news 2026/6/14 2:08:03

RLinf复现RECAP(一):从轨迹回报到优势标签

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
RLinf复现RECAP(一):从轨迹回报到优势标签

一、RLinf使用Sidecar文件连接三个处理阶段

与RECAP数据处理相关的代码主要位于,

examples/recap/ ├── process/ │ ├── compute_returns.py │ ├── compute_advantages.py │ ├── run_compute_returns.sh │ ├── run_compute_advantages.sh │ └── config/ └── value/ ├── train_value.py ├── run_value_sft.sh └── config/libero_sft_value.yaml

完整数据流如下,

LeRobot轨迹 -> compute_returns.py -> returns_{tag}.parquet -> Value Model SFT -> Value Model checkpoint -> compute_advantages.py -> advantages_{tag}.parquet

RLinf不会把Return和Advantage直接写回原始轨迹文件,而是保存成独立的Sidecar Parquet。例如,

dataset/ ├── data/ │ └── chunk-000/ ├── meta/ │ ├── info.json │ ├── stats.json │ ├── returns_fail300.parquet │ └── advantages_fail300_N10_q30.parquet └── mixture_config.yaml

Return和Advantage依靠episode_index、frame_index字段与原始轨迹对齐。

注意,三个阶段必须使用相同的数据版本。重新过滤轨迹、删除帧或者重新编号后,旧的Sidecar文件通常不能继续使用。

SFT和Rollout数据采用不同的处理方式

数据配置示例如下,

data: train_data_paths: - dataset_path: /path/to/sft_dataset type: sft weight: 1.0 - dataset_path: /path/to/rollout_dataset type: rollout weight: 1.0

其中,

  • sft一般表示人工示范;
  • rollout表示策略自主运行产生的数据。

二者差别如下,

  • SFT:
    • 所有轨迹默认成功,最终所有Advantage标签强制设为True。
  • Rollout:
    • 根据is_success区分成功和失败,根据连续Advantage和阈值生成正负标签。

二、compute_returns.py将轨迹结果转换为逐帧回报

Return的计算入口是,

examples/recap/process/compute_returns.py

启动命令为,

bash examples/recap/process/run_compute_returns.sh compute_returns

核心配置如下,

data: train_data_paths: - dataset_path: /path/to/sft_dataset type: sft - dataset_path: /path/to/rollout_dataset type: rollout gamma: 1.0 failure_reward: -300.0 tag: fail300 num_workers: 128

compute_returns_for_episode负责计算单条轨迹

单条轨迹的Reward和Return由下面的函数计算,

def compute_returns_for_episode( episode_length, is_success, gamma, failure_reward, ): rewards = np.full( episode_length, -1.0, dtype=np.float32, ) rewards[-1] = ( 0.0 if is_success else failure_reward ) returns = np.zeros( episode_length, dtype=np.float32, ) returns[-1] = rewards[-1] for t in range( episode_length - 2, -1, -1, ): returns[t] = ( rewards[t] + gamma * returns[t + 1] ) return returns, rewards

奖励规则为,

任务仍在执行:Reward = -1 成功终止帧:Reward = 0 失败终止帧:Reward = failure_reward

其中,-1表示每执行一步产生一次时间成本;较大的失败惩罚则用于保证失败轨迹的回报明显低于成功轨迹。

Return从轨迹末尾向前累计

当gamma=1时,当前帧Return = 当前帧Reward + 下一帧Return。

假设一条成功轨迹包含三个普通步骤和一个成功终止帧,

帧 Reward Return 当前帧 -1 -3 下一帧 -1 -2 再下一帧 -1 -1 成功终止帧 0 0

计算过程为,

成功终止帧:Return = 0 再下一帧:Return = -1 + 0 = -1 下一帧:Return = -1 + -1 = -2 当前帧:Return = -1 + -2 = -3

因此,成功轨迹中的Return会逐渐接近0,

-3 → -2 → -1 → 0

_process_single_parquet负责处理数据文件

RLinf不会加载图像数据,只读取计算Return所需的元数据列,

_READ_COLUMNS = [ "episode_index", "frame_index", "is_success", "task_index", "task", ]

实际读取时,还会检查当前Parquet中是否存在这些字段,

