推理架构

相关源文件

以下文件用于生成此文档页面:

目的与范围

本文档介绍 inference_service 包的三组件架构,该架构为在物理机器人上运行端到端机器学习策略提供核心 AI 执行框架。该架构将推理管道解耦为纯 Python、与 ROS 无关的组件,实现灵活的部署策略。

有关此架构启用的特定执行模式(单体与分布式)信息,请参阅 单体执行模式分布式执行模式。有关节点实现细节,请参阅 策略节点。有关为此管道提供数据的协议转换层,请参阅 协议转换 (tensormsg)

源码: src/inference_service/README.en.md:1-13, src/inference_service/README.md:1-15


设计原则

组合优于继承

推理管道遵循 基于组合的架构,而非继承层次结构。系统被分解为三个独立、松耦合的组件,可根据部署需求以不同方式组合。每个组件有单一、明确定义的职责,可独立测试、优化和部署。

源码: src/inference_service/README.en.md:5-6, src/inference_service/README.md:7-8

与 ROS 无关的核心

所有核心推理组件位于 inference_service.core 模块,零 ROS 依赖。它们仅操作 PyTorch 张量和标准 Python 数据结构。这种分离提供了几个关键优势:

优势

描述

离线测试

组件可在无任何 ROS 环境的情况下 通过 pytest 验证

部署灵活性

相同代码可在单体或分布式配置中运行

框架独立性

核心逻辑可移植到其他机器人框架 或独立应用程序

性能隔离

纯张量操作不受 ROS 通信开销影响

源码: src/inference_service/README.en.md:7-12, src/inference_service/README.md:9-14


三组件架构

组件概述

        graph LR
    subgraph "ROS 层"
        SENSOR["ROS 传感器数据<br/>(图像, JointState)"]
        CTRL["ROS 控制命令<br/>(JointTrajectory)"]
    end

    subgraph "inference_service.core (纯 Python)"
        PREPROC["TensorPreprocessor"]
        ENGINE["PureInferenceEngine"]
        POSTPROC["TensorPostprocessor"]
    end

    SENSOR -->|"ROS → Python"| PREPROC
    PREPROC -->|"Dict[str, Tensor]"| ENGINE
    ENGINE -->|"InferenceResult"| POSTPROC
    POSTPROC -->|"Python → ROS"| CTRL

    PREPROC -.->|"裁剪、归一化"| PREPROC
    ENGINE -.->|"policy.select_action()"| ENGINE
    POSTPROC -.->|"反归一化"| POSTPROC
    

图:三组件管道与 ROS 边界分离

三个组件形成顺序处理管道:

  1. TensorPreprocessor — 将原始 ROS 传感器数据转换为归一化的 PyTorch 张量

  2. PureInferenceEngine — 执行策略网络以生成动作

  3. TensorPostprocessor — 将网络输出反归一化为物理控制命令

源码: src/inference_service/README.en.md:7-10, src/inference_service/README.md:9-12


组件 1:TensorPreprocessor

职责

TensorPreprocessor 组件处理将异构 ROS 传感器数据转换为适合策略网络输入的标准化张量批次。此组件是 CPU 密集型的,执行:

操作

描述

数据提取

从 ROS 消息缓冲区读取值 (图像、关节位置)

图像裁剪

应用外设配置中的裁剪参数

归一化

缩放像素值(0-255 → 0.0-1.0) 和关节位置到 [-1, 1]

批次组装

构建符合策略期望的命名张量字典

与契约系统集成

预处理器根据 robot_config 中的 契约定义见契约系统)运行。它:

  • 仅订阅契约中声明的观测

  • 应用为每种模态指定的归一化统计信息

  • 构建与策略 input_features 匹配的张量键

数据流

        graph TB
    subgraph "输入: ROS 消息缓冲区"
        IMG_TOP["StreamBuffer:<br/>images.top"]
        IMG_WRIST["StreamBuffer:<br/>images.wrist"]
        JOINTS["StreamBuffer:<br/>observation.state"]
    end

    subgraph "TensorPreprocessor 操作"
        READ["从缓冲区读取最新数据"]
        CROP["裁剪图像<br/>(外设配置)"]
        NORM_IMG["归一化像素<br/>[0,255] → [0.0,1.0]"]
        NORM_JOINT["归一化关节<br/>契约中的统计信息"]
        BATCH["组装批次字典"]
    end

    subgraph "输出: 张量批次"
        OUT["Dict[str, Tensor]:<br/>- observation.images.top<br/>- observation.images.wrist<br/>- observation.state"]
    end

    IMG_TOP --> READ
    IMG_WRIST --> READ
    JOINTS --> READ
    READ --> CROP
    CROP --> NORM_IMG
    READ --> NORM_JOINT
    NORM_IMG --> BATCH
    NORM_JOINT --> BATCH
    BATCH --> OUT
    

图:TensorPreprocessor 数据转换管道

源码: src/inference_service/README.en.md:8, src/inference_service/README.md:10


组件 2:PureInferenceEngine

职责

PureInferenceEngine 是一个 完全无状态、与 ROS 无关的 GPU 执行引擎。它封装策略网络并实现纯函数接口:

batch: Dict[str, Tensor] → InferenceResult(action: Tensor, ...)

无状态设计

引擎在推理调用之间 不维护任何内部状态。每次调用独立:

  • 无 episode 历史跟踪

  • 无超出策略自身管理的时间依赖

  • 无副作用

