SUimeModelTraner/.opencode/plans/precompute_shuffle.md

230 lines
7.0 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 预处理数据预打乱方案
## 目标
用 CPU 机时预打乱数据,让训练时直接用 `shuffle=False` 顺序读取,消除跨分片缓存抖动和 CPU 利用率低的问题。
## 改动清单
### 1. `src/model/subsample.py` — 默认开启输出打乱
**函数签名变更:**
```python
def pass2_subsample(
...,
shuffle: bool = True, # 新增
seed: int = 42, # 新增
) -> Tuple[int, int]:
```
**改动点:**
- `rng = np.random.RandomState()``rng = np.random.RandomState(seed)`
- 新增 `shuffle_rng = np.random.RandomState(seed + 1)` 用于输出打乱
- 在两次 `np.savez_compressed` 写入前line ~251 和 line ~273插入
```python
if shuffle and train_buf_count > 1:
perm = shuffle_rng.permutation(train_buf_count)
for f in FIELDS:
merged[f] = merged[f][perm]
```
- main() 中新增参数:
```python
parser.add_argument("--no-shuffle", action="store_false", dest="shuffle",
help="禁用输出分片内部打乱")
parser.add_argument("--seed", type=int, default=42,
help="随机种子(用于选择+打乱)")
```
- 调用 `pass2_subsample` 时传入 `shuffle=args.shuffle, seed=args.seed`
**metadata.json 新增字段:**
```json
"pre_shuffled": true,
"seed": 42
```
---
### 2. 新建 `src/model/shuffle_npz.py` — 平衡+打乱已有数据
处理流程:
```
Phase 1: npz → 逐字段打乱 → .npymmap 友好)
输入: 19个不平衡 .npz 分片100M样本~10GB 压缩)
输出: 114个临时 .npy 文件6字段 × 19分片~144GB 未压缩)
内存峰值: ~10GB单字段 int16 最大 ~5GB + permuted copy ~5GB
耗时: ~15-20分钟解压+写入)
Phase 2: .npy → 平衡分配 → .npz
输入: Phase 1 的 .npy 文件mmap 模式)
输出: 100个平衡 .npz 分片每片100万样本~100MB/片压缩后)
每个输出分片 = 从19个源各取比例份额 → concatenate → shuffle → save
内存峰值: ~3GB1个输出缓冲 + mmap pages
耗时: ~10-15分钟mmap读取+压缩写入)
总计: ~30-40分钟峰值内存 ~10GB
```
**磁盘需求:**
- 临时 .npy 文件Phase 1→2 中间产物):~144GB
- 最终输出 .npz~10GB
- 临时文件在 Phase 2 完成后自动删除
**用法:**
```bash
python -m src.model.shuffle_npz \
--input-dir /home/songsenand/DataSet/SubPro \
--output-dir /home/songsenand/DataSet/SubPro_Shuffled \
--shard-size 1000000 \
--seed 42
```
**关键实现:**
Phase 1 — 逐字段加载+打乱:
```python
for src_idx in range(num_shards):
data = np.load(shard_path) # lazy NpzFile
n = shard_sizes[src_idx]
perm = rng.permutation(n)
for field in FIELDS:
arr = data[field].copy() # ~5GB peak (input_ids)
shuffled = arr[perm] # ~5GB temp
np.save(temp_dir / field / f"shard_{src_idx:06d}.npy", shuffled)
del arr, shuffled
gc.collect()
data.close()
```
Phase 2 — 平衡分配:
```python
# 打开所有源 mmap读模式零内存
src_mmaps = [
{f: np.load(temp_dir / f / f"shard_{i:06d}.npy", mmap_mode='r') for f in FIELDS}
for i in range(num_shards)
]
for out_j in range(num_output_shards):
buffers = {f: [] for f in FIELDS}
for src_i in range(num_shards):
s = shard_sizes[src_i]
start = (out_j * s) // num_output_shards
end = ((out_j + 1) * s) // num_output_shards
if start >= end:
continue
for f in FIELDS:
chunk = src_mmaps[src_i][f][start:end].copy()
buffers[f].append(chunk)
output = {f: np.concatenate(buffers[f]) for f in FIELDS}
# 额外打乱(跨源混合)
perm = rng.permutation(len(output[FIELDS[0]]))
for f in FIELDS:
output[f] = output[f][perm]
np.savez_compressed(output_dir / f"shard_{out_j:06d}.npz", **output)
del output, buffers, perm
gc.collect()
```
**输出 metadata.json**
```json
{
"num_samples": 99998406,
"max_seq_length": 128,
"dtype": "int16",
"fields": [...],
"shard_size": 1000000,
"num_shards": 100,
"shard_sizes": [1000000, ..., 998406],
"pre_shuffled": true,
"seed": 42
}
```
**eval 目录处理:** 如果 `--input-dir/eval/` 存在,直接复制到 `--output-dir/eval/`eval 数据量小,不需要打乱)
---
### 3. `src/model/trainer.py` — 预处理数据禁用 shuffle
**改动点train 函数line ~1258-1272**
```python
if is_train_preprocessed:
train_dataset = PreProcessedDataset(train_data_path, max_cache_shards=1)
# pre_shuffled 数据不需要 DataLoader 的 RandomSampler
shuffle_train = not train_dataset.metadata.get("pre_shuffled", False)
total_steps = (len(train_dataset) // batch_size) * num_epochs
# 支持 max_iter_length 限制总步数
if max_iter_length > 0:
max_steps_per_epoch = max_iter_length // batch_size
total_steps = min(total_steps, max_steps_per_epoch * num_epochs)
train_num_workers = min(num_workers, 1)
train_dataloader = create_dataloader(
dataset=train_dataset,
batch_size=batch_size,
num_workers=train_num_workers,
pin_memory=torch.cuda.is_available(),
shuffle=shuffle_train, # 预打乱数据不 shuffle
)
```
**eval DataLoaderline ~1295-1303**
```python
if is_eval_preprocessed:
eval_dataset = PreProcessedDataset(eval_data_path, max_cache_shards=1)
eval_dataloader = create_dataloader(
dataset=eval_dataset,
batch_size=batch_size,
num_workers=0, # eval 数据小,单进程足够
pin_memory=torch.cuda.is_available(),
shuffle=False, # eval 不需要打乱
)
```
**`create_dataloader` 函数line ~1076-1114** 无需改动,`shuffle` 参数已透传。
---
### 4. `src/model/preprocessed_dataset.py`
现有代码无需修改。`PreProcessedDataset` 已经可以正确处理 `shuffle=False` 的情况PyTorch 的 `SequentialSampler` 会按 0..N-1 顺序读取)。
`metadata["pre_shuffled"]` 字段由 subsample.py 和 shuffle_npz.py 在写入 metadata.json 时添加,训练代码读取判断即可。
---
## 执行顺序
```bash
# Step 1: 打乱并平衡已有的 100M 数据集
python -m src.model.shuffle_npz \
--input-dir /home/songsenand/DataSet/SubPro \
--output-dir /home/songsenand/DataSet/SubPro_Shuffled \
--shard-size 1000000 \
--seed 42
# Step 2: 用新数据训练(数据已打乱,顺序读取即可)
uv run train-model train \
--train-data-path /home/songsenand/DataSet/SubPro_Shuffled/train \
--eval-data-path /home/songsenand/DataSet/SubPro_Shuffled/eval \
-b 16 \
-o ~/tmp \
--eval-frequency 20 \
--save-frequency 40
```
## 预期效果
| 改动前 | 改动后 |
|---|---|
| 每 batch 跨 13 个 shard | 顺序读1 个 shard 在缓存中 |
| 每 batch 数据加载 2-3 分钟 | ~0.1-0.5 秒(纯 mmap/memory |
| CPU 利用率 10% | 正常(训练计算是瓶颈) |
| 内存 40GB+ | <20GB shard 1M 样本 1.4GB |