入门#

安装 openYuanrong datasystem#

pip 方式安装#

  • 安装 openYuanrong datasystem 完整发行版(包含Python SDK、C++ SDK以及命令行工具):

    pip install https://openyuanrong.obs.cn-southwest-2.myhuaweicloud.com/openyuanrong_datasystem-0.5.0-cp39-cp39-manylinux_2_34_x86_64.whl
    
  • 仅安装 openYuanrong datasystem Python SDK(不包含C++ SDK以及命令行工具):

    pip install https://openyuanrong.obs.cn-southwest-2.myhuaweicloud.com/openyuanrong_datasystem_sdk-0.5.0-cp39-cp39-manylinux_2_34_x86_64.whl
    

源码编译方式安装#

使用源码编译方式安装 openYuanrong datasystem 可以参考文档:源码编译安装 openYuanrong datasystem

部署 openYuanrong datasystem#

进程部署#

  • 准备ETCD

    openYuanrong datasystem 的集群管理依赖 ETCD,请先在后台启动单节点 ETCD(示例端口 2379):

    etcd --listen-client-urls http://0.0.0.0:2379 \
         --advertise-client-urls http://localhost:2379 &
    
  • 一键部署

    安装 openYuanrong datasystem 完整发行版后,即可通过随包自带的 dscli 命令行工具一键完成集群部署。在当前启动一个监听端口号为 31501 的服务端进程:

    dscli start -w --worker_address "127.0.0.1:31501" --etcd_address "127.0.0.1:2379"
    
  • 一键卸载

    dscli stop --worker_address "127.0.0.1:31501"
    

更多进程部署参数与部署方式请参考文档:openYuanrong datasystem 进程部署

Kubernetes 部署#

openYuanrong datasystem 还提供了基于 Kubernetes 容器化部署方式,部署前请确保部署环境集群已就绪 Kubernetes、Helm 及可访问的 ETCD 集群。

  • 获取 openYuanrong datasystem helm chart 包

    安装 openYuanrong datasystem 完整发行版后,即可通过随包自带的 dscli 命令行工具在当前路径下快速获取 helm chart 包:

    dscli generate_helm_chart -o ./
    
  • 编辑集群部署配置

    openYuanrong datasystem 通过 ./datasystem/values.yaml 文件进行集群相关配置,其中必配项如下:

    global:
      # 其他配置项...
    
      # 镜像仓地址
      imageRegistry: ""
      # 镜像名字和镜像tag
      images:
        datasystem: "openyuanrong-datasystem:0.5.0"
      
      etcd:
        # ETCD集群地址
        etcdAddress: "127.0.0.1:2379"
    
  • 集群部署

    Helm 会提交 DaemonSet,按节点依次拉起 openYuanrong datasystem 实例:

    helm install openyuanrong_datasystem ./datasystem
    
  • 集群卸载

    helm uninstall openyuanrong_datasystem
    

更多 openYuanrong datasystem Kubernetes 高级参数配置请参考文档:openYuanrong datasystem Kubernetes 部署

开发指南#

异构对象#

异构对象实现对 HBM 内存的抽象管理,能够高效实现 D2D/H2D/D2H 的数据传输,加速 AI 训推场景数据读写。

主要应用场景

  • LLM 长序列推理 KVCache:基于异构对象提供分布式多级缓存 (HBM/DRAM/SSD) 和高吞吐 D2D/H2D/D2H 访问能力,构建分布式 KV Cache,实现 Prefill 阶段的 KVCache 缓存以及 Prefill/Decode 实例间 KV Cache 快速传递,提升推理吞吐。

  • 模型推理实例 M->N 快速弹性:利用异构对象的卡间直通及 P2P 数据分发能力实现模型参数快速复制。

  • 训练场景 CheckPoint 快速加载到 HBM:各节点将待恢复的 Checkpoint 分片加载到异构对象中,利用异构对象的卡间直通传输及 P2P 数据分发能力,快速将 Checkpoint 传递到各节点 HBM。