pf = pq.ParquetFile(pq_file) available = set(pf.schema_arrow.names) cols_to_read = [ c for c in _READ_COLUMNS if c in available ] table = pq.read_table( pq_file, columns=cols_to_read, )

由于没有读取图像,Return计算主要是Parquet I/O和数组运算,内存占用相对较低。

代码根据episode_index的变化找到轨迹边界,

change_mask = ( np.diff(episode_indices) != 0 ) change_positions = ( np.where(change_mask)[0] + 1 )

每条轨迹分别调用,

compute_returns_for_episode(...)

处理SFT数据时,代码直接将轨迹设为成功,

if dataset_type == "sft": is_success = True

处理Rollout数据时,则读取轨迹最后一帧的is_success,

is_success = bool( is_success_col[ep_end - 1] )

多个Parquet文件通过线程并行处理

process_dataset会递归查找数据目录中的所有Parquet,

parquet_files = sorted( str(p) for p in data_dir.rglob("*.parquet") )

随后使用线程池并行处理,

with ThreadPoolExecutor( max_workers=effective_workers ) as pool: fut = pool.submit( _process_single_parquet, pq_file, dataset_type, gamma, failure_reward, tasks, )

PyArrow文件读取时能够释放GIL,因此这里使用线程也可以获得较好的并行效果。

所有结果最终合并为一个Arrow Table,

combined = pa.concat_tables( result_tables )

当配置为,

tag: fail300

输出文件为,

meta/returns_fail300.parquet

主要字段包括,

episode_index frame_index return reward prompt

脚本还会更新,

meta/stats.json meta/info.json

stats.json中会记录,

{ "return": { "mean": "...", "std": "...", "min": "...", "max": "..." }, "reward": { "mean": "...", "std": "...", "min": "...", "max": "..." } }

后面的Value Model和Advantage计算都需要使用这里的Return范围。

三、Value Model将逐帧Return转换为状态价值

价值模型训练入口是,

examples/recap/value/train_value.py

运行命令为,

bash examples/recap/value/run_value_sft.sh libero_sft_value

train_value.py本身没有实现完整训练循环,而是创建RLinf的SFT Worker和Runner,

actor_group = ( FSDPValueSftWorker .create_group(cfg) .launch( cluster, name=cfg.actor.group_name, placement_strategy=actor_placement, ) ) runner = SFTRunner( cfg=cfg, actor=actor_group, ) runner.init_workers() runner.run()

ValueDataset根据帧索引读取Return标签

Value Model的数据集实现位于,

rlinf/data/datasets/recap/value_model.py

每次读取样本时,代码先从原始LeRobot数据中获得episode_index、frame_index、图像、任务文本。然后使用episode_index和frame_index从Return Sidecar中读取标签,

raw = float( self._sidecar[ep]["return"][fr] )

如果找不到对应Episode,代码会提示Sidecar或Tag不匹配,

if ep not in self._sidecar: raise KeyError( "The sidecar/tag may not match " "the dataset" )

最终返回给模型的数据结构为,

result = { "images": images, "prompt": prompt, "target_values": target_value, "actions": None, }

Value Model不需要动作标签,因此,

actions = None

ReturnNormalizer将原始Return归一化到-1到0

配置默认开启,

data: normalize_to_minus_one_zero: true

对应实现为,

def normalize_value(self, value): denom = ( abs(self.return_min) if self.return_min != 0 else 1.0 ) return value / denom

假设全局最小Return为-300(return_max = 0),

原始 Return = -300 归一化 Value = -1.0 原始 Return = -150 归一化 Value = -0.5 原始 Return = 0 归一化 Value = 0

如果不同任务的最大轨迹长度差异很大,全局最小Return可能主要由最长任务或失败惩罚决定。这样会把短任务的价值压缩到靠近0的较小区间内,因此多任务训练时需要分别观察各任务的价值分布。

Value Head使用201个离散区间预测价值

默认配置为,

num_bins: 201 v_min: -1.0 v_max: 0.0

模型输出201个Logits,

logit_0, logit_1, ..., logit_200

每一个Logit对应-1到0之间的一个价值区间。推理时先计算概率,

probs = F.softmax( logits, dim=-1, )

再计算期望价值,

values = ( probs * self.value_head.atoms ).sum(dim=-1)

可以理解为,

预测价值 = 每个价值区间 × 该区间的预测概率

连续标签被分配到相邻的两个区间

