动作调度器节点
相关源文件
以下文件用于生成此文档页面:
目的与范围
ActionDispatcherNode 是 IB-Robot 动作调度流水线中的核心执行协调器。它实现了基于拉取的架构,维护一个动作队列,当队列不足时触发推理请求,并以固定频率(默认 100Hz)向 ros2_control 发布动作。本文档介绍该节点的控制循环、队列管理和基于水位线的推理触发机制。
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:1-319, src/action_dispatch/README.en.md:1-447
概述
ActionDispatcherNode 充当机器人的”小脑”,将推理延迟与控制频率解耦。它解决了 AI 策略推理(通常 50-200ms)远慢于所需控制速率(100Hz = 10ms)的根本问题。
关键职责
职责 |
实现 |
|---|---|
队列管理 |
维护 |
异步推理 |
当队列低于水位线时,通过 |
高频控制 |
通过 |
时间对齐 |
跟踪推理期间执行的动作数量, 以正确对齐分块 |
安全回退 |
当队列为空时保持最后一个动作 |
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:38-48, src/action_dispatch/README.en.md:133-148
系统架构
执行流水线中的位置
来源:src/action_dispatch/README.en.md:13-42, src/action_dispatch/action_dispatch/action_dispatcher_node.py:165-201
通信接口
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:125-154, src/action_dispatch/README.en.md:344-378
控制循环实现
基于定时器的执行
ActionDispatcherNode 的核心是 _control_loop() 方法,以固定频率(默认 100Hz)执行:
关键实现细节:
方面 |
实现 |
|---|---|
定时器创建 |
|
队列长度 计算 |
|
动作获取 |
Smoother: |
保持行为 |
当队列为空时使用 |
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:142-146, src/action_dispatch/action_dispatch/action_dispatcher_node.py:165-201
队列管理
双模式架构
该节点支持两种队列管理模式:
1. 简单队列模式(temporal_smoothing_enabled=false)- 使用 collections.deque,设置 maxlen=queue_size - 推理完成时直接替换动作 - 适用于单步策略或调试
2. 时间平滑模式(temporal_smoothing_enabled=true)- 使用 TemporalSmootherManager 包装器 - 跨帧指数加权混合 - 动作分块模型(ACT、Diffusion Policy)必需
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:89-103, src/action_dispatch/action_dispatch/action_dispatcher_node.py:165-178
动作执行跟踪
为了正确对齐新的推理结果与现有队列,节点跟踪推理期间执行了多少动作:
# At inference request time (line 209)
self._plan_length_at_inference_start = self._get_plan_length()
# At inference result time (line 259-260)
current_plan_length = self._get_plan_length()
actions_executed = max(0, self._plan_length_at_inference_start - current_plan_length)
这个 actions_executed 值对于时间平滑对齐至关重要。
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:208-216, src/action_dispatch/action_dispatch/action_dispatcher_node.py:232-278
基于水位线的推理触发
触发逻辑
节点使用水位线阈值异步触发推理:
参数配置:
参数 |
默认值 |
用途 |
|---|---|---|
|
20 |
当队列低于此数值时 触发推理 |
|
100 |
最大队列容量 |
|
100.0 |
Hz - 控制循环速率 |
触发策略:- 提前触发:当 watermark=20 时,在队列消耗 80% 时开始推理 - 重叠:新推理在旧动作仍在执行时完成 - 连续运行:确保动作流无间隙
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:54-68, src/action_dispatch/action_dispatch/action_dispatcher_node.py:203-220
推理请求流程
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:203-230, src/action_dispatch/README.en.md:84-130
结果处理与队列更新
推理结果回调
当推理完成时,_result_cb() 处理结果并更新队列:
关键代码段:
# Decode inference result (lines 241-256)
batch = TensorMsgConverter.from_variant(result.action_chunk)
action_chunk = batch['action']
# Handle both Tensor and NumPy (lines 246-256)
if hasattr(action_chunk, 'detach'):
action_chunk_tensor = action_chunk
action_chunk_np = action_chunk.detach().cpu().numpy()
else:
action_chunk_tensor = torch.from_numpy(action_chunk)
action_chunk_np = action_chunk
# Calculate temporal alignment (lines 259-260)
current_plan_length = self._get_plan_length()
actions_executed = max(0, self._plan_length_at_inference_start - current_plan_length)
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:232-278
简单队列模式更新
对于简单队列模式,更新逻辑跳过已执行的动作:
# Skip actions that were executed during inference (line 272)
relevant_actions = action_chunk_np[actions_executed:]
# Replace entire queue with relevant actions (lines 273-274)
self._queue.clear()
self._queue.extend(relevant_actions)
这确保队列只包含未来的动作,而非过时的动作。
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:270-278
状态管理
状态变量
节点维护多个状态变量以协调推理和执行:
状态转换:
事件 |
状态变化 |
|---|---|
推理请求 |
|
推理完成 |
|
动作执行 |
队列大小减 1 |
切换平滑 |
|
重置服务 |
所有队列清除标志重置为初始状态 |
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:80-103
服务接口
重置服务
~/reset 服务将调度器重置为初始状态:
def _reset_cb(self, request, response):
self.get_logger().info("Resetting dispatcher state")
self._queue.clear()
if self._smoother is not None:
self._smoother.reset()
self._inference_in_progress = False
self._plan_length_at_inference_start = 0
self._last_action = None
return response
用例:- 紧急停止并重启 - 在控制模式之间切换 - 清除损坏的队列状态
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:280-288
切换平滑服务
~/toggle_smoothing 服务支持运行时在平滑模式和直接模式之间切换:
def _toggle_smoothing_cb(self, request, response):
if self._smoother is None:
self.get_logger().warn("Cannot toggle smoothing: smoother not initialized")
return response
self._smoothing_enabled = not self._smoothing_enabled
self._smoother._config.enabled = self._smoothing_enabled
self._smoother._smoother.config.enabled = self._smoothing_enabled
self.get_logger().info(f"Temporal smoothing {'ENABLED' if self._smoothing_enabled else 'DISABLED'}")
return response
注意:只有在初始化了 TemporalSmootherManager 的情况下(即启动时 temporal_smoothing_enabled=true)才能切换平滑。
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:290-301
参数参考
完整参数表
参数 |
类型 |
默认值 |
描述 |
|---|---|---|---|
|
int |
100 |
最大动作队列 长度 |
|
int |
20 |
当队列低于此数值时 触发推理 |
|
double |
100.0 |
控制循环 频率(Hz) |
|
string |
|
推理 Action Server 的全名 |
|
string |
|
robot_config YAML 路径 (TopicExecutor 需要) |
|
string |
|
关节状态反馈 话题(可选) |
|
bool |
false |
启用跨帧 时间平滑 |
|
double |
0.01 |
指数平滑系数 (参见 时间平滑) |
|
int |
100 |
推理预期的 动作分块大小 |
|
string |
|
平滑计算设备 (空=自动检测) |
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:54-66, src/action_dispatch/README.en.md:173-185
初始化序列
节点启动流程
关键依赖:TopicExecutor 需要契约中的 action_specs 来将动作维度映射到控制器话题。如果未提供 robot_config_path,执行器将使用默认值,可能无法正常工作。
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:49-159
与其他组件的集成
契约驱动执行
节点依赖 robot_config 契约来正确映射动作:
# Load contract (lines 106-113)
from robot_config.loader import load_robot_config
self._contract = load_robot_config(robot_config_path).to_contract()
self._action_specs = [s for s in iter_specs(self._contract) if s.is_action]
# Pass to executor (line 120)
self._executor = TopicExecutor(self, {'action_specs': self._action_specs})
这确保当执行器接收到多维动作数组时,它知道如何将其拆分为手臂关节和夹爪指令。
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:106-122
启动配置示例
从 robot.launch.py 中,调度器使用正确的参数启动:
# Parameter binding from control mode config
parameters = [{
'queue_size': 100,
'watermark_threshold': 20,
'control_frequency': 100.0,
'robot_config_path': robot_config_path,
'inference_action_server': f'/lerobot_policy_node/DispatchInfer',
'temporal_smoothing_enabled': executor_config.get('temporal_smoothing', {}).get('enabled', False),
'temporal_ensemble_coeff': executor_config.get('temporal_smoothing', {}).get('coeff', 0.01),
'chunk_size': model_config.get('chunk_size', 100),
}]
来源: src/robot_config/robot_config/launch_builders/execution.py:1-436
主入口点
节点通过标准 ROS 2 Python 可执行模式启动:
def main(args=None):
rclpy.init(args=args)
node = ActionDispatcherNode()
executor = rclpy.executors.MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
MultiThreadedExecutor 是处理并发回调(定时器、动作回调、服务调用)而不阻塞所必需的。
来源: src/action_dispatch/action_dispatch/action_dispatcher_node.py:304-318