IB-Robot 概述

相关源文件

以下文件用于生成此文档页面:

目的与范围

本页面提供 IB-Robot 系统架构、设计原则和核心组件的高层次介绍。它解释了 IB-Robot 如何将 Hugging Face LeRobot 机器学习生态系统与 ROS 2 机器人中间件连接起来,以实现端到端的具身智能工作流程。

有关配置系统的详细文档,请参阅 Configuration System (robot_config)。有关推理管道的详细信息,请参阅 Inference Pipeline。有关数据收集工作流程,请参阅 Data Pipeline

来源: README.md:1-192, docs/architecture.md:1-313

什么是 IB-Robot?

IB-Robot(Intelligence Boom Robot)是一个集成开发框架,连接了两个不同的生态系统:

  • LeRobot: Hugging Face 的机器人学习框架,用于训练 AI 策略(ACT、Diffusion Policy、VLA 模型)

  • ROS 2 Humble: 行业标准的机器人中间件,用于硬件控制、感知和运动规划

该系统提供了从专家演示收集、策略训练到实际部署的完整工具链,消除了机器学习实验与机器人生产系统之间的传统鸿沟。

主要仓库结构:

IB_Robot/
├── libs/lerobot/          # [子模块] LeRobot 训练框架
├── src/                   # [子模块] 核心 ROS 2 包
│   ├── robot_config/      # 配置中心(单一数据源)
│   ├── tensormsg/         # ROS↔Tensor 协议转换器
│   ├── inference_service/ # 策略推理节点
│   ├── action_dispatch/   # 动作执行与平滑
│   ├── dataset_tools/     # 数据收集与转换
│   └── so101_hardware/    # 硬件驱动
├── scripts/setup.sh       # 环境初始化
└── scripts/build.sh       # 支持 mixin 的构建系统

来源: README.md:1-47, src/README.md:1-103


集成挑战

IB-Robot 解决了机器学习与机器人控制范式之间的根本不兼容问题:

概念差异

维度

LeRobot (机器 学习世界)

ROS 2 (控制世 界)

IB-Robot 解决方案

数据单元

Episode (回合)

Topic stream (话题流)

契约驱动的转换

时间模型

离散步数

连续实时

带重采样的 StreamBuffer

控制

端到端策略

分层规划

双模式架构

部署

Python 脚本

分布式节点

Action Server 集成

来源: README.md:11-16, docs/architecture.md:53-77

数据格式桥接

        graph LR
    subgraph "LeRobot Training Space"
        EP["Episode<br/>(parquet + video)"]
        FEAT["Features Dict<br/>observation.state: [N,7]<br/>observation.images.top: [N,H,W,3]<br/>action: [N,6]"]
    end

    subgraph "IB-Robot Bridge Layer"
        CONTRACT["robot_config YAML<br/>Contract Definition"]
        BAG2LR["bag_to_lerobot.py"]
        TENSORMSG["tensormsg Converter"]
    end

    subgraph "ROS 2 Runtime Space"
        BAG["ROS2 Bag<br/>(MCAP format)"]
        TOPICS["Topic Streams<br/>/joint_states<br/>/camera/top/image_raw<br/>/arm_controller/commands"]
    end

    TOPICS -->|"record"| BAG
    BAG -->|"decode_value()"| BAG2LR
    CONTRACT -.->|"defines mappings"| BAG2LR
    BAG2LR -->|"feature_from_spec()"| FEAT
    FEAT --> EP

    EP -.->|"trained policy"| POLICY["Policy Checkpoint"]
    POLICY --> TENSORMSG
    TOPICS -->|"subscribe"| TENSORMSG
    CONTRACT -.->|"defines mappings"| TENSORMSG
    TENSORMSG -->|"from_variant()"| FEAT
    

tensormsg 包通过基于注册表的解码器系统实现双向转换。robot_config 契约作为所有映射规范的单一数据源。

来源: README.md:29-41, src/dataset_tools/dataset_tools/bag_to_lerobot.py:1-73, src/robot_config/robot_config/contract_utils.py:1-100


架构原则

1. 单一数据源模式

