核心概念概述

相关源文件

以下文件用作生成此文档页面的上下文:

目的与范围

本文档阐述了支撑 IB-Robot 系统的三个基本架构原则:单一数据源契约驱动设计**和**控制模式架构。这些原则消除了配置冗余,确保训练与部署的一致性,并实现了不同机器人控制范式之间的无缝切换。

各原则的详细实现: - 单一数据源模式 - 深入了解 robot_config YAML 结构 - 契约系统 - 全面的契约抽象文档
- 控制模式架构 - 详细的控制模式切换机制

有关系统整体架构概述,请参阅 系统架构


三大架构支柱

IB-Robot 通过三个核心设计原则解决了机器学习生态系统(LeRobot)与机器人中间件(ROS 2)之间的基本集成挑战:

原则

解决的问题

核心优势

单一数据源

数据采集、训练和推理之间 的配置重复

消除不一致性; 一处修改,处处生效

契约驱动设计

训练-部署偏差(模型在 训练和推理时看到不同的 数据格式)

保证数据生命周期中 处理流水线一致

控制模式架构

硬编码的控制逻辑;无法 在遥操作、AI和规划之间 切换

统一的硬件接口; 通过单一参数切换模式

这些原则协同工作,创建了一个这样的系统: 1. 硬件配置在 robot_config YAML 中**定义一次** 2. 数据处理契约从此配置中自动**合成** 3. 多种控制模式**汇聚**于同一硬件抽象层

来源README.md:1-192docs/architecture.md:47-79


原则一:单一数据源(robot_config YAML)

概述

robot_config YAML 文件作为所有机器人规格的**唯一权威来源**。IB-Robot 不再为 ros2_control、摄像头、ML 契约和关节定义维护单独的配置,而是在一处定义所有内容并自动传播到所有子系统。

配置结构

机器人配置整合了四个传统上分离的系统:

robot:
  name: so101_single_arm
  type: so101

  # 1. 关节定义(用于 ros2_control、MoveIt 和契约)
  joints:
    arm: ["1", "2", "3", "4", "5"]
    gripper: ["6"]
    all: ["1", "2", "3", "4", "5", "6"]

  # 2. 硬件接口(ros2_control 配置)
  ros2_control:
    hardware_plugin: so101_hardware/SO101SystemHardware
    port: /dev/ttyACM0
    controllers_config: $(find so101_hardware)/config/so101_controllers.yaml

  # 3. 外设(摄像头和传感器及其变换)
  peripherals:
    - type: camera
      name: top
      driver: opencv
      width: 640
      height: 480
      fps: 30
      transform:
        parent_frame: base
        x: 0.0
        y: 0.0
        z: 0.5

  # 4. ML 契约(AI 模型的观测和动作)
  contract:
    rate_hz: 20
    max_duration_s: 90.0
    observations:
      - key: observation.images.top
        topic: /camera/top/image_raw
        peripheral: top  # 引用上面定义的摄像头

来源src/robot_config/config/robots/so101_single_arm.yaml:1-329

图表:配置传播流程

        graph TB
    subgraph "Single Source of Truth"
        YAML["robot_config YAML<br/>(so101_single_arm.yaml)"]

        YAML --> JOINTS["joints:<br/>arm: [1,2,3,4,5]<br/>gripper: [6]"]
        YAML --> PERIPH["peripherals:<br/>- camera top (640x480@30fps)<br/>- camera wrist"]
        YAML --> CONTRACT["contract:<br/>observations + actions<br/>rate_hz: 20"]
        YAML --> ROS2C["ros2_control:<br/>hardware_plugin<br/>controllers"]
    end

    subgraph "Consumers: Runtime Systems"
        JOINTS --> URDF["robot_description<br/>URDF generation"]
        JOINTS --> MOVEIT["robot_moveit<br/>MoveItGateway.arm_group_name"]

        PERIPH --> CAM_LAUNCH["Camera Launch Nodes<br/>usb_cam / realsense2_camera"]
        PERIPH --> TF_PUB["TF Static Publishers<br/>camera transforms"]

        CONTRACT --> RECORDER["episode_recorder<br/>topic subscriptions"]
        CONTRACT --> BAG2LR["bag_to_lerobot<br/>tensor shapes + resampling"]
        CONTRACT --> INFERENCE["lerobot_policy_node<br/>observation filtering"]

        ROS2C --> CTRL_MGR["controller_manager<br/>spawner scripts"]
    end

    subgraph "Consistency Guarantee"
        RECORDER --> TRAIN_DATA["Training Dataset<br/>(LeRobot format)"]
        INFERENCE --> DEPLOY_OBS["Deployment Observations<br/>(runtime tensors)"]

        TRAIN_DATA --> ALIGN["Same Contract =<br/>Same Processing"]
        DEPLOY_OBS --> ALIGN
    end

    YAML -.->|"loaded by"| LOADER["RobotConfig.load()<br/>robot_config/loader.py"]
    LOADER -.->|"synthesizes"| CONTRACT_OBJ["Contract object<br/>contract_utils.py"]
    

