feat(dataset): 优化拼音处理逻辑并增强代码注释

This commit is contained in:
songsenand 2026-02-23 22:40:39 +08:00
parent a82279b02a
commit 019fa2d23d
8 changed files with 90 additions and 57 deletions

View File

@ -137,38 +137,42 @@ class PinyinInputDataset(IterableDataset):
pinyin_list: List[str] = None, pinyin_list: List[str] = None,
) -> List[Tuple[str, str]]: ) -> List[Tuple[str, str]]:
""" """
获取后续中文字符及其拼音 获取从指定起始位置开始的后续中文字符及其对应的拼音
Args: Args:
text: 完整文本 text (str): 完整的输入文本
start_idx: 起始位置 start_idx (int): 开始搜索的索引位置
max_count: 最大字符数 max_count (int, optional): 最多返回的中文字符数量默认为3
pinyin_list (List[str], optional): 预先计算好的拼音列表用于提高效率如果未提供则会动态计算
Returns: Returns:
列表每个元素为(字符, 拼音) List[Tuple[str, str]]: 返回一个列表每个元素是一个元组包含中文字符及其对应的拼音
""" """
result = [] result = []
count = 0 count = 0
# 遍历从起始位置之后的字符
for i in range(start_idx + 1, len(text)): for i in range(start_idx + 1, len(text)):
if count >= max_count: if count >= max_count:
break break
char = text[i] char = text[i]
# 判断当前字符是否为中文字符
if self.query_engine.is_chinese_char(char): if self.query_engine.is_chinese_char(char):
# 获取拼音注意这里需要确保拼音列表长度与text一致 # 获取拼音信息
try: try:
# 重新计算整个text的拼音可能效率低但确保准确 # 如果没有提供拼音列表,则动态计算整个文本的拼音
# 实际实现中可以考虑缓存或优化
if pinyin_list is None: if pinyin_list is None:
pinyin_list = lazy_pinyin(text, errors=lambda x: [c for c in x]) pinyin_list = lazy_pinyin(text, errors=lambda x: [c for c in x])
# 确保索引在拼音列表范围内,并将字符和拼音加入结果
if i < len(pinyin_list): if i < len(pinyin_list):
result.append((char, pinyin_list[i])) result.append((char, pinyin_list[i]))
count += 1 count += 1
except Exception: except Exception:
# 发生异常时终止循环
break break
else: else:
# 非汉字,继续查找 # 当前字符不是中文,跳过
continue continue
return result return result
@ -432,8 +436,7 @@ class PinyinInputDataset(IterableDataset):
# Tokenize # Tokenize
hint = self.tokenizer( hint = self.tokenizer(
sampled_context, sampled_context + processed_pinyin,
processed_pinyin,
max_length=self.max_len, max_length=self.max_len,
padding="max_length", padding="max_length",
truncation=True, truncation=True,

View File

@ -23,6 +23,7 @@ if __name__ == "__main__":
batch_query_size=300, batch_query_size=300,
shuffle=True, shuffle=True,
shuffle_buffer_size=4000, shuffle_buffer_size=4000,
drop_py_rate=0.7
) )
logger.info("数据集初始化") logger.info("数据集初始化")
dataloader = DataLoader( dataloader = DataLoader(

View File

@ -37,32 +37,33 @@ class ResidualBlock(nn.Module):
x = self.ln1(x) x = self.ln1(x)
x = self.linear2(x) x = self.linear2(x)
x = self.ln2(x) x = self.ln2(x)
x = self.dropout(x) # 残差前加 Dropout符合原描述 x = self.dropout(x)
x = x + residual x = x + residual
return self.relu(x) return self.relu(x)
# ---------------------------- 专家网络 ---------------------------- # ---------------------------- 专家网络 ----------------------------
class Expert(nn.Module): class Expert(nn.Module):
def __init__( def __init__(
self, self,
input_dim, input_dim, # 输入特征的维度大小
d_model=1024, d_model=1024, # 模型内部的隐藏层维度默认为1024
num_resblocks=4, num_resblocks=4, # 残差块的数量默认为4
output_multiplier=2, output_multiplier=2, # 输出维度是输入维度的倍数默认为2倍
dropout_prob=0.1, dropout_prob=0.1, # Dropout层的丢弃概率默认为0.1
): ):
""" """
初始化函数用于构建模型的各个层
参数说明
input_dim : 输入维度 input_dim : 输入维度
d_model : 专家内部维度固定 1024 d_model : 专家内部维度
output_multiplier : 输出维度 = input_dim * output_multiplier output_multiplier : 输出维度 = input_dim * output_multiplier
dropout_prob : 残差块内 Dropout dropout_prob : 残差块内 Dropout
""" """
super().__init__() super().__init__() # 调用父类的初始化方法
self.input_dim = input_dim self.input_dim = input_dim # 保存输入维度
self.d_model = d_model self.d_model = d_model # 保存模型内部维度
self.output_dim = input_dim * output_multiplier self.output_dim = input_dim * output_multiplier # 计算并保存输出维度
# 输入映射input_dim -> d_model # 输入映射input_dim -> d_model
self.linear_in = nn.Linear(input_dim, d_model) self.linear_in = nn.Linear(input_dim, d_model)
@ -85,9 +86,8 @@ class Expert(nn.Module):
x = block(x) x = block(x)
return self.output(x) return self.output(x)
# ---------------------------- 主模型MoE + 硬路由)------------------------ # ---------------------------- 主模型MoE + 硬路由)------------------------
class MoEModel(nn.Module): class MoEModel(nn.Module):
def __init__( def __init__(
self, self,
@ -180,15 +180,15 @@ class MoEModel(nn.Module):
# ----- 2. Transformer Encoder ----- # ----- 2. Transformer Encoder -----
# padding mask: True 表示忽略该位置 # padding mask: True 表示忽略该位置
padding_mask = attention_mask == 0 # padding_mask = attention_mask == 0
encoded = self.encoder( # encoded = self.encoder(
embeddings, src_key_padding_mask=padding_mask # embeddings, src_key_padding_mask=padding_mask
) # [B, S, H] # ) # [B, S, H]
# ----- 3. 池化量 ----- # ----- 3. 池化量 -----
# for block in self.shared_resblocks: # for block in self.shared_resblocks:
# encoded = block(encoded) # encoded = block(encoded)
pooled = self.pooler(encoded.transpose(1, 2)).squeeze(-1) pooled = self.pooler(embeddings.transpose(1, 2)).squeeze(-1)
# pooled = self.pooler(encoded.transpose(1, 2)) # [B, H, 2] # pooled = self.pooler(encoded.transpose(1, 2)) # [B, H, 2]
# pooled = pooled.flatten(1) # [B, H*2] # pooled = pooled.flatten(1) # [B, H*2]
# pooled = self.linear(pooled) # pooled = self.linear(pooled)
@ -313,12 +313,35 @@ class MoEModel(nn.Module):
return accuracy, avg_loss return accuracy, avg_loss
def gen_predict_sample(self, text, py, tokenizer=None): def gen_predict_sample(self, text, py, tokenizer=None):
"""
生成用于预测的样本数据
参数:
text (str): 输入的文本内容
py (list): 与文本对应的拼音列表
tokenizer (PreTrainedTokenizer, optional): 用于文本编码的分词器如果未提供且实例中没有默认分词器
则会自动加载预训练的分词器
返回:
dict: 包含以下键值的字典
- "hint": 包含编码后的输入特征包括 "input_ids" "attention_mask"
- "pg": 一个张量表示拼音的第一个字符在 PG 映射中的索引
功能说明:
1. 如果未提供分词器且实例中不存在默认分词器则从预训练模型加载分词器
2. 使用分词器对输入文本和拼音进行编码设置最大长度为 88并进行填充和截断
3. 构造样本字典包含编码后的输入特征和拼音映射张量
"""
# 如果未提供分词器且实例中没有默认分词器,则加载预训练分词器
if tokenizer is None and not hasattr(self, "tokenizer"): if tokenizer is None and not hasattr(self, "tokenizer"):
self.tokenizer = AutoTokenizer.from_pretrained( self.tokenizer = AutoTokenizer.from_pretrained(
"iic/nlp_structbert_backbone_tiny_std" "iic/nlp_structbert_backbone_tiny_std"
) )
else: else:
# 使用传入的分词器或实例中的默认分词器
self.tokenizer = tokenizer or self.tokenizer self.tokenizer = tokenizer or self.tokenizer
# 对输入文本和拼音进行编码,生成模型所需的输入格式
hint = self.tokenizer( hint = self.tokenizer(
text, text,
py, py,
@ -327,11 +350,14 @@ class MoEModel(nn.Module):
truncation=True, truncation=True,
return_tensors="pt", return_tensors="pt",
) )
# 构造样本字典
sample = {} sample = {}
sample["hint"] = { sample["hint"] = {
"input_ids": hint["input_ids"], "input_ids": hint["input_ids"],
"attention_mask": hint["attention_mask"], "attention_mask": hint["attention_mask"],
} }
# 将拼音的第一个字符映射为 PG 中的索引并转换为张量
sample["pg"] = torch.tensor([PG[py[0]]]) sample["pg"] = torch.tensor([PG[py[0]]])
return sample return sample
@ -348,30 +374,34 @@ class MoEModel(nn.Module):
preds : torch.Tensor preds : torch.Tensor
[batch] 预测类别标签若输入为单样本且无 batch 维度则返回标量 [batch] 预测类别标签若输入为单样本且无 batch 维度则返回标量
""" """
self.eval() self.eval() # 将模型设置为评估模式关闭dropout等训练时需要的层
# ------------------ 1. 提取并规范化输入 ------------------ # ------------------ 1. 提取并规范化输入 ------------------
# 判断是否为单样本input_ids 无 batch 维度) # 判断是否为单样本input_ids 无 batch 维度)
sample = self.gen_predict_sample(text, py, tokenizer) sample = self.gen_predict_sample(text, py, tokenizer) # 生成预测所需的样本数据
input_ids = sample["hint"]["input_ids"] input_ids = sample["hint"]["input_ids"] # 获取输入ID
attention_mask = sample["hint"]["attention_mask"] attention_mask = sample["hint"]["attention_mask"] # 获取注意力掩码
pg = sample["pg"] pg = sample["pg"] # 获取拼音引导
has_batch_dim = input_ids.dim() > 1 has_batch_dim = input_ids.dim() > 1 # 判断输入是否有batch维度
# 如果没有batch维度则添加batch维度
if not has_batch_dim: if not has_batch_dim:
input_ids = input_ids.unsqueeze(0) input_ids = input_ids.unsqueeze(0) # 在第0维添加batch维度
attention_mask = attention_mask.unsqueeze(0) attention_mask = attention_mask.unsqueeze(0) # 在第0维添加batch维度
# 如果拼音引导是标量则扩展为与输入ID相同的batch大小
if pg.dim() == 0: if pg.dim() == 0:
pg = pg.unsqueeze(0).expand(input_ids.size(0)) pg = pg.unsqueeze(0).expand(input_ids.size(0)) # 扩展拼音引导的batch维度
# ------------------ 2. 移动设备 ------------------ # ------------------ 2. 移动设备 ------------------
# 将输入数据移动到模型所在设备GPU/CPU
input_ids = input_ids.to(self.device) input_ids = input_ids.to(self.device)
attention_mask = attention_mask.to(self.device) attention_mask = attention_mask.to(self.device)
pg = pg.to(self.device) pg = pg.to(self.device)
# ------------------ 3. 推理 ------------------ # ------------------ 3. 推理 ------------------
# 使用torch.no_grad()上下文管理器,不计算梯度,节省内存
with torch.no_grad(): with torch.no_grad():
logits = self(input_ids, attention_mask, pg) logits = self(input_ids, attention_mask, pg) # 前向传播获取logits
preds = torch.softmax(logits, dim=-1).argmax(dim=-1) # [batch] preds = torch.softmax(logits, dim=-1).argmax(dim=-1) # [batch]
# ------------------ 4. 返回结果(保持与输入维度一致) ------------------ # ------------------ 4. 返回结果(保持与输入维度一致) ------------------
@ -381,19 +411,19 @@ class MoEModel(nn.Module):
def fit( def fit(
self, self,
train_dataloader, train_dataloader, # 训练数据加载器
eval_dataloader=None, eval_dataloader=None, # 评估数据加载器,可选
monitor: Optional[TrainingMonitor] = None, monitor: Optional[TrainingMonitor] = None, # 训练监控器,用于记录训练过程
criterion=None, criterion=None, # 损失函数
optimizer=None, optimizer=None, # 优化器
num_epochs=1, num_epochs=1, # 训练轮数
stop_batch=1e6, stop_batch=1e6, # 最大训练批次数
eval_frequency=500, eval_frequency=500,
grad_accum_steps=1, grad_accum_steps=1, # 梯度累积步数
clip_grad_norm=1.0, clip_grad_norm=1.0, # 梯度裁剪的范数
mixed_precision=False, mixed_precision=False, # 是否使用混合精度训练
loss_weight=None, loss_weight=None, # 损失权重,用于处理类别不平衡
lr=6e-5, lr=6e-5, # 初始学习率
lr_schedule=None, # 新增:可选的自定义学习率调度函数 lr_schedule=None, # 新增:可选的自定义学习率调度函数
): ):
""" """
@ -426,7 +456,7 @@ class MoEModel(nn.Module):
自定义学习率调度函数接收参数 (processed_batches, optimizer) 自定义学习率调度函数接收参数 (processed_batches, optimizer)
可在内部直接修改 optimizer.param_groups 中的学习率 可在内部直接修改 optimizer.param_groups 中的学习率
""" """
# 确保模型在正确的设备上 # 确保模型在正确的设备上GPU或CPU
if self.device is None: if self.device is None:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.to(self.device) self.to(self.device)
@ -434,10 +464,11 @@ class MoEModel(nn.Module):
# 切换到训练模式 # 切换到训练模式
super().train() super().train()
# 默认优化器 # 默认优化器设置
if optimizer is None: if optimizer is None:
optimizer = optim.AdamW(self.parameters(), lr=lr) optimizer = optim.AdamW(self.parameters(), lr=lr)
# 损失函数设置
if criterion is None: if criterion is None:
if loss_weight is not None: if loss_weight is not None:
criterion = nn.CrossEntropyLoss(weight=loss_weight) criterion = nn.CrossEntropyLoss(weight=loss_weight)
@ -541,5 +572,3 @@ class MoEModel(nn.Module):
for name, param in self.named_parameters(): for name, param in self.named_parameters():
if name in freeze_layers: if name in freeze_layers:
param.requires_grad = False param.requires_grad = False