所有系统配置均源自 robot_config YAML 文件。这消除了数据收集、训练和部署之间的配置漂移:

        graph TB
    YAML["robot_config YAML<br/>so101_single_arm.yaml"]

    subgraph "Configuration Propagation"
        YAML --> CONTRACT["Contract Section<br/>observations + actions"]
        YAML --> JOINTS["Joint Definitions<br/>limits + controllers"]
        YAML --> PERIPH["Peripheral Config<br/>cameras + transforms"]
        YAML --> MODES["Control Modes<br/>teleop | model_inference | moveit"]
    end

    subgraph "Runtime Consumers"
        CONTRACT --> RECORDER["episode_recorder<br/>creates subscriptions"]
        CONTRACT --> CONVERTER["bag_to_lerobot<br/>decode + resample"]
        CONTRACT --> INFERENCE["lerobot_policy_node<br/>filter observations"]

        JOINTS --> CONTROL["ros2_control<br/>hardware interface"]
        PERIPH --> CAMERAS["Camera Drivers<br/>usb_cam/realsense"]
        MODES --> LAUNCH["robot.launch.py<br/>node selection"]
    end

    RECORDER --> BAG["rosbag2 files"]
    BAG --> CONVERTER
    CONVERTER --> DS["LeRobot Dataset"]
    DS -.->|"training"| MODEL["Policy .pt"]
    MODEL --> INFERENCE
    

关键实现: robot_config.loader.load_robot_config() 加载 YAML,RobotConfig.to_contract() 合成契约数据类。

来源: src/robot_config/config/robots/so101_single_arm.yaml:1-100, README.md:39-41, src/robot_config/robot_config/contract_builder.py:1-104

2. 契约驱动设计

Contract 数据类定义了机器人与策略之间的观测-动作接口:

@dataclass(frozen=True, slots=True)
class Contract:
    name: str
    version: int
    rate_hz: float                          # 记录/推理频率
    max_duration_s: float                   # Episode 超时
    observations: List[ObservationSpec]     # 输入话题
    actions: List[ActionSpec]               # 输出话题
    tasks: List[TaskSpec]                   # 可选任务提示
    recording: Dict[str, Any]
    robot_type: Optional[str]

每个 ObservationSpecActionSpec 包括:- ROS 话题名称和消息类型 - 张量形状和数据类型规范 - QoS 配置设置 - 对齐策略(用于重采样)

此契约被三个关键系统使用,处理逻辑完全相同:

  1. episode_recorder (src/dataset_tools/dataset_tools/episode_recorder.py:159-274): 订阅所有契约话题

  2. bag_to_lerobot (src/dataset_tools/dataset_tools/bag_to_lerobot.py:213-231): 使用 decode_value() 解码消息

  3. lerobot_policy_node (src/inference_service/inference_service/lerobot_policy_node.py:205-240): 根据模型的 input_features 过滤观测

来源: src/robot_config/robot_config/contract_utils.py:83-97, src/dataset_tools/dataset_tools/bag_to_lerobot.py:1-73

3. 双模式控制架构

IB-Robot 支持三种控制模式,它们汇聚到同一个 ros2_control 硬件接口:

        graph TB
    subgraph "Mode Selection"
        LAUNCH["robot.launch.py<br/>--control_mode parameter"]
        DEFAULT["default_control_mode<br/>from robot_config"]

        LAUNCH -.->|"overrides"| DEFAULT
    end

    subgraph "Mode 1: Teleoperation"
        TELEOP["robot_teleop node<br/>VR/Xbox/IMU input"]
        TELEOP_EXEC["TopicExecutor<br/>position_controllers"]

        TELEOP --> TELEOP_EXEC
    end

    subgraph "Mode 2: Model Inference"
        INF["lerobot_policy_node<br/>ACT/Diffusion Policy"]
        DISP["action_dispatcher_node<br/>TemporalSmoother"]
        TOPIC_EXEC["TopicExecutor<br/>100Hz streaming"]

        INF -->|"action chunks"| DISP
        DISP --> TOPIC_EXEC
    end

    subgraph "Mode 3: MoveIt Planning"
        POSE["Pose Commands<br/>/cmd_pose"]
        GATEWAY["moveit_gateway.py<br/>IK solver"]
        MOVEIT["MoveIt2 Core<br/>OMPL planner"]
        ACTION_EXEC["ActionExecutor<br/>FollowJointTrajectory"]

        POSE --> GATEWAY
        GATEWAY --> MOVEIT
        MOVEIT --> ACTION_EXEC
    end

    subgraph "Unified Hardware Layer"
        ROS2CTRL["ros2_control<br/>controller_manager"]

        TELEOP_EXEC --> ROS2CTRL
        TOPIC_EXEC --> ROS2CTRL
        ACTION_EXEC --> ROS2CTRL

        ROS2CTRL --> HW{"use_sim?"}
        HW -->|"false"| REAL["so101_hardware<br/>Feetech SDK"]
        HW -->|"true"| SIM["Gazebo<br/>gz_ros2_control"]
    end

    LAUNCH -->|"teleop"| TELEOP
    LAUNCH -->|"model_inference"| INF
    LAUNCH -->|"moveit_planning"| GATEWAY
    