训练时,连续目标值通常不会刚好落在某一个离散区间上。代码先计算目标值在离散区间中的位置,

b = ( target_values - self.v_min ) / self.delta_z

然后找到左右两个区间,

l = b.floor().long() u = b.ceil().long()

监督概率按照距离分配,

target_probs[batch_idx, l] += d_to_u target_probs[batch_idx, u] += d_to_l

假设目标值位于两个区间之间,并且更靠近左区间,

左区间监督概率:0.8 右区间监督概率:0.2

这种方式比直接选择最近区间更平滑,也保留了连续Return中的相对距离信息。最终损失为,

loss = -( target_probs * F.log_softmax(logits, dim=-1) ).sum(dim=-1)

训练日志中除了Loss,还会记录,

cat_acc_best cat_acc_neighbor mae value_spearman

其中,value_spearman用于衡量预测价值和真实Return的排序一致性。

四、compute_advantages.py将价值变化转换为正负标签

Advantage的计算入口为,

examples/recap/process/compute_advantages.py

运行命令为,

bash examples/recap/process/run_compute_advantages.sh compute_advantages

主要配置如下,

advantage: value_checkpoint: /path/to/value_checkpoint batch_size: 1024 flush_interval: 256 num_dataloader_workers_per_gpu: 12 prefetch_factor: 2 discount_next_value: true positive_quantile: 0.3 returns_tag: fail300 tag: fail300_N10_q30 data: advantage_lookahead_step: 10 gamma: 1.0

ValueInferenceDataset统一构造模型输入

不同机器人数据集使用的字段名称可能不同。例如LIBERO可能使用:

observation.image observation.wrist_image observation.state

Franka数据可能使用,

observation.images.front_cam observation.images.wrist_cam observation.state.tcp_pose

RLinf通过KEY_MAPPINGS将它们转换成Value Model使用的统一格式,

KEY_MAPPINGS = { "libero": { "observation.image": "observation/image", "observation.wrist_image": "observation/wrist_image", "observation.state": "observation/state", "task": "prompt", } }

ValueInferenceDataset每次返回,

{ "obs": obs, "global_idx": idx, "episode_index": ep_idx, "frame_index": frame_idx, "true_return": true_return, "reward": reward, }

代码分两个阶段完成价值推理和优势计算

第一阶段批量推理所有状态的价值,

batch_results = value_model.infer_batch( obs_list, batch_size=batch_size, )

预测结果保存到数组,

v_values[local_idx] = float( result["value"] )

第二阶段不再调用模型,而是通过数组下标获取当前状态价值V(o_t)和N步之后的状态价值 V(o_t+N),每一帧只需要执行一次Value Model推理。如果直接针对每一个样本分别推理当前状态和未来状态,大部分中间帧会被重复计算两次。RLinf先统一推理再按下标复用,可以显著减少计算量。

分片推理会额外读取N个未来样本

多GPU模式下,每个进程只负责数据集的一部分,

shard_start, shard_end = ( get_shard_indices( total_samples, rank, world_size, ) )

但计算当前分片末尾样本时,仍然需要访问未来N步的Value。因此,代码会将推理范围向后扩展,

extended_end = min( shard_end + action_horizon, len(dataset), )

例如当前GPU负责,

样本 0 到样本 999

并设置,

action_horizon = 10

那么该GPU最多会推理到,

样本 1009

多出来的10个样本只用于查询分片末尾位置的未来价值,不会重复写入最终结果。

N步Advantage同时考虑状态变化和动作成本

核心代码为,

reward_sum = normalize( reward_sum_raw ) gamma_k = ( gamma**num_valid if discount_next_value else 1.0 ) advantage = ( reward_sum + gamma_k * v_next - v_curr )

直接写成容易理解的形式,

Advantage = 归一化后的未来N步累计奖励 + 折扣后的未来状态价值 - 当前状态价值

其中,v_curr为当前状态价值,v_next为N步之后的状态价值,reward_sum_raw为中间N步的原始 Reward总和,num_valid为当前轨迹中实际可用的未来步数

假设,

当前价值v_curr = -0.40 10步后价值v_next = -0.30 10步Reward总和 = -10 全局Return范围 = [-300, 0]

归一化后的累计Reward为,

reward_sum = -10 / 300 ≈ -0.033

因此,