来源src/robot_config/robot_config/loader.py:1-359src/robot_config/robot_config/contract_utils.py:1-500README.md:39-41

关键代码实体

实体

文件

用途

` load_robot_config()`

robot_config/loader.py:86-359

加载并验证 YAML 配置 _config/loader.

RobotConfig

robot_config/ config.py

表示整个配置的 Python 数据类

to_contract()

robot_config/ config.py

从机器人配置合成契约

robot.launch.py

robot_config/launch/robot.launch.py:1-432

主入口点,将配置传播到 所有节点 主入口点,将配置传播到 所有节点

传播示例:摄像头配置

当您在 robot_config YAML 中定义摄像头外设时:

peripherals:
  - type: camera
    name: top
    width: 640
    height: 480
    fps: 30

这个单一定义会自动传播到:

  1. 摄像头驱动启动launch_builders/perception.py 生成带有正确参数的 usb_cam 节点

  2. TF 变换launch_builders/perception.py 创建静态变换发布器

  3. 契约观测:外设元数据(宽度、高度、帧率)被注入到观测规格中

  4. 数据集转换bag_to_lerobot.py:369-370 使用此元数据验证图像形状

来源src/robot_config/robot_config/launch_builders/perception.pysrc/dataset_tools/dataset_tools/bag_to_lerobot.py:216-230


原则二:契约驱动设计

概述

契约 是一个机器可读的规范,定义了:

  • 机器人提供什么观测(摄像头、关节状态、任务描述)

  • 机器人期望什么动作(关节命令、夹爪命令)

  • ROS 消息如何映射到 ML 张量(反之亦然)

  • 时序参数(rate_hz、对齐策略、重采样策略)

契约确保在数据采集、训练和部署期间使用**相同的处理流水线**,消除了常见的”训练时有效,部署时失败”问题。

契约数据模型

契约抽象定义在 contract_utils.py:26-117

@dataclass
class Contract:
    """Contract defining observations, actions, and processing parameters."""
    robot_type: str              # e.g., "so_101"
    rate_hz: float               # Recording/inference frequency
    max_duration_s: float        # Episode duration limit
    observations: List[ObservationSpec]  # Input specs
    actions: List[ActionSpec]            # Output specs

每个 ObservationSpec 包含:

字段

用途

示例

key

数据集中的张量名称

"observation.images.top"

topic

要订阅的 ROS 2 话题

"/camera/top/image_raw"

ros_type

消息类型

"sensor_msgs/msg/Image"

peripheral

外设配置引用

"top" (链接到摄像头元数据)

resample_policy

对齐策略

"hold" (保持最后一值)

asof_tol_ms

时间戳容差

1500 (1.5 秒)

来源src/robot_config/robot_config/contract_utils.py:26-117