action_dispatcher_node 实现了基于拉取的动作执行,并为 Action Chunking 模型提供时间平滑,当队列水位达到时触发推理。

来源: README.md:121-154, src/action_dispatch/action_dispatch/action_dispatcher_node.py:1-319, src/robot_moveit/scripts/moveit_gateway.py:1-100


系统组件与数据流

核心包职责

        graph TB
    subgraph "Layer 1: Configuration"
        RC["robot_config<br/>- load_robot_config()<br/>- to_contract()<br/>- launch builders"]
    end

    subgraph "Layer 2: Protocol Conversion"
        TM["tensormsg<br/>- TensorMsgConverter<br/>- register_encoder/decoder<br/>- decode_value()"]
    end

    subgraph "Layer 3: Inference & Execution"
        INF["inference_service<br/>- lerobot_policy_node<br/>- InferenceCoordinator<br/>- TensorPreprocessor"]
        DISP["action_dispatch<br/>- action_dispatcher_node<br/>- TemporalSmoother<br/>- TopicExecutor"]

        INF -->|"DispatchInfer Action"| DISP
    end

    subgraph "Layer 4: Data Pipeline"
        RECORDER["dataset_tools<br/>- episode_recorder<br/>- bag_to_lerobot<br/>- record_cli"]
        TELEOP["robot_teleop<br/>- VR/Xbox/IMU drivers<br/>- Leader-Follower"]
    end

    subgraph "Layer 5: Hardware Abstraction"
        CTRL["ros2_control<br/>- controller_manager<br/>- hardware_interface"]
        HW["so101_hardware<br/>- SO101Hardware class<br/>- Feetech SDK"]

        CTRL --> HW
    end

    subgraph "Layer 6: Motion Planning"
        MOVEIT["robot_moveit<br/>- MoveItGateway<br/>- kinematics.yaml"]
    end

    RC -.->|"provides Contract"| TM
    RC -.->|"provides Contract"| RECORDER
    RC -.->|"provides Contract"| INF

    TM --> INF
    TM --> DISP

    TELEOP --> RECORDER

    DISP --> CTRL
    MOVEIT --> CTRL
    

来源: src/README.md:20-98, README.md:44-71

组件交互:推理循环

        sequenceDiagram
    participant Sensors as "/joint_states<br/>/camera/top/image_raw"
    participant PolicyNode as "lerobot_policy_node"
    participant Dispatcher as "action_dispatcher_node"
    participant Controller as "ros2_control"
    participant Hardware as "so101_hardware"

    Note over Sensors,Hardware: Steady-state operation at 100Hz

    loop Every 10ms (100Hz control loop)
        Dispatcher->>Dispatcher: Check queue watermark

        alt Queue below watermark (< 20 actions)
            Dispatcher->>PolicyNode: DispatchInfer.Goal(obs_timestamp)
            PolicyNode->>Sensors: Sample latest observations
            Sensors-->>PolicyNode: /joint_states, images
            PolicyNode->>PolicyNode: TensorPreprocessor<br/>PureInferenceEngine (GPU)<br/>TensorPostprocessor
            PolicyNode-->>Dispatcher: Result(action_chunk[100,6])
            Dispatcher->>Dispatcher: TemporalSmoother.update()<br/>blend overlapping actions
        end

        Dispatcher->>Dispatcher: pop next action from queue
        Dispatcher->>Controller: /arm_position_controller/commands
        Controller->>Hardware: write() joint positions
        Hardware-->>Sensors: read() joint states
    end
    

action_dispatcher_node 维护一个动作队列,并通过 DispatchInfer Action 接口触发异步推理。TemporalSmoother 使用指数加权混合重叠的动作块,以确保平滑过渡。

