异构对象#
基本概念#
openYuanrong datasystem (下文中称为数据系统)的 Hetero 语义中,基于 Device 侧的 HBM 内存抽象异构对象接口,实现昇腾 NPU 卡间数据高速直通传输。同时提供 H2D/D2H 高速迁移接口,实现数据快速在 DRAM/HBM 之间传输。
注意:
数据系统并不直接负责 HBM 内存的申请及释放,用户调用 hetero 接口将 HBM 的指针注册关联到数据系统中并指定 key,数据系统将用户指定的 key 及 HBM 指针抽象为数据对象,控制在不同卡之间的流转。
因此将 HBM 的指针注册给数据系统后,上层业务需要保证指针的有效性,避免被释放或修改。
D2D(Device to Device) 数据传输#
异构对象针对 D2D 数据传输,提供了两种语义:
DevPublish / DevSubscribe:数据传输语义,数据生成端执行 DevPublish 将 HBM 上的数据发布为异构对象,数据接收端申请 HBM 内存后,执行 DevSubscribe 订阅接收数据,数据系统使用卡间直通传输数据并写入用户提供的 HBM 内存中。当数据被接收后,数据系统会自动删除对象,不再关联发布的 HBM 内存。
DevPublish / DevSubscribe 为异步接口,提供了返回 Future 供用户获取执行结果,每个key返回一个 Future。当 Future::Get 获取到结果为 OK 时,表示数据已经被对端接收成功。
注意:
DevPublish / DevSubscribe 传入的 Device 内存地址不能归属于同一张 NPU 卡。
在执行 DevSubscribe 过程中,执行了 DevPublish 的进程不能退出,否则 DevSubscribe 会失败。
在key,devBlobList内存地址映射关系均一致的情况下,DevPublish在同进程支持重试。 DevSubscribe单Key的订阅超时时间为20s,多key为60s。
import acl
import random
from datasystem import DsClient, DeviceBlobList, Blob
def random_str(slen=10):
seed = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#%^*()_+=-"
sa = []
for _ in range(slen):
sa.append(random.choice(seed))
return ''.join(sa)
# hetero_dev_publish and hetero_dev_subscribe must be executed in different processes
# because they need to be bound to different NPUs.
def hetero_dev_publish():
client = DsClient("127.0.0.1", 31501)
client.init()
acl.init()
device_idx = 1
acl.rt.set_device(device_idx)
key_list = [ 'key1', 'key2', 'key3' ]
data_size = 1024 * 1024
test_value = random_str(data_size)
in_data_blob_list = []
for _ in key_list:
tmp_batch_list = []
for _ in range(4):
dev_ptr, _ = acl.rt.malloc(data_size, 0)
acl.rt.memcpy(dev_ptr, data_size, acl.util.bytes_to_ptr(test_value.encode()), data_size, 1)
blob = Blob(dev_ptr, data_size)
tmp_batch_list.append(blob)
blob_list = DeviceBlobList(device_idx, tmp_batch_list)
in_data_blob_list.append(blob_list)
pub_futures = client.hetero().dev_publish(key_list, in_data_blob_list)
for future in pub_futures:
future.get()
def hetero_dev_subscribe():
client = DsClient("127.0.0.1", 31501)
client.init()
acl.init()
device_idx = 2
acl.rt.set_device(device_idx)
key_list = [ 'key1', 'key2', 'key3' ]
data_size = 1024 * 1024
out_data_blob_list = []
for _ in key_list:
tmp_batch_list = []
for _ in range(4):
dev_ptr, _ = acl.rt.malloc(data_size, 0)
blob = Blob(dev_ptr, data_size)
tmp_batch_list.append(blob)
blob_list = DeviceBlobList(device_idx, tmp_batch_list)
out_data_blob_list.append(blob_list)
sub_futures = client.hetero().dev_subscribe(key_list, out_data_blob_list)
for future in sub_futures:
future.get()
#include "datasystem/datasystem.h"
#include <acl/acl.h>
// HeteroDevPublish and HeteroDevSubscribe must be executed in different processes
// because they need to be bound to different NPUs.
void HeteroDevPublish()
{
ConnectOptions connectOptions = { .host = "127.0.0.1", .port = 31501 };
auto client = std::make_shared<DsClient>(connectOptions);
ASSERT_TRUE(client->Init().IsOk());
// Initialize the ACL interface.
int deviceId = 1;
aclInit(nullptr);
aclrtSetDevice(deviceId); // Bind the NPU card.
// The sender constructs data and publishes it to the data system.
std::vector<std::string> keys = { "test-key1" };
std::vector<uint64_t> blobSize = { 10, 20 };
int blobNum = blobSize.size();
std::vector<DeviceBlobList> inBlobList;
inBlobList.resize(keys.size());
// Enter the Device address information on the device to the devblob.
for (size_t i = 0; i < inBlobList.size(); i++) {
inBlobList[i].deviceIdx = deviceId;
for (int j = 0; j < blobNum; j++) {
void *devPtr = nullptr;
int code = aclrtMalloc(&devPtr, blobSize[j], ACL_MEM_MALLOC_HUGE_FIRST);
// Copying Data to the Device Memory.
// aclrtMemcpy(devPtr, blobSize[j], value.data(), size, aclrtMemcpyKind::ACL_MEMCPY_HOST_TO_DEVICE)
ASSERT_EQ(code, 0);
Blob blob = { .pointer = devPtr, .size = blobSize[j] };
inBlobList[i].blobs.emplace_back(std::move(blob));
}
}
std::vector<Future> inFutureVecEnque;
Status status = client->Hetero()->DevPublish(keys, inBlobList, inFutureVecEnque);
ASSERT_TRUE(status.IsOk());
// The sender checks the future.
// If OK is returned, it indicates that the receiver has received the HBM successfully
// and the HBM memory is disconnected from the data system.
for (size_t i = 0; i < inFutureVecEnque.size(); i++) {
ASSERT_TRUE(inFutureVecEnque[i].Get().IsOk());
}
}
void HeteroDevSubscribe()
{
ConnectOptions connectOptions = { .host = "127.0.0.1", .port = 31501 };
auto client = std::make_shared<DsClient>(connectOptions);
ASSERT_TRUE(client->Init().IsOk());
// Initialize the ACL interface.
int deviceId = 2;
aclInit(nullptr);
aclrtSetDevice(deviceId); // Bind the NPU card.
std::vector<std::string> keys = { "test-key1" };
std::vector<uint64_t> blobSize = { 10, 20 };
int blobNum = blobSize.size();
std::vector<DeviceBlobList> outBlobList;
outBlobList.resize(keys.size());
// The receiver allocates HBM memory and subscribes to receive data from the data system.
for (size_t i = 0; i < outBlobList.size(); i++) {
outBlobList[i].deviceIdx = deviceId;
for (int j = 0; j < blobNum; j++) {
void *devPtr = nullptr;
int code = aclrtMalloc(&devPtr, blobSize[j], ACL_MEM_MALLOC_HUGE_FIRST);
ASSERT_EQ(code, 0);
Blob blob = { .pointer = devPtr, .size = blobSize[j] };
outBlobList[i].blobs.emplace_back(std::move(blob));
}
}
std::vector<Future> outFutureVecDeque;
Status status = client->Hetero()->DevSubscribe(keys, outBlobList, outFutureVecDeque);
ASSERT_TRUE(status.IsOk());
// The receiver checks the future.
// If ok is returned, the data is received successfully.
for (size_t i = 0; i < outFutureVecDeque.size(); i++) {
ASSERT_TRUE(outFutureVecDeque[i].Get().IsOk());
}
}
DevMSet / DevMGet:数据缓存语义,数据生成端执行 DevMSet 将 HBM 数据发布到数据系统,数据接收端申请 HBM 内存后,执行 DevMGet 接口读取数据。当数据被读取后,数据系统不会删除对象,该数据可被反复读取。数据使用完成后需要调用 DevLocalDelete/DevDelete 删除对象。
注意:
DevMSet / DevMGet 传入的 Device 内存地址不能归属于同一张 NPU 卡。
在执行 DevMGet 过程中,执行了 DevMSet 的进程不能退出,否则 DevMGet 会失败。
在key,devBlobList内存地址映射关系均一致的情况下,DevMGet在同进程支持重试。
import acl
import random
from datasystem.ds_client import DsClient
def random_str(slen=10):
seed = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#%^*()_+=-"
sa = []
for _ in range(slen):
sa.append(random.choice(seed))
return ''.join(sa)
# hetero_dev_mset and hetero_dev_mget must be executed in different processes
# because they need to be bound to different NPUs.
def hetero_dev_mset():
client = DsClient("127.0.0.1", 31501)
client.init()
acl.init()
device_idx = 1
acl.rt.set_device(device_idx)
key_list = [ 'key1', 'key2', 'key3' ]
data_size = 1024 * 1024
test_value = random_str(data_size)
in_data_blob_list = []
for _ in key_list:
tmp_batch_list = []
for _ in range(4):
dev_ptr, _ = acl.rt.malloc(data_size, 0)
acl.rt.memcpy(dev_ptr, data_size, acl.util.bytes_to_ptr(test_value.encode()), data_size, 1)
blob = Blob(dev_ptr, data_size)
tmp_batch_list.append(blob)
blob_list = DeviceBlobList(device_idx, tmp_batch_list)
in_data_blob_list.append(blob_list)
client.hetero().dev_mset(key_list, in_data_blob_list)
def hetero_dev_mget():
client = DsClient("127.0.0.1", 31501)
client.init()
acl.init()
device_idx = 2
acl.rt.set_device(device_idx)
key_list = [ 'key1', 'key2', 'key3' ]
data_size = 1024 * 1024
out_data_blob_list = []
for _ in key_list:
tmp_batch_list = []
for _ in range(4):
dev_ptr, _ = acl.rt.malloc(data_size, 0)
blob = Blob(dev_ptr, data_size)
tmp_batch_list.append(blob)
blob_list = DeviceBlobList(device_idx, tmp_batch_list)
out_data_blob_list.append(blob_list)
client.hetero().dev_mget(key_list, out_data_blob_list, 60000)
client.hetero().dev_delete(key_list)
#include "datasystem/datasystem.h"
#include <acl/acl.h>
// HeteroDevMSet and HeteroDevMGet must be executed in different processes
// because they need to be bound to different NPUs.
void HeteroDevMSet()
{
ConnectOptions connectOptions = { .host = "127.0.0.1", .port = 31501 };
auto client = std::make_shared<DsClient>(connectOptions);
ASSERT_TRUE(client->Init().IsOk());
// Initialize the ACL interface.
int deviceId = 1;
aclInit(nullptr);
aclrtSetDevice(deviceId); // Bind the NPU card.
// The data generator constructs data and writes it to the data system.
std::vector<std::string> keys = { "test-key1" };
std::vector<uint64_t> blobSize = { 10, 20 };
int blobNum = blobSize.size();
std::vector<DeviceBlobList> inBlobList;
inBlobList.resize(keys.size());
for (size_t i = 0; i < inBlobList.size(); i++) {
inBlobList[i].deviceIdx = deviceId;
for (int j = 0; j < blobNum; j++) {
void *devPtr = nullptr;
int code = aclrtMalloc(&devPtr, blobSize[j], ACL_MEM_MALLOC_HUGE_FIRST);
// Copying Data to the Device Memory.
// aclrtMemcpy(devPtr, blobSize[j], value.data(), size, aclrtMemcpyKind::ACL_MEMCPY_HOST_TO_DEVICE)
ASSERT_EQ(code, 0);
Blob blob = { .pointer = devPtr, .size = blobSize[j] };
inBlobList[i].blobs.emplace_back(std::move(blob));
}
}
std::vector<std::string> setFailedKeys;
Status status = client->Hetero()->DevMSet(keys, inBlobList, setFailedKeys);
ASSERT_TRUE(status.IsOk());
ASSERT_TRUE(setFailedKeys.size() == 0);
}
void HeteroDevMGet()
{
ConnectOptions connectOptions = { .host = "127.0.0.1", .port = 31501 };
auto client = std::make_shared<DsClient>(connectOptions);
ASSERT_TRUE(client->Init().IsOk());
// Initialize the ACL interface.
int deviceId = 2;
aclInit(nullptr);
aclrtSetDevice(deviceId); // Bind the NPU card.
// The data obtainer allocates the HBM memory and invokes the DevMGet interface to receive data.
std::vector<std::string> keys = { "test-key1" };
std::vector<uint64_t> blobSize = { 10, 20 };
int blobNum = blobSize.size();
std::vector<DeviceBlobList> outBlobList;
outBlobList.resize(keys.size());
// Allocate the HBM memory and fill it in the outBlobList.
for (size_t i = 0; i < outBlobList.size(); i++) {
outBlobList[i].deviceIdx = deviceId;
for (int j = 0; j < blobNum; j++) {
void *devPtr = nullptr;
int code = aclrtMalloc(&devPtr, blobSize[j], ACL_MEM_MALLOC_HUGE_FIRST);
ASSERT_EQ(code, 0);
Blob blob = { .pointer = devPtr, .size = blobSize[j] };
outBlobList[i].blobs.emplace_back(std::move(blob));
}
}
std::vector<std::string> getFailedKeys;
Status status = client->Hetero()->DevMGet(keys, outBlobList, getFailedKeys);
ASSERT_TRUE(status.IsOk());
ASSERT_TRUE(getFailedKeys.size() == 0);
std::vector<std::string> delFailedKeys;
status = client->Hetero()->DevDelete(keys, delFailedKeys);
ASSERT_TRUE(status.IsOk());
ASSERT_TRUE(delFailedKeys.size() == 0);
}
注意:
这里提到的删除对象,是指解除数据系统与 HBM 内存的绑定,并不会直接影响 HBM 中的数据,HBM 的内存由上层应用自己释放。
H2D(Host to Device)/D2H(Device to Host) 数据传输#
异构对象接口提供了 MGetH2D 和 MSetD2H 接口,实现数据在 HBM 和 DRAM 之间快速 swap。
MSetD2H 接口:将 HBM 上的内存,写入到host中的指定 key 中。当数据系统在 HBM 中是零散的多个小块内存时,在 MSetD2H 接口中,会自动将这些小块数据拼接后写入到 host 内存中。
MGetH2D 接口:从 host 的指定 key 中读取数据,并写入到 Device 的 HBM 内存中。MGetH2D 接口需要用户提前申请 HBM 内存将地址写入到 devBlobList 参数中。
MGetH2D / MSetD2H 需配套使用。当 MSetD2H 对 HBM 的小数据块合并后,在 MGetH2D 中会重新还原为小数据块。
若 host 的 key 不再使用,可调用 Delete 接口删除。
import acl
import random
from datasystem.ds_client import DsClient
def random_str(slen=10):
seed = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#%^*()_+=-"
sa = []
for _ in range(slen):
sa.append(random.choice(seed))
return ''.join(sa)
def hetero_mset_d2h_mget_h2d():
client = DsClient("127.0.0.1", 31501)
client.init()
acl.init()
device_idx = 0
acl.rt.set_device(device_idx)
key_list = [ 'key1', 'key2', 'key3' ]
data_size = 1024 * 1024
test_value = random_str(data_size)
in_data_blob_list = []
for _ in key_list:
tmp_batch_list = []
for _ in range(4):
dev_ptr, _ = acl.rt.malloc(data_size, 0)
acl.rt.memcpy(dev_ptr, data_size, acl.util.bytes_to_ptr(test_value.encode()), data_size, 1)
blob = Blob(dev_ptr, data_size)
tmp_batch_list.append(blob)
blob_list = DeviceBlobList(device_idx, tmp_batch_list)
in_data_blob_list.append(blob_list)
out_data_blob_list = []
for _ in key_list:
tmp_batch_list = []
for _ in range(4):
dev_ptr, _ = acl.rt.malloc(data_size, 0)
blob = Blob(dev_ptr, data_size)
tmp_batch_list.append(blob)
blob_list = DeviceBlobList(device_idx, tmp_batch_list)
out_data_blob_list.append(blob_list)
client.hetero().mset_d2h(key_list, in_data_blob_list)
client.hetero().mget_h2d(key_list, out_data_blob_list, 60000)
#include "datasystem/hetero_client.h"
#include <acl/acl.h>
ConnectOptions connectOptions = { .host = "127.0.0.1", .port = 31501 };
auto client = std::make_shared<DsClient>(connectOptions);
ASSERT_TRUE(client->Init().IsOk());
// Initialize the ACL interface.
int deviceId = 1;
aclInit(nullptr);
aclrtSetDevice(deviceId); // Bind the NPU card.
std::vector<std::string> keys = { "test-key1" };
std::vector<uint64_t> blobSize = { 10, 20 };
int blobNum = blobSize.size();
std::vector<DeviceBlobList> swapOutBlobList;
swapOutBlobList.resize(keys.size());
// Allocate the HBM memory and fill it in the swapOutBlobList.
for (size_t i = 0; i < swapOutBlobList.size(); i++) {
swapOutBlobList[i].deviceIdx = deviceId;
for (int j = 0; j < blobNum; j++) {
void *devPtr = nullptr;
int code = aclrtMalloc(&devPtr, blobSize[j], ACL_MEM_MALLOC_HUGE_FIRST);
// Copying Data to the Device Memory.
// aclrtMemcpy(devPtr, blobSize[j], value.data(), size, aclrtMemcpyKind::ACL_MEMCPY_HOST_TO_DEVICE)
ASSERT_EQ(code, 0);
Blob blob = { .pointer = devPtr, .size = blobSize[j] };
swapOutBlobList[i].blobs.emplace_back(std::move(blob));
}
}
Status status = client->Hetero()->MSetD2H(keys, swapOutBlobList);
ASSERT_TRUE(status.IsOk());
std::vector<DeviceBlobList> swapInBlobList;
swapInBlobList.resize(keys.size());
// Allocate the HBM memory and fill it in the swapInBlobList.
for (size_t i = 0; i < swapInBlobList.size(); i++) {
swapInBlobList[i].deviceIdx = deviceId;
for (int j = 0; j < blobNum; j++) {
void *devPtr = nullptr;
int code = aclrtMalloc(&devPtr, blobSize[j], ACL_MEM_MALLOC_HUGE_FIRST);
ASSERT_EQ(code, 0);
Blob blob = { .pointer = devPtr, .size = blobSize[j] };
swapInBlobList[i].blobs.emplace_back(std::move(blob));
}
}
std::vector<std::string> failedList;
status = client->Hetero()->MGetH2D(keys, swapInBlobList, failedList, 1);
ASSERT_TRUE(status.IsOk());
使用限制#
key 仅支持大写字母、小写字母、数字以及如下特定字符:
-_!@#%^*()+=:;。key 的最大长度为 255 字节。
DevPublish 和 DevSubscribe 需配套使用,不能和 DevMSet 及 DevMGet 混合使用。
DevMSet 及 DevMGet 配套使用,写入的 key 使用 DevDelete 及 DevLocalDelete 删除。
MSetD2H 和 MGetH2D 需要配套使用,MGetH2D 接口中只能传入 MSetD2H 接口写入的key。写入的 key 使用 delete 接口删除。