时间平滑
相关源文件
以下文件用于生成此 wiki 页面的上下文:
本文档深入介绍 TemporalSmoother 算法,该算法对 Action Chunking 策略(如 ACT、Diffusion Policy)产生的动作块执行跨帧指数加权混合。时间平滑确保连续推理输出之间的平滑过渡,防止动作计划在执行中途更新时出现突然的运动不连续。
有关整体动作分发系统和队列管理的信息,请参阅 动作分发器节点。有关平滑后的动作如何通过话题或动作服务器执行的详细信息,请参阅 话题与动作执行器。
来源:src/action_dispatch/README.en.md:1-447, src/action_dispatch/action_dispatch/temporal_smoother.py:1-322
问题陈述:动作块重叠
Action Chunking 策略每次推理输出 n 个动作序列(通常 n=100)。然而,推理并非瞬间完成——当模型计算下一个动作块时,机器人继续执行前一个块中的动作。这产生了时间重叠问题:
T1: First inference produces [a1, a2, a3, ..., a100]
Robot begins executing: a1, a2, a3...
T2: Inference starts (queue watermark triggered)
Robot continues: a4, a5, a6...
T3: Inference completes after robot executed 30 actions
New chunk: [b1, b2, b3, ..., b100]
Remaining old actions: [a31, a32, ..., a100]
Problem: How to transition from old plan to new plan?
如果不进行平滑,有两个糟糕的选择:1. 丢弃新块:继续执行过时的预测 → 模型预测变得无关 2. 立即替换:从 a30 跳到 b31 → 突然的运动不连续
时间平滑通过 混合重叠区域 使用指数加权来解决这个问题,既保持连续性又保持对新预测的响应性。
来源:src/action_dispatch/README.en.md:212-221, src/action_dispatch/README.md:211-221
算法概述
时间平滑算法分三个阶段运行:
阶段 1:时间对齐图
阶段 2:指数加权混合
来源:src/action_dispatch/README.en.md:224-257, src/action_dispatch/action_dispatch/temporal_smoother.py:44-77
核心组件
时间平滑实现由 src/action_dispatch/action_dispatch/temporal_smoother.py 中的三个主要类组成:
类层次图
来源: src/action_dispatch/action_dispatch/temporal_smoother.py:19-42, src/action_dispatch/action_dispatch/temporal_smoother.py:44-257, src/action_dispatch/action_dispatch/temporal_smoother.py:259-322
TemporalSmootherConfig
定义在 src/action_dispatch/action_dispatch/temporal_smoother.py:19-42 的配置数据类。
字段 |
类型 |
默认值 |
描述 |
|---|---|---|---|
|
|
|
启用平滑。如果 |
|
|
|
每块最大动作数。 用于权重预计算。 |
|
|
|
指数衰减系数。参见 平滑系数效果。 |
|
|
|
张量操作设备 ( |
来源: src/action_dispatch/action_dispatch/temporal_smoother.py:19-42
TemporalSmoother
核心平滑实现位于 src/action_dispatch/action_dispatch/temporal_smoother.py:44-257。 维护内部状态:
_smoothed_actions:当前平滑后的动作计划(形状:[plan_length, action_dim])_action_counts:每个动作被混合的次数(形状:[plan_length, 1])_weights:预计算的指数权重(形状:[chunk_size])_weights_cumsum:权重的累加和(形状:[chunk_size])
关键方法:
方法 |
签名 |
用途 |
|---|---|---|
|
|
核心平滑逻辑。 对齐并混合新块。 |
|
|
弹出并返回计划中的 下一个动作。 |
|
|
返回下一个动作 但不移除它。 |
|
|
清除内部状态以 开始新回合。 |
|
|
返回当前计划中的 动作数量。 |
来源: src/action_dispatch/action_dispatch/temporal_smoother.py:44-257
TemporalSmootherManager
便捷包装器位于
src/action_dispatch/action_dispatch/temporal_smoother.py:259-322,
提供运行时切换和统一接口。将所有操作委托给内部 TemporalSmoother 实例。
额外方法:- set_enabled(enabled: bool):运行时开启/关闭平滑
来源: src/action_dispatch/action_dispatch/temporal_smoother.py:259-322
平滑公式
指数加权混合公式在 src/action_dispatch/action_dispatch/temporal_smoother.py:208-246 中实现:
# For each action i in the overlap region:
blended[i] = (old[i] * cumsum[count[i]-1] + new[i] * weights[count[i]])
/ cumsum[count[i]]
其中:- old[i]:前一个平滑计划中的第 i 个动作 - new[i]:新推理结果中的第 i 个动作(对齐后)- count[i]:动作 i 被混合的次数(从 1 开始)- weights[k] = exp(-temporal_ensemble_coeff * k) - cumsum[k] = 索引 k 之前的权重累加和
权重计算
权重在初始化时预计算,位于 src/action_dispatch/action_dispatch/temporal_smoother.py:86-92:
coeff = self.config.temporal_ensemble_coeff
chunk_size = self.config.chunk_size
self._weights = torch.exp(-coeff * torch.arange(chunk_size, dtype=torch.float32))
self._weights_cumsum = torch.cumsum(self._weights, dim=0)
权重进展示例(对于 coeff=0.01,前 5 步):
k |
weight[k] |
cumsum[k] |
解释 |
|---|---|---|---|
0 |
1.000 |
1.000 |
第一次贡献(新动作) |
1 |
0.990 |
1.990 |
第二次混合(旧权重累积) |
2 |
0.980 |
2.970 |
第三次混合 |
3 |
0.970 |
3.940 |
第四次混合 |
4 |
0.961 |
4.901 |
第五次混合 |
随着 count[i] 增加,分母增长,给予累积旧值更多影响。这通过优先考虑已提交的动作来创造 稳定性。
来源: src/action_dispatch/action_dispatch/temporal_smoother.py:86-92, src/action_dispatch/action_dispatch/temporal_smoother.py:208-246, src/action_dispatch/README.en.md:311-322
时间对齐过程
update() 方法位于
src/action_dispatch/action_dispatch/temporal_smoother.py:145-206
处理时间对齐:
对齐流程图
来源: src/action_dispatch/action_dispatch/temporal_smoother.py:145-206
混合实现
_apply_smoothing() 方法位于
src/action_dispatch/action_dispatch/temporal_smoother.py:208-246
执行实际混合:
来源: src/action_dispatch/action_dispatch/temporal_smoother.py:208-246
平滑系数效果
temporal_ensemble_coeff 参数控制指数衰减率。其值决定了稳定性(遵循已提交动作)和响应性(适应新预测)之间的平衡。
系数值 |
权重行为 |
效果 |
用例 |
|---|---|---|---|
|
均匀(所有 k 的 |
新旧等权 |
最大响应性,可能导致抖动 |
|
慢衰减( |
略微偏好旧动作 |
平衡稳定性和响应性(ACT 论文默认值) |
|
快衰减( |
强烈偏好旧动作 |
保守、稳定、较少反应性 |
|
指数增长 |
偏好 新 动作 |
高度反应性,可能导致不稳定 |
权重衰减可视化
对于不同系数值,k=10 时的权重:
系数 |
|
累加和 |
解释 |
|---|---|---|---|
|
1.000 |
11.000 |
新动作获得 1/11 = 9.1% 权重 |
|
0.905 |
9.517 |
新动作获得 9.5% 权重 |
|
0.607 |
7.869 |
新动作获得 7.7% 权重 |
|
0.368 |
6.321 |
新动作获得 5.8% 权重 |
来源:src/action_dispatch/README.en.md:323-331, src/action_dispatch/action_dispatch/temporal_smoother.py:26-31
使用模式
使用 TemporalSmoother 的基本用法
来自 src/action_dispatch/test/test_temporal_smoother.py:51-63 的示例:
from action_dispatch import TemporalSmoother, TemporalSmootherConfig
# Configure
config = TemporalSmootherConfig(
enabled=True,
chunk_size=100,
temporal_ensemble_coeff=0.01
)
smoother = TemporalSmoother(config)
# First inference: 100 actions for 7-DOF robot
actions1 = np.random.randn(100, 7)
smoother.update(actions1, actions_executed=0)
assert smoother.plan_length == 100
# Execute actions
for _ in range(30):
action = smoother.get_next_action() # shape: (7,)
# Send action to robot...
# Second inference (30 actions executed during inference)
actions2 = np.random.randn(100, 7)
smoother.update(actions2, actions_executed=30)
# Plan now contains 70 blended + 30 new = 100 actions
assert smoother.plan_length == 100
来源: src/action_dispatch/test/test_temporal_smoother.py:51-90, src/action_dispatch/README.en.md:383-413
使用 TemporalSmootherManager
管理器提供运行时切换能力,位于 src/action_dispatch/action_dispatch/temporal_smoother.py:259-322:
from action_dispatch import TemporalSmootherManager
manager = TemporalSmootherManager(
enabled=True,
chunk_size=100,
temporal_ensemble_coeff=0.01
)
# Check status
print(f"Smoothing enabled: {manager.is_enabled}")
print(f"Plan length: {manager.plan_length}")
# Runtime toggle
manager.set_enabled(False) # Disable smoothing
manager.set_enabled(True) # Re-enable smoothing
# Use same interface as TemporalSmoother
manager.update(actions, actions_executed=30)
action = manager.get_next_action()
来源:src/action_dispatch/README.en.md:415-426, src/action_dispatch/test/test_temporal_smoother.py:176-216
禁用平滑(仅对齐)
当平滑被禁用时,位于 src/action_dispatch/test/test_temporal_smoother.py:66-90, 平滑器仍执行时间对齐但替换计划而非混合:
config = TemporalSmootherConfig(enabled=False, chunk_size=10)
smoother = TemporalSmoother(config)
# First chunk
actions1 = np.ones((10, 7))
smoother.update(actions1, 0)
# Execute 3 actions
for _ in range(3):
smoother.get_next_action()
# Second chunk arrives after 3 more actions executed
actions2 = np.zeros((10, 7))
smoother.update(actions2, actions_executed=3)
# Plan is replaced with aligned actions2[3:]
# Result: 7 actions, all zeros (no blending with ones)
for _ in range(7):
action = smoother.get_next_action()
np.testing.assert_array_almost_equal(action, np.zeros(7))
来源: src/action_dispatch/test/test_temporal_smoother.py:66-90
与动作分发器的集成
TemporalSmoother 集成到 ActionDispatcherNode
(参见 动作分发器节点)中,如下所示:
动作分发器:1. 在推理开始时记录队列长度 2. 计算 actions_executed = queue_len_start - current_queue_len 3. 将 (new_chunk, actions_executed) 传递给平滑器 4. 将平滑后的动作入队以执行
启动文件中的配置参数
时间平滑参数可以通过 ROS 2 参数配置:
参数 |
类型 |
默认值 |
描述 |
|---|---|---|---|
|
|
|
启用/禁用平滑 |
|
|
|
指数衰减系数 |
|
|
|
每块最大动作数 |
`` smoothing_device`` |
|
|
计算设备 (空 = 自动检测) |
来自 src/action_dispatch/README.en.md:186-209 的启动配置示例:
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='action_dispatch',
executable='action_dispatcher_node',
name='action_dispatcher',
parameters=[{
'temporal_smoothing_enabled': True,
'temporal_ensemble_coeff': 0.01,
'chunk_size': 100,
'smoothing_device': 'cuda:0', # or 'cpu', 'npu:0'
}]
)
])
运行时服务
动作分发器提供用于运行时控制的 ROS 2 服务 (在 动作分发器节点 中记录):
服务 |
类型 |
效果 |
|---|---|---|
|
|
切换平滑 启用/禁用状态 |
|
|
重置平滑器状态 (清除计划) |
# Toggle smoothing on/off
ros2 service call /action_dispatcher/toggle_smoothing std_srvs/srv/Empty
# Reset smoother state
ros2 service call /action_dispatcher/reset std_srvs/srv/Empty
来源:src/action_dispatch/README.en.md:335-341, src/action_dispatch/README.md:326-334
测试
全面的测试位于 src/action_dispatch/test/test_temporal_smoother.py:1-247。关键测试场景:
测试类 |
覆盖范围 |
|---|---|
|
配置验证、参数默认值 |
|
基本更新/获取、禁用平滑、 跨帧平滑、张量输入、重置 |
|
管理器接口、运行时切换、 peek 操作 |
|
权重计算、系数效果、累加和 |
验证跨帧平滑的示例测试位于 src/action_dispatch/test/test_temporal_smoother.py:91-118:
def test_cross_frame_smoothing(self):
config = TemporalSmootherConfig(enabled=True, chunk_size=10, temporal_ensemble_coeff=0.01)
smoother = TemporalSmoother(config)
# First inference: actions of value 1.0
actions1 = np.ones((10, 7)) * 1.0
smoother.update(actions1, 0)
for _ in range(3):
smoother.get_next_action()
# Second inference: actions of value 2.0
actions2 = np.ones((10, 7)) * 2.0
smoother.update(actions2, actions_executed=2)
# First action should be blended (not exactly 1.0 or 2.0)
first_action = smoother.peek_next_action()
assert not np.allclose(first_action.numpy(), np.ones(7) * 1.0)
assert not np.allclose(first_action.numpy(), np.ones(7) * 2.0)
运行测试:
cd src/action_dispatch
python -m pytest test/test_temporal_smoother.py -v
来源: src/action_dispatch/test/test_temporal_smoother.py:1-247