推理服务概述
相关源文件
以下文件用作生成此 wiki 页面的上下文:
推理服务是 IB-Robot 的 AI 执行子系统,负责加载 LeRobot 策略模型并将实时传感器观测转换为机器人动作。它实现了契约驱动架构,通过与数据集转换流水线相同的数据处理逻辑,确保训练-部署的一致性。
关于训练数据准备,请参阅 数据集转换 (bag_to_lerobot)。关于动作执行和时间平滑,请参阅 动作分发。
来源:src/inference_service/README.en.md:1-27, README.md:33-35
系统目的与范围
推理服务包(inference_service)连接训练好的 LeRobot 策略与实时机器人控制。其核心职责包括:
模型加载:加载 ACT、Diffusion Policy、VLA 及其他 LeRobot 兼容的检查点文件
观测处理:使用契约定义的转换规则,将 ROS 2 传感器流(相机、关节状态)转换为模型就绪的张量
GPU 推理:以最小延迟执行策略前向传播
动作生成:将输出张量转换为
action_dispatch可执行的动作命令部署灵活性:支持单机(零拷贝)和分布式(边-云)执行
该流水线**不**处理动作执行、时间平滑或电机控制——这些是 动作分发 系统的职责。
来源:src/inference_service/README.en.md:1-13, src/inference_service/inference_service/lerobot_policy_node.py:1-34
三组件架构
推理服务遵循严格的关注点分离设计,将 AI 执行工作流分解为三个独立、可组合的组件:
组件图
graph TB
subgraph "Core Components (inference_service.core)"
Preprocessor["TensorPreprocessor<br/>(CPU)"]
Engine["PureInferenceEngine<br/>(GPU)"]
Postprocessor["TensorPostprocessor<br/>(CPU)"]
end
subgraph "ROS Integration Layer"
PolicyNode["LeRobotPolicyNode"]
Coordinator["InferenceCoordinator"]
end
subgraph "External Systems"
Contract["robot_config.yaml<br/>(Contract)"]
Dispatch["action_dispatcher_node"]
Sensors["ROS Sensors<br/>(cameras, joint_states)"]
end
Contract --> PolicyNode
Sensors --> PolicyNode
PolicyNode --> Coordinator
Coordinator --> Preprocessor
Preprocessor --> Engine
Engine --> Postprocessor
Postprocessor --> PolicyNode
PolicyNode --> Dispatch
来源:src/inference_service/README.en.md:8-13, src/inference_service/inference_service/lerobot_policy_node.py:69-77
TensorPreprocessor
将原始 ROS 观测字典转换为模型就绪的 PyTorch 张量。此组件是无状态的,运行在 CPU 上。
主要职责:
应用契约定义的图像缩放、归一化和编码转换
堆叠时间历史(用于需要观测窗口的模型)
应用模型
config.json中的归一化统计量与训练一致地处理灰度、深度和 RGB 图像编码
实现:inference_service/core/preprocessor.py
来源: src/dataset_tools/dataset_tools/bag_to_lerobot.py:9-19, src/inference_service/README.en.md:8
PureInferenceEngine
完全独立于 ROS 的无状态 GPU 执行引擎。它加载策略检查点并执行前向传播。
主要特性:
零 ROS 依赖:可在纯 Python 脚本中导入使用
设备无关:自动处理 CPU/CUDA 张量放置
策略类型检测:读取
config.json确定块大小、动作维度和策略架构(ACT、Diffusion 等)
实现:inference_service/core/engine.py
来源:src/inference_service/README.en.md:9, src/inference_service/inference_service/lerobot_policy_node.py:69-74
TensorPostprocessor
使用模型的归一化统计量,将输出动作张量反归一化为物理关节位置/速度。
主要职责:
应用逆归一化:
action_physical = action_normalized * std + mean根据契约动作规范验证输出张量形状
如已配置,将值钳位到安全范围
实现:inference_service/core/postprocessor.py
来源:src/inference_service/README.en.md:10, src/inference_service/inference_service/lerobot_policy_node.py:76
执行模式
推理服务支持两种部署架构,可通过 robot_config.yaml 中的 execution_mode 参数选择。两种模式向下游系统暴露**相同**的 DispatchInfer Action 接口。
模式对比表
方面 |
单体模式 |
分布式模式 |
|---|---|---|
目标硬件 |
配备板载高性能 GPU 的机器人 |
轻量级机器人 (仅 CPU 的边缘设备) |
组件位置 |
所有三个组件 在同一进程中 |
Preprocessor+Postprocessor 在边缘端,Engine 在云端 |
张量传递 |
零拷贝(进程内引用) |
通过 ROS 2 话题序列化 |
延迟 |
最小(总计约 10-50ms) |
较高(由于网络, 约 50-200ms) |
配置 |
|
|
需要云端节点 |
否 |
是 ( |
来源:src/inference_service/README.en.md:14-48, README.md:15-16
单体模式(单进程零拷贝)
在单体模式下,LeRobotPolicyNode 实例化一个 InferenceCoordinator,在单个进程中串联所有三个组件。
graph TB
subgraph "lerobot_policy_node Process"
ActionServer["DispatchInfer<br/>Action Server"]
Coord["InferenceCoordinator"]
Pre["TensorPreprocessor"]
Eng["PureInferenceEngine"]
Post["TensorPostprocessor"]
ActionServer -->|"obs_frame dict"| Coord
Coord -->|"zero-copy"| Pre
Pre -->|"batch tensor"| Eng
Eng -->|"action tensor"| Post
Post -->|"denormalized"| Coord
Coord -->|"VariantsList"| ActionServer
end
Dispatch["action_dispatcher_node"] -->|"Goal"| ActionServer
ActionServer -->|"Result"| Dispatch
关键代码流程:
LeRobotPolicyNode._dispatch_infer_callback()接收DispatchInfer.Goallerobot_policy_node.py:422-457调用
self._execute_monolithic(obs_frame)lerobot_policy_node.py:491-493委托给
self._coordinator(obs_frame)``(一个 ``InferenceCoordinator实例)`lerobot_policy_node.py:291-299 <https://gitcode.com/openeuler/IB_Robot/blob/9e382ea2320c3260b03e9c838696f8ac89eb8944/lerobot_policy_node.py#L291-L299>`__协调器串联:
preprocessor(obs) → engine(batch) → postprocessor(action)inference_service/core/init.py返回包含动作张量和延迟指标的
CoordinatorResult
来源:src/inference_service/README.en.md:18-24, src/inference_service/inference_service/lerobot_policy_node.py:285-299, src/inference_service/inference_service/lerobot_policy_node.py:491-493
分布式模式(设备-边缘-云端)
在分布式模式下,LeRobotPolicyNode 在机器人(边缘端)充当**异步代理**,而单独的 pure_inference_node 运行在云端 GPU 服务器上。
graph TB
subgraph "Edge Device (Robot CPU)"
EdgeNode["lerobot_policy_node<br/>(Asynchronous Proxy)"]
EdgePre["TensorPreprocessor"]
EdgePost["TensorPostprocessor"]
EdgeNode --> EdgePre
EdgePost --> EdgeNode
end
subgraph "Cloud Server (GPU)"
CloudNode["pure_inference_node"]
CloudEngine["PureInferenceEngine"]
CloudNode --> CloudEngine
CloudEngine --> CloudNode
end
Dispatch["action_dispatcher_node"] -->|"DispatchInfer Goal"| EdgeNode
EdgeNode -->|"Result"| Dispatch
EdgePre -->|"/preprocessed/batch<br/>(VariantsList)"| CloudNode
CloudNode -->|"/inference/action<br/>(VariantsList)"| EdgePost
分布式执行流程:
边缘节点接收
DispatchInfer.Goallerobot_policy_node.py:422-457调用
self._execute_distributed(obs_frame, inference_id)lerobot_policy_node.py:495-554预处理器本地运行:
batch = self._preprocessor(obs_frame)lerobot_policy_node.py:511-513边缘节点将批次发布到
/preprocessed/batch,带有唯一的request_idlerobot_policy_node.py:516-524边缘节点**阻塞在 threading.Event** 上等待云端响应 lerobot_policy_node.py:526-530
云端节点(
pure_inference_node)接收批次,运行推理,发布到/inference/action边缘节点的
_cloud_result_callback()接收结果,按request_id匹配,设置 Event lerobot_policy_node.py:556-584边缘后处理器运行:
action = self._postprocessor(cloud_result["action"])lerobot_policy_node.py:540-542返回
CoordinatorResult给 action server
关键架构说明:边缘节点的 action 回调在等待云端时**挂起其线程**。这是有意为之——它允许 action server 自然地序列化推理请求,防止多个并发云端调用。
来源:src/inference_service/README.en.md:25-48, src/inference_service/inference_service/lerobot_policy_node.py:306-351, src/inference_service/inference_service/lerobot_policy_node.py:495-554
契约驱动的观测过滤
IB-Robot 的一个关键架构原则是,robot_config.yaml 作为所有硬件能力的**唯一事实来源**,而各个模型定义其特定的观测需求。推理服务协调这两个来源。
观测过滤逻辑
graph TB
subgraph "robot_config.yaml"
AllObs["contract.observations:<br/>- images.top<br/>- images.wrist<br/>- images.front<br/>- observation.state"]
end
subgraph "Model config.json"
ModelInputs["input_features:<br/>- images.top<br/>- observation.state"]
end
subgraph "lerobot_policy_node"
Filter["Observation Filter"]
Subs["ROS Subscriptions"]
end
AllObs --> Filter
ModelInputs --> Filter
Filter -->|"Required only"| Subs
Subs -->|"Subscribe to:<br/>/camera/top/image_raw<br/>/joint_states"| Topics["ROS Topics"]
实现:lerobot_policy_node.py:156-159 加载模型的 config.json 提取 input_features,然后 lerobot_policy_node.py:205-230 过滤契约观测:
# Filter by model's required inputs
if self._required_inputs:
self._obs_specs = [
s for s in all_obs_specs
if s.key in self._required_inputs
]
此设计允许:
单一 robot_config.yaml 支持具有不同观测需求的多个模型
无需手动配置话题 每个模型
自动错误检测 如果模型需要机器人未提供的观测
来源: src/inference_service/inference_service/lerobot_policy_node.py:180-203, src/inference_service/inference_service/lerobot_policy_node.py:205-240
ROS 接口规范
推理服务向动作分发器暴露一个**稳定的 Action 接口**。此接口在单体模式和分布式模式下完全相同。
DispatchInfer Action 接口
Action 定义:ibrobot_msgs/action/DispatchInfer
Goal 字段:
builtin_interfaces/Time obs_timestamp # 采样观测的时间戳
string inference_id # 可选的请求标识符
Result 字段:
VariantsList action_chunk # 序列化的动作张量
int32 chunk_size # 块中的动作数量
bool success # 推理是否成功
string message # 失败时的错误消息
float64 inference_latency_ms # 总延迟(毫秒)
Feedback:未使用(推理通常太快,无法提供有意义的进度更新)
来源:src/action_dispatch/README.en.md:344-351, src/inference_service/inference_service/lerobot_policy_node.py:56
与动作分发器的通信流程
sequenceDiagram
participant Dispatch as action_dispatcher_node
participant Policy as lerobot_policy_node
participant Smoother as TemporalSmoother
Note over Dispatch: Queue length < watermark (20)
Dispatch->>Policy: DispatchInfer.Goal<br/>(obs_timestamp)
activate Policy
Policy->>Policy: Sample obs_frame at timestamp
Policy->>Policy: Execute inference<br/>(monolithic or distributed)
Policy-->>Dispatch: DispatchInfer.Result<br/>(action_chunk, chunk_size)
deactivate Policy
Dispatch->>Smoother: Update plan with new chunk
Smoother->>Smoother: Apply temporal smoothing
Note over Dispatch: Pop actions at 100Hz
关键时间戳:
obs_timestamp:用于从StreamBuffer实例采样观测 lerobot_policy_node.py:399-420确保时间一致性,即使推理耗时不同
来源:src/action_dispatch/README.en.md:79-130, src/inference_service/inference_service/lerobot_policy_node.py:422-489
观测采样与 StreamBuffer
推理节点使用来自 robot_config.contract_utils 的 StreamBuffer 实例维护最近传感器观测的**滑动窗口**。这允许在动作分发器请求的精确时间戳处采样观测。
StreamBuffer 架构
graph TB
subgraph "Per-Observation StreamBuffer"
Buffer["Circular Buffer<br/>(recent messages)"]
Policy["Resample Policy<br/>(hold/asof/drop)"]
Tol["Tolerance (asof_tol_ms)"]
end
subgraph "Observation Sources"
Camera["/camera/top/image_raw"]
Joints["/joint_states"]
end
Camera -->|"Push with timestamp"| Buffer
Joints -->|"Push with timestamp"| Buffer
Dispatcher["action_dispatcher_node"] -->|"obs_timestamp_ns"| Sample["sample(timestamp)"]
Sample --> Policy
Policy --> Buffer
Buffer -->|"Interpolated value"| Frame["obs_frame dict"]
重采样策略(来自契约):
hold:返回时间戳前最近的值(零阶保持)asof:如果在asof_tol_ms内则返回值,否则返回 Nonedrop:仅返回精确匹配,否则返回 None
实现:lerobot_policy_node.py:381-398 处理观测回调,将值推入缓冲区。lerobot_policy_node.py:399-420 在请求的时间戳从所有缓冲区采样。
来源: src/robot_config/robot_config/contract_utils.py:74-90, src/inference_service/inference_service/lerobot_policy_node.py:381-420
启动集成
推理服务由 robot_config 启动系统根据控制模式配置动态启动。
启动参数流程
graph TB
subgraph "robot_config.yaml"
ControlMode["control_modes.model_inference:<br/> inference.enabled: true<br/> inference.execution_mode: monolithic<br/> inference.model: act_policy_1"]
ModelDef["models.act_policy_1:<br/> repo_id: /path/to/checkpoint<br/> device: cuda<br/> frequency: 10.0"]
end
subgraph "Launch System"
LaunchFile["robot.launch.py"]
ExecBuilder["execution.py<br/>generate_inference_node()"]
end
subgraph "Generated Nodes"
PolicyNode["lerobot_policy_node<br/>(parameters from config)"]
end
ControlMode --> LaunchFile
ModelDef --> LaunchFile
LaunchFile --> ExecBuilder
ExecBuilder --> PolicyNode
PolicyNode -->|"loads"| Contract["robot_config.yaml<br/>(for observations)"]
PolicyNode -->|"loads"| Checkpoint["Policy checkpoint<br/>(for model config)"]
关键启动构建函数:robot_config/launch_builders/execution.py:20-58 中的 generate_inference_node()
自动检测逻辑:
检查控制模式是否有
inference.enabled: true从
inference.model字段解析模型引用提取模型配置(repo_id、device、frequency)
生成带有契约路径的节点用于观测过滤
如果
execution_mode: distributed,还在云端启动pure_inference_node
来源: src/robot_config/robot_config/launch_builders/execution.py:20-58, src/robot_config/launch/robot.launch.py:183-196
健康监控与诊断
LeRobotPolicyNode 发布周期性健康状态,以支持运行时监控和调试。
发布话题
话题 |
消息类型 |
频率 |
用途 |
|---|---|---|---|
|
|
1 Hz |
节点健康状态、 错误消息、 推理计数 |
` /actions /{name}` |
|
按需 |
发布动作块 用于调试 |
健康状态级别:
OK (0):推理正常运行
WARN (1):超过预期周期 2 倍时间未推理(可能的分发器问题)
ERROR (2):致命错误(推理失败、超时)
诊断键值对 lerobot_policy_node.py:632-638:
inference_count:已完成的推理总数model_type:策略类型(act、diffusion 等)policy_type:架构变体chunk_size:动作块大小execution_mode:单体或分布式
来源: src/inference_service/inference_service/lerobot_policy_node.py:615-640
配置参数
LeRobotPolicyNode 接受以下 ROS 参数,通常通过 robot_config.yaml 设置:
参数 |
类型 |
默认值 |
描述 |
|---|---|---|---|
|
string |
|
节点名称后缀 |
|
string |
|
完整节点名称 |
|
string |
|
模型类型标识符 |
|
string |
必需:策略检查点 目录路径 |
|
|
string |
|
|
|
string |
必需:用于契约的 robot_config.yaml 路径 |
|
|
string |
|
计算设备 ( |
|
float |
10.0 |
预期推理频率(Hz) |
`` use_header_time`` |
bool |
true |
使用消息头时间戳 vs. 接收时间 |
` execution_mode` |
string |
` monolithic` |
|
`` request_timeout`` |
float |
5.0 |
分布式云端响应超时 (秒) |
|
string |
|
分布式模式下发布批次 的话题 |
|
string |
|
接收云端结果的话题 |
来源: src/inference_service/inference_service/lerobot_policy_node.py:88-103, src/inference_service/inference_service/lerobot_policy_node.py:648-669
总结
推理服务是一个契约驱动、执行模式无关的 AI 运行时,具有以下特点:
过滤观测:根据全局契约中的模型需求过滤观测
采样时间戳数据:使用 StreamBuffer 确保时间一致性
支持双重部署:板载 GPU 的零拷贝单体模式,轻量级机器人的分布式边-云模式
暴露稳定 Action 接口:与 action_dispatch 无缝集成
复用数据集转换逻辑:消除训练-服务偏差
三组件架构(Preprocessor、Engine、Postprocessor)允许相同代码同时服务于研究(纯 Python 脚本)和部署(ROS 2 节点)场景。