单体执行模式

相关源文件

以下文件用于生成此文档页面:

本文档介绍 IB-Robot 推理服务的 单体执行模式。此模式在单个进程内运行所有推理组件(预处理、推理和后处理),适用于配备板载高性能 GPU 的机器人。

范围:本文档涵盖单体模式特有的架构、数据流、配置和实现细节。有关整体推理管道架构和三组件设计,请参阅 推理架构。有关将计算卸载到云端节点的分布式模式,请参阅 分布式执行模式


概述

单体执行模式专为具有足够板载计算资源(如 NVIDIA RTX 4060 或更高)在本地运行整个推理管道的机器人设计。主要特点如下:

特性

描述

部署

单机、单进程执行

数据流

通过 Python 引用的零拷贝张量传递

延迟

绝对最小延迟(无序列化开销)

内存

所有数据保留在进程 RAM/VRAM 中

用例

具有板载高性能 GPU 的机器人

配置

robot_config YAML 中设置 execution_mode: "monolithic"

主要优势是 零序列化开销:张量通过引用在预处理器、推理引擎和后处理器之间传递,完全消除了网络和编组成本。

源码src/inference_service/README.en.md:18-24, src/inference_service/inference_service/lerobot_policy_node.py:8-13


架构

组件组合

在单体模式下,lerobot_policy_node 实例化一个 InferenceCoordinator,在单个进程中将三个核心组件链接在一起:

        graph TB
    subgraph "lerobot_policy_node 进程"
        direction TB

        ActionServer["ActionServer<br/>DispatchInfer"]

        Coordinator["InferenceCoordinator<br/>(coordinator.py)"]

        Pre["TensorPreprocessor<br/>(preprocessor.py)"]
        Infer["PureInferenceEngine<br/>(engine.py)"]
        Post["TensorPostprocessor<br/>(postprocessor.py)"]

        ActionServer -->|"execute_callback()"| Coordinator
        Coordinator -->|"1. preprocess(obs_frame)"| Pre
        Pre -->|"2. batch (torch.Tensor)<br/>零拷贝"| Infer
        Infer -->|"3. action (torch.Tensor)<br/>零拷贝"| Post
        Post -->|"4. action_np (numpy.ndarray)"| Coordinator
        Coordinator -->|"CoordinatorResult"| ActionServer
    end

    subgraph "外部通信"
        Dispatcher["action_dispatcher_node"]
        Sensors["ROS Topics<br/>/joint_states, /camera/*"]
    end

    Sensors -.->|"观测数据"| ActionServer
    ActionServer -->|"VariantsList"| Dispatcher
    

关键类与函数:- InferenceCoordinator.__call__() [inference_service/core/coordinator.py] - 编排三阶段管道 - TensorPreprocessor.__call__() - 将原始观测转换为归一化张量 - PureInferenceEngine.__call__() - 执行 GPU 推理 - TensorPostprocessor.__call__() - 将动作张量反归一化为物理命令

源码src/inference_service/inference_service/lerobot_policy_node.py:285-304, src/inference_service/README.en.md:6-13


零拷贝数据流

单体模式通过 零拷贝张量传递 实现最小延迟。所有张量在整个管道中保持为 GPU 内存中的 PyTorch 对象:

        sequenceDiagram
    participant AD as action_dispatcher_node
    participant AS as ActionServer<br/>(DispatchInfer)
    participant EC as _execute_monolithic()
    participant IC as InferenceCoordinator
    participant Pre as TensorPreprocessor
    participant Eng as PureInferenceEngine
    participant Post as TensorPostprocessor

    AD->>AS: DispatchInfer.Goal<br/>(obs_timestamp)
    AS->>EC: _dispatch_infer_callback()

    Note over EC: 从 StreamBuffers<br/>采样观测
    EC->>EC: _sample_obs_frame()

    EC->>IC: coordinator(obs_frame)

    IC->>Pre: preprocess(obs_frame)
    Note over Pre: 将图像/状态归一化<br/>为 torch.Tensor
    Pre-->>IC: batch (Dict[str, Tensor])

    IC->>Eng: forward(batch)
    Note over Eng: GPU 推理<br/>同一设备上零拷贝
    Eng-->>IC: action (Tensor)

    IC->>Post: postprocess(action)
    Note over Post: 反归一化为<br/>物理关节值
    Post-->>IC: action_np (ndarray)

    IC-->>EC: CoordinatorResult

    Note over EC: 从 action_np<br/>创建 VariantsList
    EC-->>AS: DispatchInfer.Result
    AS-->>AD: action_chunk
    

