SUimeModelTraner/.opencode/plans/precompute_shuffle.md

7.0 KiB
Raw Permalink Blame History

预处理数据预打乱方案

目标

用 CPU 机时预打乱数据,让训练时直接用 shuffle=False 顺序读取,消除跨分片缓存抖动和 CPU 利用率低的问题。

改动清单

1. src/model/subsample.py — 默认开启输出打乱

函数签名变更:

def pass2_subsample(
    ...,
    shuffle: bool = True,      # 新增
    seed: int = 42,             # 新增
) -> Tuple[int, int]:

改动点:

  • rng = np.random.RandomState()rng = np.random.RandomState(seed)
  • 新增 shuffle_rng = np.random.RandomState(seed + 1) 用于输出打乱
  • 在两次 np.savez_compressed 写入前line ~251 和 line ~273插入
if shuffle and train_buf_count > 1:
    perm = shuffle_rng.permutation(train_buf_count)
    for f in FIELDS:
        merged[f] = merged[f][perm]
  • main() 中新增参数:
parser.add_argument("--no-shuffle", action="store_false", dest="shuffle",
                    help="禁用输出分片内部打乱")
parser.add_argument("--seed", type=int, default=42,
                    help="随机种子(用于选择+打乱)")
  • 调用 pass2_subsample 时传入 shuffle=args.shuffle, seed=args.seed

metadata.json 新增字段:

"pre_shuffled": true,
"seed": 42

2. 新建 src/model/shuffle_npz.py — 平衡+打乱已有数据

处理流程:

Phase 1: npz → 逐字段打乱 → .npymmap 友好)
  输入: 19个不平衡 .npz 分片100M样本~10GB 压缩)
  输出: 114个临时 .npy 文件6字段 × 19分片~144GB 未压缩)
  内存峰值: ~10GB单字段 int16 最大 ~5GB + permuted copy ~5GB
  耗时: ~15-20分钟解压+写入)

Phase 2: .npy → 平衡分配 → .npz
  输入: Phase 1 的 .npy 文件mmap 模式)
  输出: 100个平衡 .npz 分片每片100万样本~100MB/片压缩后)
  每个输出分片 = 从19个源各取比例份额 → concatenate → shuffle → save
  内存峰值: ~3GB1个输出缓冲 + mmap pages
  耗时: ~10-15分钟mmap读取+压缩写入)

总计: ~30-40分钟峰值内存 ~10GB

磁盘需求:

  • 临时 .npy 文件Phase 1→2 中间产物):~144GB
  • 最终输出 .npz~10GB
  • 临时文件在 Phase 2 完成后自动删除

用法:

python -m src.model.shuffle_npz \
    --input-dir /home/songsenand/DataSet/SubPro \
    --output-dir /home/songsenand/DataSet/SubPro_Shuffled \
    --shard-size 1000000 \
    --seed 42

关键实现:

Phase 1 — 逐字段加载+打乱:

for src_idx in range(num_shards):
    data = np.load(shard_path)  # lazy NpzFile
    n = shard_sizes[src_idx]
    perm = rng.permutation(n)

    for field in FIELDS:
        arr = data[field].copy()           # ~5GB peak (input_ids)
        shuffled = arr[perm]               # ~5GB temp
        np.save(temp_dir / field / f"shard_{src_idx:06d}.npy", shuffled)
        del arr, shuffled
        gc.collect()

    data.close()

Phase 2 — 平衡分配:

# 打开所有源 mmap读模式零内存
src_mmaps = [
    {f: np.load(temp_dir / f / f"shard_{i:06d}.npy", mmap_mode='r') for f in FIELDS}
    for i in range(num_shards)
]

