@浙大疏锦行
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader, random_split import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler from sklearn.impute import SimpleImputer # ========================================== # 1. 数据预处理与加载 # ========================================== def preprocess_data(file_path): # 读取数据 df = pd.read_csv(file_path) # 简单的清洗逻辑 if 'Id' in df.columns: df = df.drop(columns=['Id']) # 处理异常值 (例如: 99999999 通常是缺失值的标记) if 'Current Loan Amount' in df.columns: df['Current Loan Amount'] = df['Current Loan Amount'].replace(99999999.0, np.nan) # 分离特征和标签 # 假设目标列名为 'Credit Default' target_col = 'Credit Default' X = df.drop(columns=[target_col]) y = df[target_col] # 区分数值型和类别型特征 numeric_cols = X.select_dtypes(include=['float64', 'int64']).columns categorical_cols = X.select_dtypes(include=['object']).columns # 填充缺失值 (数值型填均值,类别型填众数) imputer_num = SimpleImputer(strategy='mean') X[numeric_cols] = imputer_num.fit_transform(X[numeric_cols]) imputer_cat = SimpleImputer(strategy='most_frequent') X[categorical_cols] = imputer_cat.fit_transform(X[categorical_cols]) # 独热编码 (One-Hot Encoding) X = pd.get_dummies(X, columns=categorical_cols, drop_first=True) # 标准化 (Standard Scaling) scaler = StandardScaler() X = pd.DataFrame(scaler.fit_transform(X), columns=X.columns) return X, y class CreditDataset(Dataset): def __init__(self, X, y): self.X = torch.tensor(X.values, dtype=torch.float32) self.y = torch.tensor(y.values, dtype=torch.float32).unsqueeze(1) def __len__(self): return len(self.y) def __getitem__(self, idx): return self.X[idx], self.y[idx] # ========================================== # 2. 模型定义 # ========================================== class CreditModel(nn.Module): def __init__(self, input_dim): super(CreditModel, self).__init__() # 定义一个简单的全连接神经网络 self.layer1 = nn.Linear(input_dim, 64) self.relu = nn.ReLU() self.layer2 = nn.Linear(64, 32) self.output = nn.Linear(32, 1) self.sigmoid = nn.Sigmoid() def forward(self, x): x = self.relu(self.layer1(x)) x = self.relu(self.layer2(x)) x = self.sigmoid(self.output(x)) return x # ========================================== # 3. 早停策略 (Early Stopping) # ========================================== class EarlyStopping: def __init__(self, patience=5, min_delta=0): """ Args: patience (int): 当验证集损失在多少个epoch内没有改善时停止训练 min_delta (float): 被认为是改善的最小变化量 """ self.patience = patience self.min_delta = min_delta self.counter = 0 self.best_loss = None self.early_stop = False def __call__(self, val_loss): if self.best_loss is None: self.best_loss = val_loss elif val_loss > self.best_loss - self.min_delta: self.counter += 1 if self.counter >= self.patience: self.early_stop = True else: self.best_loss = val_loss self.counter = 0 # ========================================== # 4. 训练函数 # ========================================== def train_epoch(model, dataloader, criterion, optimizer): model.train() running_loss = 0.0 for X_batch, y_batch in dataloader: optimizer.zero_grad() y_pred = model(X_batch) loss = criterion(y_pred, y_batch) loss.backward() optimizer.step() running_loss += loss.item() return running_loss / len(dataloader) def validate_epoch(model, dataloader, criterion): model.eval() running_loss = 0.0 with torch.no_grad(): for X_batch, y_batch in dataloader: y_pred = model(X_batch) loss = criterion(y_pred, y_batch) running_loss += loss.item() return running_loss / len(dataloader) # ========================================== # 5. 主程序执行 # ========================================== if __name__ == "__main__": # --- A. 数据准备 --- file_path = 'data.csv' # 请确保目录下有此文件 X, y = preprocess_data(file_path) # 划分训练集和验证集 dataset = CreditDataset(X, y) train_size = int(0.8 * len(dataset)) val_size = len(dataset) - train_size train_dataset, val_dataset = random_split(dataset, [train_size, val_size]) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) input_dim = X.shape[1] # --- B. 初始训练并保存权重 --- print(">>> 开始初始训练 (Phase 1)...") model = CreditModel(input_dim) criterion = nn.BCELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 假设先训练 10 轮 for epoch in range(10): train_loss = train_epoch(model, train_loader, criterion, optimizer) print(f"Epoch {epoch+1}/10 - Loss: {train_loss:.4f}") print(">>> 保存模型权重到 'credit_model.pth'...") torch.save(model.state_dict(), 'credit_model.pth') # --- C. 加载权重并继续训练 (带早停) --- print("\n>>> 加载权重并继续训练 50 轮 (Phase 2)...") # 1. 初始化新模型实例 new_model = CreditModel(input_dim) # 2. 加载之前保存的权重 new_model.load_state_dict(torch.load('credit_model.pth')) # 3. 定义新的优化器和早停实例 new_optimizer = optim.Adam(new_model.parameters(), lr=0.001) early_stopping = EarlyStopping(patience=5, min_delta=0.001) # 4. 继续训练 50 轮 additional_epochs = 50 for epoch in range(additional_epochs): train_loss = train_epoch(new_model, train_loader, criterion, new_optimizer) val_loss = validate_epoch(new_model, val_loader, criterion) print(f"Resumed Epoch {epoch+1}/{additional_epochs} - Train Loss: {train_loss:.4f} - Val Loss: {val_loss:.4f}") # 检查是否需要早停 early_stopping(val_loss) if early_stopping.early_stop: print("!!! 验证集损失不再下降,触发早停 (Early Stopping) !!!") break print("训练结束。")