数据流#

openYuanrong 提供了基于发布-订阅(pub/sub)模型的数据流,可实现函数间无界数据流的数据交换,支持复杂的数据交互关系。同时,数据流解耦了数据生产者和消费者,支持它们各自按需异步调度。

数据流中有四个关键概念:生产者(producer)、消费者(consumer)、流(stream)、数据项(element)。

  • 生产者:生产者是无界数据流的发起端,产生并发送数据。

  • 消费者:消费者是无界数据流的接收端,消费数据。

  • 数据项:数据以数据项的粒度在生产者和消费者间发送和接收。

  • 流:生产者和消费者间不相互感知,通过流进行关联。生产者向流中发送数据,消费者订阅流并从流中接收数据。一个应用中可以有多条流,通过流名称区分。

数据流支持多生产者和多消费者间的数据交互。在实际场景中,使用最多的是一个生产者一个消费者(one-to-one)、多个生产者一个消费者(many-to-one)及一个生产者多个消费者(one-to-many)。

使用限制#

  • 数据流无持久化能力,发送节点故障时,数据会丢失,应用需要考虑对应的故障处,比如重启业务等。

  • 数据流都是同步接口,没有类似 epoll 的多路复用能力。

  • 流解耦了生产者和消费者,彼此间不感知对方。当生产者关闭时,消费者无法感知,需要业务层进行处理。

创建流#

流代表了生产者和消费者间的发布订阅交互关系。流随着生产者或消费者的创建而隐式创建,无需应用显式创建流。

创建一个生产者或消费者时,需要指定它所关联的流,不同的流通过流名称区分。如果流已经存在,新创建的生产者或消费者会关联到这条流上。如流不存在,在创建生产者或消费者时,系统会隐式创建一个新流,并关联它到指定的流名称。

一条流上的生产者和消费者都关闭后,如果未操作接口删除流,流仍然存在,应用可以在这条流上继续关联新的生产者和消费者。您可以在创建生产者时指定流为自动删除(配置 autoCleanup 选项),当该流上所有生产者和消费者都关闭后,流将被系统自动删除。

import yr

yr.init()
stream_name = "this-stream"
try:
    # 配置流自动删除
    producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True)
    # 创建生产者将隐式创建流 this-stream
    producer = yr.create_stream_producer(stream_name, producer_config)
    # 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除
    producer.close()

    consumer_config = yr.SubscriptionConfig("local-consumer")
    # 创建消费者将再次隐式创建流 this-stream
    consumer = yr.create_stream_consumer(stream_name, consumer_config)
    consumer.close()

    # 消费者新创建的流需要显示删除
    yr.delete_stream(stream_name)
except RuntimeError as exp:
    print(exp)

yr.finalize()
#include <iostream>
#include "yr/yr.h"

int main(int argc, char *argv[])
{
    YR::Init(YR::Config{}, argc, argv);
    std::string streamName = "this-stream";
    try {
        // 配置流自动删除
        YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true};
        // 创建生产者将隐式创建流 this-stream
        std::shared_ptr<YR::Producer> producer = YR::CreateProducer(streamName, pConfig);
        // 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除
        producer->Close();

        YR::SubscriptionConfig sConfig("local-consumer", YR::SubscriptionType::STREAM);
        // 创建消费者将再次隐式创建流 this-stream
        std::shared_ptr<YR::Consumer> consumer = YR::Subscribe(streamName, sConfig);
        consumer->Close();

        // 消费者新创建的流需要显示删除
        YR::DeleteStream(streamName);
    } catch (YR::Exception &e) {
        std::cout << e.what() << std::endl;
    }

    YR::Finalize();
    return 0;
}
package com.example;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;

import com.yuanrong.api.YR;
import com.yuanrong.Config;
import com.yuanrong.exception.YRException;
import com.yuanrong.stream.Producer;
import com.yuanrong.stream.ProducerConfig;
import com.yuanrong.stream.Consumer;
import com.yuanrong.stream.SubscriptionConfig;
import com.yuanrong.stream.SubscriptionType;
import com.yuanrong.stream.Element;

