@浙大疏锦行
import pandas as pd import numpy as np import os from pathlib import Path from typing import Tuple, Dict, Optional # ==================== 1. 加载数据文件的函数 ==================== # def load_heart_data(file_path: str) -> Optional[pd.DataFrame]: # """ # 加载心脏疾病数据集 # 参数: # file_path (str): 数据文件路径 # 返回: # pd.DataFrame: 包含所有特征和目标值的数据框 # """ # try: # # 将路径转换为Path对象并解析 # path = Path(file_path) # # 如果路径不是绝对路径,尝试从项目根目录开始查找 # if not path.is_absolute(): # # 获取项目根目录 (src/data/preprocessing.py -> src/data -> src -> 项目根目录) # project_root = Path(__file__).parent.parent.parent # # 构建完整路径 # full_path = project_root / path # if full_path.exists(): # path = full_path # else: # # 尝试相对路径 # if not path.exists(): # print(f"错误:文件 '{file_path}' 未找到") # print(f"尝试的完整路径: {full_path}") # print(f"当前工作目录: {os.getcwd()}") # return None # print(f"加载文件: {path.absolute()}") # df = pd.read_csv(path) # print(f"数据加载成功,共 {df.shape[0]} 行,{df.shape[1]} 列") # return df # except Exception as e: # print(f"加载数据时发生错误: {e}") # return None def load_data_simple(): """直接使用绝对路径加载数据""" # 你的绝对路径(根据实际情况修改) file_path = r'E:\PyStudy\credit_heart_prediction\data\raw\heart.csv' try: df = pd.read_csv(file_path) print(f"成功加载数据,形状: {df.shape}") return df except Exception as e: print(f"加载失败: {e}") return None # ==================== 2. 分类特征编码函数 ==================== def encode_categorical_features(df: pd.DataFrame, target_col: str = 'target') -> Tuple[pd.DataFrame, Dict]: """ 对分类特征进行编码处理 参数: df (pd.DataFrame): 原始数据框 target_col (str): 目标变量列名 返回: Tuple[pd.DataFrame, Dict]: 编码后的数据框和编码映射关系字典 """ # 创建副本以避免修改原始数据 df_encoded = df.copy() # 根据数据集的说明,识别分类特征 categorical_cols = ['sex', 'cp', 'fbs', 'restecg', 'exang', 'slope', 'ca', 'thal'] # 移除可能在数据中不存在的列 categorical_cols = [col for col in categorical_cols if col in df_encoded.columns] # 编码映射字典 encoding_maps = {} print("\n开始编码分类特征...") for col in categorical_cols: # 检查列是否包含数值型数据(可能是已经编码的) if df_encoded[col].dtype == 'object' or df_encoded[col].nunique() <= 10: # 获取唯一值 unique_vals = sorted(df_encoded[col].dropna().unique()) # 创建编码映射 if len(unique_vals) > 2: # 多分类特征使用独热编码 print(f" ✓ 对特征 '{col}' 进行独热编码({len(unique_vals)} 个类别)") dummies = pd.get_dummies(df_encoded[col], prefix=col, drop_first=True) df_encoded = pd.concat([df_encoded.drop(columns=[col]), dummies], axis=1) encoding_maps[col] = {'type': 'onehot', 'mapping': dict(enumerate(unique_vals))} else: # 二分类特征使用标签编码(0/1) print(f" ✓ 对特征 '{col}' 进行标签编码({len(unique_vals)} 个类别)") mapping = {val: i for i, val in enumerate(unique_vals)} df_encoded[col] = df_encoded[col].map(mapping) encoding_maps[col] = {'type': 'label', 'mapping': mapping} else: print(f" ⚠ 特征 '{col}' 看起来已经是数值型,跳过编码") print(f"编码完成。原始特征数: {len(df.columns)}, 编码后特征数: {len(df_encoded.columns)}") return df_encoded, encoding_maps # ==================== 3. 数据分析辅助函数 ==================== def analyze_dataset(df: pd.DataFrame) -> None: """ 对数据集进行基本分析 参数: df (pd.DataFrame): 数据框 """ print("\n" + "="*60) print("数据集分析报告") print("="*60) # 基本信息 print(f"数据集形状: {df.shape}") # 列信息 print("\n列信息:") for i, col in enumerate(df.columns, 1): dtype = df[col].dtype unique_count = df[col].nunique() print(f" {i:2d}. {col:<15} {dtype:<10} 唯一值: {unique_count:3d}") # 缺失值分析 print("\n缺失值统计:") missing = df.isnull().sum() if missing.sum() == 0: print(" ✓ 无缺失值") else: for col, count in missing[missing > 0].items(): percentage = count/len(df)*100 print(f" ⚠ {col}: {count} 个缺失值 ({percentage:.2f}%)") # 目标变量分布 if 'target' in df.columns: print("\n目标变量分布:") target_counts = df['target'].value_counts().sort_index() total = len(df) for val, count in target_counts.items(): percentage = count/total*100 label = "有心脏病" if val == 1 else "无心脏病" print(f" {label}: {count:3d} 个样本 ({percentage:.1f}%)") # ==================== 4. 保存数据函数 ==================== # def save_processed_data(df: pd.DataFrame, filename: str = 'heart_encoded.csv') -> Path: # """ # 保存处理后的数据 # 参数: # df (pd.DataFrame): 要保存的数据框 # filename (str): 文件名 # 返回: # Path: 保存的文件路径 # """ # # 获取项目根目录 # project_root = Path(__file__).parent.parent.parent # # 创建processed目录 # processed_dir = project_root / 'data' / 'processed' # processed_dir.mkdir(parents=True, exist_ok=True) # # 保存文件 # output_path = processed_dir / filename # df.to_csv(output_path, index=False) # return output_path # ==================== 5. 运行测试 ==================== def main(): """ 主函数:执行完整的数据处理流程 """ print("="*70) print("心脏病数据集数据处理") print("="*70) # 显示项目结构信息 print(f"Python文件位置: {Path(__file__).absolute()}") print(f"项目根目录: {Path(__file__).parent.parent.parent.absolute()}") df = load_data_simple() # 3. 编码分类特征 print("\n3. 编码分类特征...") df_encoded, encoding_maps = encode_categorical_features(df) # # ==================== 6. 简化的测试版本(如果上面的不行) ==================== # def simple_test(): # """ # 简化版本,使用绝对路径确保能工作 # """ # print("简化测试版本...") # # 方法1: 使用绝对路径(最可靠) # project_root = Path(__file__).parent.parent.parent # data_file = project_root / 'data' / 'raw' / 'heart.csv' # print(f"项目根目录: {project_root}") # print(f"数据文件路径: {data_file}") # if not data_file.exists(): # print(f"错误:文件不存在于 {data_file}") # print("请检查:") # print(f"1. 文件是否在: {data_file}") # print(f"2. 或者修改路径") # return None # # 加载数据 # df = pd.read_csv(data_file) # print(f"成功加载数据,形状: {df.shape}") # # 编码特征 # df_encoded, encoding_maps = encode_categorical_features(df) # # 保存 # save_processed_data(df_encoded) # return df_encoded, encoding_maps # ==================== 执行主程序 ==================== if __name__ == "__main__": # 方法1: 使用主函数(相对路径) # try: df_encoded, encoding_maps = main() # except: # print("\n主函数执行失败,尝试简化版本...") # # 方法2: 使用简化版本(绝对路径) # df_encoded, encoding_maps = simple_test()import pandas as pd import sys import os # sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, confusion_matrix import time import joblib # 用于保存模型 from typing import Tuple # 用于类型注解 def load_data_simple(): """直接使用绝对路径加载数据""" # 你的绝对路径(根据实际情况修改) file_path = r'E:\PyStudy\credit_heart_prediction\data\processed\heart_encoded.csv' try: df = pd.read_csv(file_path) print(f"成功加载数据,形状: {df.shape}") return df except Exception as e: print(f"加载失败: {e}") return None def prepare_data(df) -> Tuple: """准备训练数据 Returns: 训练集和测试集的特征和标签 """ # 加载和预处理数据 df_encoded = df.copy() # 分离特征和标签 X = df_encoded.drop(['target'], axis=1) y = df_encoded['target'] # 划分训练集和测试集 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) return X_train, X_test, y_train, y_test def train_model(X_train, y_train, model_params=None) -> RandomForestClassifier: """训练随机森林模型 Args: X_train: 训练特征 y_train: 训练标签 model_params: 模型参数字典 Returns: 训练好的模型 """ if model_params is None: model_params = {'random_state': 42} model = RandomForestClassifier(**model_params) model.fit(X_train, y_train) return model def evaluate_model(model, X_test, y_test) -> None: """评估模型性能 Args: model: 训练好的模型 X_test: 测试特征 y_test: 测试标签 """ y_pred = model.predict(X_test) print("\n分类报告:") print(classification_report(y_test, y_pred)) print("\n混淆矩阵:") print(confusion_matrix(y_test, y_pred)) def save_model(model, model_path: str) -> None: """保存模型 Args: model: 训练好的模型 model_path: 模型保存路径 """ os.makedirs(os.path.dirname(model_path), exist_ok=True) joblib.dump(model, model_path) print(f"\n模型已保存至: {model_path}") if __name__ == "__main__": df = load_data_simple() # 准备数据 X_train, X_test, y_train, y_test = prepare_data(df) # 记录开始时间 start_time = time.time() # 训练模型 model = train_model(X_train, y_train) # 记录结束时间 end_time = time.time() print(f"\n训练耗时: {end_time - start_time:.4f} 秒") # 评估模型 evaluate_model(model, X_test, y_test) # 保存模型 save_model(model, "models/random_forest_model.joblib")import matplotlib.pyplot as plt import seaborn as sns import shap import numpy as np from typing import Any def plot_feature_importance_shap(model: Any, X_test, save_path: str = None) -> None: """绘制SHAP特征重要性图 Args: model: 训练好的模型 X_test: 测试数据 save_path: 图片保存路径 """ # 初始化SHAP解释器 explainer = shap.TreeExplainer(model) shap_values = explainer.shap_values(X_test) # 绘制特征重要性条形图 plt.figure(figsize=(12, 8)) shap.summary_plot(shap_values[:, :, 0], X_test, plot_type="bar", show=False) plt.title("SHAP特征重要性") if save_path: plt.savefig(save_path) print(f"特征重要性图已保存至: {save_path}") plt.show() def plot_confusion_matrix(y_true, y_pred, save_path: str = None) -> None: """绘制混淆矩阵热力图 Args: y_true: 真实标签 y_pred: 预测标签 save_path: 图片保存路径 """ plt.figure(figsize=(8, 6)) cm = confusion_matrix(y_true, y_pred) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues') plt.title('混淆矩阵') plt.ylabel('真实标签') plt.xlabel('预测标签') if save_path: plt.savefig(save_path) print(f"混淆矩阵图已保存至: {save_path}") plt.show() def set_plot_style(): """设置绘图样式""" plt.style.use('seaborn') plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False if __name__ == "__main__": # 设置绘图样式 set_plot_style() # 这里可以添加测试代码 print("可视化模块加载成功!")