通过异构对象接口,将任意二进制数据以键值对形式写入 HBM:

import acl
import os
from datasystem import Blob, DsClient, DeviceBlobList

# hetero_dev_mset and hetero_dev_mget must be executed in different processes
# because they need to be bound to different NPUs.
def hetero_dev_mset():
    client = DsClient("127.0.0.1", 31501)
    client.init()

    acl.init()
    device_idx = 1
    acl.rt.set_device(device_idx)

    key_list = [ 'key1', 'key2', 'key3' ]
    data_size = 1024 * 1024
    test_value = "value"

    in_data_blob_list = []
    for _ in key_list:
        tmp_batch_list = []
        for _ in range(4):
            dev_ptr, _ = acl.rt.malloc(data_size, 0)
            acl.rt.memcpy(dev_ptr, data_size, acl.util.bytes_to_ptr(test_value.encode()), data_size, 1)
            blob = Blob(dev_ptr, data_size)
            tmp_batch_list.append(blob)
        blob_list = DeviceBlobList(device_idx, tmp_batch_list)
        in_data_blob_list.append(blob_list)
    client.hetero().dev_mset(key_list, in_data_blob_list)

def hetero_dev_mget():
    client = DsClient("127.0.0.1", 31501)
    client.init()

    acl.init()
    device_idx = 2
    acl.rt.set_device(device_idx)

    key_list = [ 'key1', 'key2', 'key3' ]
    data_size = 1024 * 1024
    out_data_blob_list = []
    for _ in key_list:
        tmp_batch_list = []
        for _ in range(4):
            dev_ptr, _ = acl.rt.malloc(data_size, 0)
            blob = Blob(dev_ptr, data_size)
            tmp_batch_list.append(blob)
        blob_list = DeviceBlobList(device_idx, tmp_batch_list)
        out_data_blob_list.append(blob_list)
    client.hetero().dev_mget(key_list, out_data_blob_list, 60000)
    client.hetero().dev_delete(key_list)

pid = os.fork()
if pid == 0:
    hetero_dev_mset()
    os._exit(0)
else:
    hetero_dev_mget()
    os.wait()

更多异构对象使用方式请参考:异构对象开发指南

KV#

基于共享内存实现免拷贝的 KV 数据读写,支持通过对接外部组件提供数据可靠性语义,支持数据在 DRAM / SSD / 二级缓存之间置换,实现大容量高性能缓存。

主要应用场景

  • 训练场景 Checkpoint 快速保存及加载:基于 KV 接口快速读写 Checkpoint,并支持将数据持久化到二级缓存保证数据可靠性。

通过 KV 接口,将任意二进制数据以键值对形式写入 DDR:

from datasystem.ds_client import DsClient

client = DsClient("127.0.0.1", 31501)
client.init()

key = "key"
expected_val = b"value"
client.kv().set(key, expected_val)

val = client.kv().get([key])
assert val[0] == expected_val

client.kv().delete([key])
#include "datasystem/datasystem.h"

using namespace datasystem;