此设计支持:- 分布式模式下的并发推理请求 - 简单的错误恢复(失败的推理不会破坏状态) - 测试的确定性行为

策略加载

引擎使用 LeRobot 库的标准格式加载预训练策略检查点。它从检查点元数据解析策略类型和配置。

        graph TB
    subgraph "PureInferenceEngine 初始化"
        PATH["policy_path:<br/>/path/to/checkpoint.pt"]
        RESOLVE["resolve_device()<br/>cuda/cpu 检测"]
        LOAD["torch.load()<br/>策略检查点"]
        EXTRACT["提取:<br/>- policy_type<br/>- chunk_size<br/>- input_features"]
    end

    subgraph "运行时: __call__ 方法"
        INPUT["batch:<br/>Dict[str, Tensor]"]
        SELECT["policy.select_action(batch)"]
        RESULT["InferenceResult:<br/>- action: Tensor<br/>- chunk_size: int"]
    end

    PATH --> RESOLVE
    RESOLVE --> LOAD
    LOAD --> EXTRACT

    INPUT --> SELECT
    SELECT --> RESULT
    

图:PureInferenceEngine 生命周期与调用

设备管理

引擎包含自动选择适当计算设备的设备解析逻辑:

设备参数

行为

"auto"

优先使用 CUDA(如可用),回退到 CPU

"cuda"

强制 GPU,如不可用则抛出错误

"cpu"

强制 CPU 执行

源码: src/inference_service/README.en.md:9, src/inference_service/README.md:11, src/inference_service/inference_service/pure_inference_node.py:30


组件 3:TensorPostprocessor

职责

TensorPostprocessor 执行预处理器的逆变换,将策略网络的归一化动作张量转换回物理控制命令:

操作

描述

反归一化

将动作值从 [-1, 1] 缩放到 物理关节限制

动作块提取

提取完整动作块 (通常为 100 个时间步)

ROS 消息构建

构建适合动作分发的 ROS 消息

输出格式

后处理器输出与 action_dispatcher_node 兼容的数据(见动作分发):

  • 动作块:用于时序平滑的多时间步动作序列

  • 物理单位:以弧度或米为单位的关节位置

  • 契约对齐:动作名称与契约的动作规格匹配

源码: src/inference_service/README.en.md:10, src/inference_service/README.md:12


管道数据流

张量传递语义

三个组件通过定义良好的张量字典通信。在 单体模式 下,张量通过引用传递(零拷贝)。在 分布式模式 下,张量通过 tensormsg 协议序列化(见协议转换)。

        graph TB
    subgraph "各阶段数据结构"
        direction TB

        IN_DATA["输入: ROS 消息<br/>- sensor_msgs/Image<br/>- sensor_msgs/JointState"]

        PREP_OUT["预处理后<br/>Dict[str, Tensor]:<br/>- 'observation.images.top': [1,3,480,640]<br/>- 'observation.state': [1,7]"]

        ENG_OUT["推理后<br/>InferenceResult:<br/>- action: [chunk_size, action_dim]<br/>- chunk_size: int"]

        POST_OUT["后处理后<br/>- 物理关节位置<br/>- 准备用于 action_dispatcher"]
    end

    IN_DATA -->|"TensorPreprocessor"| PREP_OUT
    PREP_OUT -->|"PureInferenceEngine"| ENG_OUT
    ENG_OUT -->|"TensorPostprocessor"| POST_OUT
    

图:管道中的数据结构转换

请求-响应匹配

在分布式模式下,系统使用 _request_id 机制将异步推理响应与其原始请求匹配。pure_inference_node 透传此标识符:

# 从输入批次提取请求 ID
request_id = batch.pop("task.request_id", None)

# ... 执行推理 ...

# 在输出中包含请求 ID
if request_id is not None:
    out_batch["action.request_id"] = [request_id]

源码: src/inference_service/inference_service/pure_inference_node.py:88-98


与 TensorMsg 协议集成

双向转换

tensormsg 包(见协议转换)提供启用分布式模式操作的序列化层。TensorMsgConverter 类处理:

方法

用途

to_variant(batch: Dict)

将 Python 张量字典转换为 ROS VariantsList 消息

from_varian t(msg: VariantsList, device)

将 ROS 消息反序列化回 指定设备上的张量字典

在纯推理节点中的使用

pure_inference_node 演示了此集成:

# 反序列化输入批次
batch = TensorMsgConverter.from_variant(msg, self._engine._device)

# 运行推理
result = self._engine(batch)

# 序列化输出
out_batch = {"action": result.action}
out_msg = TensorMsgConverter.to_variant(out_batch)
self._pub.publish(out_msg)

此抽象允许核心组件保持与 ROS 无关,同时启用网络透明的张量通信。

源码: src/inference_service/inference_service/pure_inference_node.py:86-103, src/tensormsg/package.xml:1-26


总结

三组件架构实现了严格的关注点分离:

组件

依赖

主要职责

` TensorPreprocessor`

无(纯 Python)

ROS 数据 → 归一化张量

`` PureInferenceEngine``

仅 PyTorch

无状态策略执行

`` TensorPostprocessor``

无(纯 Python)

反归一化张量 → ROS 命令

此设计使相同的核心逻辑能够通过不同的组合策略支持单体(单机)和分布式(设备-边缘-云端)部署,详见 单体执行模式分布式执行模式

源码: src/inference_service/README.en.md:1-74, src/inference_service/README.md:1-76