# 预处理数据预打乱方案 ## 目标 用 CPU 机时预打乱数据,让训练时直接用 `shuffle=False` 顺序读取,消除跨分片缓存抖动和 CPU 利用率低的问题。 ## 改动清单 ### 1. `src/model/subsample.py` — 默认开启输出打乱 **函数签名变更:** ```python def pass2_subsample( ..., shuffle: bool = True, # 新增 seed: int = 42, # 新增 ) -> Tuple[int, int]: ``` **改动点:** - `rng = np.random.RandomState()` → `rng = np.random.RandomState(seed)` - 新增 `shuffle_rng = np.random.RandomState(seed + 1)` 用于输出打乱 - 在两次 `np.savez_compressed` 写入前(line ~251 和 line ~273),插入: ```python if shuffle and train_buf_count > 1: perm = shuffle_rng.permutation(train_buf_count) for f in FIELDS: merged[f] = merged[f][perm] ``` - main() 中新增参数: ```python parser.add_argument("--no-shuffle", action="store_false", dest="shuffle", help="禁用输出分片内部打乱") parser.add_argument("--seed", type=int, default=42, help="随机种子(用于选择+打乱)") ``` - 调用 `pass2_subsample` 时传入 `shuffle=args.shuffle, seed=args.seed` **metadata.json 新增字段:** ```json "pre_shuffled": true, "seed": 42 ``` --- ### 2. 新建 `src/model/shuffle_npz.py` — 平衡+打乱已有数据 处理流程: ``` Phase 1: npz → 逐字段打乱 → .npy(mmap 友好) 输入: 19个不平衡 .npz 分片(100M样本,~10GB 压缩) 输出: 114个临时 .npy 文件(6字段 × 19分片,~144GB 未压缩) 内存峰值: ~10GB(单字段 int16 最大 ~5GB + permuted copy ~5GB) 耗时: ~15-20分钟(解压+写入) Phase 2: .npy → 平衡分配 → .npz 输入: Phase 1 的 .npy 文件(mmap 模式) 输出: 100个平衡 .npz 分片(每片100万样本,~100MB/片压缩后) 每个输出分片 = 从19个源各取比例份额 → concatenate → shuffle → save 内存峰值: ~3GB(1个输出缓冲 + mmap pages) 耗时: ~10-15分钟(mmap读取+压缩写入) 总计: ~30-40分钟,峰值内存 ~10GB ``` **磁盘需求:** - 临时 .npy 文件(Phase 1→2 中间产物):~144GB - 最终输出 .npz:~10GB - 临时文件在 Phase 2 完成后自动删除 **用法:** ```bash python -m src.model.shuffle_npz \ --input-dir /home/songsenand/DataSet/SubPro \ --output-dir /home/songsenand/DataSet/SubPro_Shuffled \ --shard-size 1000000 \ --seed 42 ``` **关键实现:** Phase 1 — 逐字段加载+打乱: ```python for src_idx in range(num_shards): data = np.load(shard_path) # lazy NpzFile n = shard_sizes[src_idx] perm = rng.permutation(n) for field in FIELDS: arr = data[field].copy() # ~5GB peak (input_ids) shuffled = arr[perm] # ~5GB temp np.save(temp_dir / field / f"shard_{src_idx:06d}.npy", shuffled) del arr, shuffled gc.collect() data.close() ``` Phase 2 — 平衡分配: ```python # 打开所有源 mmap(读模式,零内存) src_mmaps = [ {f: np.load(temp_dir / f / f"shard_{i:06d}.npy", mmap_mode='r') for f in FIELDS} for i in range(num_shards) ] for out_j in range(num_output_shards): buffers = {f: [] for f in FIELDS} for src_i in range(num_shards): s = shard_sizes[src_i] start = (out_j * s) // num_output_shards end = ((out_j + 1) * s) // num_output_shards if start >= end: continue for f in FIELDS: chunk = src_mmaps[src_i][f][start:end].copy() buffers[f].append(chunk) output = {f: np.concatenate(buffers[f]) for f in FIELDS} # 额外打乱(跨源混合) perm = rng.permutation(len(output[FIELDS[0]])) for f in FIELDS: output[f] = output[f][perm] np.savez_compressed(output_dir / f"shard_{out_j:06d}.npz", **output) del output, buffers, perm gc.collect() ``` **输出 metadata.json:** ```json { "num_samples": 99998406, "max_seq_length": 128, "dtype": "int16", "fields": [...], "shard_size": 1000000, "num_shards": 100, "shard_sizes": [1000000, ..., 998406], "pre_shuffled": true, "seed": 42 } ``` **eval 目录处理:** 如果 `--input-dir/eval/` 存在,直接复制到 `--output-dir/eval/`(eval 数据量小,不需要打乱) --- ### 3. `src/model/trainer.py` — 预处理数据禁用 shuffle **改动点(train 函数,line ~1258-1272):** ```python if is_train_preprocessed: train_dataset = PreProcessedDataset(train_data_path, max_cache_shards=1) # pre_shuffled 数据不需要 DataLoader 的 RandomSampler shuffle_train = not train_dataset.metadata.get("pre_shuffled", False) total_steps = (len(train_dataset) // batch_size) * num_epochs # 支持 max_iter_length 限制总步数 if max_iter_length > 0: max_steps_per_epoch = max_iter_length // batch_size total_steps = min(total_steps, max_steps_per_epoch * num_epochs) train_num_workers = min(num_workers, 1) train_dataloader = create_dataloader( dataset=train_dataset, batch_size=batch_size, num_workers=train_num_workers, pin_memory=torch.cuda.is_available(), shuffle=shuffle_train, # 预打乱数据不 shuffle ) ``` **eval DataLoader(line ~1295-1303):** ```python if is_eval_preprocessed: eval_dataset = PreProcessedDataset(eval_data_path, max_cache_shards=1) eval_dataloader = create_dataloader( dataset=eval_dataset, batch_size=batch_size, num_workers=0, # eval 数据小,单进程足够 pin_memory=torch.cuda.is_available(), shuffle=False, # eval 不需要打乱 ) ``` **`create_dataloader` 函数(line ~1076-1114):** 无需改动,`shuffle` 参数已透传。 --- ### 4. `src/model/preprocessed_dataset.py` 现有代码无需修改。`PreProcessedDataset` 已经可以正确处理 `shuffle=False` 的情况(PyTorch 的 `SequentialSampler` 会按 0..N-1 顺序读取)。 `metadata["pre_shuffled"]` 字段由 subsample.py 和 shuffle_npz.py 在写入 metadata.json 时添加,训练代码读取判断即可。 --- ## 执行顺序 ```bash # Step 1: 打乱并平衡已有的 100M 数据集 python -m src.model.shuffle_npz \ --input-dir /home/songsenand/DataSet/SubPro \ --output-dir /home/songsenand/DataSet/SubPro_Shuffled \ --shard-size 1000000 \ --seed 42 # Step 2: 用新数据训练(数据已打乱,顺序读取即可) uv run train-model train \ --train-data-path /home/songsenand/DataSet/SubPro_Shuffled/train \ --eval-data-path /home/songsenand/DataSet/SubPro_Shuffled/eval \ -b 16 \ -o ~/tmp \ --eval-frequency 20 \ --save-frequency 40 ``` ## 预期效果 | 改动前 | 改动后 | |---|---| | 每 batch 跨 13 个 shard | 顺序读,1 个 shard 在缓存中 | | 每 batch 数据加载 2-3 分钟 | ~0.1-0.5 秒(纯 mmap/memory) | | CPU 利用率 10% | 正常(训练计算是瓶颈) | | 内存 40GB+ | <20GB(单 shard 1M 样本 ≈ 1.4GB) |