Advantage = -0.033 + (-0.30) - (-0.40) = 0.067

结果为正,说明状态价值的提升超过了中间10步产生的时间成本。如果10步后状态反而变差,

v_curr = -0.30 v_next = -0.35

则,

Advantage = -0.033 + (-0.35) - (-0.30) = -0.083

结果为负,说明动作既消耗了时间,又没有推动任务进展。

gamma等于1时直接使用Return差值计算累计奖励

当gamma=1.0,代码不需要重新读取并累计每一步Reward。如果N步后的状态仍在当前轨迹中,

reward_sum_raw = ( true_return - next_return )

原因是,

当前Return = 未来N步Reward总和 + N步后的Return

移项后是,

未来N步Reward总和 = 当前Return - N步后的Return

如果N步后已经超出轨迹末尾,

reward_sum_raw = true_return v_next = 0.0

此时直接使用当前帧到轨迹结束的完整Return。

当gamma不等于1时,代码才会显式读取Reward序列:

reward_sum_raw = np.sum( gamma_powers[:num_valid] * reward_slice )

轨迹末尾不会跨Episode读取未来状态

未来位置通过下面的方式计算,

next_gidx = ( gidx + action_horizon ) is_next_pad = ( next_gidx >= ep_end )

实际使用的Reward数量为,

num_valid = min( action_horizon, ep_end - gidx, )

连续Advantage通过全局阈值转换为标签

每个样本先得到连续值,

advantage_continuous

所有数据集计算完成后,代码会合并 Advantage,

combined_advantages = np.concatenate( all_advantages )

默认,

positive_quantile: 0.3

表示将Advantage最高的30%作为正样本。因此,实际阈值位于70%分位点:

unified_threshold = np.percentile( combined_advantages, 70, )

保存Rollout数据时,

save_df["advantage"] = ( save_df["advantage_continuous"] >= threshold )

保存SFT数据时,

if dataset_type == "sft": save_df["advantage"] = True

结果通过临时Parquet分块写入

优势计算可能包含数百万帧,如果一直把所有结果保存在内存中,容易产生OOM。

代码根据flush_interval和batch_size,定期将结果写入临时Parquet,

temp_df.to_parquet( temp_file, index=False, )

写入后清空内存,

for k in results: results[k] = [] gc.collect()

所有分块处理完成后再统一合并,

merged_df = pd.concat( [ pd.read_parquet(f) for f in temp_files ], ignore_index=True, )

最终输出文件为,

meta/advantages_fail300_N10_q30.parquet

主要字段包括,

episode_index frame_index return value_current value_next reward_sum reward_sum_raw num_valid_rewards advantage_continuous advantage dataset_name

代码还会更新mixture_config.yaml,记录全局Return范围、统一阈值、正样本比例配置和数据集信息,主要用于实验记录与结果追踪。下一阶段的CFG训练直接读取advantages_{tag}.parquet中的布尔优势标签。记录信息如下,

global_return_min global_return_max unified_threshold positive_quantile 数据集名称与权重

这些信息会在下一阶段的CFG策略训练中继续使用。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/14 1:58:58

英雄联盟玩家必备:本地化智能助手League Akari终极指南

英雄联盟玩家必备:本地化智能助手League Akari终极指南 【免费下载链接】League-Toolkit An all-in-one toolkit for LeagueClient. Gathering power 🚀. 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 你是否厌倦了网络延迟导致的…

作者头像 李华
网站建设 2026/6/14 1:56:55

AI Agent的Replay与Debug系统2026:从黑盒执行到可观测的智能体工程

引言:为什么Agent的可观测性比LLM更难 LLM应用的可观测性在过去一年里有了长足进步——Langfuse、LangSmith、Helicone等工具让开发者能看到Prompt、Response、Token、Latency、Cost等核心指标。但当应用从"单次LLM调用"演化为"多步Agent执行"时…

作者头像 李华
网站建设 2026/6/14 1:55:53

有哪些AI论文工具是真的贴合学术规范,而不是空洞拼凑?

在AI写作技术不断渗透学术领域的当下,越来越多的论文工具涌现出来,宣称能帮助学生和研究者高效完成论文撰写。然而,许多工具只是打着“智能”的幌子,实则内容空洞、逻辑松散、格式混乱,沦为“文字生成器”,…

作者头像 李华