数据对象#

数据对象是可以在多个 openYuanrong 函数间跨节点分布式共享访问的内存数据,支持基于共享内存的高性能 put/get 访问和修改。openYuanrong 将远程对象缓存在分布式共享内存对象存储中,对象存储在集群的每个节点上都会创建一个。

对象引用本质上是一个指针或唯一的 ID,用于引用远程对象而无需获取它实际的值,它和 Future 在概念上有类似之处。openYuanrong 基于引用计数进行数据对象自动生命周期管理,当引用计数为 0 时自动删除数据对象。

使用限制#

  • 数据对象不能被修改,如果您需要修改,请创建新的数据对象。

  • 单个数据对象的大小除 Java 语言外没有限制,但是不能超出 openYuanrong 数据系统配置的共享内存大小。Java 语言的单个数据对象大小不能超过 Integer.MAX_VALUE - 8

  • 数据对象不保证数据可靠性,当发生故障数据可能会丢失。

  • 使用 get 接口批量获取数据时,一次获取的总数据大小不能超出 openYuanrong 数据系统的共享内存大小。

创建数据对象#

数据对象可通过以下两种方式创建。创建完成后返回对象引用并自动增加引用计数,当对象引用实例消亡时,自动减小引用计数。

  • 函数调用返回对象引用。

  • 直接调用 put 接口返回对象引用。

import yr

yr.init()
object_ref = yr.put(123)
yr.finalize()
import com.yuanrong.Config;
import com.yuanrong.api.YR;
import com.yuanrong.runtime.client.ObjectRef;
import com.yuanrong.exception.YRException;

public class Main {
    public static void main(String[] args) throws YRException {
        YR.init(new Config());
        ObjectRef objectRef = YR.put(123);
        YR.Finalize();
    }
}
#include <yr/yr.h>

int main(int argc, char *argv[])
{
    YR::Init(YR::Config{}, argc, argv);
    YR::ObjectRef<int> objectRef = YR::Put<int>(123);
    YR::Finalize();
    return 0;
}

获取数据对象#

使用 get 接口从对象引用中获取远程对象的结果。如果当前节点的 openYuanrong 数据系统中没有该对象,则自动从远端节点下载。get 接口支持并行获取多个对象结果,支持设置获取超时时间,超时时间内如果数据对象没有产生则抛出异常。

import time
import yr

yr.init()

# Get the value of one object ref.
object_ref = yr.put(123)
assert yr.get(object_ref) == 123

# Get the values of multiple object refs in parallel.
assert yr.get([yr.put(i) for i in range(3)]) == [0, 1, 2]

# Get timeout example.
@yr.invoke
def long_running_function():
    time.sleep(10)

object_ref = long_running_function.invoke()
try:
    yr.get(object_ref, 3)
except TimeoutError as e:
    print(e)

yr.finalize()
import com.yuanrong.Config;
import com.yuanrong.api.YR;
import com.yuanrong.runtime.client.ObjectRef;
import com.yuanrong.exception.YRException;
import com.yuanrong.storage.WaitResult;

import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;

public class Main {
    // Get timeout example.
    public static class MyApp {
        public static int longRunningFunction() throws InterruptedException {
            TimeUnit.SECONDS.sleep(10);
            return 1;
        }
    }

    public static void main(String[] args) throws YRException {
        YR.init(new Config());

        // Get the value of one object ref.
        ObjectRef objectRef = YR.put(123);
        System.out.println(YR.get(objectRef, 30));

        // Get the values of multiple object refs in parallel.
        List<ObjectRef> objectRefs = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            objectRefs.add(YR.put(i));
        }

        WaitResult waitResult = YR.wait(objectRefs, objectRefs.size(), -1);
        for (ObjectRef ref : waitResult.getReady()) {
            System.out.println(YR.get(ref, 30));
        }

        try {
            ObjectRef resultRef = YR.function(MyApp::longRunningFunction).invoke();
            YR.get(resultRef, 3);
        } catch (YRException e) {
            System.out.println(e);
        }

        YR.Finalize();
    }
}
#include <iostream>
#include <yr/yr.h>

// Get timeout example.
int LongRunningFunction()
{
    std::this_thread::sleep_for(std::chrono::seconds(10));
    return 1;
}
YR_INVOKE(LongRunningFunction);