public class Main {
    public static void main(String[] args) throws YRException {
        YR.init(new Config());

        String streamName = "this-stream";
        try {
            // 配置流自动删除
            ProducerConfig pConfig = ProducerConfig.builder()
                                         .delayFlushTimeMs(5L)
                                         .pageSizeByte(1024 * 1024L)
                                         .maxStreamSize(1024 * 1024 * 1024L)
                                         .autoCleanup(true).build();
            // 创建生产者将隐式创建流 this-stream
            Producer producer = YR.createProducer(streamName, pConfig);
            // 关闭生产者,流 this-stream 已无生产者或消费者关联,将被自动删除
            producer.close();

            SubscriptionConfig sConfig = SubscriptionConfig.builder().subscriptionName("local-consumer").build();
            // 创建消费者将再次隐私创建流 this-stream
            Consumer consumer = YR.subscribe(streamName, sConfig);
            consumer.close();
           
            // 消费者新创建的流需要显示删除
            YR.deleteStream(streamName);
        } catch (YRException e) {
            e.printStackTrace();
        }

        YR.Finalize();
    }
}

生产流数据#

生产者(Producer)可向流中发送数据。生产者发送的数据会先放入缓冲区,系统根据生产者配置的 Flush 策略(发送间隔一段时间或者缓冲写满)刷新缓冲使其对消费者可见。生产者不再使用时,需要主动关闭。

import yr

yr.init()
stream_name = "this-stream"
try:
    producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True)
    producer = yr.create_stream_producer(stream_name, producer_config)

    # 生产数据
    element = yr.Element(value=b"hello", ele_id=0)
    producer.send(element)

    # 主动关闭生产者
    producer.close()
except RuntimeError as exp:
    print(exp)

yr.finalize()
#include <iostream>
#include "yr/yr.h"

int main(int argc, char *argv[])
{
    YR::Init(YR::Config{}, argc, argv);
    std::string streamName = "this-stream";
    try {
        YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true};
        std::shared_ptr<YR::Producer> producer = YR::CreateProducer(streamName, pConfig);

        // 生产数据
        std::string data = "hello";
        YR::Element element((uint8_t *)(data.c_str()), data.size());
        producer->Send(element);

        // 主动关闭生产者
        producer->Close();
    } catch (YR::Exception &e) {
        std::cout << e.what() << std::endl;
    }

    YR::Finalize();
    return 0;
}
package com.example;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;

import com.yuanrong.api.YR;
import com.yuanrong.Config;
import com.yuanrong.exception.YRException;
import com.yuanrong.stream.Producer;
import com.yuanrong.stream.ProducerConfig;
import com.yuanrong.stream.Consumer;
import com.yuanrong.stream.SubscriptionConfig;
import com.yuanrong.stream.SubscriptionType;
import com.yuanrong.stream.Element;

public class Main {
    public static void main(String[] args) throws YRException {
        YR.init(new Config());

        String streamName = "this-stream";
        try {
            ProducerConfig pConfig = ProducerConfig.builder()
                                         .delayFlushTimeMs(5L)
                                         .pageSizeByte(1024 * 1024L)
                                         .maxStreamSize(1024 * 1024 * 1024L)
                                         .autoCleanup(true).build();
            Producer producer = YR.createProducer(streamName, pConfig);

            // 生产数据
            String data = "hello";
            ByteBuffer buffer = ByteBuffer.wrap(data.getBytes());
            Element element = new Element(0L, buffer);
            producer.send(element);

            // 关闭生产者
            producer.close();
        } catch (YRException e) {
            e.printStackTrace();
        }

        YR.Finalize();
    }
}

消费流数据#

消费者(Consumer)可接收流中的数据,使用 Ack 方法确认数据接收。消费者不再使用时,需要主动关闭。

消费者接收数据后,需要对数据进行 ACK 操作,以确认该数据及之前收到的数据都已消费完。对确认消费完的数据,系统会回收内存资源。数据流提供了自动 ACK 功能,只需在创建消费者时配置 autoAcktrue。应用每次调用 Receive 操作后,系统会自动确认上一次接收的数据,应用无需再主动调用 Ack 方法。

Note

开启自动 ACK 后,用户需保证消费者每次调用 Receive 前,上一次接收到的数据已消费完。调用 Receive 后,继续消费上一次接收到的数据系统未定义。

Hint

