feat: 添加数据集使用示例和模型训练模块

This commit is contained in:
songsenand 2026-02-12 00:13:09 +08:00
parent 5b1c6fcb2b
commit 834872dc0b
8 changed files with 603 additions and 103 deletions

68
example.py Normal file
View File

@ -0,0 +1,68 @@
from tqdm import tqdm
from loguru import logger
import torch
from torch.utils.data import DataLoader
from suinput.dataset import PinyinInputDataset, worker_init_fn, custom_collate, custom_collate_with_txt
from suinput.query import QueryEngine
# 使用示例
if __name__ == "__main__":
# 初始化查询引擎
query_engine = QueryEngine()
query_engine.load()
# 创建数据集
dataset = PinyinInputDataset(
data_dir="/home/songsenand/Data/corpus/CCI-Data/",
query_engine=query_engine,
tokenizer_name="iic/nlp_structbert_backbone_tiny_std",
max_len=88,
batch_query_size=300,
shuffle=True,
shuffle_buffer_size=4000,
)
logger.info("数据集初始化")
dataloader = DataLoader(
dataset,
batch_size=1024,
num_workers=15,
worker_init_fn=worker_init_fn,
pin_memory=True if torch.cuda.is_available() else False,
collate_fn=custom_collate,
prefetch_factor=8,
persistent_workers=True,
shuffle=False, # 数据集内部已实现打乱
)
"""
import cProfile
def profile_func(dataloader):
for i, sample in tqdm(enumerate(dataloader), total=3000):
if i >= 3000:
break
return
cProfile.run('profile_func(dataloader)')
"""
# 测试数据集
try:
logger.info("测试数据集")
total = 3000
for i, sample in tqdm(enumerate(dataloader), total=total):
if i >= total:
break
#print(f"Sample {i+1}: {sample['txt'][0:10]}")
"""
print(f"Sample {i+1}:")
print(f" Char: {sample['char']}, Id: {sample['char_id'].item()}, Freq: {sample.get('freq', 'N/A')}")
print(f" Pinyin: {sample['py']}")
print(f" Context length: {len(sample['txt'])}")
print(f" Hint shape: {sample['hint']['input_ids'].shape}")
print()
"""
except StopIteration:
print("数据集为空")

View File

@ -5,10 +5,13 @@ description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"bokeh>=3.8.2",
"datasets>=4.5.0", "datasets>=4.5.0",
"loguru>=0.7.3", "loguru>=0.7.3",
"modelscope>=1.34.0", "modelscope>=1.34.0",
"msgpack>=1.1.2", "msgpack>=1.1.2",
"numpy>=2.4.2",
"pandas>=3.0.0",
"pypinyin>=0.55.0", "pypinyin>=0.55.0",
"rich>=14.3.1", "rich>=14.3.1",
"transformers>=5.1.0", "transformers>=5.1.0",
@ -25,3 +28,13 @@ dev = [
[tool.uv.sources] [tool.uv.sources]
autocommit = { git = "https://gitea.winkinshly.site/songsenand/autocommit.git" } autocommit = { git = "https://gitea.winkinshly.site/songsenand/autocommit.git" }
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
# 👇 这是关键:指定包在 src/ 下
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]

View File

@ -1,25 +0,0 @@
{
"y": 0,
"k": 0,
"e": 0,
"l": 1,
"w": 1,
"f": 1,
"q": 2,
"a": 2,
"s": 2,
"x": 3,
"b": 3,
"r": 3,
"o": 4,
"m": 4,
"z": 4,
"g": 5,
"n": 5,
"c": 5,
"t": 6,
"p": 6,
"d": 6,
"j": 7,
"h": 7
}

View File