#define ASSERT_TRUE(condition) \
    do { \
        if (!(condition)) { \
            fprintf(stderr, "Assertion failed: %s, file %s, line %d\n", \
                    #condition, __FILE__, __LINE__); \
            exit(1); \
        } \
    } while(0)

int main()
{
    ConnectOptions connectOptions = { .host = "127.0.0.1", .port = 31501 };
    auto client = std::make_shared<DsClient>(connectOptions);
    ASSERT_TRUE(client->Init().IsOk());

    std::string key = "testKey";
    std::string value = "Hello kv client";
    std::string value2 = "Hello modify";
    Status status = client->KV()->Set(key, value);
    ASSERT_TRUE(status.IsOk());

    std::string getValue;
    status = client->KV()->Get(key, getValue);
    ASSERT_TRUE(status.IsOk());
    ASSERT_TRUE(getValue == value);

    status = client->KV()->Set(key, value2);
    ASSERT_TRUE(status.IsOk());

    status = client->KV()->Get(key, getValue);
    ASSERT_TRUE(status.IsOk());
    ASSERT_TRUE(getValue == value2);

    status = client->KV()->Del(key);
    ASSERT_TRUE(status.IsOk());

    status = client->KV()->Get(key, getValue);
    ASSERT_TRUE(status.IsError());
    return 0;
}

更多KV使用方式请参考:KV开发指南

Object#

基于共享内存实现 Object 语义读写,提供基于引用计数管理生命周期,将共享内存抽象为 buffer,直接映射共享内存指针,提供更底层灵活的编程接口。

import random
from datasystem.ds_client import DsClient

def random_str(slen=10):
    seed = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#%^*()_+=-"
    sa = []
    for _ in range(slen):
        sa.append(random.choice(seed))
    return ''.join(sa)

def object_test():
    client = DsClient("127.0.0.1", 31501)
    client.init()
    object_key = "test_key"
    value = bytes(random_str(100), encoding='utf8')
    buffer = client.object().create(object_key, len(value))
    client.object().g_increase_ref([object_key])
    assert client.object().query_global_ref_num(object_key) == 1
    buffer.wlatch()
    buffer.memory_copy(value)
    buffer.seal()
    buffer.unwlatch()
    buffer_list = client.object().get([object_key], 0)
    assert buffer_list[0].immutable_data().tobytes() == value
    #self.assertEqual(buffer_list[0].immutable_data().tobytes(), value)
    client.object().g_decrease_ref([object_key])
    assert client.object().query_global_ref_num(object_key) == 0
    #self.assertEqual(client.object().query_global_ref_num(object_key), 0)
    try:
        client.object().get([object_key], 0)
    except RuntimeError as e:
        print("get error:", e)

object_test()
#include "datasystem/datasystem.h"

using namespace datasystem;

#define ASSERT_TRUE(condition) \
    do { \
        if (!(condition)) { \
            fprintf(stderr, "Assertion failed: %s, file %s, line %d\n", \
                    #condition, __FILE__, __LINE__); \
            exit(1); \
        } \
    } while(0)

int main()
{
    ConnectOptions connectOptions = { .host = "127.0.0.1", .port = 31501 };
    auto client = std::make_shared<DsClient>(connectOptions);
    ASSERT_TRUE(client->Init().IsOk());

    std::string objectKey = "testKey";
    std::string data = "Hello object client";
    int size = data.size();
    std::shared_ptr<Buffer> buffer;
    Status status = client->Object()->Create(objectKey, size, CreateParam{}, buffer);
    ASSERT_TRUE(status.IsOk());
    ASSERT_EQ(size, buffer->GetSize());
    std::vector<std::string> failedobjectKeys;
    ASSERT_TRUE(client->Object()->GIncreaseRef({ objectKey }, failedobjectKeys).IsOk());
    buffer->WLatch();
    buffer->MemoryCopy((void *)data.data(), size);
    buffer->Seal();
    buffer->UnWLatch();

    std::vector<Optional<Buffer>> buffers;
    ASSERT_TRUE(client->Object()->Get({ objectKey }, 0, buffers).IsOk());
    ASSERT_EQ(buffers[0]->GetSize(), size);
    buffers[0]->RLatch();
    ASSERT_EQ(memcmp(data.data(), buffers[0]->MutableData(), size), 0);
    buffers[0]->UnRLatch();
    ASSERT_TRUE(client->Object()->GDecreaseRef({ objectKey }, failedobjectKeys).IsOk());
    ASSERT_TRUE(client->Object()->Get({ objectKey }, 0, buffers).IsError());
    return 0;
}

更多对象缓存使用方式请参考:Object开发指南