int main(int argc, char *argv[])
{
    YR::Init(YR::Config{}, argc, argv);

    // Get the value of one object ref.
    YR::ObjectRef<int> objectRef = YR::Put<int>(123);
    std::cout << *YR::Get(objectRef) << std::endl;

    // Get the values of multiple object refs in parallel.
    std::vector<YR::ObjectRef<int>> object_refs;
    for (int i = 0; i < 3; i++) {
        object_refs.emplace_back(YR::Put(i));
    }

    auto waitResults = YR::Wait(object_refs, object_refs.size());
    for (auto ref : waitResults.first) {
        std::cout << *YR::Get(ref) << std::endl;
    }

    YR::ObjectRef<int> object_ref = YR::Function(LongRunningFunction).Invoke();
    try {
        int result = *YR::Get(object_ref, 3);
    } catch (YR::Exception &e) {
        std::cout << e.what() << std::endl;
    }

    YR::Finalize();
    return 0;
}

传递数据对象#

对象引用可以在应用程序之间自由传递,对象通过分布式引用计数自动进行生命周期管理。

  • 将对象引用作为函数的顶层参数传递: 将对象引用作为顶层参数传递时,openYuanrong 会自动调用 get 接口将数据对象的值获取出来传入被调函数,这意味着当对象数据完全可用时才会执行任务。

    import yr
    
    yr.init()
    
    @yr.invoke
    def get_nums():
        return [1, 2, 3]
    
    @yr.invoke
    def dis_sum(args): # dis_sum 被调用时,传入的是值为 [1, 2, 3]
        return sum(args)
    
    ref = dis_sum.invoke(get_nums.invoke()) # 传递给 dis_sum 的参数是一个对象引用,在运行时 openYuanrong 将自动获取对象的值并传递
    num = yr.get(ref, 30)
    assert num == 6
    
  • 将对象引用作为嵌套参数传递: 当数据对象嵌套在其他数据对象中传递时,在任务中需要调用 get 接口来获取具体的值。数据对象嵌套传递时,外层的对象引用会自动增加内层对象引用的引用计数,从而保证当外层对象引用存在时,内层对象引用一定不会被释放。

    import yr
    
    yr.init()
    
    @yr.invoke
    def get_num(x):
        return x
    
    @yr.invoke
    def dis_sum(args): # dis_sum 被调用时,传入的值为 [objref1, objref2, objref3]
        return sum(yr.get(args)) # 调用 get 时,objref1/objref2/objref3 的值才会传输到当前机器。
    
    objref1 = get_num.invoke(1) # 此处返回 object ref,发出调用请求后即可返回,无需等待 get_sum 执行完成
    objref2 = get_num.invoke(2)
    objref3 = get_num.invoke(3)
    objref = yr.put([objref1, objref2, objref3]) # 嵌套传递 object ref
    ref = dis_sum.invoke(objref)
    num = yr.get(ref, 30)
    assert num == 6
    

数据对象溢出到磁盘#

数据对象数据存储在 openYuanrong 数据系统的共享内存中,当内存不足时,支持自动将数据溢出到磁盘并从内存中删除数据。当数据需要读取时,自动从磁盘中加载到共享内存。需要注意,当磁盘空间也不足时,数据对象会写入失败。

使用数据对象溢出功能,需要在部署时指定相关参数,默认为关闭。

# 指定 object 溢出到磁盘的路径,如果配置为空,表示禁止使用 object 溢出功能。
spillDirectory: ""

数据对象溢出有以下参数,可用于设置磁盘空间上限、溢出的并发线程、文件大小等参数,用于性能调优。

# 指定溢出到磁盘时占用的最大磁盘空间,单位是字节。当配置为 0 时,openYuanrong 会采用openYuanrong 集群拉起时的磁盘剩余空间的 95% 作为最大可占用空间。
# 当 spillDirectory 配置目录是独占磁盘时,建议将该值配置为 0,由系统自动计算可用值。否则,建议按实际的磁盘空间规划配置该值。
spillSizeLimit: "0"
# 指定向磁盘写数据时的最大并行度,当磁盘性能非常高时,可尝试将该值调高提升性能。
spillThreadNum: 8
# 指定溢出到磁盘上的单个文件大小。单位为 MB,取值范围时 200-10240。
# 当对象较小时,多个对象合并写入到一个文件中。当对象很大且超出该值时,每个文件一个对象。
spillFileMaxSizeMb: 200
# 用于指定对象溢出最大打开的文件句柄数。当该值被调小时,可能会降低性能。
spillFileOpenLimit: 512
# 是否允许文件系统的预读功能。当设置为 false 时,关闭文件系统的预读功能,可减少读放大,但可能会影响性能。
spillEnableReadahead: true

更多信息#

openYuanrong 也提供了近计算 KV 缓存能力,基于共享内存实现免拷贝的 KV 数据读写。查看KV 缓存开发指南