来源: src/action_dispatch/README.en.md:1-447, src/action_dispatch/action_dispatch/action_dispatcher_node.py:38-302, src/inference_service/inference_service/lerobot_policy_node.py:422-489


数据生命周期:从收集到部署

端到端工作流程

        graph LR
    subgraph "Phase 1: Data Collection"
        HUMAN["Human Expert<br/>VR/Xbox controller"]
        TELEOP_NODE["robot_teleop node"]
        ROBOT1["Physical Robot"]

        HUMAN --> TELEOP_NODE
        TELEOP_NODE --> ROBOT1
    end

    subgraph "Phase 2: Recording"
        RECORDER_NODE["episode_recorder<br/>Action Server"]
        CLI["record_cli<br/>trigger interface"]
        MCAP["ROS2 Bag<br/>MCAP format"]

        CLI -->|"RecordEpisode.Goal"| RECORDER_NODE
        ROBOT1 -.->|"sensors"| RECORDER_NODE
        RECORDER_NODE --> MCAP
    end

    subgraph "Phase 3: Conversion"
        BAG2LR["bag_to_lerobot.py<br/>--robot-config"]
        LRDS["LeRobot v3 Dataset<br/>parquet + mp4"]

        MCAP --> BAG2LR
        BAG2LR --> LRDS
    end

    subgraph "Phase 4: Training"
        TRAIN["lerobot library<br/>ACT/Diffusion training"]
        MODEL["Policy Checkpoint<br/>config.json + model.safetensors"]

        LRDS --> TRAIN
        TRAIN --> MODEL
    end

    subgraph "Phase 5: Deployment"
        POLICY_NODE["lerobot_policy_node"]
        ROBOT2["Physical Robot<br/>Autonomous execution"]

        MODEL --> POLICY_NODE
        POLICY_NODE --> ROBOT2
    end

    ROBOT2 -.->|"online evaluation"| RECORDER_NODE
    

关键脚本和命令:

  1. 启动记录服务器: ros2 launch robot_config robot.launch.py control_mode:=teleop record:=true record_mode:=episodic

  2. 触发 episode: ros2 run dataset_tools record_cli (交互式提示)

  3. 转换为 LeRobot: python bag_to_lerobot.py --robot-config so101_single_arm.yaml --bag /path/to/episode --out /path/to/dataset

  4. 训练策略: python -m lerobot.train policy=act dataset_repo_id=local/dataset

  5. 部署: ros2 launch robot_config robot.launch.py control_mode:=model_inference

来源: src/dataset_tools/dataset_tools/episode_recorder.py:1-60, src/dataset_tools/dataset_tools/bag_to_lerobot.py:27-73, src/robot_config/robot_config/launch_builders/recording.py:102-168


环境设置与构建系统

初始化工作流程

IB-Robot 使用 Python 虚拟环境(venv)将 ML 依赖与系统 ROS 2 包隔离:

        graph TB
    SETUP["./scripts/setup.sh"]

    SETUP --> SUB["git submodule update<br/>--init --recursive"]
    SETUP --> SYSDEPS["Install system packages<br/>nlohmann-json, build tools"]
    SETUP --> VENV["Create venv/<br/>Python 3.11 isolated environment"]

    VENV --> PIP["pip install<br/>torch, numpy<2.0, lerobot"]
    PIP --> SHRC["Generate .shrc_local<br/>source venv + ROS 2"]

    SHRC --> BUILD["./scripts/build.sh<br/>colcon build with mixins"]

    BUILD --> LEROBOT_EDITABLE["pip install -e libs/lerobot"]
    BUILD --> COLCON["colcon build<br/>--mixin dev"]
    

build.sh 脚本支持基于 mixin 的配置,用于不同的构建配置文件:

Mixin

配置

dev

调试符号,符号链接安装,无测试(默认)

release

优化构建,-O3

test

启用测试

asan

AddressSanitizer 用于内存调试

用法: ./scripts/build.sh --mixin release test

来源: scripts/build.sh:1-229, README.md:75-118

启动系统架构

robot.launch.py 入口点使用构建器模式根据 robot_config 动态生成节点:

# Simplified launch flow
def launch_setup(context):
    robot_config = load_robot_config(robot_config_name)
    control_mode = robot_config['default_control_mode']

    actions = []

    # Generate nodes from launch_builders modules
    actions += generate_ros2_control_nodes(robot_config, use_sim)
    actions += generate_camera_nodes(robot_config)
    actions += generate_tf_nodes(robot_config)

    if use_sim:
        actions += generate_gazebo_nodes(robot_config)

    if control_mode == 'model_inference':
        actions += generate_inference_node(robot_config, control_mode)
        actions += generate_dispatcher_node(robot_config)
    elif control_mode == 'moveit_planning':
        actions += generate_moveit_nodes(robot_config)

    return actions

关键启动参数:

参数

用途

默认值

robot_config

配置名称(映射到 YAML 文件)

test_cam

use_sim

启用 Gazebo 仿真

false

control_mode

覆盖控制模式

(来自 YAML)

with_inference

强制启用/禁用推理

(自动检测)

moveit_display

为 MoveIt 启动 RViz

true

来源: src/robot_config/launch/robot.launch.py:1-300, src/robot_config/robot_config/launch_builders/execution.py:1-200


分布式推理支持

IB-Robot 支持两种策略推理执行模式:

单体模式(默认)

所有推理组件在单个 lerobot_policy_node 进程中运行,采用零拷贝张量传递:

lerobot_policy_node process:
  ├─ TensorPreprocessor (CPU)
  ├─ PureInferenceEngine (GPU)
  └─ TensorPostprocessor (CPU)

分布式模式(云-边)

边缘节点执行预处理,云端节点执行 GPU 推理,边缘节点执行后处理:

        graph LR
    subgraph "Edge Node (Robot)"
        SENSORS["Sensors"]
        EDGE["lerobot_policy_node<br/>(edge proxy)"]
        PRE["TensorPreprocessor"]
        POST["TensorPostprocessor"]
        DISPATCH["action_dispatcher_node"]

        SENSORS --> EDGE
        EDGE --> PRE
        POST --> DISPATCH
    end

    subgraph "Cloud Node (GPU Server)"
        CLOUD["pure_inference_node"]
        ENGINE["PureInferenceEngine<br/>(GPU)"]

        CLOUD --> ENGINE
    end

    PRE -->|"/preprocessed/batch<br/>VariantsList"| CLOUD
    ENGINE -->|"/inference/action<br/>VariantsList"| POST
    

边缘节点在等待云端响应时挂起 DispatchInfer Action 回调,使用 threading.Event 进行同步。

配置:

inference:
  enabled: true
  execution_mode: distributed  # or 'monolithic'
  cloud_inference_topic: /preprocessed/batch
  cloud_result_topic: /inference/action
  request_timeout: 5.0

来源: src/inference_service/inference_service/lerobot_policy_node.py:1-34, src/inference_service/inference_service/lerobot_policy_node.py:306-351, src/inference_service/inference_service/lerobot_policy_node.py:495-554


横切关注点

错误处理与诊断

系统通过 DiagnosticStatus 消息发布健康诊断:

  • lerobot_policy_node 监控推理延迟并发布到 ~/health

  • action_dispatcher_node 监控队列大小并发布到 ~/queue_size

  • 硬件驱动将通信错误报告到 /diagnostics

线程安全

  • StreamBuffer: 用于观测采样的无锁环形缓冲区

  • episode_recorder: threading.Lock 保护 SequentialWriter 访问

  • action_dispatcher_node: 使用 ROS 2 MutuallyExclusiveCallbackGroup 进行控制循环隔离

性能优化

  1. 零拷贝推理(单体模式):张量在预处理器、引擎和后处理器之间通过引用传递

  2. 符号链接安装(开发):colcon build --symlink-install 用于仅 Python 更改无需重新构建

  3. 动作分块:通过以 10Hz 预测 100 步(10 秒范围)降低推理频率

  4. 时间平滑TemporalSmoother 使用指数加权 exp(-coeff * k) 混合重叠的动作规划(默认 coeff=0.01)

来源: src/action_dispatch/README.en.md:212-331, scripts/build.sh:117-229, src/inference_service/inference_service/lerobot_policy_node.py:286-304


下一步

本概述介绍了 IB-Robot 系统架构、设计原则和核心工作流程。有关特定子系统的详细信息:

来源: README.md:1-192, docs/architecture.md:1-313