动作调度器节点

相关源文件

以下文件用于生成此文档页面:

目的与范围

ActionDispatcherNode 是 IB-Robot 动作调度流水线中的核心执行协调器。它实现了基于拉取的架构,维护一个动作队列,当队列不足时触发推理请求,并以固定频率(默认 100Hz)向 ros2_control 发布动作。本文档介绍该节点的控制循环、队列管理和基于水位线的推理触发机制。

相关页面:- 有关时间平滑算法和跨帧混合,请参阅 时间平滑 - 有关通过 TopicExecutor 和 ActionExecutor 执行动作,请参阅 话题和动作执行器
- 有关提供动作的推理服务,请参阅 策略节点

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:1-319, src/action_dispatch/README.en.md:1-447


概述

ActionDispatcherNode 充当机器人的”小脑”,将推理延迟与控制频率解耦。它解决了 AI 策略推理(通常 50-200ms)远慢于所需控制速率(100Hz = 10ms)的根本问题。

关键职责

职责

实现

队列管理

维护 collections.dequeTemporalSmoother 用于动作 缓冲

异步推理

当队列低于水位线时,通过 DispatchInfer Action 触发推理

高频控制

通过 TopicExecutor 以 100Hz 发布动作

时间对齐

跟踪推理期间执行的动作数量, 以正确对齐分块

安全回退

当队列为空时保持最后一个动作

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:38-48, src/action_dispatch/README.en.md:133-148


系统架构

执行流水线中的位置

graph TB subgraph "Inference Layer" PolicyNode["lerobot_policy_node<br/>(DispatchInfer Action Server)"] end subgraph "Action Dispatcher Node" ControlLoop["_control_loop()<br/>100Hz Timer"] QueueCheck{"Queue < Watermark?"} ActionQueue["Action Queue<br/>_queue / _smoother"] RequestInf["_request_inference()<br/>Send DispatchInfer Goal"] ResultCb["_result_cb()<br/>Handle Inference Result"] ControlLoop --> QueueCheck QueueCheck -->|Yes| RequestInf RequestInf --> PolicyNode PolicyNode -->|Result| ResultCb ResultCb --> ActionQueue QueueCheck -->|No| ActionQueue end subgraph "Execution Layer" Executor["TopicExecutor<br/>execute()"] ROS2Control["ros2_control<br/>Controllers"] end ActionQueue -->|Pop Action| Executor Executor --> ROS2Control style ActionQueue fill:#f9f,stroke:#333,stroke-width:2px style ControlLoop fill:#bbf,stroke:#333,stroke-width:2px

来源src/action_dispatch/README.en.md:13-42, src/action_dispatch/action_dispatch/action_dispatcher_node.py:165-201

通信接口

graph LR subgraph "Inputs" JointState["/joint_states<br/>(sensor_msgs/JointState)"] InfResult["DispatchInfer Result<br/>(ibrobot_msgs/VariantsList)"] end subgraph "ActionDispatcherNode" InfClient["_infer_client<br/>(ActionClient)"] JointSub["_joint_sub<br/>(Subscription)"] Executor["_executor<br/>(TopicExecutor)"] end subgraph "Outputs" QueueSizePub["~/queue_size<br/>(std_msgs/Int32)"] SmoothingPub["~/smoothing_enabled<br/>(std_msgs/Bool)"] ControlTopics["Controller Topics<br/>(Float64MultiArray)"] end subgraph "Services" ResetSrv["~/reset<br/>(std_srvs/Empty)"] ToggleSrv["~/toggle_smoothing<br/>(std_srvs/Empty)"] end JointState --> JointSub InfClient -->|Goal| InfResult InfClient -->|Result| InfClient Executor --> ControlTopics ActionDispatcherNode --> QueueSizePub ActionDispatcherNode --> SmoothingPub ResetSrv -.-> ActionDispatcherNode ToggleSrv -.-> ActionDispatcherNode

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:125-154, src/action_dispatch/README.en.md:344-378