关键性能细节:在 TensorPreprocessorPureInferenceEngine 之间,张量在同一设备(GPU)上 通过引用传递。不发生 .cpu().to() 调用,消除了内存拷贝。

源码src/inference_service/inference_service/lerobot_policy_node.py:491-493, src/inference_service/README.en.md:22-24


与动作分发的集成

单体模式向 action_dispatcher_node 呈现 与分布式模式相同的 Action Server 接口,确保透明性:

        graph LR
    subgraph "action_dispatcher_node"
        Loop["控制循环<br/>(100Hz 定时器)"]
        Client["ActionClient<br/>DispatchInfer"]
        Queue["动作队列"]
    end

    subgraph "lerobot_policy_node (单体)"
        Server["ActionServer<br/>~/DispatchInfer"]
        Coord["InferenceCoordinator"]
    end

    subgraph "硬件"
        Control["ros2_control"]
    end

    Loop -->|"queue < watermark"| Client
    Client -->|"Goal"| Server
    Server -->|"_execute_monolithic()"| Coord
    Coord -->|"Result"| Server
    Server -->|"action_chunk"| Client
    Client -->|"入队"| Queue
    Queue -->|"100Hz pop"| Control
    

接口契约:- Actionibrobot_msgs/action/DispatchInfer - Goal 字段obs_timestamp (Time) - Result 字段action_chunk (VariantsList)、chunk_size (int)、success (bool)、 inference_latency_ms (float)

分发器无需知道推理节点是在单体模式还是分布式模式下运行。两种模式返回相同的 DispatchInfer.Result 结构。

源码src/action_dispatch/action_dispatch/action_dispatcher_node.py:205-220, src/inference_service/inference_service/lerobot_policy_node.py:422-489


配置

机器人配置 YAML

单体模式通过 execution_mode 参数在机器人配置文件中启用:

# src/robot_config/config/robots/so101_single_arm.yaml
control_modes:
  model_inference:
    inference:
      enabled: true
      execution_mode: "monolithic"  # 关键参数
      model: so101_act
      request_timeout: 5.0  # 单体模式下未使用,保留以保持一致性

启动参数

generate_monolithic_inference_node() 函数提取配置并构建节点:

参数

类型

描述

来源

checkpoint

str

策略检查点路径 (.pt 文件)

models[mod el_name].path

robo t_config_path

str

robot_config YAML 的路径 (契约源)

robot_config ._config_path

e xecution_mode

str

必须为 “monolithic”

inference.e xecution_mode

device

str

PyTorch 设备 (“auto”、 “cuda”、”cpu”)

硬编码 “auto”

` use_sim_time`

bool

ROS 仿真时间标志

启动参数 use_sim

源码src/robot_config/robot_config/launch_builders/execution.py:61-127


实现细节

节点初始化

lerobot_policy_node 单体模式的初始化序列:

        graph TB
    Init["__init__()"]

    Init --> LoadConfig["_load_policy_config()<br/>从 config.json 提取 input_features"]
    LoadConfig --> LoadContract["_load_contract()<br/>加载 robot_config YAML<br/>根据 input_features 过滤观测"]
    LoadContract --> SetupObs["_setup_observation_subscriptions()<br/>为过滤后的观测创建 StreamBuffers"]
    SetupObs --> CheckMode{"execution_mode?"}

    CheckMode -->|"monolithic"| SetupMono["_setup_monolithic_mode()"]
    CheckMode -->|"distributed"| SetupDist["_setup_distributed_mode()"]

    SetupMono --> CreateCoord["InferenceCoordinator(policy_path, device)"]
    CreateCoord --> ExtractMeta["从协调器提取<br/>policy_type, chunk_size"]
    ExtractMeta --> SetupAction["_setup_action_server()<br/>创建 DispatchInfer server"]

    SetupAction --> Ready["节点就绪"]
    