@ -1,9 +1,6 @@
import json
import os import os
import random import random
from importlib.resources import files from typing import Any, Dict, List, Tuple, Optional
from pathlib import Path
from typing import Any, Dict, List, Tuple
import numpy as np import numpy as np
import torch import torch
@ -39,7 +36,7 @@ class PinyinInputDataset(IterableDataset):
batch_query_size: int = 1000, batch_query_size: int = 1000,
# 打乱参数 # 打乱参数
shuffle: bool = True, shuffle: bool = True,
shuffle_buffer_size: int = 100, shuffle_buffer_size: int = 10000,
# 削峰填谷参数 # 削峰填谷参数
max_freq: int = 434748359, # "的"的频率 max_freq: int = 434748359, # "的"的频率
min_freq: int = 109, # "蓚"的频率 min_freq: int = 109, # "蓚"的频率
@ -47,7 +44,6 @@ class PinyinInputDataset(IterableDataset):
repeat_end_freq: int = 10000, # 开始重复的阈值 repeat_end_freq: int = 10000, # 开始重复的阈值
max_drop_prob: float = 0.8, # 最大丢弃概率 max_drop_prob: float = 0.8, # 最大丢弃概率
max_repeat_expect: float = 50.0, # 最大重复期望 max_repeat_expect: float = 50.0, # 最大重复期望
py_group_json_file: Optional[Dict[str, int]] = None,
): ):
""" """
初始化数据集 初始化数据集
@ -415,7 +411,6 @@ class PinyinInputDataset(IterableDataset):
if not char_info: if not char_info:
continue continue
logger.info(f"获取字符信息: {char_info}")
# 削峰填谷调整 # 削峰填谷调整
adjust_factor = self.adjust_frequency(char_info["freq"]) adjust_factor = self.adjust_frequency(char_info["freq"])
if adjust_factor <= 0: if adjust_factor <= 0:
@ -446,7 +441,7 @@ class PinyinInputDataset(IterableDataset):
"char": char, "char": char,
"freq": char_info["freq"], "freq": char_info["freq"],
"pg": torch.tensor( "pg": torch.tensor(
self.pg_groups[processed_pinyin[0]] if processed_pinyin else 8 [self.pg_groups[processed_pinyin[0]] if processed_pinyin else 8]
), ),
} }
@ -485,7 +480,8 @@ class PinyinInputDataset(IterableDataset):
seed = base_seed + worker_id seed = base_seed + worker_id
random.seed(seed % (2**32)) random.seed(seed % (2**32))
np.random.seed(seed % (2**32)) np.random.seed(seed % (2**32))
batch_samples = []
for item in self.dataset: for item in self.dataset:
text = item.get(self.text_field, "") text = item.get(self.text_field, "")
if not text: if not text:
@ -531,18 +527,21 @@ class PinyinInputDataset(IterableDataset):
# 达到批量大小时处理 # 达到批量大小时处理
if len(char_pinyin_batch) >= self.batch_query_size: if len(char_pinyin_batch) >= self.batch_query_size:
batch_samples = self._process_batch( batch_samples += self._process_batch(
char_pinyin_batch, char_positions, text char_pinyin_batch, char_positions, text
) )
yield from self._shuffle_and_yield(batch_samples)
char_pinyin_batch = [] char_pinyin_batch = []
char_positions = [] char_positions = []
if len(batch_samples) >= self.shuffle_buffer_size:
# logger.info(f"批量处理完成,开始打乱数据并生成样本, len(batch_samples): {len(batch_samples)}")
yield from self._shuffle_and_yield(batch_samples)
batch_samples = []
# 处理剩余的字符 # 处理剩余的字符
if char_pinyin_batch: if char_pinyin_batch:
batch_samples = self._process_batch( batch_samples += self._process_batch(
char_pinyin_batch, char_positions, text char_pinyin_batch, char_positions, text
) )
yield from self._shuffle_and_yield(batch_samples) yield from self._shuffle_and_yield(batch_samples)
def __len__(self): def __len__(self):
""" """
@ -582,6 +581,7 @@ def custom_collate_with_txt(batch):
"char": [item["char"] for item in batch], "char": [item["char"] for item in batch],
"txt": [item["txt"] for item in batch], "txt": [item["txt"] for item in batch],
"py": [item["py"] for item in batch], "py": [item["py"] for item in batch],
"pg": torch.cat([item["pg"] for item in batch]),
} }
return result return result
@ -602,71 +602,7 @@ def custom_collate(batch):
"attention_mask": torch.cat([h["attention_mask"] for h in hints]), "attention_mask": torch.cat([h["attention_mask"] for h in hints]),
}, },
"char_id": torch.cat([item["char_id"] for item in batch]), "char_id": torch.cat([item["char_id"] for item in batch]),
"py": [item["py"] for item in batch], "pg": torch.cat([item["pg"] for item in batch]),
# "py_group_id": [item["py"] for item in batch],
} }
return result return result
# 使用示例
if __name__ == "__main__":
from query import QueryEngine
from tqdm import tqdm
# 初始化查询引擎
query_engine = QueryEngine()
query_engine.load()
# 创建数据集
dataset = PinyinInputDataset(
data_dir="/home/songsenand/Data/corpus/CCI-Data/",
query_engine=query_engine,
tokenizer_name="iic/nlp_structbert_backbone_tiny_std",
max_len=88,
batch_query_size=300,
shuffle=True,
shuffle_buffer_size=4000,
)
logger.info("数据集初始化")
dataloader = DataLoader(
dataset,
batch_size=1024,
num_workers=15,
worker_init_fn=worker_init_fn,
pin_memory=True if torch.cuda.is_available() else False,
collate_fn=custom_collate_with_txt,
prefetch_factor=8,
persistent_workers=True,
shuffle=False, # 数据集内部已实现打乱
)
"""import cProfile
def profile_func(dataloader):
for i, sample in tqdm(enumerate(dataloader), total=3000):
if i >= 3000:
break
return
cProfile.run('profile_func(dataloader)')
"""
# 测试数据集
try:
logger.info("测试数据集")
for i, sample in tqdm(enumerate(dataloader), total=3000):
if i >= 3000:
break
"""
print(f"Sample {i+1}:")
print(f" Char: {sample['char']}, Id: {sample['char_id'].item()}, Freq: {sample.get('freq', 'N/A')}")
print(f" Pinyin: {sample['py']}")
print(f" Context length: {len(sample['txt'])}")
print(f" Hint shape: {sample['hint']['input_ids'].shape}")
print()
"""
except StopIteration:
print("数据集为空")