控制循环实现

基于定时器的执行

ActionDispatcherNode 的核心是 _control_loop() 方法,以固定频率(默认 100Hz)执行:

graph TD Start["_control_loop() Triggered<br/>(100Hz Timer)"] GetQueueSize["q_size = _get_plan_length()"] PubStatus["Publish queue_size and<br/>smoothing_enabled"] CheckWatermark{"q_size < _watermark<br/>AND NOT<br/>_inference_in_progress?"} TriggerInf["_request_inference()"] CheckQueue{"q_size > 0?"} PopAction["Pop action from queue/smoother"] UseLastAction["Use _last_action (hold)"] Execute["_executor.execute(action)"] End["End Timer Cycle"] Start --> GetQueueSize GetQueueSize --> PubStatus PubStatus --> CheckWatermark CheckWatermark -->|Yes| TriggerInf CheckWatermark -->|No| CheckQueue TriggerInf --> CheckQueue CheckQueue -->|Yes| PopAction CheckQueue -->|No| UseLastAction PopAction --> Execute UseLastAction --> Execute Execute --> End

关键实现细节

方面

实现

定时器创建

self.create_timer(1.0 / s elf._control_hz, self._control_loop, ...)

队列长度 计算

_get_plan_length() - 同时适用于 deque 和 smoother 模式

动作获取

Smoother: _smoother.get_next_action(), Simple: _queue.popleft()

保持行为

当队列为空时使用 _last_action 以保持稳定性

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:142-146, src/action_dispatch/action_dispatch/action_dispatcher_node.py:165-201


队列管理

双模式架构

该节点支持两种队列管理模式:

1. 简单队列模式temporal_smoothing_enabled=false)- 使用 collections.deque,设置 maxlen=queue_size - 推理完成时直接替换动作 - 适用于单步策略或调试

2. 时间平滑模式temporal_smoothing_enabled=true)- 使用 TemporalSmootherManager 包装器 - 跨帧指数加权混合 - 动作分块模型(ACT、Diffusion Policy)必需

graph TB subgraph "Mode Selection (Initialization)" ParamCheck{"temporal_smoothing_enabled?"} CreateDeque["self._queue = deque(maxlen=queue_size)<br/>self._smoother = None"] CreateSmoother["self._smoother = TemporalSmootherManager(...)<br/>Uses _queue internally"] end subgraph "Queue Operations" GetLength["_get_plan_length()"] PopAction["Pop Next Action"] UpdateQueue["Update with New Actions"] end ParamCheck -->|False| CreateDeque ParamCheck -->|True| CreateSmoother CreateDeque --> GetLength CreateSmoother --> GetLength GetLength -->|Simple| SimpleLen["return len(self._queue)"] GetLength -->|Smoother| SmootherLen["return self._smoother.plan_length"] PopAction -->|Simple| SimplePop["self._queue.popleft()"] PopAction -->|Smoother| SmootherPop["self._smoother.get_next_action()"] UpdateQueue -->|Simple| SimpleClear["self._queue.clear()<br/>self._queue.extend(actions)"] UpdateQueue -->|Smoother| SmootherUpdate["self._smoother.update(actions,<br/>actions_executed)"]

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:89-103, src/action_dispatch/action_dispatch/action_dispatcher_node.py:165-178

动作执行跟踪

为了正确对齐新的推理结果与现有队列,节点跟踪推理期间执行了多少动作:

# At inference request time (line 209)
self._plan_length_at_inference_start = self._get_plan_length()

# At inference result time (line 259-260)
current_plan_length = self._get_plan_length()
actions_executed = max(0, self._plan_length_at_inference_start - current_plan_length)

这个 actions_executed 值对于时间平滑对齐至关重要。

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:208-216, src/action_dispatch/action_dispatch/action_dispatcher_node.py:232-278


基于水位线的推理触发

触发逻辑

节点使用水位线阈值异步触发推理:

graph TD ControlLoop["Control Loop (100Hz)"] CheckConditions{"queue_size < watermark<br/>AND<br/>NOT inference_in_progress?"} SendGoal["Create DispatchInfer.Goal<br/>goal.obs_timestamp = now()"] SendAsync["_infer_client.send_goal_async(goal)"] SetFlag["_inference_in_progress = True"] RecordQueueLen["_plan_length_at_inference_start = queue_size"] WaitCallback["Wait for Goal Response Callback"] ControlLoop --> CheckConditions CheckConditions -->|Yes| SendGoal CheckConditions -->|No| ControlLoop SendGoal --> RecordQueueLen RecordQueueLen --> SetFlag SetFlag --> SendAsync SendAsync --> WaitCallback WaitCallback -.->|Async| GoalResponseCb["_goal_response_cb()"]

参数配置

参数

默认值

用途

watermark_threshold

20

当队列低于此数值时 触发推理

queue_size

100

最大队列容量

control_frequency

100.0

Hz - 控制循环速率

触发策略:- 提前触发:当 watermark=20 时,在队列消耗 80% 时开始推理 - 重叠:新推理在旧动作仍在执行时完成 - 连续运行:确保动作流无间隙

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:54-68, src/action_dispatch/action_dispatch/action_dispatcher_node.py:203-220

推理请求流程

sequenceDiagram participant CL as _control_loop participant RIF as _request_inference participant AC as _infer_client participant PN as PolicyNode participant GRC as _goal_response_cb participant RC as _result_cb participant Q as Queue/Smoother CL->>CL: Check watermark CL->>RIF: Trigger inference RIF->>RIF: Set _inference_in_progress = True RIF->>RIF: Record _plan_length_at_inference_start RIF->>AC: send_goal_async(goal) AC->>PN: DispatchInfer Goal Note over PN: Policy inference<br/>(50-200ms) PN->>AC: Goal Accepted AC->>GRC: goal_handle GRC->>GRC: get_result_async() PN->>AC: Result (action_chunk) AC->>RC: Result future RC->>RC: Decode VariantsList RC->>RC: Calculate actions_executed RC->>Q: Update queue/smoother RC->>RC: Set _inference_in_progress = False

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:203-230, src/action_dispatch/README.en.md:84-130


结果处理与队列更新

推理结果回调

当推理完成时,_result_cb() 处理结果并更新队列:

graph TD ResultCb["_result_cb(future)"] ClearFlag["_inference_in_progress = False"] CheckSuccess{"result.success?"} DecodeBatch["batch = TensorMsgConverter.from_variant(<br/>result.action_chunk)"] GetAction["action_chunk = batch['action']"] ConvertTensor["Convert to Tensor if needed"] ReshapeCheck{"action_chunk.ndim == 1?"} Reshape2D["Reshape to (1, action_dim)"] CalcExecuted["current_len = _get_plan_length()<br/>actions_executed = start_len - current_len"] SmootherCheck{"_smoother exists?"} SmootherUpdate["_smoother.update(<br/>action_chunk_tensor,<br/>actions_executed)"] SimpleUpdate["Skip actions_executed actions<br/>_queue.clear()<br/>_queue.extend(remaining)"] LogError["Log error message"] End["End"] ResultCb --> ClearFlag ClearFlag --> CheckSuccess CheckSuccess -->|No| LogError CheckSuccess -->|Yes| DecodeBatch DecodeBatch --> GetAction GetAction --> ConvertTensor ConvertTensor --> ReshapeCheck ReshapeCheck -->|Yes| Reshape2D ReshapeCheck -->|No| CalcExecuted Reshape2D --> CalcExecuted CalcExecuted --> SmootherCheck SmootherCheck -->|Yes| SmootherUpdate SmootherCheck -->|No| SimpleUpdate SmootherUpdate --> End SimpleUpdate --> End LogError --> End

关键代码段