for out_j in range(num_output_shards):
    buffers = {f: [] for f in FIELDS}

    for src_i in range(num_shards):
        s = shard_sizes[src_i]
        start = (out_j * s) // num_output_shards
        end = ((out_j + 1) * s) // num_output_shards
        if start >= end:
            continue
        for f in FIELDS:
            chunk = src_mmaps[src_i][f][start:end].copy()
            buffers[f].append(chunk)

    output = {f: np.concatenate(buffers[f]) for f in FIELDS}
    # 额外打乱(跨源混合)
    perm = rng.permutation(len(output[FIELDS[0]]))
    for f in FIELDS:
        output[f] = output[f][perm]

    np.savez_compressed(output_dir / f"shard_{out_j:06d}.npz", **output)
    del output, buffers, perm
    gc.collect()

输出 metadata.json

{
  "num_samples": 99998406,
  "max_seq_length": 128,
  "dtype": "int16",
  "fields": [...],
  "shard_size": 1000000,
  "num_shards": 100,
  "shard_sizes": [1000000, ..., 998406],
  "pre_shuffled": true,
  "seed": 42
}

eval 目录处理: 如果 --input-dir/eval/ 存在,直接复制到 --output-dir/eval/eval 数据量小,不需要打乱)


3. src/model/trainer.py — 预处理数据禁用 shuffle

改动点train 函数line ~1258-1272

if is_train_preprocessed:
    train_dataset = PreProcessedDataset(train_data_path, max_cache_shards=1)
    # pre_shuffled 数据不需要 DataLoader 的 RandomSampler
    shuffle_train = not train_dataset.metadata.get("pre_shuffled", False)
    total_steps = (len(train_dataset) // batch_size) * num_epochs
    # 支持 max_iter_length 限制总步数
    if max_iter_length > 0:
        max_steps_per_epoch = max_iter_length // batch_size
        total_steps = min(total_steps, max_steps_per_epoch * num_epochs)
    train_num_workers = min(num_workers, 1)
    train_dataloader = create_dataloader(
        dataset=train_dataset,
        batch_size=batch_size,
        num_workers=train_num_workers,
        pin_memory=torch.cuda.is_available(),
        shuffle=shuffle_train,  # 预打乱数据不 shuffle
    )

eval DataLoaderline ~1295-1303

if is_eval_preprocessed:
    eval_dataset = PreProcessedDataset(eval_data_path, max_cache_shards=1)
    eval_dataloader = create_dataloader(
        dataset=eval_dataset,
        batch_size=batch_size,
        num_workers=0,      # eval 数据小,单进程足够
        pin_memory=torch.cuda.is_available(),
        shuffle=False,       # eval 不需要打乱
    )

create_dataloader 函数line ~1076-1114 无需改动,shuffle 参数已透传。


4. src/model/preprocessed_dataset.py

现有代码无需修改。PreProcessedDataset 已经可以正确处理 shuffle=False 的情况PyTorch 的 SequentialSampler 会按 0..N-1 顺序读取)。

metadata["pre_shuffled"] 字段由 subsample.py 和 shuffle_npz.py 在写入 metadata.json 时添加,训练代码读取判断即可。


执行顺序

# Step 1: 打乱并平衡已有的 100M 数据集
python -m src.model.shuffle_npz \
    --input-dir /home/songsenand/DataSet/SubPro \
    --output-dir /home/songsenand/DataSet/SubPro_Shuffled \
    --shard-size 1000000 \
    --seed 42

# Step 2: 用新数据训练(数据已打乱,顺序读取即可)
uv run train-model train \
    --train-data-path /home/songsenand/DataSet/SubPro_Shuffled/train \
    --eval-data-path /home/songsenand/DataSet/SubPro_Shuffled/eval \
    -b 16 \
    -o ~/tmp \
    --eval-frequency 20 \
    --save-frequency 40

预期效果

改动前 改动后
每 batch 跨 13 个 shard 顺序读1 个 shard 在缓存中
每 batch 数据加载 2-3 分钟 ~0.1-0.5 秒(纯 mmap/memory
CPU 利用率 10% 正常(训练计算是瓶颈)
内存 40GB+ <20GB单 shard 1M 样本 ≈ 1.4GB