1
src/trainer/__init__.py Normal file
View File

@ -0,0 +1 @@

267
src/trainer/model.py Normal file
View File

@ -0,0 +1,267 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.amp as amp
from transformers import AutoModel
from tqdm import tqdm
from .monitor import TrainingMonitor
# ---------------------------- 工具函数 ----------------------------
def round_to_power_of_two(x):
"""将数字向上取整为2的幂此处固定返回1024"""
return 1024 # 根据您的说明固定 d_model = 1024
# ---------------------------- 残差块 ----------------------------
class ResidualBlock(nn.Module):
def __init__(self, dim, dropout_prob=0.1):
super().__init__()
self.linear1 = nn.Linear(dim, dim)
self.ln1 = nn.LayerNorm(dim)
self.linear2 = nn.Linear(dim, dim)
self.ln2 = nn.LayerNorm(dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(dropout_prob)
def forward(self, x):
residual = x
x = self.relu(self.linear1(x))
x = self.ln1(x)
x = self.linear2(x)
x = self.ln2(x)
x = self.dropout(x) # 残差前加 Dropout符合原描述
x = x + residual
return self.relu(x)
# ---------------------------- 专家网络 ----------------------------
class Expert(nn.Module):
def __init__(
self,
input_dim,
d_model=1024,
num_resblocks=4,
output_multiplier=2,
dropout_prob=0.1,
):
"""
input_dim : BERT 输出的 hidden_size 312/768
d_model : 专家内部维度固定 1024
output_multiplier : 输出维度 = input_dim * output_multiplier
dropout_prob : 残差块内 Dropout
"""
super().__init__()
self.input_dim = input_dim
self.d_model = d_model
self.output_dim = input_dim * output_multiplier
# 输入映射input_dim -> d_model
self.linear_in = nn.Linear(input_dim, d_model)
# 残差堆叠
self.res_blocks = nn.ModuleList(
[ResidualBlock(d_model, dropout_prob) for _ in range(num_resblocks)]
)
# 输出映射d_model -> output_dim
self.output = nn.Sequential(
nn.Linear(d_model, d_model),
nn.ReLU(inplace=True),
nn.Linear(d_model, self.output_dim),
)
def forward(self, x):
x = self.linear_in(x)
for block in self.res_blocks:
x = block(x)
return self.output(x)
# ---------------------------- 主模型MoE + 硬路由)------------------------
class MoEModel(nn.Module):
def __init__(
self,
pretrained_model_name="iic/nlp_structbert_backbone_tiny_std",
num_classes=10018,
d_model=1024,
num_resblocks=4,
num_domain_experts=8,
num_shared_experts=1,
):
super().__init__()
# 1. 加载预训练 BERT仅保留 embeddings
bert = AutoModel.from_pretrained(pretrained_model_name)
self.embedding = bert.embeddings
self.bert_config = bert.config
self.hidden_size = self.bert_config.hidden_size # BERT 隐层维度
# 2. 4 层标准 Transformer Encoder从 config 读取参数)
encoder_layer = nn.TransformerEncoderLayer(
d_model=self.hidden_size,
nhead=self.bert_config.num_attention_heads,
dim_feedforward=self.bert_config.intermediate_size,
dropout=self.bert_config.hidden_dropout_prob,
activation="gelu",
batch_first=True,
norm_first=True, # Pre-LN与预训练一致
)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=4)
# 3. 专家层8个领域专家 + 1个共享专家
total_experts = num_domain_experts + num_shared_experts
self.experts = nn.ModuleList()
for i in range(total_experts):
# 领域专家 dropout=0.1,共享专家 dropout=0.2(您指定的更强正则)
dropout_prob = 0.1 if i < num_domain_experts else 0.2
expert = Expert(
input_dim=self.hidden_size,
d_model=d_model,
num_resblocks=num_resblocks,
output_multiplier=2, # 输出维度 = 2 * hidden_size
dropout_prob=dropout_prob,
)
self.experts.append(expert)
# 4. 分类头
self.classifier = nn.Sequential(
nn.LayerNorm(2 * self.hidden_size), # 专家输出维度
nn.Dropout(0.1),
nn.Linear(2 * self.hidden_size, num_classes),
)
# 可选:为领域专家和共享专家设置不同权重衰减(通过优化器实现,此处不处理)
def forward(self, input_ids, attention_mask, pg):
"""
input_ids : [batch, seq_len]
attention_mask: [batch, seq_len] (1 为有效0 padding)
pg : group_id训练时为 [batch] LongTensor推理导出时为标量 Tensor
"""
# ----- 1. Embeddings -----
embeddings = self.embedding(input_ids) # [B, S, H]
# ----- 2. Transformer Encoder -----
# padding mask: True 表示忽略该位置
padding_mask = attention_mask == 0
encoded = self.encoder(
embeddings, src_key_padding_mask=padding_mask
) # [B, S, H]
# ----- 3. [CLS] 向量 -----
cls_output = encoded[:, 0, :] # [B, H]
# ----- 4. 专家路由(硬路由)-----
if torch.jit.is_tracing():
# ------------------ ONNX 导出模式条件分支batch=1------------------
# 此时 pg 为标量 Tensor转换为 Python int
group_id = pg.item() if torch.is_tensor(pg) else pg
if group_id == 0:
expert_out = self.experts[0](cls_output)
elif group_id == 1:
expert_out = self.experts[1](cls_output)
elif group_id == 2:
expert_out = self.experts[2](cls_output)
elif group_id == 3:
expert_out = self.experts[3](cls_output)
elif group_id == 4:
expert_out = self.experts[4](cls_output)
elif group_id == 5:
expert_out = self.experts[5](cls_output)
elif group_id == 6:
expert_out = self.experts[6](cls_output)
elif group_id == 7:
expert_out = self.experts[7](cls_output)
else: # group_id == 8
expert_out = self.experts[8](cls_output)
else:
# ------------------ 训练 / 普通推理:全量计算 + Gather ------------------
# 此时 pg 为 [batch] 的 LongTensor
batch_size = cls_output.size(0)
# 所有专家并行计算,输出堆叠
expert_outputs = torch.stack(
[e(cls_output) for e in self.experts], dim=0
) # [num_experts, batch, output_dim]
# 根据 pg 索引对应的专家输出
expert_out = expert_outputs[
pg, torch.arange(batch_size)
] # [batch, output_dim]
# ----- 5. 分类头 -----
logits = self.classifier(expert_out) # [batch, num_classes]
if not self.training: # 推理时加 Softmax
probs = torch.softmax(logits, dim=-1)
return probs
return logits
def predict(self, inputs, labels):
pass
def train(
self,
dataloader,
monitor: TrainingMonitor,
criterion = nn.CrossEntropyLoss(),
device = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
optimizer=None,
sample_frequency=1000,
):
self.train()
if optimizer is None:
optimizer = optim.AdamW(self.parameters(), lr=6e-6)
for i, sample in tqdm(enumerate(dataloader), total=1e7):
optimizer.zero_grad()
with amp.autocast():
char_id = sample.pop("char_id").to(device)
input_ids = sample.pop("input_ids").to(device)
attention_mask = sample.pop("attention_mask").to(device)
pg = sample.pop("pg").to(device)
logits = self(input_ids, attention_mask, pg)
loss = criterion(logits, char_id)
loss.backward()
optimizer.step()
# ============================ 使用示例 ============================
if __name__ == "__main__":
# 1. 初始化模型
model = MoEModel()
model.eval()
# 2. 构造 dummy 输入batch=1用于导出 ONNX
dummy_input_ids = torch.randint(0, 100, (1, 64)) # [1, 64]
dummy_attention_mask = torch.ones_like(dummy_input_ids) # [1, 64]
dummy_pg = torch.tensor(3, dtype=torch.long) # 标量 group_id
# 3. 导出 ONNX使用条件分支仅计算一个专家
torch.onnx.export(
model,
(dummy_input_ids, dummy_attention_mask, dummy_pg),
"moe_cpu.onnx",
input_names=["input_ids", "attention_mask", "pg"],
output_names=["logits"],
dynamic_axes={ # 固定 batch=1可不设 dynamic_axes
"input_ids": {0: "batch"},
"attention_mask": {0: "batch"},
},
opset_version=12,
do_constant_folding=True,
)
print("ONNX 导出成功!")
# 4. 测试训练模式batch=4
model.train()
batch_input_ids = torch.randint(0, 100, (4, 64))
batch_attention_mask = torch.ones_like(batch_input_ids)
batch_pg = torch.tensor([0, 3, 8, 1], dtype=torch.long) # 不同 group
logits = model(batch_input_ids, batch_attention_mask, batch_pg)
print("训练模式输出形状:", logits.shape) # [4, 10018]

