系统架构
相关源文件
以下文件用作生成此 wiki 页面的上下文:
本文档全面概述了 IB-Robot 系统架构,解释了组件如何组织成层级、数据如何在系统中流动,以及各个子系统如何交互以实现从数据采集到部署的端到端具身 AI 开发。
有关详细的配置规范,请参阅 配置系统 (robot_config)。有关此设计背后的基本架构原则,请参阅 核心概念。有关各个子系统的详细信息,请参阅 推理服务、动作分发 和 数据流水线。
架构概述
IB-Robot 实现了分层架构,每一层为其上层提供明确定义的抽象。系统围绕**单一数据源**原则组织,其中 robot_config YAML 文件通过契约驱动的合成来驱动所有子系统的行为。
系统层级
来源: README.md:18-43, docs/architecture.md:86-177, src/README.md:20-44
包架构与依赖
系统由 10 个核心 ROS 2 包组成,具有明确定义的职责和依赖关系。
核心包层次结构
包职责矩阵:
包 |
主要职责 |
关键类/节点 |
依赖 |
|---|---|---|---|
|
配置管理、启动编排 |
|
|
|
接口定义 |
|
None |
|
ROS↔张量协议转换 |
|
|
|
策略推理(单体/分布式) |
|
|
|
带时间平滑的动作执行 |
|
|
|
Episode 录制和数据集转换 |
|
|
|
遥操作接口 |
遥操作节点 VR/Xbox/IMU |
|
|
运动规划集成 |
|
|
|
机器人模型定义 |
URDF, SRDF, 网格 |
None |
|
硬件驱动 |
|
None(仅 C++) |
来源: README.md:44-71, src/README.md:49-84, docs/architecture.md:216-265
配置驱动架构
整个系统由 robot_config YAML 文件驱动,该文件作为所有系统规范的单一数据源。
配置流程
关键配置文件:
配置到代码映射:
配置节 |
代码消费者 |
关键函数 |
|---|---|---|
|
|
|
|
|
|
|
`` robot.launch.py`` |
` launch_setup()` src/robot_config/launch/robot.launch.py:123-300 |
|
|
src/robot_config/robot_config/launch_builders/execution.py:61-157 |
|
|
src/robot_conf ig/robot_config/l aunch_builders/pe rception.py |
` ros2_control.controllers` |
|
来源: src/robot_config/config/robots/so101_single_arm.yaml, src/robot_config/launch/robot.launch.py:87-175, src/robot_config/robot_config/launch_builders/execution.py:20-157
数据流架构
观测流(传感器 → 推理)
关键代码路径:
观测回调: src/inference_service/inference_service/lerobot_policy_node.py:381-397
_obs_cb(msg, spec)接收 ROS 消息调用来自 src/robot_config/robot_config/contract_utils.py 的
decode_value()推送到
StreamBuffer进行时间对齐
观测采样: src/inference_service/inference_service/lerobot_policy_node.py:399-420
_sample_obs_frame(sample_t_ns)在特定时间戳查询所有缓冲区处理多个
observation.state流(拼接)对缺失数据返回零填充值
预处理: src/inference_service/inference_service/core/preprocessor.py
TensorPreprocessor.__call__(obs_frame)准备模型输入调整图像大小、归一化值、堆叠时间历史
来源: src/inference_service/inference_service/lerobot_policy_node.py:381-420, src/robot_config/robot_config/contract_utils.py, src/inference_service/inference_service/core/preprocessor.py
动作流(推理 → 硬件)
关键代码路径:
动作解码: src/action_dispatch/action_dispatch/action_dispatcher_node.py:232-278
_result_cb()接收DispatchInfer.Result通过
TensorMsgConverter.from_variant()将VariantsList转换为 numpy/tensor
时间平滑: src/action_dispatch/action_dispatch/temporal_smoother.py
TemporalSmoother.update(action_chunk, actions_executed)对齐并混合计算
actions_executed = plan_length_at_start - current_plan_length应用指数加权:
weight[k] = exp(-coeff * k)
动作执行: src/action_dispatch/action_dispatch/action_dispatcher_node.py:172-201
_control_loop()以 100 Hz 运行从队列/平滑器弹出下一个动作
TopicExecutor.execute(action)发布到控制器主题
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:172-278, src/action_dispatch/action_dispatch/temporal_smoother.py, src/action_dispatch/action_dispatch/topic_executor.py
控制模式架构
IB-Robot 支持三种不同的控制模式,它们汇聚到同一个 ros2_control 硬件接口。
控制模式选择与路由
控制模式配置:
每种模式在机器人配置 YAML 的 control_modes 节中定义:
control_modes:
teleop:
controllers: [arm_position_controller, gripper_position_controller]
inference:
enabled: false
model_inference:
controllers: [arm_position_controller, gripper_position_controller]
inference:
enabled: true
model: act_policy
execution_mode: monolithic # or distributed
executor:
type: topic
moveit_planning:
controllers: [arm_trajectory_controller, gripper_trajectory_controller]
inference:
enabled: false
executor:
type: action
模式选择逻辑: src/robot_config/launch/robot.launch.py:176-196
来源: src/robot_config/launch/robot.launch.py:123-300, src/robot_config/config/robots/so101_single_arm.yaml, README.md:121-154
推理执行模式
推理服务支持两种执行架构:单体(一体化进程)和**分布式**(边缘-云端分离)。
单体模式架构
关键实现: src/inference_service/inference_service/lerobot_policy_node.py:285-304
_setup_monolithic_mode()创建InferenceCoordinator所有组件在同一进程中运行,共享内存
预处理、推理和后处理之间零序列化开销
_execute_monolithic()位于 src/inference_service/inference_service/lerobot_policy_node.py:491-493
分布式模式架构
关键实现: src/inference_service/inference_service/lerobot_policy_node.py:306-351
边缘节点设置:
_setup_distributed_mode()创建
TensorPreprocessor和TensorPostprocessor(仅 CPU)在
/preprocessed/batch上发布,订阅/inference/action维护
_pending_requests字典,使用 threading.Event 进行同步
分布式执行: src/inference_service/inference_service/lerobot_policy_node.py:495-554
_execute_distributed()在本地预处理生成 UUID
request_id,嵌入批次中发布到云端,阻塞等待
threading.Event.wait(timeout)云端结果回调
_cloud_result_callback()通过request_id匹配
云端节点: src/inference_service/inference_service/pure_inference_node.py
纯 GPU 推理服务
订阅
/preprocessed/batch,发布到/inference/action在结果中添加
_latency_ms
配置:
inference:
execution_mode: distributed # or monolithic
request_timeout: 5.0
cloud_inference_topic: /preprocessed/batch
cloud_result_topic: /inference/action
来源: src/inference_service/inference_service/lerobot_policy_node.py:285-584, src/inference_service/inference_service/pure_inference_node.py, src/inference_service/README.en.md:1-130
数据采集与训练流水线
录制架构
录制实现: src/dataset_tools/dataset_tools/episode_recorder.py:161-274
持久订阅: 在节点启动时为所有契约主题创建
_make_sub()使用契约驱动的 QoS 创建订阅除非
_flags.is_recording,否则回调为空操作
Episode 生命周期:
execute_callback()开始录制 src/dataset_tools/dataset_tools/episode_recorder.py:351-493使用唯一目录打开
rosbag2_py.SequentialWriter从
contract.max_duration_s创建超时定时器按接收顺序直接写入消息(无缓冲)
元数据注入: src/dataset_tools/dataset_tools/episode_recorder.py:494-547
关闭写入器后,修改
metadata.yaml在
custom_data字段中嵌入operator_prompt
数据集转换流水线
转换实现: src/dataset_tools/dataset_tools/bag_to_lerobot.py:233-648
契约加载: src/dataset_tools/dataset_tools/bag_to_lerobot.py:216-230
_load_contract_from_robot_config()加载用于录制的相同契约确保训练-部署对齐
流规划: src/dataset_tools/dataset_tools/bag_to_lerobot.py:151-210
_plan_streams()将契约规范映射到 bag 主题为每个主题创建
_Stream缓冲区
重采样: src/dataset_tools/dataset_tools/bag_to_lerobot.py:521-531
应用来自 contract_utils 的
resample()使用每个规范的
resample_policy(hold/asof/drop)按
contract.rate_hz对齐到统一的ticks_ns
特征生成: src/dataset_tools/dataset_tools/bag_to_lerobot.py:288-362
feature_from_spec()创建 LeRobot 兼容的特征字典处理图像/视频编码、浮点数组、任务字符串
来源: src/dataset_tools/dataset_tools/episode_recorder.py:161-547, src/dataset_tools/dataset_tools/bag_to_lerobot.py:216-648, src/dataset_tools/README.md
动作分发与时间平滑
action_dispatcher_node 实现了一个复杂的拉取式执行系统,具有跨帧时间平滑功能用于动作块。
分发架构
关键实现细节:
控制循环: src/action_dispatch/action_dispatch/action_dispatcher_node.py:172-201
以 100 Hz 运行(通过
control_frequency参数配置)发布队列大小和平滑状态诊断
当
plan_length < watermark时触发推理
推理请求: src/action_dispatch/action_dispatch/action_dispatcher_node.py:203-220
在发送目标前记录
_plan_length_at_inference_start异步 action client 模式,带 future 回调
结果处理: src/action_dispatch/action_dispatch/action_dispatcher_node.py:232-278
计算
actions_executed = plan_at_start - current_plan路由到
TemporalSmoother.update()或简单队列替换
时间平滑算法
时间平滑器解决了在执行前一个动作块时新推理完成时的动作块对齐问题:
平滑实现: src/action_dispatch/action_dispatch/temporal_smoother.py
更新方法:
def update(self, new_actions: torch.Tensor, actions_executed: int) -> int: # 对齐:跳过已执行的动作 relevant_new = new_actions[actions_executed:] # 确定重叠和新尾部 overlap_len = min(len(self._plan), len(relevant_new)) # 用指数权重混合重叠区域 for i in range(overlap_len): self._count[i] += 1 weight = self._temporal_weights[self._count[i]] cumsum = self._cumulative_sum[self._count[i]] blended = (self._plan[i] * cumsum_prev + relevant_new[i] * weight) / cumsum self._plan[i] = blended # 追加新尾部 self._plan.extend(relevant_new[overlap_len:])
权重预计算: src/action_dispatch/action_dispatch/temporal_smoother.py
预计算
weight[k] = exp(-temporal_ensemble_coeff * k)对于 k=0..chunk_size预计算
cumulative_sum[k]用于快速混合默认
temporal_ensemble_coeff = 0.01(来自 ACT 论文)
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:49-301, src/action_dispatch/action_dispatch/temporal_smoother.py, src/action_dispatch/README.en.md:212-342
启动系统与节点生成
启动系统使用模块化构建器模式,每个子系统都有专用的构建器模块。
启动构建器架构
启动构建器模块:
构建器模块 |
职责 |
关键函数 |
|---|---|---|
|
ros2_control 设置、控制器启动 |
|
|
相机驱动、TF 树 |
|
|
Gazebo 启动 |
|
|
推理和分发节点 |
|
|
遥操作节点 |
|
|
Episode 录制 |
|
参数流:
启动参数 → context.launch_configurations → 构建器函数 → 节点参数
来自执行构建器的示例 src/robot_config/robot_config/launch_builders/execution.py:81-138:
robot_config_path = robot_config.get('_config_path', '')
model_name = inference_config["model"]
models = robot_config.get("models", {})
model_config = models[model_name]
node_params = {
'robot_config_path': robot_config_path,
'name': model_config.get('name', 'lerobot_policy'),
'repo_id': model_config.get('repo_id'),
'checkpoint': model_config.get('checkpoint'),
'device': model_config.get('device', 'auto'),
'execution_mode': inference_config.get('execution_mode', 'monolithic'),
}
来源: src/robot_config/launch/robot.launch.py:123-300, src/robot_config/robot_config/launch_builders/execution.py:20-281, src/robot_config/robot_config/launch_builders/control.py
硬件抽象与 ros2_control 集成
系统使用 ros2_control 作为硬件抽象层,支持真实硬件和仿真使用相同的控制器。
ros2_control 架构
控制器配置:
控制器在机器人配置的 ros2_control 节中定义:
ros2_control:
hardware:
plugin: so101_hardware/SO101Hardware # or gz_ros2_control/GazeboSystem
parameters:
serial_port: /dev/ttyUSB0
baudrate: 1000000
controllers:
- name: arm_position_controller
type: position_controllers/JointGroupPositionController
joints: [joint1, joint2, joint3, joint4, joint5]
- name: arm_trajectory_controller
type: joint_trajectory_controller/JointTrajectoryController
joints: [joint1, joint2, joint3, joint4, joint5]
控制器启动: src/robot_config/robot_config/launch_builders/control.py
控制构建器生成通过 controller_manager 激活控制器的 spawner 节点:
spawner_nodes = []
for controller_name in active_controllers:
spawner = Node(
package='controller_manager',
executable='spawner',
arguments=[controller_name],
output='screen',
)
spawner_nodes.append(spawner)
硬件接口实现: src/so101_hardware/
SO101Hardware 插件实现
hardware_interface::SystemInterface: - on_init():打开串口,初始化 Feetech SDK - read():从电机读取关节位置/速度 - write():向电机写入位置命令
来源: src/robot_config/robot_config/launch_builders/control.py, src/so101_hardware/, src/robot_config/config/robots/so101_single_arm.yaml
构建系统与环境管理
构建架构
构建 Mixin: scripts/build.sh:19-67
构建系统支持不同构建配置的 mixin:
Mixin |
描述 |
用例 |
|---|---|---|
|
调试符号,无测试,符号链接安装 |
默认开发 |
|
带符号的完整调试构建 |
调试 |
|
优化构建,无调试 |
生产部署 |
|
启用测试 |
CI/CD |
|
AddressSanitizer |
内存调试 |
|
ThreadSanitizer |
并发调试 |
虚拟环境管理: scripts/build.sh:125-157
构建脚本确保正确的 venv 激活和依赖安装:
setup_venv() {
local venv_paths=(
"${WORKSPACE}/venv"
"/home/ros/colcon_venv/venv"
"${VIRTUAL_ENV:-}"
)
for venv in "${venv_paths[@]}"; do
if [[ -n "${venv}" && -f "${venv}/bin/activate" ]]; then
source "${venv}/bin/activate"
return 0
fi
done
}
LeRobot 集成: scripts/build.sh:162-178
构建脚本以可编辑模式安装 LeRobot 并确保 NumPy 兼容性:
PIP_BIN="${WORKSPACE}/venv/bin/pip"
"${PIP_BIN}" install -e "${WORKSPACE}/libs/lerobot" --quiet
"${PIP_BIN}" install "numpy<2" "opencv-python-headless<4.12" --quiet
来源: scripts/build.sh:1-229, scripts/setup.sh, README.md:75-118
总结:关键架构模式
1. 单一数据源
所有系统行为由 robot_config YAML 文件驱动。Contract 抽象确保录制、训练和推理使用相同的数据处理流水线。
文件: src/robot_config/config/robots/so101_single_arm.yaml, src/robot_config/robot_config/loader.py
2. 契约驱动设计
Contract 数据类定义观测、动作及其 ROS 到张量的映射。消费者使用共享工具(decode_value、resample、feature_from_spec)确保一致性。
文件: src/robot_config/robot_config/contract_utils.py, src/tensormsg/tensormsg/converter.py
3. 模块化启动构建器
启动文件生成被拆分为特定领域的构建器(控制、感知、执行等),可根据控制模式和配置进行组合。
4. 双执行模式
推理服务支持单体(零拷贝、单机)和分布式(边缘-云端)模式,使用相同的 Action Server 接口。
文件: src/inference_service/inference_service/lerobot_policy_node.py, src/inference_service/inference_service/pure_inference_node.py
5. 时间平滑
跨帧动作块平滑消除了在前一个块执行期间新推理完成时的运动不连续性。
文件: src/action_dispatch/action_dispatch/temporal_smoother.py
6. 硬件抽象
ros2_control 框架为真实硬件和仿真提供统一接口,实现无缝模式切换。
文件: src/so101_hardware/, src/robot_config/robot_config/launch_builders/control.py