背景痛点:中文语料为什么总“卡”在清洗
做中文 NLP 的同学都懂,真正的瓶颈往往不是模型,而是把原始文本喂给模型之前的那一段“脏活累活”。我去年接手一个千万级问答对召回项目,CLUECorpus2020 原封不动下下来 200 G,解压后直接丢进训练脚本,结果 3 天过去还在 tokenizer 阶段——GPU 空转,电费烧得心疼。总结下来,中文语料有 3 只拦路虎:
- 繁简体、异体字、全半角、标点风格一锅粥,规则写少了清洗不干净,写多了又误杀正常字符。
- 没有天然空格,tokenization/分词必须先于清洗,而清洗又会反过来影响分词结果,循环依赖。
- 长文本截断策略难定:按句切?按字切?按 max_len 硬截?截完标签对不齐,下游任务直接崩。
再加上业务方临时加需求“把医疗实体全保留”,原来那套单机 Python 脚本彻底跑不动,于是有了这次“PySpark + Sentence-Transformers”全流程重构。
技术对比:PySpark 为什么赢 Dask
我先搭了 8 台 32C128G 的测试集群,同样 1200 万段文本,对比两段代码:
- Dask DataFrame + bag-of-words 清洗
- PySpark DataFrame + UDF
结果如下表(单位:分钟 / 峰值内存 GB):
| 框架 | 清洗+去重 | 分词+新词发现 | 向量化(CPU) |
|---|---|---|---|
| Dask | 38 / 92 | 55 / 110 | OOM |
| PySpark | 22 / 65 | 31 / 70 | 42 / 75 |
Dask 在第二阶段就频繁 spill-to-disk,磁盘打满后节点失联;PySpark 的内存管理虽然也被吐槽,但靠 Tungsten 二进制缓存 + 可调节的spark.memory.fraction稳住了。再考虑到生产环境已有 YARN 集群,运维同学一句话:“别给我加新框架”,于是拍板 PySpark。
核心实现:三段流水线拆给你看
1. 正则+词典联合清洗
下面这段clean_zh函数,我放在utils/clean.py里,被 PySpark UDF 直接引用。特点:
- 先“粗”后“细”:统一编码 → 正则剥皮 → 词典补洞
- 保留医疗/法律领域常见实体,用
keep_dict白名单 - 全函数带类型标注,抛出自定义
CleanError,方便 Spark 捕获
# utils/clean.py import regex as re from typing import Dict, List, Tuple class CleanError(ValueError): pass # 预编译提速 RE_HAN = re.compile(r'[\p{Han}]+', re.U) RE_NUM = re.compile(r'[0-90-9]+') RE_ENG = re.compile(r'[a-zA-Za-zA-Z]+') def clean_zh( text: str, keep_dict: Dict[str, str] = None, min_len: int = 2 ) -> str: """ 返回清洗后的 str,若清洗后长度不足则 raise CleanError """ if not isinstance(text, str): raise CleanError(f"Expected str, got {type(text)}") # 1. 全角→半角 & 繁体→简体 text = text.translate(FULL2HALF).translate(TRA2SIM) # 2. 正则剥皮:仅保留汉字/数字/英文/中文标点 chunks = [] for seg in RE_HAN.split(text): seg = RE_NUM.sub(" ", seg) seg = RE_ENG.sub(" ", seg) seg = re.sub(r'[^\p{Han}\p{P}\p{N}\p{L}]', ' ', seg) chunks.append(seg) text = ' '.join(chunks) # 3. 领域词典还原(如“Ⅲ级” -> “三级”) if keep_dict: for k, v in keep_dict.items(): text = text.replace(k, v) text = re.sub(r'\s+', ' ', text).strip() if len(text) < min_len: raise CleanError("too short after clean") return text在 Spark 侧注册:
from pyspark.sql.functions import udf from utils.clean import clean_zh, CleanError @udf("string") def udf_clean(text): try: return clean_zh(text, keep_dict=medical_keep) except CleanError: return None df = spark.read.json("s3://bucket/cluecorpus/*.json.gz") df_clean = df.withColumn("body", udf_clean("raw_text")).filter("body is not null")2. Jieba+领域词典分词优化
清洗完直接repartition(800),下一步 tokenization/分词。中文 OOV 最头疼,我搞了两招:
- 把业务已有的 38 万医疗实体导进 jieba 词频表,
jieba.add_word('实体', freq=50000) - 对新语料做“新词发现”,这里用的是 word-discovery 的互信息+左右熵,跑 2 小时捞出 1.2 万新词,再人工抽检 200 条,准确率 87%,写回词典。
代码片段(含类型标注):
# utils/tokenizer.py import jieba import jieba.posseg as pseg from typing import List def load_domain_dict(path: str) -> None: with open(path, encoding='utf8') as f: for line in f: w, f = line.strip().split() jieba.add_word(w, freq=int(f)) def seg_text(text: str, stop_words: set) -> List[str]: return [ w for w, f in pseg.cut(text) if w.strip() and w.lower() not in stop_words and f != 'm' # 过滤数词 ]PySpark 里再包一层 UDF,返回空格分隔的字符串,方便后续RegexTokenizer直接读取。
3. Sentence-Transformers batch processing 调优
向量化 GPU 显存 32 G,经实验发现:
batch_size=128是拐点,再大显存占用线性上升,吞吐却不再翻倍max_seq_length=256足以覆盖 92% 医疗问答;再长 GPU 利用率掉 20%- 开
fp16提速 1.7×,但中文 BERT 系列偶发overflow;改bf16稳了
核心代码(含异常捕获):
from sentence_transformers import SentenceTransformer import torch from typing import Iterator import numpy as np model = SentenceTransformer("shibing624/text2vec-base-chinese") model = model.half().cuda() def encode_partition(rows: Iterator) -> Iterator[np.ndarray]: batch, ids = [], [] for row in rows: batch.append(row.sent) ids.append(row.id) if len(batch) >= 128: try: emb = model.encode(batch, convert_to_numpy=True, show_progress_bar=False) for i, e in zip(ids, emb): yield (i, e.tobytes()) except RuntimeError as e: print("GPU OOM, skip batch:", ids) batch, ids = [], []用mapPartitions把上述函数喂给 Spark,每 partition 输出(id, bytes),再写 Parquet,下游 Faiss 直接读,毫无压力。
性能测试:CLUECorpus2020 实战
集群规模:5 × (32 vCore / 128 GB) + 1 × GPU 32 G
处理量:1200 万段,平均 120 字
结果(三次平均):
- 清洗+去重:22 min / 65 GB 峰值
- 分词+新词:31 min / 70 GB 峰值
- 向量化:42 min / GPU 显存 28 GB 峰值
- 端到端:≈ 1 h 35 min
对比原先单机 8 核 1.5 天,时间缩短 60%,电费省下一半,老板终于点头。
避坑指南:三个暗坑让我半夜调试
- 中文停用词表别瞎用百度版,会把“发烧”“腹泻”当停用词干掉。我的做法是合并哈工大表+业务自定义,再跑一遍 DF-IDF 把高频但高业务价值词捞回来。
- 分布式文件分片一定用
repartition(n)而别coalesce(n),后者倾斜节点会把 GPU 向量化拖成木桶最短板;n取executor*core*2最稳。 - GPU 内存泄漏常因为
torch.cuda.empty_cache()没调,Sentence-Transformers 在encode异常时不会自动清;我在except RuntimeError后强制empty_cache()+gc.collect(),显存占用从 29 G 降到 19 G。
代码规范小结
- 全项目
black --line-length 88一把梭,配合isort零人工冲突 - 所有 UDF 都写
try/except并返回None,让 Spark 脏数据可追踪 - 类型标注覆盖率 95%,配合
mypy在 CI 阶段拦截,半夜不会被TypeError叫醒
延伸思考:如何秒切医疗/法律垂直语料
医疗场景:把 ICD-10、药典、诊疗指南实体全部add_word,清洗阶段保留拉丁学名、剂量单位;向量化后用 Faiss 做相似病历检索,院内 20 万份病案秒级返回。
法律场景:法条号、判决要点、当事人角色都是长名词,建议用 LAC 或 HanLP 的粗分+细分两阶段,先保证法条不被切开,再用 Sentence-Transformers 的LaBSE多语模型,中英混排判决也能统一向量空间。
如果你正在啃垂直语料,不妨把这套流水线当 baseline,先跑通 60% 通用清洗,再叠加领域词典,最后把 GPU 向量化当积木插进来,基本两周就能端到端上线。
全文代码已放到 GitHub 模板仓库,替换词典路径即可跑通自己的数据。遇到 Spark 版本冲突或者 GPU 驱动坑,欢迎评论区交流,一起把中文 NLP 的“体力活”干成“自动化”。祝各位训练顺利,显存常驻,F1 常高。