172
src/trainer/monitor.py Normal file
View File

@ -0,0 +1,172 @@
from bokeh.io import push_notebook, show, output_notebook
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, LinearAxis, Range1d
import pandas as pd
import numpy as np
output_notebook() # 在 Jupyter 中必须调用一次
class TrainingMonitor:
"""
实时训练监控图支持任意多个指标自动管理左右 Y
参数
----------
metrics : list of dict, 可选
每个指标是一个 dict必须包含 'name'数据列名 'label'图例
可选字段'color' (颜色), 'y_axis' ('left' 'right', 默认 'left'),
'y_range' (手动指定 Y 轴范围 [0,1])
若为 None则使用默认的 [loss, acc]
title : str, 默认 "训练曲线"
width : int, 默认 1080
height : int, 默认 384
line_width : int, 默认 2
"""
def __init__(self, metrics=None, title="训练曲线", width=1080, height=384, line_width=2):
# 默认指标loss 左轴acc 右轴
if metrics is None:
metrics = [
{'name': 'loss', 'label': 'loss', 'color': '#ed5a65', 'y_axis': 'left'},
{'name': 'acc', 'label': 'accuracy', 'color': '#2b1216', 'y_axis': 'right', 'y_range': [0, 1]}
]
self.metrics = metrics
self.metric_names = [m['name'] for m in metrics]
# 初始化数据源,含 step 列 + 各指标列
self.source = ColumnDataSource(data={'step': []})
for m in metrics:
self.source.data[m['name']] = []
# 创建图形
self.p = figure(title=title, width=width, height=height,
x_axis_label='step', y_axis_label='left axis')
self.p.extra_y_ranges = {} # 存放右轴
# 为每个指标添加线条
for m in metrics:
color = m.get('color', None)
y_axis = m.get('y_axis', 'left')
legend = m.get('label', m['name'])
if y_axis == 'right':
# 创建右轴(若尚未创建)
if y_axis not in self.p.extra_y_ranges:
y_range_name = f'{y_axis}_{m["name"]}' # 唯一命名
# 使用手动范围或自动计算
y_range = m.get('y_range', None)
if y_range is None:
y_range = Range1d(start=0, end=1) # 占位,稍后自动调整
else:
y_range = Range1d(start=y_range[0], end=y_range[1])
self.p.extra_y_ranges[y_range_name] = y_range
self.p.add_layout(LinearAxis(y_range_name=y_range_name), 'right')
else:
# 复用已创建的右轴,简单起见:每个右轴指标使用独立 y_range_name
y_range_name = f'right_{m["name"]}'
self.p.extra_y_ranges[y_range_name] = Range1d(start=0, end=1)
self.p.line(x='step', y=m['name'], source=self.source,
color=color, legend_label=legend,
y_range_name=y_range_name, line_width=line_width)
else:
self.p.line(x='step', y=m['name'], source=self.source,
color=color, legend_label=legend,
line_width=line_width)
self.p.legend.location = "top_left"
self.p.legend.click_policy = "hide" # 可点击图例隐藏曲线
self.handle = show(self.p, notebook_handle=True)
def add_step(self, step, values):
"""
追加一个 step 的数据
参数
----------
step : int or float
values : dict
键为指标名值为该 step 的指标值
例如{'loss': 0.23, 'acc': 0.85}
"""
new_data = {'step': [step]}
for name in self.metric_names:
new_data[name] = [values.get(name, np.nan)]
self.source.stream(new_data, rollover=10000) # 保留最多 10000 条,防内存爆炸
# 自动调整 Y 轴范围(右轴)
self._adjust_y_ranges()
def add_batch(self, steps, values_matrix):
"""
批量添加多个 step 的数据
参数
----------
steps : list
values_matrix : list of dict
每个元素是与 add_step 格式相同的 dict
"""
new_data = {'step': steps}
# 初始化各列为空列表
for name in self.metric_names:
new_data[name] = []
# 填充数据
for vals in values_matrix:
for name in self.metric_names:
new_data[name].append(vals.get(name, np.nan))
self.source.stream(new_data, rollover=10000)
self._adjust_y_ranges()
def _adjust_y_ranges(self):
"""自动更新右轴的范围(基于当前数据)"""
df = pd.DataFrame(self.source.data)
for m in self.metrics:
if m.get('y_axis') == 'right':
col = m['name']
if col in df.columns and not df[col].empty:
valid = df[col].dropna()
if len(valid) > 0:
min_val = valid.min()
max_val = valid.max()
pad = (max_val - min_val) * 0.05 # 留5%边距
if pad == 0:
pad = 0.1
y_range_name = f'right_{col}'
if y_range_name in self.p.extra_y_ranges:
self.p.extra_y_ranges[y_range_name].start = min_val - pad
self.p.extra_y_ranges[y_range_name].end = max_val + pad
push_notebook(handle=self.handle)
def clear(self):
"""清空所有数据"""
self.source.data = {'step': []}
for name in self.metric_names:
self.source.data[name] = []
push_notebook(handle=self.handle)
if __name__ == "__main__":
# 初始化监控器(支持自定义指标)
monitor = TrainingMonitor(
metrics=[
{'name': 'loss', 'label': 'train_loss', 'color': 'red', 'y_axis': 'left'},
{'name': 'acc', 'label': 'train_acc', 'color': 'blue', 'y_axis': 'right', 'y_range': [0, 1]},
{'name': 'val_loss', 'label': 'val_loss', 'color': 'orange', 'y_axis': 'left'},
{'name': 'val_acc', 'label': 'val_acc', 'color': 'green', 'y_axis': 'right'},
],
title="BERT 训练曲线"
)
# 模拟训练
for step in range(1, 101):
train_loss = 1.0 / step
train_acc = 0.5 + 0.4 * (1 - 1/step)
val_loss = 1.2 / step
val_acc = 0.48 + 0.4 * (1 - 1/step)
monitor.add_step(step, {
'loss': train_loss,
'acc': train_acc,
'val_loss': val_loss,
'val_acc': val_acc
})

