230 lines
7.0 KiB
Markdown
230 lines
7.0 KiB
Markdown
# 预处理数据预打乱方案
|
||
|
||
## 目标
|
||
|
||
用 CPU 机时预打乱数据,让训练时直接用 `shuffle=False` 顺序读取,消除跨分片缓存抖动和 CPU 利用率低的问题。
|
||
|
||
## 改动清单
|
||
|
||
### 1. `src/model/subsample.py` — 默认开启输出打乱
|
||
|
||
**函数签名变更:**
|
||
```python
|
||
def pass2_subsample(
|
||
...,
|
||
shuffle: bool = True, # 新增
|
||
seed: int = 42, # 新增
|
||
) -> Tuple[int, int]:
|
||
```
|
||
|
||
**改动点:**
|
||
- `rng = np.random.RandomState()` → `rng = np.random.RandomState(seed)`
|
||
- 新增 `shuffle_rng = np.random.RandomState(seed + 1)` 用于输出打乱
|
||
- 在两次 `np.savez_compressed` 写入前(line ~251 和 line ~273),插入:
|
||
```python
|
||
if shuffle and train_buf_count > 1:
|
||
perm = shuffle_rng.permutation(train_buf_count)
|
||
for f in FIELDS:
|
||
merged[f] = merged[f][perm]
|
||
```
|
||
- main() 中新增参数:
|
||
```python
|
||
parser.add_argument("--no-shuffle", action="store_false", dest="shuffle",
|
||
help="禁用输出分片内部打乱")
|
||
parser.add_argument("--seed", type=int, default=42,
|
||
help="随机种子(用于选择+打乱)")
|
||
```
|
||
- 调用 `pass2_subsample` 时传入 `shuffle=args.shuffle, seed=args.seed`
|
||
|
||
**metadata.json 新增字段:**
|
||
```json
|
||
"pre_shuffled": true,
|
||
"seed": 42
|
||
```
|
||
|
||
---
|
||
|
||
### 2. 新建 `src/model/shuffle_npz.py` — 平衡+打乱已有数据
|
||
|
||
处理流程:
|
||
|
||
```
|
||
Phase 1: npz → 逐字段打乱 → .npy(mmap 友好)
|
||
输入: 19个不平衡 .npz 分片(100M样本,~10GB 压缩)
|
||
输出: 114个临时 .npy 文件(6字段 × 19分片,~144GB 未压缩)
|
||
内存峰值: ~10GB(单字段 int16 最大 ~5GB + permuted copy ~5GB)
|
||
耗时: ~15-20分钟(解压+写入)
|
||
|
||
Phase 2: .npy → 平衡分配 → .npz
|
||
输入: Phase 1 的 .npy 文件(mmap 模式)
|
||
输出: 100个平衡 .npz 分片(每片100万样本,~100MB/片压缩后)
|
||
每个输出分片 = 从19个源各取比例份额 → concatenate → shuffle → save
|
||
内存峰值: ~3GB(1个输出缓冲 + mmap pages)
|
||
耗时: ~10-15分钟(mmap读取+压缩写入)
|
||
|
||
总计: ~30-40分钟,峰值内存 ~10GB
|
||
```
|
||
|
||
**磁盘需求:**
|
||
- 临时 .npy 文件(Phase 1→2 中间产物):~144GB
|
||
- 最终输出 .npz:~10GB
|
||
- 临时文件在 Phase 2 完成后自动删除
|
||
|
||
**用法:**
|
||
```bash
|
||
python -m src.model.shuffle_npz \
|
||
--input-dir /home/songsenand/DataSet/SubPro \
|
||
--output-dir /home/songsenand/DataSet/SubPro_Shuffled \
|
||
--shard-size 1000000 \
|
||
--seed 42
|
||
```
|
||
|
||
**关键实现:**
|
||
|
||
Phase 1 — 逐字段加载+打乱:
|
||
```python
|
||
for src_idx in range(num_shards):
|
||
data = np.load(shard_path) # lazy NpzFile
|
||
n = shard_sizes[src_idx]
|
||
perm = rng.permutation(n)
|
||
|
||
for field in FIELDS:
|
||
arr = data[field].copy() # ~5GB peak (input_ids)
|
||
shuffled = arr[perm] # ~5GB temp
|
||
np.save(temp_dir / field / f"shard_{src_idx:06d}.npy", shuffled)
|
||
del arr, shuffled
|
||
gc.collect()
|
||
|
||
data.close()
|
||
```
|
||
|
||
Phase 2 — 平衡分配:
|
||
```python
|
||
# 打开所有源 mmap(读模式,零内存)
|
||
src_mmaps = [
|
||
{f: np.load(temp_dir / f / f"shard_{i:06d}.npy", mmap_mode='r') for f in FIELDS}
|
||
for i in range(num_shards)
|
||
]
|
||
|
||
for out_j in range(num_output_shards):
|
||
buffers = {f: [] for f in FIELDS}
|
||
|
||
for src_i in range(num_shards):
|
||
s = shard_sizes[src_i]
|
||
start = (out_j * s) // num_output_shards
|
||
end = ((out_j + 1) * s) // num_output_shards
|
||
if start >= end:
|
||
continue
|
||
for f in FIELDS:
|
||
chunk = src_mmaps[src_i][f][start:end].copy()
|
||
buffers[f].append(chunk)
|
||
|
||
output = {f: np.concatenate(buffers[f]) for f in FIELDS}
|
||
# 额外打乱(跨源混合)
|
||
perm = rng.permutation(len(output[FIELDS[0]]))
|
||
for f in FIELDS:
|
||
output[f] = output[f][perm]
|
||
|
||
np.savez_compressed(output_dir / f"shard_{out_j:06d}.npz", **output)
|
||
del output, buffers, perm
|
||
gc.collect()
|
||
```
|
||
|
||
**输出 metadata.json:**
|
||
```json
|
||
{
|
||
"num_samples": 99998406,
|
||
"max_seq_length": 128,
|
||
"dtype": "int16",
|
||
"fields": [...],
|
||
"shard_size": 1000000,
|
||
"num_shards": 100,
|
||
"shard_sizes": [1000000, ..., 998406],
|
||
"pre_shuffled": true,
|
||
"seed": 42
|
||
}
|
||
```
|
||
|
||
**eval 目录处理:** 如果 `--input-dir/eval/` 存在,直接复制到 `--output-dir/eval/`(eval 数据量小,不需要打乱)
|
||
|
||
---
|
||
|
||
### 3. `src/model/trainer.py` — 预处理数据禁用 shuffle
|
||
|
||
**改动点(train 函数,line ~1258-1272):**
|
||
|
||
```python
|
||
if is_train_preprocessed:
|
||
train_dataset = PreProcessedDataset(train_data_path, max_cache_shards=1)
|
||
# pre_shuffled 数据不需要 DataLoader 的 RandomSampler
|
||
shuffle_train = not train_dataset.metadata.get("pre_shuffled", False)
|
||
total_steps = (len(train_dataset) // batch_size) * num_epochs
|
||
# 支持 max_iter_length 限制总步数
|
||
if max_iter_length > 0:
|
||
max_steps_per_epoch = max_iter_length // batch_size
|
||
total_steps = min(total_steps, max_steps_per_epoch * num_epochs)
|
||
train_num_workers = min(num_workers, 1)
|
||
train_dataloader = create_dataloader(
|
||
dataset=train_dataset,
|
||
batch_size=batch_size,
|
||
num_workers=train_num_workers,
|
||
pin_memory=torch.cuda.is_available(),
|
||
shuffle=shuffle_train, # 预打乱数据不 shuffle
|
||
)
|
||
```
|
||
|
||
**eval DataLoader(line ~1295-1303):**
|
||
|
||
```python
|
||
if is_eval_preprocessed:
|
||
eval_dataset = PreProcessedDataset(eval_data_path, max_cache_shards=1)
|
||
eval_dataloader = create_dataloader(
|
||
dataset=eval_dataset,
|
||
batch_size=batch_size,
|
||
num_workers=0, # eval 数据小,单进程足够
|
||
pin_memory=torch.cuda.is_available(),
|
||
shuffle=False, # eval 不需要打乱
|
||
)
|
||
```
|
||
|
||
**`create_dataloader` 函数(line ~1076-1114):** 无需改动,`shuffle` 参数已透传。
|
||
|
||
---
|
||
|
||
### 4. `src/model/preprocessed_dataset.py`
|
||
|
||
现有代码无需修改。`PreProcessedDataset` 已经可以正确处理 `shuffle=False` 的情况(PyTorch 的 `SequentialSampler` 会按 0..N-1 顺序读取)。
|
||
|
||
`metadata["pre_shuffled"]` 字段由 subsample.py 和 shuffle_npz.py 在写入 metadata.json 时添加,训练代码读取判断即可。
|
||
|
||
---
|
||
|
||
## 执行顺序
|
||
|
||
```bash
|
||
# Step 1: 打乱并平衡已有的 100M 数据集
|
||
python -m src.model.shuffle_npz \
|
||
--input-dir /home/songsenand/DataSet/SubPro \
|
||
--output-dir /home/songsenand/DataSet/SubPro_Shuffled \
|
||
--shard-size 1000000 \
|
||
--seed 42
|
||
|
||
# Step 2: 用新数据训练(数据已打乱,顺序读取即可)
|
||
uv run train-model train \
|
||
--train-data-path /home/songsenand/DataSet/SubPro_Shuffled/train \
|
||
--eval-data-path /home/songsenand/DataSet/SubPro_Shuffled/eval \
|
||
-b 16 \
|
||
-o ~/tmp \
|
||
--eval-frequency 20 \
|
||
--save-frequency 40
|
||
```
|
||
|
||
## 预期效果
|
||
|
||
| 改动前 | 改动后 |
|
||
|---|---|
|
||
| 每 batch 跨 13 个 shard | 顺序读,1 个 shard 在缓存中 |
|
||
| 每 batch 数据加载 2-3 分钟 | ~0.1-0.5 秒(纯 mmap/memory) |
|
||
| CPU 利用率 10% | 正常(训练计算是瓶颈) |
|
||
| 内存 40GB+ | <20GB(单 shard 1M 样本 ≈ 1.4GB) |
|