当消费者调用 Receive 方法时,会获取 Element 对象,对象的内部指针指向实际的数据,这些数据处于应用函数和数据系统间的共享内存之中。应用需要通过 ACK 操作确认该数据已经消费完,此时数据系统才能回收该数据所占的内存资源。如应用不调用 ACK 操作,数据系统无法判断数据是否被消费,则无法回收内存资源,最终导致内存资源耗尽,系统异常。

import yr

yr.init()
stream_name = "this-stream"
try:
    producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True)
    producer = yr.create_stream_producer(stream_name, producer_config)

    consumer_config = yr.SubscriptionConfig("local-consumer")
    consumer = yr.create_stream_consumer(stream_name, consumer_config)

    element = yr.Element(value=b"hello", ele_id=0)
    producer.send(element)

    # 消费数据,等待到一条数据或者1秒超时
    elements = consumer.receive(1000, 1)
    for e in elements:
        print("receive:" + e.data.decode())

    producer.close()
    # 主动关闭消费者
    consumer.close()
except RuntimeError as exp:
    print(exp)

yr.finalize()
#include <iostream>
#include "yr/yr.h"

int main(int argc, char *argv[])
{
    YR::Init(YR::Config{}, argc, argv);
    std::string streamName = "this-stream";
    try {
        YR::ProducerConf pConfig{.delayFlushTime=5, .pageSize=1024 * 1024ul, .maxStreamSize=1024 * 1024 * 1024ul, .autoCleanup=true};
        std::shared_ptr<YR::Producer> producer = YR::CreateProducer(streamName, pConfig);

        YR::SubscriptionConfig sConfig("local-consumer", YR::SubscriptionType::STREAM);
        std::shared_ptr<YR::Consumer> consumer = YR::Subscribe(streamName, sConfig);

        std::string data = "hello";
        YR::Element element((uint8_t *)(data.c_str()), data.size());
        producer->Send(element);

        // 消费数据,等待到一条数据或者1秒超时
        std::vector<YR::Element> elements;
        consumer->Receive(1, 1000, elements);
        for (auto e : elements) {
            std::string str(reinterpret_cast<char *>(e.ptr), e.size);
            // 手动ACK
            consumer->Ack(e.id);
            std::cout << "receive: " << str << std::endl;
        }

        producer->Close();
        // 主动关闭消费者
        consumer->Close();
    } catch (YR::Exception &e) {
        std::cout << e.what() << std::endl;
    }

    YR::Finalize();
    return 0;
}
package com.example;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;

import com.yuanrong.api.YR;
import com.yuanrong.Config;
import com.yuanrong.exception.YRException;
import com.yuanrong.stream.Producer;
import com.yuanrong.stream.ProducerConfig;
import com.yuanrong.stream.Consumer;
import com.yuanrong.stream.SubscriptionConfig;
import com.yuanrong.stream.SubscriptionType;
import com.yuanrong.stream.Element;

public class Main {
    public static void main(String[] args) throws YRException {
        YR.init(new Config());

        String streamName = "this-stream";
        try {
            ProducerConfig pConfig = ProducerConfig.builder()
                                         .delayFlushTimeMs(5L)
                                         .pageSizeByte(1024 * 1024L)
                                         .maxStreamSize(1024 * 1024 * 1024L)
                                         .autoCleanup(true).build();
            Producer producer = YR.createProducer(streamName, pConfig);

            SubscriptionConfig sConfig = SubscriptionConfig.builder().subscriptionName("local-consumer").build();
            Consumer consumer = YR.subscribe(streamName, sConfig);

            String data = "hello";
            ByteBuffer buffer = ByteBuffer.wrap(data.getBytes());
            Element element = new Element(0L, buffer);
            producer.send(element);

            // 消费数据,等待到一条数据或者3秒超时
            Charset charset = Charset.forName("UTF-8");
            List<Element> elements = consumer.receive(1, 3000);
            for (Element e : elements) {
                String str = charset.decode(e.getBuffer()).toString();
                // 手动ACK
                consumer.ack(e.getId());
                System.out.println("receive: " + str);
            }

            producer.close();
            // 主动关闭消费者
            consumer.close();
        } catch (YRException e) {
            e.printStackTrace();
        }

        YR.Finalize();
    }
}