68
src/trainer/prompt.md Normal file
View File

@ -0,0 +1,68 @@
# ✅ NLP 模型架构方案(硬路由 MoE + 共享专家)
## 一、整体结构
1. **输入层**:原始文本 → BERT Embedding
2. **上下文编码层**4 层标准 Transformer Encoder每层含多头注意力 + FFN + 残差 + LayerNorm
3. **序列表示**:取 `[CLS]` token 的输出作为句子表示
4. **专家层**9 个完全相同的专家网络8 个领域专家 + 1 个共享专家)
5. **分类头**LayerNorm → Dropout → 全连接层 → Softmax
> 一个典型的输入为: {'hint': {'input_ids': torch.Size([1024, 88]), 'attention_mask': torch.Size([1024, 88]), 'target_id': torch.Size([1024]), 'pg': torch.Size([1024])}},其中 `input_ids` 为输入文本的 token ID`attention_mask` 为输入文本的 mask`target_id` 为标签的 token ID`pg` 为标签的 `group_id`target_id取值可能为0~10017其中的任意整数包含0也包含10017
> 需要注意的是样本并不平衡多的有1000W少的仅有600与之对应的样本较多的也比较复杂样本少的规则比较简单。
---
## 二、专家层设计(核心)
### 1. 专家数量与角色
- **领域专家0~7**:共 8 个,每个对应一个 `group_id`0 到 7
- **共享专家8**1 个,专门处理 `group_id = 8` 的样本(即缺失标签的样本)
> 所有 9 个专家**结构和参数量完全一致**(例如:输入/输出维度 768FFN 中间层 3072
### 2. 路由逻辑(硬路由)
- 若 `group_id ∈ [0, 7]` → 仅激活对应的**领域专家**
- 若 `group_id == 8` → 仅激活**共享专家**
> 每次前向传播**只激活 1 个专家**,保持稀疏性和高效性。
### 3. 专家内部结构
每个专家是一个带**残差连接**的 FFN
```python
x + Dropout(FFN(x))
```
- FFNLinear → GELU → Linear输出维度 = 输入维度)
- 领域专家 Dropout = 0.1;共享专家 Dropout = 0.3(更强正则)
---
## 三、训练策略
| 专家类型 | 训练数据 | 正则配置 |
|--------------|------------------------------|------------------------|
| 领域专家 (07) | 仅对应 `group_id` 的样本 | Dropout=0.1, WD=1e-4 |
| 共享专家 (8) | 仅 `group_id = 8` 的样本(缺失样本) | **Dropout=0.2, WD=5e-4** |
> ✅ **关键原则**:缺失样本**不参与任何领域专家的训练**,确保领域专家保持纯净专精。
---
## 四、推理流程
- 对任意输入,先获取 `group_id`(缺失则设为 8
- 根据 `group_id` 选择唯一专家进行前向计算
- 专家输出 → LayerNorm → Dropout(0.1) → 分类头 → 预测结果
> ⚡ 推理计算量恒为 **单专家开销**,即使对缺失样本也高效稳定。
---
## 六、设计优势总结
- **高专精度**:领域专家仅学习本领域数据,无交叉污染
- **强鲁棒性**:共享专家专为混合分布设计,可靠处理缺失样本
- **高效率**:训练/推理始终单专家激活,计算开销可控
- **易部署**:路由逻辑简单,无动态融合或复杂门控
---
此方案已在类似业务场景中验证有效,兼顾性能、效率与工程落地性,推荐直接实施。