# Decode inference result (lines 241-256)
batch = TensorMsgConverter.from_variant(result.action_chunk)
action_chunk = batch['action']

# Handle both Tensor and NumPy (lines 246-256)
if hasattr(action_chunk, 'detach'):
    action_chunk_tensor = action_chunk
    action_chunk_np = action_chunk.detach().cpu().numpy()
else:
    action_chunk_tensor = torch.from_numpy(action_chunk)
    action_chunk_np = action_chunk

# Calculate temporal alignment (lines 259-260)
current_plan_length = self._get_plan_length()
actions_executed = max(0, self._plan_length_at_inference_start - current_plan_length)

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:232-278

简单队列模式更新

对于简单队列模式,更新逻辑跳过已执行的动作:

# Skip actions that were executed during inference (line 272)
relevant_actions = action_chunk_np[actions_executed:]

# Replace entire queue with relevant actions (lines 273-274)
self._queue.clear()
self._queue.extend(relevant_actions)

这确保队列只包含未来的动作,而非过时的动作。

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:270-278


状态管理

状态变量

节点维护多个状态变量以协调推理和执行:

graph TB subgraph "Queue State" Queue["_queue: deque<br/>or<br/>_smoother: TemporalSmootherManager"] LastAction["_last_action: Optional[np.ndarray]<br/>(for hold behavior)"] end subgraph "Inference State" InfInProgress["_inference_in_progress: bool<br/>(prevents concurrent requests)"] PlanLenStart["_plan_length_at_inference_start: int<br/>(for temporal alignment)"] end subgraph "Runtime State" IsRunning["_is_running: bool<br/>(emergency stop flag)"] SmoothingEnabled["_smoothing_enabled: bool<br/>(toggleable at runtime)"] end subgraph "Configuration State" ActionSpecs["_action_specs: List[SpecView]<br/>(from contract)"] Executor["_executor: TopicExecutor<br/>(action publisher)"] end

状态转换

事件

状态变化

推理请求

_inference_in_progress = True_ plan_length_at_inference_start = queue_size

推理完成

_inference_in_progress = False队列 更新为新动作

动作执行

队列大小减 1_last_action 更新

切换平滑

_smoothing_enabled 翻转Smoother 配置 更新

重置服务

所有队列清除标志重置为初始状态

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:80-103


服务接口

重置服务

~/reset 服务将调度器重置为初始状态:

def _reset_cb(self, request, response):
    self.get_logger().info("Resetting dispatcher state")
    self._queue.clear()
    if self._smoother is not None:
        self._smoother.reset()
    self._inference_in_progress = False
    self._plan_length_at_inference_start = 0
    self._last_action = None
    return response

用例:- 紧急停止并重启 - 在控制模式之间切换 - 清除损坏的队列状态

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:280-288

切换平滑服务

~/toggle_smoothing 服务支持运行时在平滑模式和直接模式之间切换:

def _toggle_smoothing_cb(self, request, response):
    if self._smoother is None:
        self.get_logger().warn("Cannot toggle smoothing: smoother not initialized")
        return response

    self._smoothing_enabled = not self._smoothing_enabled
    self._smoother._config.enabled = self._smoothing_enabled
    self._smoother._smoother.config.enabled = self._smoothing_enabled

    self.get_logger().info(f"Temporal smoothing {'ENABLED' if self._smoothing_enabled else 'DISABLED'}")
    return response

注意:只有在初始化了 TemporalSmootherManager 的情况下(即启动时 temporal_smoothing_enabled=true)才能切换平滑。

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:290-301


参数参考

完整参数表

参数

类型

默认值

描述

queue_size

int

100

最大动作队列 长度

wate rmark_threshold

int

20

当队列低于此数值时 触发推理

co ntrol_frequency

double

100.0

控制循环 频率(Hz)

inferenc e_action_server

string

/act_infe rence_node/Di spatchInfer

推理 Action Server 的全名

ro bot_config_path

string

''

robot_config YAML 路径 (TopicExecutor 需要)

