1. Pandas数据清洗实战概述
数据清洗是数据分析过程中最基础也最关键的环节。在实际工作中,我们拿到的原始数据往往存在各种问题:缺失值、重复记录、异常数据、格式不一致等。这些问题如果不处理,会直接影响后续分析结果的准确性。Pandas作为Python中最强大的数据处理库,提供了丰富的数据清洗功能。
我曾在电商数据分析项目中遇到过这样的情况:运营部门提供的销售数据中,有15%的记录缺少用户ID,部分商品销量出现异常值(正常范围是1-100,但出现了9999这样的数值),还有约5%的重复订单记录。如果不进行清洗就直接分析,会导致用户画像不准确、销售预测偏差等问题。
2. 数据准备与初步检查
2.1 创建示例数据集
我们先创建一个包含典型脏数据问题的DataFrame,模拟电商订单数据:
import pandas as pd import numpy as np # 创建包含脏数据的DataFrame data = { "order_id": ["OD001", "OD002", "OD003", "OD004", "OD005", "OD006", "OD002", "OD002"], "date": ["2023-01-01", "2023-01-02", "2023-01-03", "2023-01-04", "2023-01-05", "2023-01-06", "2023-01-02", "2023-01-02"], "category": ["clothing", "electronics", "food", "clothing", "electronics", "food", "electronics", "electronics"], "quantity": [2, 1, 5, 3, 9999, 4, 1, 1], # 9999是明显异常值 "amount": [398, 1299, np.nan, 597, 59900, 88, 1299, 1299], # 包含缺失值 "user_id": ["U101", "U102", "U103", np.nan, "U105", "U106", "U102", "U102"] # 包含缺失值 } df = pd.DataFrame(data) print("原始数据:") print(df)2.2 数据质量检查
在进行清洗前,我们需要全面了解数据质量状况:
# 检查数据基本信息 print("\n数据基本信息:") print(df.info()) # 检查缺失值 print("\n缺失值统计:") print(df.isnull().sum()) # 检查重复值 print("\n重复值统计:") print(f"完全重复的行数: {df.duplicated().sum()}") print(f"按order_id检查重复: {df.duplicated(subset=['order_id']).sum()}") # 检查数值列的统计信息 print("\n数值列描述统计:") print(df[['quantity', 'amount']].describe())3. 缺失值处理实战
3.1 缺失值识别与分析
缺失值处理前,我们需要分析缺失的原因和模式:
- 随机缺失(MAR):缺失与其他可观测变量相关
- 完全随机缺失(MCAR):缺失与任何变量无关
- 非随机缺失(MNAR):缺失与缺失值本身相关
在我们的示例中:
- amount列的缺失可能是随机缺失,与商品类别相关
- user_id列的缺失可能是新用户未注册导致
3.2 缺失值处理技术
3.2.1 删除缺失值
# 删除包含缺失值的行 df_dropna = df.dropna() print(f"\n删除缺失值后行数: {len(df_dropna)}") # 删除缺失值超过阈值的行 df_dropna_thresh = df.dropna(thresh=5) # 保留至少5个非空值的行注意:直接删除缺失值可能导致信息丢失,特别是当缺失不是随机发生时,可能引入偏差。
3.2.2 填充缺失值
# 单值填充 df_fillna = df.fillna({'amount': 0, 'user_id': 'unknown'}) # 向前/向后填充 df_ffill = df.fillna(method='ffill') # 向前填充 df_bfill = df.fillna(method='bfill') # 向后填充 # 分组均值填充 df['amount'] = df.groupby('category')['amount'].transform( lambda x: x.fillna(x.mean()))3.2.3 高级缺失值处理
对于复杂场景,可以使用预测模型填充缺失值:
from sklearn.ensemble import RandomForestRegressor # 预测amount缺失值 amount_notna = df[df['amount'].notna()] amount_na = df[df['amount'].isna()] model = RandomForestRegressor() X = amount_notna[['quantity']] y = amount_notna['amount'] model.fit(X, y) predicted = model.predict(amount_na[['quantity']]) df.loc[df['amount'].isna(), 'amount'] = predicted4. 重复数据处理实战
4.1 重复值检测
# 检查完全重复的行 duplicate_rows = df[df.duplicated(keep=False)] print(f"\n完全重复的行:\n{duplicate_rows}") # 检查关键字段重复 order_duplicates = df[df.duplicated(subset=['order_id'], keep=False)] print(f"\n订单ID重复的记录:\n{order_duplicates}")4.2 重复值处理策略
4.2.1 完全重复行处理
# 删除完全重复的行,保留第一个出现的 df_dedup = df.drop_duplicates()4.2.2 关键字段重复处理
# 按order_id去重,保留amount最大的记录 df_dedup_order = df.sort_values('amount', ascending=False).drop_duplicates('order_id')4.2.3 复杂重复处理
有时需要结合多个字段判断是否重复:
# 定义重复判断条件 dup_condition = (df.duplicated(subset=['order_id', 'date', 'user_id'], keep=False)) df_dup_complex = df[dup_condition]5. 异常值检测与处理
5.1 异常值检测方法
5.1.1 描述统计法
# 计算Z-score df['quantity_z'] = (df['quantity'] - df['quantity'].mean()) / df['quantity'].std() outliers_z = df[abs(df['quantity_z']) > 3]5.1.2 IQR方法
Q1 = df['quantity'].quantile(0.25) Q3 = df['quantity'].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers_iqr = df[(df['quantity'] < lower_bound) | (df['quantity'] > upper_bound)]5.1.3 可视化检测
import matplotlib.pyplot as plt import seaborn as sns plt.figure(figsize=(10, 6)) sns.boxplot(data=df, x='quantity') plt.title('Quantity Distribution Boxplot') plt.show()5.2 异常值处理策略
5.2.1 删除异常值
df_no_outliers = df[(df['quantity'] >= lower_bound) & (df['quantity'] <= upper_bound)]5.2.2 替换异常值
# 使用边界值替换 df['quantity'] = df['quantity'].clip(lower_bound, upper_bound) # 使用中位数替换 median = df['quantity'].median() df.loc[df['quantity'] > upper_bound, 'quantity'] = median5.2.3 分箱处理
# 等宽分箱 df['quantity_bin'] = pd.cut(df['quantity'], bins=5) # 等频分箱 df['quantity_bin'] = pd.qcut(df['quantity'], q=4)6. 数据类型与格式处理
6.1 数据类型转换
# 转换日期格式 df['date'] = pd.to_datetime(df['date']) # 转换分类数据 df['category'] = df['category'].astype('category') # 数值类型转换 df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce')6.2 字符串处理
# 去除前后空格 df['user_id'] = df['user_id'].str.strip() # 统一大小写 df['category'] = df['category'].str.lower() # 提取字符串信息 df['order_prefix'] = df['order_id'].str.extract(r'([A-Z]+)')[0]7. 高级数据清洗技巧
7.1 基于规则的清洗
# 定义清洗规则函数 def clean_amount(row): if row['category'] == 'food' and row['amount'] > 1000: return row['amount'] / 100 # 假设可能是单位错误 return row['amount'] df['amount'] = df.apply(clean_amount, axis=1)7.2 使用正则表达式清洗
# 清洗电话号码格式 df['phone'] = df['phone'].str.replace(r'\D', '', regex=True) df['phone'] = df['phone'].str.replace(r'^(\d{3})(\d{4})(\d{4})$', r'\1-\2-\3', regex=True)7.3 数据一致性检查
# 检查金额与数量的合理性 df['unit_price'] = df['amount'] / df['quantity'] invalid_prices = df[(df['unit_price'] < 1) | (df['unit_price'] > 10000)]8. 数据清洗完整流程示例
8.1 完整清洗流程
def clean_data(df): # 1. 处理缺失值 df['amount'] = df.groupby('category')['amount'].transform( lambda x: x.fillna(x.mean())) df['user_id'] = df['user_id'].fillna('unknown') # 2. 处理重复值 df = df.sort_values('amount', ascending=False).drop_duplicates('order_id') # 3. 处理异常值 Q1 = df['quantity'].quantile(0.25) Q3 = df['quantity'].quantile(0.75) IQR = Q3 - Q1 upper_bound = Q3 + 1.5 * IQR df['quantity'] = df['quantity'].clip(upper=upper_bound) # 4. 数据类型转换 df['date'] = pd.to_datetime(df['date']) df['category'] = df['category'].astype('category') return df cleaned_df = clean_data(df.copy()) print("\n清洗后的数据:") print(cleaned_df)8.2 清洗效果验证
# 验证清洗结果 print("\n清洗后数据质量:") print(f"缺失值数量: {cleaned_df.isnull().sum().sum()}") print(f"重复订单数: {cleaned_df.duplicated('order_id').sum()}") print(f"异常值数量: {len(cleaned_df[cleaned_df['quantity'] > upper_bound])}") # 保存清洗后的数据 cleaned_df.to_csv('cleaned_data.csv', index=False)9. 实际项目中的经验分享
9.1 常见问题与解决方案
内存不足问题:
- 对于大型数据集,使用
dtype参数指定合适的数据类型 - 使用
chunksize参数分块读取数据 - 示例:
pd.read_csv('large_file.csv', dtype={'id': 'int32'}, chunksize=10000)
- 对于大型数据集,使用
性能优化:
- 使用
eval()和query()进行高效过滤 - 避免在循环中使用
iterrows(),改用itertuples() - 示例:
df = df.query('amount > 100 and quantity < 10')
- 使用
复杂清洗逻辑:
- 将复杂清洗步骤分解为多个函数
- 使用
pipe()方法链式调用清洗函数 - 示例:
def remove_duplicates(df): return df.drop_duplicates() def handle_outliers(df): # 异常值处理逻辑 return df cleaned_df = (df.pipe(remove_duplicates) .pipe(handle_outliers))
9.2 最佳实践建议
保持原始数据:永远保留原始数据副本,所有清洗操作在新的DataFrame上进行
文档记录:详细记录每个清洗步骤和决策原因,便于后续追溯
自动化测试:为数据质量检查创建自动化测试,确保清洗逻辑的正确性
版本控制:对清洗脚本和清洗后的数据进行版本控制
可视化验证:清洗前后使用可视化工具对比数据分布变化
10. 扩展应用与进阶技巧
10.1 处理脏数据的其他Pandas技巧
多数据源合并时的清洗:
# 合并时处理重复和冲突 merged = pd.merge(df1, df2, on='key', how='outer', suffixes=('_df1', '_df2')) merged['amount'] = merged['amount_df1'].fillna(merged['amount_df2'])时间序列数据清洗:
# 处理时间序列中的缺失 df.set_index('date', inplace=True) df = df.asfreq('D', method='pad') # 按天填充分类数据清洗:
# 统一分类值 category_mapping = {'clth': 'clothing', 'elec': 'electronics'} df['category'] = df['category'].replace(category_mapping)
10.2 与其他工具的集成
与数据库交互:
from sqlalchemy import create_engine engine = create_engine('postgresql://user:password@localhost/db') dirty_data = pd.read_sql('SELECT * FROM sales', engine) clean_data.to_sql('clean_sales', engine, if_exists='replace')使用Dask处理大数据:
import dask.dataframe as dd ddf = dd.read_csv('large_file.csv') clean_ddf = ddf.drop_duplicates().dropna() clean_ddf.compute().to_csv('clean_large_file.csv')与可视化工具结合:
import plotly.express as px fig = px.box(df, x='category', y='amount', title='Amount Distribution by Category') fig.show()
数据清洗是一项需要耐心和经验的工作。在实际项目中,我建议采用迭代式的清洗方法:先快速实现基本清洗,然后通过数据分析发现剩余问题,再逐步完善清洗逻辑。记住,没有"完美"的清洗方案,只有最适合当前业务需求的方案。每次清洗决策都应该考虑业务背景和数据用途,而不是单纯追求数学上的"干净"。