图表:契约生命周期与传播

        graph TB
    subgraph "Phase 1: Contract Definition"
        RC["robot_config YAML<br/>contract section"]
        RC --> SYNTH["RobotConfig.to_contract()"]
        SYNTH --> CONTRACT["Contract object<br/>(observations + actions)"]
    end

    subgraph "Phase 2: Data Collection"
        CONTRACT --> RECORDER["episode_recorder node"]
        RECORDER --> SUBS["Subscribe to topics:<br/>/camera/top/image_raw<br/>/joint_states"]
        SUBS --> BAGMETA["Write to ROS2 bag<br/>+ embed contract in metadata"]
    end

    subgraph "Phase 3: Dataset Conversion"
        BAGMETA --> BAG2LR["bag_to_lerobot script"]
        CONTRACT --> BAG2LR
        BAG2LR --> DECODE["decode_value()<br/>(shared with live inference)"]
        BAG2LR --> RESAMPLE["resample() at rate_hz<br/>(shared with live inference)"]
        BAG2LR --> LRDS["LeRobot Dataset<br/>(parquet + videos)"]
    end

    subgraph "Phase 4: Training"
        LRDS --> TRAIN["lerobot.train()<br/>(external library)"]
        TRAIN --> MODEL["Policy Checkpoint<br/>(.pt file)"]
    end

    subgraph "Phase 5: Deployment"
        CONTRACT --> INFNODE["lerobot_policy_node"]
        MODEL --> INFNODE
        INFNODE --> FILTER["Filter observations by<br/>model's input_features"]
        INFNODE --> STREAMBUF["StreamBuffer.sample()<br/>(same resampling logic)"]
        STREAMBUF --> TENSORPREP["TensorPreprocessor<br/>(same decode_value)"]
    end

    DECODE -.->|"shared code"| TENSORPREP
    RESAMPLE -.->|"shared code"| STREAMBUF
    

来源src/dataset_tools/dataset_tools/bag_to_lerobot.py:1-718src/inference_service/inference_service/lerobot_policy_node.py:1-700src/robot_config/robot_config/contract_utils.py:267-450

共享处理函数

以下函数在数据采集、数据集转换和实时推理中**完全相同**地使用:

函数

文件

用途

decode_value()

contract_utils.py:267-332

ROS 消息 → numpy 数组转换 ROS 消息 → numpy 数组转换

resample()

contract_utils.py:372-450

以固定 rate_hz 进行 时序对齐 以固定 rate_hz 进行 时序对齐

`` qos_profile_from_dict()``

contract_utils.py:229-265

QoS 设置合成 tract_utils.py

StreamBuffer.sample()

contract_utils.py:335-370

基于时间戳的数据检索 基于时间戳的数据检索

示例:图像观测处理

给定此契约观测:

- key: observation.images.top
  topic: /camera/top/image_raw
  type: sensor_msgs/msg/Image
  peripheral: top
  image:
    resize: [480, 640]
  align:
    strategy: hold
    stamp: header
    tol_ms: 1500

录制期间episode_recorder.py:161-600):

# Subscribe to topic with QoS from contract
sub = node.create_subscription(
    sensor_msgs.msg.Image,
    "/camera/top/image_raw",
    callback,
    qos_profile_from_dict(spec.qos)
)
# Write raw message to bag (no processing)
writer.write(topic, serialize_message(msg), timestamp_ns)

数据集转换期间bag_to_lerobot.py:454-485):

# Decode using contract metadata
val = decode_value(spec.ros_type, msg, spec)  # Returns HWC numpy array
# Resample at contract rate_hz
resampled = resample(spec.resample_policy, timestamps, values,
                     ticks_ns, step_ns, spec.asof_tol_ms)
# Apply resize from contract
resized = nearest_resize_rgb(resampled, spec.image_resize)

实时推理期间lerobot_policy_node.py:381-420):

# Same decode function
val = decode_value(spec.ros_type, msg, spec)
# Same buffer sampling
obs_frame[spec.key] = stream_buffer.sample(timestamp_ns)
# Same preprocessing (inside TensorPreprocessor)
preprocessor.process(obs_frame)  # Uses same resize logic

来源src/dataset_tools/dataset_tools/episode_recorder.py:161-600src/dataset_tools/dataset_tools/bag_to_lerobot.py:454-632src/inference_service/inference_service/lerobot_policy_node.py:381-420

契约指纹

为了检测训练和部署之间的契约不匹配,IB-Robot 计算**契约指纹**:

def contract_fingerprint(contract: Contract) -> str:
    """Generate deterministic hash of contract structure."""
    # Include: observation keys, action keys, tensor shapes, rate_hz
    # Exclude: topics, QoS (deployment-specific)
    data = {
        "observations": {obs.key: obs.shape for obs in contract.observations},
        "actions": {act.key: act.shape for act in contract.actions},
        "rate_hz": contract.rate_hz
    }
    return hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()