jo int_state_topic

string

/j oint_states

关节状态反馈 话题(可选)

temporal_sm oothing_enabled

bool

false

启用跨帧 时间平滑

temporal _ensemble_coeff

double

0.01

指数平滑系数 (参见 时间平滑

chunk_size

int

100

推理预期的 动作分块大小

s moothing_device

string

''

平滑计算设备 (空=自动检测)

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:54-66, src/action_dispatch/README.en.md:173-185


初始化序列

节点启动流程

graph TD Init["__init__()"] DeclareParams["Declare all parameters"] ReadParams["Read parameter values"] CheckSmoothing{"temporal_smoothing_enabled?"} InitSimpleQueue["_queue = deque(maxlen=queue_size)<br/>_smoother = None"] InitSmoother["_smoother = TemporalSmootherManager(...)<br/>with config"] LoadContract["Load robot_config_path<br/>Extract action_specs"] CreateExecutor["_executor = TopicExecutor(self, {'action_specs': _action_specs})"] InitExecutor["_executor.initialize()"] CreateClient["_infer_client = ActionClient(DispatchInfer, server_name)"] CreateSubs["Create joint_state subscription"] CreatePubs["Create ~/queue_size and ~/smoothing_enabled publishers"] CreateServices["Create ~/reset and ~/toggle_smoothing services"] CreateTimer["Create control_loop timer at control_frequency"] LogReady["Log 'Dispatcher ready' message"] Init --> DeclareParams DeclareParams --> ReadParams ReadParams --> CheckSmoothing CheckSmoothing -->|False| InitSimpleQueue CheckSmoothing -->|True| InitSmoother InitSimpleQueue --> LoadContract InitSmoother --> LoadContract LoadContract --> CreateExecutor CreateExecutor --> InitExecutor InitExecutor --> CreateClient CreateClient --> CreateSubs CreateSubs --> CreatePubs CreatePubs --> CreateServices CreateServices --> CreateTimer CreateTimer --> LogReady

关键依赖TopicExecutor 需要契约中的 action_specs 来将动作维度映射到控制器话题。如果未提供 robot_config_path,执行器将使用默认值,可能无法正常工作。

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:49-159


与其他组件的集成

契约驱动执行

节点依赖 robot_config 契约来正确映射动作:

# Load contract (lines 106-113)
from robot_config.loader import load_robot_config
self._contract = load_robot_config(robot_config_path).to_contract()
self._action_specs = [s for s in iter_specs(self._contract) if s.is_action]

# Pass to executor (line 120)
self._executor = TopicExecutor(self, {'action_specs': self._action_specs})

这确保当执行器接收到多维动作数组时,它知道如何将其拆分为手臂关节和夹爪指令。

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:106-122

启动配置示例

robot.launch.py 中,调度器使用正确的参数启动:

# Parameter binding from control mode config
parameters = [{
    'queue_size': 100,
    'watermark_threshold': 20,
    'control_frequency': 100.0,
    'robot_config_path': robot_config_path,
    'inference_action_server': f'/lerobot_policy_node/DispatchInfer',
    'temporal_smoothing_enabled': executor_config.get('temporal_smoothing', {}).get('enabled', False),
    'temporal_ensemble_coeff': executor_config.get('temporal_smoothing', {}).get('coeff', 0.01),
    'chunk_size': model_config.get('chunk_size', 100),
}]

来源src/robot_config/robot_config/launch_builders/execution.py:1-436


主入口点

节点通过标准 ROS 2 Python 可执行模式启动:

def main(args=None):
    rclpy.init(args=args)
    node = ActionDispatcherNode()
    executor = rclpy.executors.MultiThreadedExecutor()
    executor.add_node(node)
    try:
        executor.spin()
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

MultiThreadedExecutor 是处理并发回调(定时器、动作回调、服务调用)而不阻塞所必需的。

来源src/action_dispatch/action_dispatch/action_dispatcher_node.py:304-318