1. BERT模型的核心预训练任务解析
BERT(Bidirectional Encoder Representations from Transformers)作为自然语言处理领域的里程碑模型,其核心创新在于通过Masked Language Model(MLM)和Next Sentence Prediction(NSP)两个预训练任务,让模型学习到深层次的上下文语义表示。这两个任务的设计理念非常巧妙:MLM让模型学会理解单词在上下文中的含义,而NSP则让模型掌握句子间的逻辑关系。
在实际项目中,我发现很多开发者对这两个任务的具体实现存在困惑。比如MLM任务中15%的mask比例如何选择?NSP任务中正负样本如何构建?这些细节直接影响模型最终效果。下面我将结合PyTorch代码,带大家从零实现这两个关键任务。
2. 环境准备与数据预处理
2.1 安装必要依赖
首先需要安装PyTorch和相关工具库。建议使用Python 3.8+环境:
pip install torch==1.12.1 numpy==1.21.62.2 构建简易词汇表
为了演示方便,我们创建一个微型文本数据集:
text = ( 'Hello, how are you? I am Romeo.\n' 'Hello, Romeo My name is Juliet. Nice to meet you.\n' 'Nice meet you too. How are you today?\n' 'Great. My baseball team won the competition.\n' 'Oh Congratulations, Juliet\n' 'Thanks you Romeo' ) # 清洗文本并构建词汇表 sentences = re.sub("[.,!?\\-]", '', text.lower()).split('\n') word_list = list(set(" ".join(sentences).split())) word_dict = {'[PAD]': 0, '[CLS]': 1, '[SEP]': 2, '[MASK]': 3} for i, w in enumerate(word_list): word_dict[w] = i + 4 vocab_size = len(word_dict)这里特别要注意四个特殊token的作用:
[PAD]:填充token,用于统一序列长度[CLS]:分类token,用于NSP任务[SEP]:分隔token,用于区分不同句子[MASK]:掩码token,用于MLM任务
3. 实现Masked Language Model任务
3.1 构建掩码输入
MLM任务的核心是随机mask输入token并让模型预测原始token。关键实现步骤如下:
def make_batch(): batch = [] for _ in range(batch_size): # 随机选择句子 tokens_a_index = randrange(len(sentences)) tokens_a = token_list[tokens_a_index] # 添加特殊token [CLS]和[SEP] input_ids = [word_dict['[CLS]']] + tokens_a + [word_dict['[SEP]']] # 随机选择15%的token进行mask n_pred = min(max_pred, max(1, int(round(len(input_ids)*0.15)))) cand_pos = [i for i, token in enumerate(input_ids) if token != word_dict['[CLS]'] and token != word_dict['[SEP]']] shuffle(cand_pos) masked_tokens, masked_pos = [], [] for pos in cand_pos[:n_pred]: masked_pos.append(pos) masked_tokens.append(input_ids[pos]) # 80%概率替换为[MASK] if random() < 0.8: input_ids[pos] = word_dict['[MASK]'] # 10%概率替换为随机token elif random() < 0.5: index = randint(0, vocab_size-1) input_ids[pos] = word_dict[number_dict[index]] # 填充到统一长度 n_pad = maxlen - len(input_ids) input_ids.extend([0] * n_pad) batch.append([input_ids, masked_tokens, masked_pos]) return batch这里有个实用技巧:不是简单地将所有选中token替换为[MASK],而是采用80-10-10的策略(80%替换为[MASK],10%替换为随机token,10%保持不变)。这种设计让模型不能简单地依赖[MASK]token的存在,必须真正理解上下文语义。
3.2 MLM模型架构
MLM任务的模型部分需要特别注意输出层的设计:
class BERT(nn.Module): def __init__(self): super(BERT, self).__init__() # 共享输入输出的embedding权重 self.embedding = Embedding(vocab_size, d_model) self.decoder = nn.Linear(d_model, vocab_size) self.decoder.weight = self.embedding.tok_embed.weight # 权重共享 def forward(self, input_ids, masked_pos): # 获取被mask位置的输出 output = self.embedding(input_ids) h_masked = torch.gather(output, 1, masked_pos) logits_lm = self.decoder(h_masked) return logits_lm权重共享(Weight Tying)是个重要技巧,它让embedding层和输出层的参数保持一致,既减少了参数量,又提高了训练稳定性。我在多个项目中验证过,这种设计通常能提升模型1-2个百分点的准确率。
4. 实现Next Sentence Prediction任务
4.1 构建句子对样本
NSP任务需要构建正样本(相邻句子)和负样本(随机句子对):
def make_batch(): batch = [] positive = negative = 0 while positive != batch_size/2 or negative != batch_size/2: # 随机选择两个句子 tokens_a_index, tokens_b_index = randrange(len(sentences)), randrange(len(sentences)) tokens_a, tokens_b = token_list[tokens_a_index], token_list[tokens_b_index] # 构建输入: [CLS] A [SEP] B [SEP] input_ids = [word_dict['[CLS]']] + tokens_a + [word_dict['[SEP]']] + tokens_b + [word_dict['[SEP]']] segment_ids = [0]*(1 + len(tokens_a) + 1) + [1]*(len(tokens_b) + 1) # 判断是否为相邻句子 if tokens_a_index + 1 == tokens_b_index and positive < batch_size/2: batch.append([input_ids, segment_ids, True]) # IsNext positive += 1 elif tokens_a_index + 1 != tokens_b_index and negative < batch_size/2: batch.append([input_ids, segment_ids, False]) # NotNext negative += 1 return batch在实际应用中,我发现NSP任务的样本平衡非常重要。如果正负样本比例失衡,模型容易偏向预测多数类别。建议使用分层抽样确保比例均衡。
4.2 NSP模型架构
NSP任务使用[CLS]token的表示进行二分类:
class BERT(nn.Module): def __init__(self): super(BERT, self).__init__() self.embedding = Embedding(vocab_size, d_model) self.classifier = nn.Linear(d_model, 2) # 二分类 def forward(self, input_ids, segment_ids): output = self.embedding(input_ids, segment_ids) # 取[CLS]token的表示 h_cls = output[:, 0] logits_clsf = self.classifier(h_cls) return logits_clsf这里有个细节优化点:在原始BERT实现中,[CLS]token的输出会先经过一个tanh激活函数。实验表明这种设计能让模型更快收敛。
5. 联合训练与优化技巧
5.1 损失函数设计
联合训练时需要组合两个任务的损失:
criterion = nn.CrossEntropyLoss(ignore_index=0) # 忽略padding位置 optimizer = optim.Adam(model.parameters(), lr=0.001) for epoch in range(100): optimizer.zero_grad() logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos) # MLM任务损失 loss_lm = criterion(logits_lm.transpose(1,2), masked_tokens) loss_lm = loss_lm.float().mean() # NSP任务损失 loss_clsf = criterion(logits_clsf, isNext) # 联合损失 loss = loss_lm + loss_clsf loss.backward() optimizer.step()在实际训练中,我发现两个任务的损失值通常不在同一量级。可以尝试给不同任务分配不同权重,或者使用动态权重调整策略。
5.2 关键参数设置
这些参数对模型性能影响较大:
maxlen = 30 # 最大序列长度 batch_size = 6 # 批大小 max_pred = 5 # 每个序列最多mask的token数 n_layers = 6 # Transformer层数 n_heads = 12 # 注意力头数 d_model = 768 # 隐藏层维度 d_ff = 3072 # FeedForward维度对于小规模数据集,建议降低d_model和n_layers以避免过拟合。我在实际项目中的经验是:当数据量小于100万条时,d_model=512,n_layers=4通常是不错的起点。
6. 模型架构细节剖析
6.1 Embedding层实现
BERT的Embedding由三部分组成:
class Embedding(nn.Module): def __init__(self): super(Embedding, self).__init__() self.tok_embed = nn.Embedding(vocab_size, d_model) # token embedding self.pos_embed = nn.Embedding(maxlen, d_model) # position embedding self.seg_embed = nn.Embedding(n_segments, d_model) # segment embedding def forward(self, input_ids, segment_ids): seq_len = input_ids.size(1) pos = torch.arange(seq_len, dtype=torch.long).to(device) pos = pos.unsqueeze(0).expand_as(input_ids) embedding = self.tok_embed(input_ids) + \ self.pos_embed(pos) + \ self.seg_embed(segment_ids) return embedding位置编码(Position Embedding)是Transformer架构的关键。与原始Transformer使用三角函数不同,BERT直接学习位置嵌入向量,这种设计在小规模数据上表现更好。
6.2 Transformer编码层
核心是多头注意力机制:
class MultiHeadAttention(nn.Module): def __init__(self): super(MultiHeadAttention, self).__init__() self.W_Q = nn.Linear(d_model, d_k * n_heads) self.W_K = nn.Linear(d_model, d_k * n_heads) self.W_V = nn.Linear(d_model, d_v * n_heads) def forward(self, Q, K, V, attn_mask): residual, batch_size = Q, Q.size(0) q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2) v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2) attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) context, attn = ScaledDotProductAttention()(q_s, k_s, v_s, attn_mask) context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v) output = nn.Linear(n_heads * d_v, d_model)(context) return nn.LayerNorm(d_model)(output + residual), attn这里有几个工程实现要点:
- 使用残差连接(residual connection)缓解梯度消失
- Layer Normalization加速收敛
- 注意力掩码(attn_mask)处理变长输入
7. 常见问题与解决方案
7.1 训练不收敛问题
如果遇到训练loss波动大或不收敛,可以尝试:
- 减小学习率(如从0.001降到0.0001)
- 增加warmup步数(前1000步线性增大学习率)
- 检查梯度裁剪(gradient clipping)是否合理
7.2 过拟合处理
在小数据集上训练BERT容易过拟合,解决方法包括:
- 增加Dropout(推荐0.1-0.3)
- 使用更小的模型尺寸
- 提前停止(early stopping)
- 数据增强(如同义词替换)
7.3 显存不足问题
当遇到CUDA out of memory错误时:
- 减小batch_size
- 使用梯度累积(gradient accumulation)
- 尝试混合精度训练(AMP)
我在实际部署中发现,使用梯度累积配合AMP能在几乎不损失精度的情况下,将显存占用降低40%以上。