此指纹: - 在数据集转换期间存储在 info.json 中(bag_to_lerobot.py:385-389) - 在推理启动时验证(可选检查)

来源src/robot_config/robot_config/contract_utils.py:120-150src/dataset_tools/dataset_tools/bag_to_lerobot.py:385-389


原则三:控制模式架构

概述

IB-Robot 支持**三种控制模式**,代表根本不同的机器人控制范式:

  1. teleop:人类遥操作(VR、Xbox、主臂)

  2. model_inference:高频 AI 控制(ACT、Diffusion Policy)

  3. moveit_planning:带轨迹执行的运动规划(VoxPoser、VLM)

所有三种模式**汇聚于同一硬件抽象层**(ros2_control),通过单一启动参数实现无缝模式切换。

模式配置结构

每种模式在 robot_config YAML 的 control_modes 部分定义:

control_modes:
  teleop:
    description: "Human teleoperation mode (direct control)"
    controllers:
      - joint_state_broadcaster
      - arm_position_controller
      - gripper_position_controller
    inference:
      enabled: false
      force_disable: true
    executor:
      type: topic
      mode: teleop
      control_frequency: 50.0

  model_inference:
    description: "High-frequency end-to-end control mode (ACT/pi0)"
    controllers:
      - joint_state_broadcaster
      - arm_position_controller
      - gripper_position_controller
    inference:
      enabled: true
      execution_mode: "distributed"  # or "monolithic"
      model: so101_act
    executor:
      type: topic
      mode: model_inference
      control_frequency: 50.0
      queue_size: 100

  moveit_planning:
    description: "MoveIt trajectory planning mode (VoxPoser/VLM)"
    controllers:
      - joint_state_broadcaster
      - arm_trajectory_controller
      - gripper_trajectory_controller
    inference:
      enabled: false
    executor:
      type: action
      mode: moveit_planning

来源src/robot_config/config/robots/so101_single_arm.yaml:45-103src/robot_config/README.en.md:88-213

图表:控制模式汇聚架构

        graph TB
    subgraph "Control Mode Selection"
        LAUNCH["robot.launch.py<br/>--control_mode parameter"]
        DEFAULT["default_control_mode<br/>from robot_config YAML"]

        LAUNCH -.->|"overrides if specified"| ROUTER["Control Mode Router<br/>validate_control_mode_config()"]
        DEFAULT --> ROUTER
    end

    subgraph "Mode 1: Teleoperation"
        TELEOP_DEV["Teleoperation Device<br/>(VR/Xbox/LeaderArm)"]
        TELEOP_NODE["robot_teleop node<br/>leader_follower.py"]
        TELEOP_EXEC["TopicExecutor<br/>(direct passthrough)"]

        TELEOP_DEV --> TELEOP_NODE
        TELEOP_NODE --> TELEOP_EXEC
    end

    subgraph "Mode 2: Model Inference"
        INF_SVC["lerobot_policy_node<br/>(ACT/Diffusion Policy)"]
        DISP_NODE["action_dispatcher_node<br/>(queue + smoother)"]
        TOPIC_EXEC["TopicExecutor<br/>(100Hz streaming)"]

        INF_SVC -->|"action chunks"| DISP_NODE
        DISP_NODE --> TOPIC_EXEC
    end

    subgraph "Mode 3: MoveIt Planning"
        POSE_CMD["Pose Commands<br/>/cmd_pose topic"]
        MOVEIT_GW["MoveItGateway<br/>moveit_gateway.py"]
        MOVEIT_CORE["MoveIt2 Core<br/>(OMPL/Pilz planners)"]
        ACTION_EXEC["ActionExecutor<br/>(FollowJointTrajectory)"]

        POSE_CMD --> MOVEIT_GW
        MOVEIT_GW --> MOVEIT_CORE
        MOVEIT_CORE --> ACTION_EXEC
    end

    subgraph "Unified Hardware Layer"
        ROS2_CTRL["ros2_control<br/>controller_manager"]

        TELEOP_EXEC -->|"/arm_position_controller/commands"| ROS2_CTRL
        TOPIC_EXEC -->|"/arm_position_controller/commands"| ROS2_CTRL
        ACTION_EXEC -->|"/arm_trajectory_controller/follow_joint_trajectory"| ROS2_CTRL

        HW_CHOICE{"use_sim parameter"}

        ROS2_CTRL --> HW_CHOICE

        HW_CHOICE -->|"false"| REAL["SO101SystemHardware<br/>Feetech SDK"]
        HW_CHOICE -->|"true"| SIM["gz_ros2_control<br/>Gazebo simulation"]
    end

    ROUTER -->|"mode=teleop"| TELEOP_NODE
    ROUTER -->|"mode=model_inference"| INF_SVC
    ROUTER -->|"mode=moveit_planning"| MOVEIT_GW
    

