7.0 KiB
7.0 KiB
预处理数据预打乱方案
目标
用 CPU 机时预打乱数据,让训练时直接用 shuffle=False 顺序读取,消除跨分片缓存抖动和 CPU 利用率低的问题。
改动清单
1. src/model/subsample.py — 默认开启输出打乱
函数签名变更:
def pass2_subsample(
...,
shuffle: bool = True, # 新增
seed: int = 42, # 新增
) -> Tuple[int, int]:
改动点:
rng = np.random.RandomState()→rng = np.random.RandomState(seed)- 新增
shuffle_rng = np.random.RandomState(seed + 1)用于输出打乱 - 在两次
np.savez_compressed写入前(line ~251 和 line ~273),插入:
if shuffle and train_buf_count > 1:
perm = shuffle_rng.permutation(train_buf_count)
for f in FIELDS:
merged[f] = merged[f][perm]
- main() 中新增参数:
parser.add_argument("--no-shuffle", action="store_false", dest="shuffle",
help="禁用输出分片内部打乱")
parser.add_argument("--seed", type=int, default=42,
help="随机种子(用于选择+打乱)")
- 调用
pass2_subsample时传入shuffle=args.shuffle, seed=args.seed
metadata.json 新增字段:
"pre_shuffled": true,
"seed": 42
2. 新建 src/model/shuffle_npz.py — 平衡+打乱已有数据
处理流程:
Phase 1: npz → 逐字段打乱 → .npy(mmap 友好)
输入: 19个不平衡 .npz 分片(100M样本,~10GB 压缩)
输出: 114个临时 .npy 文件(6字段 × 19分片,~144GB 未压缩)
内存峰值: ~10GB(单字段 int16 最大 ~5GB + permuted copy ~5GB)
耗时: ~15-20分钟(解压+写入)
Phase 2: .npy → 平衡分配 → .npz
输入: Phase 1 的 .npy 文件(mmap 模式)
输出: 100个平衡 .npz 分片(每片100万样本,~100MB/片压缩后)
每个输出分片 = 从19个源各取比例份额 → concatenate → shuffle → save
内存峰值: ~3GB(1个输出缓冲 + mmap pages)
耗时: ~10-15分钟(mmap读取+压缩写入)
总计: ~30-40分钟,峰值内存 ~10GB
磁盘需求:
- 临时 .npy 文件(Phase 1→2 中间产物):~144GB
- 最终输出 .npz:~10GB
- 临时文件在 Phase 2 完成后自动删除
用法:
python -m src.model.shuffle_npz \
--input-dir /home/songsenand/DataSet/SubPro \
--output-dir /home/songsenand/DataSet/SubPro_Shuffled \
--shard-size 1000000 \
--seed 42
关键实现:
Phase 1 — 逐字段加载+打乱:
for src_idx in range(num_shards):
data = np.load(shard_path) # lazy NpzFile
n = shard_sizes[src_idx]
perm = rng.permutation(n)
for field in FIELDS:
arr = data[field].copy() # ~5GB peak (input_ids)
shuffled = arr[perm] # ~5GB temp
np.save(temp_dir / field / f"shard_{src_idx:06d}.npy", shuffled)
del arr, shuffled
gc.collect()
data.close()
Phase 2 — 平衡分配:
# 打开所有源 mmap(读模式,零内存)
src_mmaps = [
{f: np.load(temp_dir / f / f"shard_{i:06d}.npy", mmap_mode='r') for f in FIELDS}
for i in range(num_shards)
]
for out_j in range(num_output_shards):
buffers = {f: [] for f in FIELDS}
for src_i in range(num_shards):
s = shard_sizes[src_i]
start = (out_j * s) // num_output_shards
end = ((out_j + 1) * s) // num_output_shards
if start >= end:
continue
for f in FIELDS:
chunk = src_mmaps[src_i][f][start:end].copy()
buffers[f].append(chunk)
output = {f: np.concatenate(buffers[f]) for f in FIELDS}
# 额外打乱(跨源混合)
perm = rng.permutation(len(output[FIELDS[0]]))
for f in FIELDS:
output[f] = output[f][perm]
np.savez_compressed(output_dir / f"shard_{out_j:06d}.npz", **output)
del output, buffers, perm
gc.collect()
输出 metadata.json:
{
"num_samples": 99998406,
"max_seq_length": 128,
"dtype": "int16",
"fields": [...],
"shard_size": 1000000,
"num_shards": 100,
"shard_sizes": [1000000, ..., 998406],
"pre_shuffled": true,
"seed": 42
}
eval 目录处理: 如果 --input-dir/eval/ 存在,直接复制到 --output-dir/eval/(eval 数据量小,不需要打乱)
3. src/model/trainer.py — 预处理数据禁用 shuffle
改动点(train 函数,line ~1258-1272):
if is_train_preprocessed:
train_dataset = PreProcessedDataset(train_data_path, max_cache_shards=1)
# pre_shuffled 数据不需要 DataLoader 的 RandomSampler
shuffle_train = not train_dataset.metadata.get("pre_shuffled", False)
total_steps = (len(train_dataset) // batch_size) * num_epochs
# 支持 max_iter_length 限制总步数
if max_iter_length > 0:
max_steps_per_epoch = max_iter_length // batch_size
total_steps = min(total_steps, max_steps_per_epoch * num_epochs)
train_num_workers = min(num_workers, 1)
train_dataloader = create_dataloader(
dataset=train_dataset,
batch_size=batch_size,
num_workers=train_num_workers,
pin_memory=torch.cuda.is_available(),
shuffle=shuffle_train, # 预打乱数据不 shuffle
)
eval DataLoader(line ~1295-1303):
if is_eval_preprocessed:
eval_dataset = PreProcessedDataset(eval_data_path, max_cache_shards=1)
eval_dataloader = create_dataloader(
dataset=eval_dataset,
batch_size=batch_size,
num_workers=0, # eval 数据小,单进程足够
pin_memory=torch.cuda.is_available(),
shuffle=False, # eval 不需要打乱
)
create_dataloader 函数(line ~1076-1114): 无需改动,shuffle 参数已透传。
4. src/model/preprocessed_dataset.py
现有代码无需修改。PreProcessedDataset 已经可以正确处理 shuffle=False 的情况(PyTorch 的 SequentialSampler 会按 0..N-1 顺序读取)。
metadata["pre_shuffled"] 字段由 subsample.py 和 shuffle_npz.py 在写入 metadata.json 时添加,训练代码读取判断即可。
执行顺序
# Step 1: 打乱并平衡已有的 100M 数据集
python -m src.model.shuffle_npz \
--input-dir /home/songsenand/DataSet/SubPro \
--output-dir /home/songsenand/DataSet/SubPro_Shuffled \
--shard-size 1000000 \
--seed 42
# Step 2: 用新数据训练(数据已打乱,顺序读取即可)
uv run train-model train \
--train-data-path /home/songsenand/DataSet/SubPro_Shuffled/train \
--eval-data-path /home/songsenand/DataSet/SubPro_Shuffled/eval \
-b 16 \
-o ~/tmp \
--eval-frequency 20 \
--save-frequency 40
预期效果
| 改动前 | 改动后 |
|---|---|
| 每 batch 跨 13 个 shard | 顺序读,1 个 shard 在缓存中 |
| 每 batch 数据加载 2-3 分钟 | ~0.1-0.5 秒(纯 mmap/memory) |
| CPU 利用率 10% | 正常(训练计算是瓶颈) |
| 内存 40GB+ | <20GB(单 shard 1M 样本 ≈ 1.4GB) |