关键代码路径src/inference_service/inference_service/lerobot_policy_node.py:128-179

观测过滤

一个关键优化:节点 仅订阅模型所需的观测

# lerobot_policy_node.py:216-227
all_obs_specs = [s for s in iter_specs(self._contract) if not s.is_action]

if self._required_inputs:
    self._obs_specs = [
        s for s in all_obs_specs
        if s.key in self._required_inputs
    ]

这允许单个 robot_config.yaml``(包含所有可能的观测)支持具有不同观测需求的多个模型。例如:- **ACT 模型**:需要 ``images.topimages.wristobservation.state - Diffusion 模型:仅需要 images.topobservation.state

节点读取模型的 config.json line 189-203,提取 input_features,并相应过滤订阅。

源码src/inference_service/inference_service/lerobot_policy_node.py:180-241

推理执行

由于协调器抽象,_execute_monolithic() 函数非常简单:

# lerobot_policy_node.py:491-493
def _execute_monolithic(self, obs_frame: Dict[str, Any]) -> CoordinatorResult:
    """在单体模式下执行推理(零拷贝)。"""
    return self._coordinator(obs_frame)

obs_frame 是从 StreamBuffers 采样的原始观测字典(图像为 numpy 数组,状态为列表)。协调器处理整个管道并返回 CoordinatorResult dataclass:

@dataclass
class CoordinatorResult:
    action: torch.Tensor  # 最终动作张量 (chunk_size, action_dim)
    chunk_size: int  # 块中的动作数量
    total_latency_ms: float  # 端到端延迟
    preprocess_latency_ms: float  # 预处理时间
    inference_latency_ms: float  # GPU 前向传播时间
    postprocess_latency_ms: float  # 后处理时间
    policy_type: str  # "ACT"、"Diffusion" 等

源码src/inference_service/inference_service/lerobot_policy_node.py:491-493


性能特征

延迟分解

RTX 4060 上 ACT 策略的典型延迟(chunk_size=100):

阶段

延迟

备注

预处理

~8-12ms

图像调整大小、归一化、拼接

推理

~15-25ms

GPU 上的模型前向传播

后处理

~2-5ms

反归一化、裁剪

总计

~25-40ms

组件间通信零开销

内存效率

所有张量在整个管道中驻留在 GPU 上:

观测数据 (CPU)
    ↓ (拷贝到 GPU)
TensorPreprocessor (GPU)
    ↓ (引用)
PureInferenceEngine (GPU)
    ↓ (引用)
TensorPostprocessor (GPU)
    ↓ (拷贝到 CPU 用于 ROS 发布)
动作输出 (CPU)

仅发生 两次设备传输:输入到 GPU 和输出到 CPU。无中间 .cpu() 调用。

源码src/inference_service/README.en.md:22-24


与分布式模式比较

方面

单体

分布式

进程数

1 (lerobot_policy_node)

2(边缘代理 + 云端 推理)

网络流量

每次推理约 500KB (预处理张量)

延迟

25-40ms

40-80ms(增加网络 + 序列化)

GPU 位置

板载机器人

远程云端服务器

硬件要求

板载高端 GPU

低功耗 CPU + LAN 到 GPU 服务器

复杂度

简单(单节点)

较高(分布式协调)

故障模式

本地进程崩溃

网络超时、 云端节点故障

何时使用单体模式:- 机器人有板载 GPU(RTX 3060 Ti 或更高)- 需要绝对最小延迟(<30ms)- 部署简单性是优先考虑 - 网络可靠性不确定

何时使用分布式模式:见 分布式执行模式

源码src/inference_service/README.en.md:18-37, src/inference_service/inference_service/lerobot_policy_node.py:8-27