功能说明
本代码实现了一个基于长短期记忆网络(LSTM)的量化交易策略,通过机器学习方法对历史金融数据进行特征工程处理,并利用LSTM模型预测未来价格走势。该策略的核心在于从原始市场数据中提取有效特征,并通过参数筛选优化模型性能。主要功能包括数据预处理、特征选择、模型训练和交易信号生成。需要注意的是,该策略存在过拟合风险,在极端市场条件下可能失效,实际应用时需结合风险管理措施。
数据准备与预处理
importnumpyasnpimportpandasaspdfromsklearn.preprocessingimportMinMaxScalerfromsklearn.feature_selectionimportSelectKBest,f_regressiondefprepare_data(csv_path,lookback=60):"""加载并预处理时间序列数据"""data=pd.read_csv(csv_path)data['Date']=pd.to_datetime(data['Date'])data.set_index('Date',inplace=True)# 计算技术指标data['SMA_20']=data['Close'].rolling(window=20).mean()data['RSI']=calculate_rsi(data['Close'])data['MACD']=calculate_macd(data['Close'])data['Volatility']=data['Close'].pct_change().std()*np.sqrt(252)# 填充缺失值data.ffill(inplace=True)data.dropna(inplace=True)# 归一化scaler=MinMaxScaler(feature_range=(0,1))scaled_data=scaler.fit_transform(data[['Open','High','Low','Close','Volume','SMA_20','RSI','MACD','Volatility']])# 创建时间窗口X,y=[],[]foriinrange(lookback,len(scaled_data)):X.append(scaled_data[i-lookback:i])y.append(scaled_data[i,3])# 预测Close价格returnnp.array(X),np.array(y)特征工程实践
deffeature_engineering(X,y,k=8):"""使用统计方法进行特征选择"""# 重塑数据以适应SelectKBestX_reshaped=X.reshape(X.shape[0],X.shape[1]*X.shape[2])# 选择前k个最佳特征selector=SelectKBest(score_func=f_regression,k=k)X_new=selector.fit_transform(X_reshaped,y)# 获取选中的特征索引selected_indices=selector.get_support(indices=True)# 重构为LSTM输入形状X_selected=X_selected.reshape(X_selected.shape[0],X.shape[1],k)returnX_selected,selected_indicesdefcreate_lagged_features(df,lags=[1,2,3,5,7]):"""创建滞后特征"""forlaginlags:df[f'Close_lag_{lag}']=df['Close'].shift(lag)df[f'Volume_lag_{lag}']=df['Volume'].shift(lag)returndf.dropna()LSTM模型架构
fromtensorflow.keras.modelsimportSequentialfromtensorflow.keras.layersimportLSTM,Dense,Dropoutfromtensorflow.keras.optimizersimportAdamdefbuild_lstm_model(input_shape,units=50,dropout_rate=0.2):"""构建LSTM神经网络"""model=Sequential([LSTM(units,return_sequences=True,input_shape=input_shape),Dropout(dropout_rate),LSTM(units,return_sequences=False),Dropout(dropout_rate),Dense(25,activation='relu'),Dense(1)])model.compile(optimizer=Adam(learning_rate=0.001),loss='mse',metrics=['mae'])returnmodel# 模型实例化示例# input_shape = (60, 8) # 60天窗口,8个特征# model = build_lstm_model(input_shape)超参数优化策略
fromsklearn.model_selectionimportTimeSeriesSplitfromtensorflow.keras.wrappers.scikit_learnimportKerasRegressorfromsklearn.model_selectionimportGridSearchCVdefhyperparameter_tuning(X,y):"""使用网格搜索优化超参数"""# 创建模型构建函数defcreate_model(neurons=32,dropout=0.2,lr=0.001):model=Sequential([LSTM(neurons,return_sequences=True,input_shape=(X.shape[1],X.shape[2])),Dropout(dropout),LSTM(neurons),Dropout(dropout),Dense(1)])model.compile(optimizer=Adam(lr=lr),loss='mse')returnmodel# 创建包装器model=KerasRegressor(build_fn=create_model,epochs=50,batch_size=32,verbose=0)# 定义参数网格param_grid={'neurons':[32,50],'dropout':[0.1,0.3],'lr':[0.001,0.0005]}# 时间序列交叉验证tscv=TimeSeriesSplit(n_splits=5)# 网格搜索grid=GridSearchCV(estimator=model,param_grid=param_grid,cv=tscv,scoring='neg_mean_squared_error')grid_result=grid.fit(X,y)returngrid_result.best_params_交易信号生成系统
defgenerate_trading_signals(model,X_test,threshold=0.005):"""基于预测结果生成交易信号"""predictions=model.predict(X_test)returns=np.diff(predictions.flatten())# 创建信号数组signals=np.zeros(len(returns))signals[returns>threshold]=1# 买入信号signals[returns<-threshold]=-1# 卖出信号# 扩展信号长度以匹配原始数据full_signals=np.concatenate(([0],signals))returnfull_signalsdefcalculate_strategy_performance(prices,signals,initial_capital=10000):"""计算策略绩效指标"""# 计算每日收益率daily_returns=prices.pct_change()# 策略收益 = 信号 × 次日收益strategy_returns=signals.shift(-1)*daily_returns# 累计净值cumulative_returns=(1+strategy_returns).cumprod()# 最大回撤peak=cumulative_returns.expanding(min_periods=1).max()drawdown=(cumulative_returns-peak)/peak max_drawdown=drawdown.min()# 夏普比率sharpe_ratio=np.sqrt(252)*strategy_returns.mean()/strategy_returns.std()return{'final_value':cumulative_returns.iloc[-1]*initial_capital,'max_drawdown':max_drawdown,'sharpe_ratio':sharpe_ratio,'total_return':cumulative_returns.iloc[-1]-1}风险控制机制
classRiskManager:"""风险管理器实现动态仓位控制"""def__init__(self,stop_loss=0.05,take_profit=0.1,position_limit=0.1):self.stop_loss=stop_loss# 止损阈值self.take_profit=take_profit# 止盈阈值self.position_limit=position_limit# 单资产最大仓位defadjust_position(self,current_price,entry_price,current_position,equity_curve):"""根据风险参数调整仓位"""# 计算浮动盈亏unrealized_pnl=(current_price-entry_price)/entry_priceifcurrent_position!=0else0# 检查止损/止盈条件ifabs(unrealized_pnl)>=self.stop_lossorabs(unrealized_pnl)>=self.take_profit:return0# 平仓# 动态仓位限制max_position=self.position_limit*equity_curve.iloc[-1]ifabs(current_position)>max_position:adjusted_position=np.sign(current_position)*max_positionreturnadjusted_position-current_position# 返回需要调整的量return0# 无需调整完整策略整合
classLSTMTradingStrategy:"""完整的LSTM交易策略实现"""def__init__(self,data_path,lookback=60,test_size=0.2):self.data_path=data_path self.lookback=lookback self.test_size=test_size self.model=Noneself.scaler=Nonedeftrain(self):"""训练整个策略流程"""# 1. 数据准备X,y=prepare_data(self.data_path,self.lookback)split_idx=int(len(X)*(1-self.test_size))X_train,X_test=X[:split_idx],X[split_idx:]y_train,y_test=y[:split_idx],y[split_idx:]# 2. 特征工程X_train_fe,selected_indices=feature_engineering(X_train,y_train)X_test_fe=X_test[:,:,selected_indices]# 3. 模型训练self.model=build_lstm_model((X_train_fe.shape[1],X_train_fe.shape[2]))history=self.model.fit(X_train_fe,y_train,validation_data=(X_test_fe,y_test),epochs=100,batch_size=32,callbacks=[EarlyStopping(monitor='val_loss',patience=10)],verbose=1)# 4. 信号生成signals=generate_trading_signals(self.model,X_test_fe)# 5. 绩效评估performance=calculate_strategy_performance(pd.Series(y_test),signals)returnperformance,signals特征重要性分析
importshapimportmatplotlib.pyplotaspltdefanalyze_feature_importance(model,X_sample,feature_names):"""使用SHAP解释模型决策过程"""# 创建解释器explainer=shap.DeepExplainer(model,X_sample)shap_values=explainer.shap_values(X_sample)# 可视化特征重要性plt.figure(figsize=(10,6))shap.summary_plot(shap_values,X_sample,feature_names=feature_names)plt.tight_layout()plt.show()# 返回排序后的特征重要性importance=pd.DataFrame({'Feature':feature_names,'Importance':np.abs(shap_values).mean(axis=0)}).sort_values('Importance',ascending=False)returnimportance# 示例调用# sample_data = X_test[:100] # 取部分测试数据用于解释# features = ['Open','High','Low','Close','Volume','SMA_20','RSI','MACD','Volatility']# importance_df = analyze_feature_importance(model, sample_data, features)