来源src/robot_config/launch/robot.launch.py:87-432src/robot_config/robot_config/contract_builder.py:9-104src/robot_moveit/scripts/moveit_gateway.py

各模式的控制器类型

不同的控制模式需要不同的 ros2_control 控制器:

模式

控制器类型

接口

频率

用途

teleop

JointGroupPositionController

Topic (Float64MultiArray)

50 Hz

来自人类输入的直接位置命令

model_inference

JointGroupPositionController

Topic (Float64MultiArray)

100 Hz

来自 AI 模型的高频流式命令

moveit_planning

JointTrajectoryController

Action (FollowJointTrajectory)

可变

来自规划器的时间参数化轨迹

来源src/robot_config/README.en.md:164-206

模式切换的关键代码实体

实体

文件

用途

validate_control_mode_config()

contract_builder.py:9-104

在启动前验证模式配置

generate_execution_nodes()

launch_builders/execution.py:113-350

根据模式生成推理/调度器节点

generate_ros2_control_nodes()

launch_builders/control.py

为模式生成正确的控制器

TopicExecutor

action_dispatch/topic_executor.py

向话题发布位置命令

ActionExecutor

action_dispatch/action_executor.py

通过动作发送轨迹

模式切换示例

model_inference 切换到 moveit_planning

# Original launch (model_inference mode)
ros2 launch robot_config robot.launch.py \
  robot_config:=so101_single_arm \
  control_mode:=model_inference

# Switch to moveit_planning mode
ros2 launch robot_config robot.launch.py \
  robot_config:=so101_single_arm \
  control_mode:=moveit_planning

内部变化: 1. 控制器生成arm_trajectory_controller 替换 arm_position_controller 2. 节点生成MoveItGateway 节点替代 lerobot_policy_node 启动 3. 执行器类型:使用 ActionExecutor 而非 TopicExecutor 4. 硬件接口无变化 - 同一 SO101SystemHardware 插件

来源src/robot_config/launch/robot.launch.py:154-179src/robot_config/README.md:88-356

自动模式检测

启动系统可以根据模式配置自动检测要启动的适当节点:

# From robot.launch.py
def should_launch_inference(control_mode_config, with_inference_override):
    """Auto-detect if inference service should launch."""
    if with_inference_override is not None:
        return with_inference_override  # Explicit override

    # Auto-detect from mode config
    inference_config = control_mode_config.get('inference', {})
    return inference_config.get('enabled', False)

def should_launch_moveit(control_mode_name, with_moveit_override):
    """Auto-detect if MoveIt should launch."""
    if with_moveit_override is not None:
        return with_moveit_override  # Explicit override

    # Auto-detect: launch MoveIt if mode name contains "moveit"
    return 'moveit' in control_mode_name.lower()

来源src/robot_config/launch/robot.launch.py:112-179


总结:三大原则如何协同工作

三个核心概念形成一个集成架构:

  1. 单一数据源 提供基础:

    • 所有机器人规格集中在一个 YAML 文件中

    • 消除子系统间的配置漂移

  2. 契约驱动设计 确保一致性:

    • 契约从 robot_config 合成

    • 从数据采集 → 训练 → 部署使用相同的处理流水线

  3. 控制模式架构 实现灵活性:

    • 多种控制范式(人类、AI、规划)

    • 所有模式使用**相同的契约**和**相同的硬件接口**

    • 通过单一参数切换模式

这些原则共同实现:

  • 开发效率:一处修改配置,处处生效

  • 部署可靠性:训练和推理看到相同的数据格式

  • 系统灵活性:无需修改代码即可切换控制模式

来源README.md:9-17docs/architecture.md:179-185README.en.md:9-17