函数中使用流

函数中使用流#

本节通过简单的 Python 示例介绍如何在函数中使用流。

准备工作#

参考在主机上部署完成openYuanrong部署。

在无状态函数中使用#

我们在主程序中创建消费者 local_consumer,该操作会隐式完成流 exp-stream 的创建。生产者为无状态函数,实例在远端运行。生产者和消费者协商使用字符串 ::END:: 作为流结束标志,处理完流后需要主动调用接口 yr.delete_stream 删除流,释放资源。

import subprocess
import yr
import time

@yr.invoke
def send_stream(stream_name, end_marker):
    try:
        # 创建生产者,配置自动 ACK
        # 流发送会进行缓存,对于实时性要求较高的任务,可调低 delay_flush_time 的值,默认 5ms
        producer_config = yr.ProducerConfig(delay_flush_time=5, page_size=1024 * 1024, max_stream_size=1024 * 1024 * 1024, auto_clean_up=True)
        stream_producer = yr.create_stream_producer(stream_name, producer_config)

        corpus = subprocess.check_output(["python", "-c", "import this"])
        lines = corpus.decode().split("\n")

        i = 0
        for line in lines:
            if len(line) > 0:
                # 发送流
                stream_producer.send(yr.Element(line.encode(), i))
                print("send:" + line)
                i += 1

        # 发送业务约定的结束符号,关闭生产者
        stream_producer.send(yr.Element(end_marker.encode(), i))
        stream_producer.close()
        print("stream producer is closed")
    except RuntimeError as exp:
        print("unexpected exp: ", exp)


if __name__ == '__main__':
    yr.init()

    stream_name = "exp-stream"
    end_marker = "::END::"
    # 创建消费者,隐式创建流
    config = yr.SubscriptionConfig("local_consumer")
    consumer = yr.create_stream_consumer(stream_name, config)
    send_stream.invoke(stream_name, end_marker)

    end = False
    while not end:
        # 经过 1000ms 或收到 10 个 elements 就返回
        elements = consumer.receive(1000, 10)
        for e in elements:
            data_str = e.data.decode()
            print("receive:" + data_str)
            # 收到约定的结束符后,关闭消费者
            if data_str == end_marker:
                consumer.close()
                print("stream consumer is closed")
                end = True

    # 需要显示删除流,否则流一直存在
    yr.delete_stream(stream_name)
    yr.finalize()