AI 驱动的智能数据清洗:从规则引擎到大模型辅助的工程化链路
一、数据清洗的日常困境
数据分析师大部分时间都在做数据清洗。缺失值填充、异常值识别、格式标准化、重复记录去重——这些操作看似简单,但实际处理时经常遇到需要人工判断的边界情况。比如一个"地址"字段里可能混入电话号码、邮编甚至乱码;一个"金额"列可能同时存在"1,234.56"、"¥1234.56"和"1234.56元"三种格式。
传统方法用正则表达式和硬编码规则,数据源多了之后维护起来很麻烦。规则引擎能处理格式问题,但遇到语义层面的异常就无能为力了——它知道"年龄=999"是异常值,却无法判断"年龄=22、职业=退休"的组合是否合理。大模型的优势在于能理解语义,找出规则引擎发现不了的深层异常。
二、规则引擎与大模型协作的清洗架构
智能数据清洗不是"用大模型替代规则",而是让规则引擎处理确定性高、频次高的问题,大模型处理语义模糊、需要上下文推理的问题。两者配合才能兼顾效率与准确性。
flowchart TB A[原始数据输入] --> B{规则引擎预检} B -->|格式异常| C[正则修复] B -->|缺失值| D[统计填充] B -->|确定性异常| E[规则标记] B -->|语义模糊| F[大模型推理层] F -->|上下文推理| G[异常识别] F -->|语义匹配| H[字段修正] F -->|跨列关联| I[一致性校验] C --> J[清洗后数据] D --> J E --> J G --> J H --> J I --> J J --> K{质量校验} K -->|通过| L[输出] K -->|未通过| B2.1 规则引擎:确定性清洗的第一道防线
规则引擎处理的问题有明确的判断标准,执行速度快、成本极低。典型场景包括:
- 格式标准化:统一日期格式、去除千分位逗号、标准化电话号码
- 缺失值填充:数值列用中位数填充、分类列用众数填充
- 确定性异常:超出物理范围的值(如年龄 < 0 或 > 150)
2.2 大模型推理层:语义异常的第二道防线
大模型擅长处理需要上下文推理的清洗任务:
- 跨列一致性校验:判断"年龄=22、职业=退休"是否矛盾
- 语义模糊字段修正:将"北京市朝阳区xxx路"中的地址标准化
- 自由文本结构化:从"大概3月份买了台电脑花了5000多"中提取结构化信息
三、生产级智能数据清洗的代码实现
3.1 规则引擎模块
import re import pandas as pd import numpy as np from typing import Optional class RuleEngine: """规则引擎:处理确定性数据清洗任务""" def __init__(self): self.rules = [] self.stats = {"applied": 0, "fixed": 0} def clean_currency(self, series: pd.Series) -> pd.Series: """统一金额格式:去除货币符号和千分位逗号""" def parse_amount(val): if pd.isna(val): return np.nan val_str = str(val).strip() # 去除人民币符号、千分位逗号、"元"等后缀 cleaned = re.sub(r'[¥¥$,,元]', '', val_str) try: return float(cleaned) except ValueError: return np.nan # 无法解析的标记为缺失 result = series.apply(parse_amount) fixed_count = result.notna().sum() - series.notna().sum() self.stats["fixed"] += max(0, fixed_count) return result def clean_phone(self, series: pd.Series) -> pd.Series: """标准化手机号格式:保留纯数字,校验位数""" def parse_phone(val): if pd.isna(val): return np.nan digits = re.sub(r'[^\d]', '', str(val)) # 中国大陆手机号:11 位,1 开头 if re.match(r'^1[3-9]\d{9}$', digits): return digits return np.nan # 格式不合法的标记为缺失 return series.apply(parse_phone) def fill_missing_numeric(self, series: pd.Series, strategy: str = "median") -> pd.Series: """数值列缺失值填充""" if strategy == "median": fill_value = series.median() elif strategy == "mean": fill_value = series.mean() else: raise ValueError(f"不支持的填充策略: {strategy}") missing_count = series.isna().sum() result = series.fillna(fill_value) self.stats["fixed"] += missing_count self.stats["applied"] += 1 return result def clip_outliers(self, series: pd.Series, lower: Optional[float] = None, upper: Optional[float] = None) -> pd.Series: """截断超出物理范围的异常值""" return series.clip(lower=lower, upper=upper)3.2 大模型推理模块
import json from dataclasses import dataclass from openai import OpenAI @dataclass class LLMCleanConfig: model: str = "deepseek-chat" temperature: float = 0.1 # 为了减少错误,温度参数设得很低 batch_size: int = 20 # 单次推理处理的最大行数 max_retries: int = 2 class LLMCleaner: """大模型推理层:处理语义模糊的数据清洗任务""" def __init__(self, config: LLMCleanConfig = None): self.config = config or LLMCleanConfig() self.client = OpenAI() def check_cross_column_consistency(self, row: dict, columns: list[str]) -> dict: """跨列一致性校验:识别语义矛盾""" prompt = f"""你是一个数据质量检查专家。请判断以下数据行中各字段之间是否存在语义矛盾。 数据行: {json.dumps(row, ensure_ascii=False, indent=2)} 请以 JSON 格式返回结果: {{ "is_consistent": true/false, "inconsistencies": ["矛盾描述1", "矛盾描述2"], "suggested_fix": {{"列名": "修正值"}} }} 仅返回 JSON,不要添加其他内容。""" response = self.client.chat.completions.create( model=self.config.model, messages=[{"role": "user", "content": prompt}], temperature=self.config.temperature, ) try: result = json.loads(response.choices[0].message.content) return result except json.JSONDecodeError: return {"is_consistent": True, "inconsistencies": [], "suggested_fix": {}} def standardize_free_text(self, text: str, target_schema: dict) -> dict: """自由文本结构化:从非结构化文本中提取字段""" prompt = f"""请从以下文本中提取结构化信息,按照目标 schema 填充。 文本:{text} 目标 Schema:{json.dumps(target_schema, ensure_ascii=False)} 请以 JSON 格式返回提取结果,缺失字段填 null。仅返回 JSON。""" response = self.client.chat.completions.create( model=self.config.model, messages=[{"role": "user", "content": prompt}], temperature=self.config.temperature, ) try: return json.loads(response.choices[0].message.content) except json.JSONDecodeError: return {k: None for k in target_schema} def batch_consistency_check(self, df: pd.DataFrame, columns: list[str]) -> pd.DataFrame: """批量跨列一致性校验""" results = [] for i in range(0, len(df), self.config.batch_size): batch = df.iloc[i:i + self.config.batch_size] for _, row in batch.iterrows(): row_dict = {col: row[col] for col in columns if col in row.index} result = self.check_cross_column_consistency(row_dict, columns) results.append(result) # 将校验结果合并回原始 DataFrame result_df = df.copy() result_df["_is_consistent"] = [r["is_consistent"] for r in results] result_df["_inconsistencies"] = [json.dumps(r["inconsistencies"], ensure_ascii=False) for r in results] return result_df3.3 编排层:规则引擎与大模型的协同
class SmartDataCleaner: """智能数据清洗编排器:规则引擎优先,大模型兜底""" def __init__(self): self.rule_engine = RuleEngine() self.llm_cleaner = LLMCleaner() def clean(self, df: pd.DataFrame, config: dict) -> pd.DataFrame: """ 执行完整清洗流程 config 示例: { "currency_columns": ["amount", "price"], "phone_columns": ["phone"], "numeric_fill": {"age": "median", "salary": "mean"}, "clip_columns": {"age": {"lower": 0, "upper": 150}}, "consistency_check": ["age", "occupation", "education"] } """ result = df.copy() # 第一步:规则引擎处理确定性清洗 for col in config.get("currency_columns", []): if col in result.columns: result[col] = self.rule_engine.clean_currency(result[col]) for col in config.get("phone_columns", []): if col in result.columns: result[col] = self.rule_engine.clean_phone(result[col]) for col, strategy in config.get("numeric_fill", {}).items(): if col in result.columns: result[col] = self.rule_engine.fill_missing_numeric( result[col], strategy=strategy ) for col, bounds in config.get("clip_columns", {}).items(): if col in result.columns: result[col] = self.rule_engine.clip_outliers( result[col], **bounds ) # 第二步:大模型处理语义模糊的清洗任务 consistency_cols = config.get("consistency_check", []) if consistency_cols: result = self.llm_cleaner.batch_consistency_check( result, consistency_cols ) return result四、规则引擎与大模型协作的架构权衡
| 维度 | 规则引擎 | 大模型推理 |
|---|---|---|
| 执行速度 | 毫秒级,可处理百万行 | 秒级,受 API 限流约束 |
| 单行成本 | 几乎为零 | 约 0.01–0.05 元/行 |
| 准确率 | 确定性场景 100% | 语义场景约 90%–95% |
| 维护成本 | 规则随数据源增多而膨胀 | Prompt 调优成本高但通用性强 |
| 可解释性 | 规则透明,可审计 | 推理过程为黑盒 |
关键权衡:
成本控制:大模型推理的成本是规则引擎的数千倍。生产环境中,应先用规则引擎过滤掉 80% 的确定性异常,仅将剩余 20% 的模糊数据送入大模型。
延迟容忍:大模型推理的延迟在秒级,不适合实时清洗流水线。对于流式数据,建议采用异步清洗模式——规则引擎实时处理,大模型离线批处理。
幻觉风险:大模型可能"过度修正",将正确的数据误判为异常。解决方案是在大模型输出后增加人工审核环节,对修正比例超过阈值的批次进行抽样复核。
五、总结
智能数据清洗的核心思路是"规则引擎优先、大模型兜底"——确定性任务交给规则引擎保证速度和成本,语义模糊任务交给大模型保证准确性。两者协同的关键在于编排层的分流逻辑:规则引擎处理格式异常、缺失值填充和确定性异常,大模型处理跨列一致性校验和自由文本结构化。
落地步骤:第一步,梳理现有清洗规则,将确定性规则迁移至规则引擎模块;第二步,识别规则引擎无法覆盖的语义异常类型,设计对应的 Prompt 模板;第三步,构建编排层实现自动分流,并设置成本监控与质量校验机制。关键原则是——能用规则解决的不要用大模型,大模型只处理规则无法覆盖的语义层问题。