SUimeModelTraner/benchmark.py

663 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
性能基准测试脚本
用于诊断输入法模型训练的性能瓶颈
"""
import json
import subprocess
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
# 添加src目录到路径
sys.path.insert(0, str(Path(__file__).parent))
from src.model.model import InputMethodEngine
class MockDataset(Dataset):
"""模拟数据集用于性能测试"""
def __init__(self, num_samples=1000, vocab_size=10019, seq_len=128, num_slots=8):
self.num_samples = num_samples
self.vocab_size = vocab_size
self.seq_len = seq_len
self.num_slots = num_slots
def __len__(self):
return self.num_samples
def __getitem__(self, idx):
# 生成模拟数据,包含训练所需的所有字段
return {
"input_ids": torch.randint(0, self.vocab_size, (self.seq_len,)),
"token_type_ids": torch.randint(0, 2, (self.seq_len,)),
"attention_mask": torch.ones(self.seq_len, dtype=torch.long),
"history_slot_ids": torch.randint(0, self.vocab_size, (self.num_slots,)),
"pinyin_ids": torch.randint(0, 30, (24,)), # pinyin_ids长度固定为24
"label": torch.randint(0, self.vocab_size, (1,)),
}
def collate_fn(batch: List[Dict[str, Any]]) -> Dict[str, Any]:
"""与trainer.py中相同的collate_fn"""
# 处理tensor字段
input_ids = torch.stack([item["input_ids"] for item in batch])
token_type_ids = torch.stack([item["token_type_ids"] for item in batch])
attention_mask = torch.stack([item["attention_mask"] for item in batch])
labels = torch.stack([item["label"] for item in batch])
history_slot_ids = torch.stack([item["history_slot_ids"] for item in batch])
pinyin_ids = torch.stack([item["pinyin_ids"] for item in batch])
# 字符串字段(模拟)
prefixes = [f"prefix_{i}" for i in range(len(batch))]
suffixes = [f"suffix_{i}" for i in range(len(batch))]
pinyins = [f"pinyin_{i}" for i in range(len(batch))]
return {
"input_ids": input_ids,
"token_type_ids": token_type_ids,
"attention_mask": attention_mask,
"labels": labels,
"history_slot_ids": history_slot_ids,
"pinyin_ids": pinyin_ids,
"prefix": prefixes,
"suffix": suffixes,
"pinyin": pinyins,
}
def get_gpu_utilization() -> float:
"""获取GPU利用率"""
try:
result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=utilization.gpu",
"--format=csv,noheader,nounits",
],
capture_output=True,
text=True,
timeout=2,
)
if result.returncode == 0:
return float(result.stdout.strip())
except:
pass
return 0.0
def benchmark_model_only(batch_sizes=[32, 64, 128, 256, 512]):
"""仅测试模型前向+反向传播性能(不包含数据加载)"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\n{'=' * 70}")
print("模型计算性能测试(不包含数据加载)")
print(f"使用设备: {device}")
print(f"PyTorch版本: {torch.__version__}")
print(f"CUDA可用: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(
f"GPU内存: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB"
)
# 关键修复使用与ContextEncoder预训练模型匹配的维度
# 根据components.pyContextEncoder使用预训练模型其embedding维度是固定的
# 从错误信息看预训练模型维度是512所以这里必须使用dim=512
vocab_size = 10019
pinyin_vocab_size = 30
dim = 512 # 必须与预训练模型维度匹配!
num_slots = 8
n_layers = 4
n_heads = 4
num_experts = 20
max_seq_len = 128
results = []
for batch_size in batch_sizes:
print(f"\n{'=' * 60}")
print(f"测试批量大小: {batch_size}")
try:
# 创建模型(使用真实训练配置)
model = InputMethodEngine(
vocab_size=vocab_size,
pinyin_vocab_size=pinyin_vocab_size,
dim=dim,
num_slots=num_slots,
n_layers=n_layers,
n_heads=n_heads,
num_experts=num_experts,
max_seq_len=max_seq_len,
).to(device)
# 创建优化器和损失函数
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()
scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())
# 生成模拟batch直接在GPU上
batch = {
"input_ids": torch.randint(
0, vocab_size, (batch_size, max_seq_len), device=device
),
"token_type_ids": torch.randint(
0, 2, (batch_size, max_seq_len), device=device
),
"attention_mask": torch.ones(
batch_size, max_seq_len, dtype=torch.long, device=device
),
"history_slot_ids": torch.randint(
0, vocab_size, (batch_size, num_slots), device=device
),
"pinyin_ids": torch.randint(
0, pinyin_vocab_size, (batch_size, 24), device=device
),
"labels": torch.randint(0, vocab_size, (batch_size, 1), device=device),
}
# 预热(运行几次以避免初始化开销)
print(" 预热...")
for _ in range(3):
with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
logits = model(
input_ids=batch["input_ids"],
token_type_ids=batch["token_type_ids"],
attention_mask=batch["attention_mask"],
pinyin_ids=batch["pinyin_ids"],
history_slot_ids=batch["history_slot_ids"],
)
loss = criterion(logits, batch["labels"].squeeze(-1))
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
# 测速
if torch.cuda.is_available():
torch.cuda.synchronize()
start_time = time.time()
steps = 20 # 运行20步以获得稳定平均值
print(f" 运行{steps}步测速...")
for step in range(steps):
with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
logits = model(
input_ids=batch["input_ids"],
token_type_ids=batch["token_type_ids"],
attention_mask=batch["attention_mask"],
pinyin_ids=batch["pinyin_ids"],
history_slot_ids=batch["history_slot_ids"],
)
loss = criterion(logits, batch["labels"].squeeze(-1))
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
if torch.cuda.is_available():
torch.cuda.synchronize()
elapsed = time.time() - start_time
# 计算指标
throughput = steps * batch_size / elapsed
step_time = elapsed / steps * 1000 # 毫秒
memory_used = (
torch.cuda.max_memory_allocated() / 1024**2
if torch.cuda.is_available()
else 0
) # MB
print(f" 结果:")
print(f" 吞吐量: {throughput:.2f} samples/sec")
print(f" 每个step: {step_time:.2f}ms")
print(f" GPU内存峰值: {memory_used:.2f} MB")
if torch.cuda.is_available():
gpu_util = get_gpu_utilization()
if gpu_util > 0:
print(f" GPU利用率: {gpu_util:.1f}%")
results.append(
{
"batch_size": batch_size,
"throughput": throughput,
"step_time": step_time,
"memory": memory_used,
"success": True,
}
)
# 清理
del model, optimizer, batch
if torch.cuda.is_available():
torch.cuda.empty_cache()
# 检查是否内存不足
if (
torch.cuda.is_available()
and memory_used
> torch.cuda.get_device_properties(0).total_memory * 0.8 / 1024**2
):
print(
f" ⚠️ 警告: batch_size={batch_size} 使用了超过80% GPU内存更大的batch_size可能导致OOM"
)
break
except Exception as e:
print(f" 错误: {e}")
results.append(
{
"batch_size": batch_size,
"error": str(e),
"success": False,
}
)
if "CUDA out of memory" in str(e):
print(f" ⚠️ CUDA内存不足停止测试更大的batch_size")
break
return results
def benchmark_data_loading(num_workers_list=[0, 2, 4, 8, 12], batch_size=128):
"""测试数据加载性能"""
print(f"\n{'=' * 70}")
print("测试数据加载性能")
print(f"批量大小: {batch_size}")
results = []
for num_workers in num_workers_list:
print(f"\n测试num_workers: {num_workers}")
try:
# 创建数据集和数据加载器
dataset = MockDataset(num_samples=2000)
dataloader = DataLoader(
dataset,
batch_size=batch_size,
num_workers=num_workers,
pin_memory=torch.cuda.is_available(),
collate_fn=collate_fn,
prefetch_factor=2 if num_workers > 0 else None,
)
# 预热
print(" 预热...")
for _ in range(2):
_ = next(iter(dataloader))
# 测速
print(f" 测速中...")
start_time = time.time()
batches = 0
total_samples = 0
for batch in dataloader:
batches += 1
total_samples += batch["input_ids"].size(0)
if batches >= 20: # 测试20个batch
break
elapsed = time.time() - start_time
if batches == 0:
print(f" 警告: 无法加载任何batch")
continue
throughput = total_samples / elapsed
print(f" 结果:")
print(f" 数据加载吞吐量: {throughput:.2f} samples/sec")
print(f" 平均每batch加载时间: {elapsed / batches * 1000:.2f}ms")
results.append(
{
"num_workers": num_workers,
"throughput": throughput,
"avg_batch_time": elapsed / batches * 1000,
"success": True,
}
)
except Exception as e:
print(f" 错误: {e}")
results.append(
{
"num_workers": num_workers,
"error": str(e),
"success": False,
}
)
return results
def profile_training_step(batch_size=128, num_steps=15):
"""详细分析训练步骤中各部分耗时"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\n{'=' * 70}")
print("详细训练步骤分析")
print(f"批量大小: {batch_size}, 步数: {num_steps}")
# 使用真实训练配置
vocab_size = 10019
pinyin_vocab_size = 30
dim = 512
num_slots = 8
n_layers = 4
n_heads = 4
num_experts = 20
max_seq_len = 128
timings = {
"data_to_gpu": [],
"forward": [],
"loss_calc": [],
"backward": [],
"optimizer_step": [],
"total_step": [],
}
try:
# 创建模型和优化器
model = InputMethodEngine(
vocab_size=vocab_size,
pinyin_vocab_size=pinyin_vocab_size,
dim=dim,
num_slots=num_slots,
n_layers=n_layers,
n_heads=n_heads,
num_experts=num_experts,
max_seq_len=max_seq_len,
).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()
scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())
# 创建数据加载器
dataset = MockDataset(num_samples=1000)
dataloader = DataLoader(
dataset,
batch_size=batch_size,
num_workers=4,
pin_memory=torch.cuda.is_available(),
collate_fn=collate_fn,
)
model.train()
print(f"开始分析...")
for i, batch in enumerate(dataloader):
if i >= num_steps:
break
step_start = time.time()
# 1. 数据移动到GPU
data_start = time.time()
batch_gpu = {
k: v.to(device, non_blocking=True)
for k, v in batch.items()
if isinstance(v, torch.Tensor)
}
data_time = time.time() - data_start
timings["data_to_gpu"].append(data_time)
# 2. 前向传播
forward_start = time.time()
with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
logits = model(
input_ids=batch_gpu["input_ids"],
token_type_ids=batch_gpu["token_type_ids"],
attention_mask=batch_gpu["attention_mask"],
pinyin_ids=batch_gpu["pinyin_ids"],
history_slot_ids=batch_gpu["history_slot_ids"],
)
forward_time = time.time() - forward_start
timings["forward"].append(forward_time)
# 3. 损失计算
loss_start = time.time()
with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
loss = criterion(logits, batch_gpu["labels"].squeeze(-1))
loss_time = time.time() - loss_start
timings["loss_calc"].append(loss_time)
# 4. 反向传播
backward_start = time.time()
scaler.scale(loss).backward()
backward_time = time.time() - backward_start
timings["backward"].append(backward_time)
# 5. 优化器步骤
optimizer_start = time.time()
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
optimizer_time = time.time() - optimizer_start
timings["optimizer_step"].append(optimizer_time)
# 总时间
total_time = time.time() - step_start
timings["total_step"].append(total_time)
if i % 5 == 0:
print(
f" Step {i}: 总时间={total_time * 1000:.1f}ms, "
f"前向={forward_time * 1000:.1f}ms, "
f"反向={backward_time * 1000:.1f}ms"
)
# 打印统计信息
print(f"\n{'=' * 60}")
print("训练步骤耗时分析(平均值):")
for key, values in timings.items():
if values:
avg_ms = sum(values) / len(values) * 1000
min_ms = min(values) * 1000
max_ms = max(values) * 1000
if timings["total_step"]:
total_avg = (
sum(timings["total_step"]) / len(timings["total_step"]) * 1000
)
percentage = (avg_ms / total_avg) * 100
else:
percentage = 0
print(
f" {key:15s}: {avg_ms:6.1f}ms ({percentage:5.1f}%) [min={min_ms:.1f}ms, max={max_ms:.1f}ms]"
)
# 计算瓶颈
if timings["total_step"]:
total_avg_time = sum(timings["total_step"]) / len(timings["total_step"])
data_percent = (
sum(timings["data_to_gpu"]) / sum(timings["total_step"]) * 100
)
compute_percent = (
(
sum(timings["forward"])
+ sum(timings["backward"])
+ sum(timings["loss_calc"])
+ sum(timings["optimizer_step"])
)
/ sum(timings["total_step"])
* 100
)
print(f"\n瓶颈分析:")
print(f" 数据加载/传输: {data_percent:.1f}%")
print(f" 计算(前向+反向): {compute_percent:.1f}%")
if data_percent > 30:
print(f" ⚠️ 数据加载是瓶颈建议增加num_workers或使用pin_memory")
if compute_percent > 70:
print(f" ⚠️ 计算是瓶颈!建议优化模型或使用混合精度")
except Exception as e:
print(f"分析过程中出错: {e}")
import traceback
traceback.print_exc()
return timings
def check_gpu_status():
"""检查GPU状态"""
print(f"\n{'=' * 70}")
print("GPU状态检查")
if not torch.cuda.is_available():
print("CUDA不可用将在CPU上运行")
print("注意CPU训练性能会远低于GPU")
return
print(f"可用GPU数量: {torch.cuda.device_count()}")
for i in range(torch.cuda.device_count()):
props = torch.cuda.get_device_properties(i)
print(f"\nGPU {i}: {props.name}")
print(f" 计算能力: {props.major}.{props.minor}")
print(f" 总内存: {props.total_memory / 1024**3:.2f} GB")
print(f" SM数量: {props.multi_processor_count}")
# 当前内存使用
print(f"\n当前内存使用:")
print(f" 已分配: {torch.cuda.memory_allocated() / 1024**2:.2f} MB")
print(f" 已缓存: {torch.cuda.memory_reserved() / 1024**2:.2f} MB")
# 检查PyTorch配置
print(f"\nPyTorch配置:")
print(f" cuDNN基准模式: {torch.backends.cudnn.benchmark}")
print(f" cuDNN确定性: {torch.backends.cudnn.deterministic}")
print(f" TF32启用: {torch.backends.cuda.matmul.allow_tf32}")
# 建议优化
print(f"\n建议优化:")
if not torch.backends.cudnn.benchmark:
print(f" ⚙️ 建议设置: torch.backends.cudnn.benchmark = True")
if not torch.backends.cuda.matmul.allow_tf32:
print(f" ⚙️ 建议设置: torch.backends.cuda.matmul.allow_tf32 = True")
def main():
"""主函数:运行所有性能测试"""
print("输入法模型性能基准测试")
print("=" * 70)
# 设置优化标志(在测试前设置)
if torch.cuda.is_available():
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
print("已启用cuDNN基准模式和TF32加速")
# 1. 检查GPU状态
check_gpu_status()
# 2. 测试数据加载性能
print(f"\n{'=' * 70}")
print("阶段1: 数据加载性能测试")
data_results = benchmark_data_loading(
num_workers_list=[0, 2, 4, 8, 12], batch_size=128
)
# 3. 测试模型计算性能
print(f"\n{'=' * 70}")
print("阶段2: 模型计算性能测试")
# 从较小的batch_size开始测试
model_results = benchmark_model_only(batch_sizes=[32, 64, 128, 256])
# 4. 详细分析训练步骤
print(f"\n{'=' * 70}")
print("阶段3: 详细训练步骤分析")
step_timings = profile_training_step(batch_size=128, num_steps=15)
# 5. 给出优化建议
print(f"\n{'=' * 70}")
print("优化建议汇总")
print("=" * 70)
# 基于数据加载结果
if data_results:
successful_results = [r for r in data_results if r.get("success", False)]
if successful_results:
best_workers = max(successful_results, key=lambda x: x["throughput"])
print(f"1. 数据加载优化:")
print(f" 推荐num_workers: {best_workers['num_workers']}")
print(f" 最佳吞吐量: {best_workers['throughput']:.2f} samples/sec")
# 基于模型结果
if model_results:
successful_results = [r for r in model_results if r.get("success", False)]
if successful_results:
best_batch = max(successful_results, key=lambda x: x["throughput"])
print(f"\n2. 批量大小优化:")
print(f" 推荐batch_size: {best_batch['batch_size']}")
print(f" 最佳吞吐量: {best_batch['throughput']:.2f} samples/sec")
print(f" 内存使用: {best_batch['memory']:.2f} MB")
# 内存安全建议
if best_batch["memory"] > 0:
if torch.cuda.is_available():
total_mem = (
torch.cuda.get_device_properties(0).total_memory / 1024**2
)
usage_percent = (best_batch["memory"] / total_mem) * 100
if usage_percent < 70:
print(f" ✅ 内存使用安全 ({usage_percent:.1f}% of total)")
else:
print(
f" ⚠️ 内存使用较高 ({usage_percent:.1f}% of total)考虑减小batch_size"
)
# 通用建议
print(f"\n3. 通用优化建议:")
if torch.cuda.is_available():
print(f" ✅ 使用pin_memory=True (已启用)")
print(f" ✅ 使用混合精度训练 (已启用)")
print(f" ✅ 设置torch.backends.cudnn.benchmark = True (已启用)")
print(f" ✅ 使用非阻塞数据传输 (non_blocking=True)")
else:
print(f" ⚠️ 未检测到GPU建议使用GPU以获得更好性能")
# 特定建议
print(f"\n4. 针对你的配置的具体建议:")
print(f" - 默认配置: batch_size=128, num_workers=2, dim=512")
print(f" - 如果GPU内存充足可以尝试增加batch_size")
print(f" - 如果CPU核心多可以增加num_workers")
print(f"\n{'=' * 70}")
print("基准测试完成!")
print(f"可以运行完整训练来验证优化效果:")
print(f" python -m src.model.trainer train --help")
# 保存结果到文件
try:
results = {
"data_loading": data_results,
"model_compute": model_results,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
}
with open("benchmark_results.json", "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"\n结果已保存到: benchmark_results.json")
except:
pass
if __name__ == "__main__":
main()