1. 从零开始认识AkShare与可转债高频数据
第一次接触AkShare这个工具时,我完全没想到它能这么方便地获取金融数据。作为一个开源的Python金融数据接口库,AkShare最大的优势就是完全免费,而且数据源都是国内主流财经网站,比如新浪财经、东方财富网这些我们平时看行情的地方。
说到可转债,可能有些新手还不太熟悉。简单来说,可转债就是可以转换成股票的债券,它既有债券的保底特性,又有股票的上涨潜力。在A股市场里,可转债的交易规则和股票不太一样——T+0交易、无涨跌幅限制(虽然有临停机制),这就让可转债成了日内交易者的乐园。
为什么要获取日内高频数据呢?举个例子,上周我观察"三力转债"时发现,它在上午10点15分突然放量拉升,但日线图上根本看不出这个细节。只有拿到1分钟或5分钟级别的数据,才能真正把握住这些盘中交易机会。高频数据就像放大镜,能让我们看清价格波动的微观结构。
2. 搭建Python环境与AkShare安装
2.1 基础环境配置
我强烈建议使用Anaconda来管理Python环境,这样可以避免各种依赖冲突。这是我常用的环境配置命令:
conda create -n akshare_env python=3.8 conda activate akshare_env安装AkShare时有个小技巧——指定国内镜像源能大幅提升下载速度:
pip install akshare -i https://pypi.tuna.tsinghua.edu.cn/simple有时候会遇到依赖包冲突的问题,特别是pandas的版本。经过多次测试,我发现这个组合最稳定:
pip install pandas==1.3.5 akshare==1.2.02.2 常见安装问题排查
第一次使用时可能会碰到SSL证书错误,这是因为某些数据源用了老版证书。解决方法很简单:
import ssl ssl._create_default_https_context = ssl._create_unverified_context如果遇到"ConnectionError",可能是IP被临时限制了。我的经验是:
- 控制请求频率,加个time.sleep(2)
- 使用代理IP(注意遵守相关规定)
- 换个时间段再试
3. 获取实时行情数据实战
3.1 全市场可转债快照
获取实时行情是最基础的需求,AkShare的bond_zh_hs_cov_spot接口可以直接拿到全市场数据:
import akshare as ak def get_realtime_cov(): df = ak.bond_zh_hs_cov_spot() # 关键字段处理 df = df[['symbol', 'name', 'price', 'change_percent']] df.columns = ['代码', '名称', '最新价', '涨跌幅'] return df.sort_values('涨跌幅', ascending=False)这个数据有个特点:包含转股溢价率这个关键指标。我通常会用它做初步筛选:
hot_cov = df[(df['涨跌幅']>0) & (df['转股溢价率']<20)].copy()3.2 实时数据更新技巧
做实时监控时,需要定时刷新数据。我设计了个简单的循环逻辑:
import time while True: data = get_realtime_cov() # 这里可以加入预警逻辑 print(data.head()) time.sleep(60) # 1分钟更新一次注意:新浪接口虽然没有明确限频,但建议间隔不低于15秒,否则可能被临时封禁。
4. 历史分时数据获取详解
4.1 分钟线数据获取
东方财富的分钟线接口非常实用,但参数设置要特别注意:
def get_minute_data(code='123045', period='5'): """ code: 转债代码如123045(三力转债) period: 1/5/15/30/60分钟 """ df = ak.bond_zh_hs_cov_min( symbol=code, period=period, adjust='hfq' ) # 字段重命名 df.columns = ['时间', '开盘', '最高', '最低', '收盘', '成交量', '成交额'] return df重要限制:
- 1分钟数据只保留最近1个交易日
- 5分钟及以上数据可获取更长时间范围
- 不复权数据可能影响连续性
4.2 多日数据拼接方案
要获取连续多日数据,需要用到日期循环:
from datetime import datetime, timedelta def get_multi_days_data(code, days=3): all_data = [] for i in range(days): date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d') try: df = ak.bond_zh_hs_cov_min(symbol=code, period='1', start_date=date) all_data.append(df) except: continue return pd.concat(all_data)5. 数据存储与处理技巧
5.1 高效存储方案
我习惯用HDF5格式存储高频数据,比CSV节省80%空间:
def save_hdf5(data, code): with pd.HDF5(f'{code}.h5', 'w') as store: store.put('data', data, format='table')对于需要快速读取的场景,Parquet格式也很不错:
data.to_parquet(f'{code}.parquet', engine='pyarrow')5.2 数据清洗实战
原始数据经常有小问题,这是我的清洗流程:
- 处理异常值
df = df[(df['收盘'] > 0) & (df['成交量'] >= 0)]- 补全缺失时间戳
full_range = pd.date_range(start=df.index[0], end=df.index[-1], freq='1T') df = df.reindex(full_range).fillna(method='ffill')- 计算关键指标
df['波动率'] = df['收盘'].rolling(10).std() df['量比'] = df['成交量'] / df['成交量'].rolling(20).mean()6. 量化策略开发实战
6.1 均值回归策略示例
基于5分钟线的简单策略:
def mean_reversion_strategy(data): data['ma10'] = data['收盘'].rolling(10).mean() data['upper'] = data['ma10'] + data['收盘'].rolling(20).std() data['lower'] = data['ma10'] - data['收盘'].rolling(20).std() data['signal'] = 0 data.loc[data['收盘'] > data['upper'], 'signal'] = -1 data.loc[data['收盘'] < data['lower'], 'signal'] = 1 return data6.2 成交量突变策略
可转债的成交量变化往往预示行情转折:
def volume_breakout(data): data['vol_ma'] = data['成交量'].rolling(30).mean() data['vol_ratio'] = data['成交量'] / data['vol_ma'] data['signal'] = np.where( (data['vol_ratio'] > 2) & (data['收盘'] > data['开盘']), 1, np.where( (data['vol_ratio'] > 2) & (data['收盘'] < data['开盘']), -1, 0 ) ) return data7. 实盘监控系统搭建
7.1 实时预警系统
结合Python的轻量级框架,可以快速搭建监控系统:
from websockets import connect async def monitor_cov(): async with connect("wss://your_websocket_url") as ws: while True: data = await ws.recv() df = process_data(data) # 突破预警 if df['close'].iloc[-1] > df['upper'].iloc[-1]: send_alert(f"突破上轨:{df['code'].iloc[-1]}")7.2 可视化监控面板
用PyQt5做个简单界面:
from PyQt5.QtWidgets import QTableView from PyQt5.QtCore import QAbstractTableModel class PandasModel(QAbstractTableModel): # 数据模型实现 pass table = QTableView() model = PandasModel(df) table.setModel(model)8. 常见问题与性能优化
8.1 数据获取失败处理
我总结了几种重试机制:
- 指数退避重试
import random def get_with_retry(func, max_retries=3): for i in range(max_retries): try: return func() except: wait = min(2 ** i + random.random(), 10) time.sleep(wait) return None- 备用数据源切换
def get_data(code): try: return ak.bond_zh_hs_cov_min(code) except: return ak.bond_zh_hs_cov_spot(code)8.2 大规模数据获取优化
当需要获取全市场数据时,要注意:
- 使用多线程(但注意网站限流)
from concurrent.futures import ThreadPoolExecutor codes = ['123045', '128036'] with ThreadPoolExecutor(4) as executor: results = list(executor.map(get_minute_data, codes))- 分时段获取
import schedule schedule.every().hour.do(get_all_data) while True: schedule.run_pending() time.sleep(1)在实际项目中,我发现可转债的高频数据质量会直接影响策略效果。曾经有个策略在回测时表现很好,实盘却亏损,后来发现是数据清洗时漏掉了集合竞价时段的特殊数据。现在我的数据预处理流程一定会特别注意开盘前30分钟和收盘前15分钟的数据异常。