news 2026/1/9 8:49:59

Python爬虫实战:运用Selenium与Asyncio技术深度挖掘投资机构持仓数据

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:运用Selenium与Asyncio技术深度挖掘投资机构持仓数据

引言:金融数据挖掘的价值与挑战

在当今数字化金融时代,投资机构的持仓数据成为了市场参与者决策的重要参考依据。通过分析顶级投资机构的持仓变化,投资者可以洞察市场趋势、发现潜在投资机会。然而,这些宝贵数据分散在各个金融监管机构、交易所和基金公司的网站中,手动收集效率低下且容易出错。本文将详细介绍如何运用Python最新爬虫技术构建一个高效、稳定的投资机构持仓数据采集系统。

技术选型:现代化爬虫技术栈

本爬虫项目采用以下技术组合:

  • Selenium 4.0+:处理JavaScript动态渲染页面

  • Asyncio + Aiohttp:实现高并发异步数据采集

  • Playwright:作为Selenium的替代方案,提供更好的性能和稳定性

  • Pandas & BeautifulSoup 4:数据解析与处理

  • Rotating Proxy:防止IP被封禁

  • MongoDB/PostgreSQL:数据存储方案

项目架构设计

text

投资机构持仓爬虫系统架构: 1. 数据源管理层 - 管理多个数据源URL和API端点 2. 爬虫调度器 - 异步任务调度和负载均衡 3. 页面解析器 - 动态页面渲染和静态数据提取 4. 数据清洗器 - 数据标准化和质量控制 5. 存储模块 - 多格式数据持久化 6. 反爬虫策略模块 - 请求头轮换、代理池、延迟策略

完整代码实现

1. 环境配置与依赖安装

python

# requirements.txt selenium==4.15.0 playwright==1.40.0 aiohttp==3.9.1 asyncio==3.4.3 pandas==2.1.4 beautifulsoup4==4.12.2 lxml==4.9.3 fake-useragent==1.4.0 redis==5.0.1 pymongo==4.5.0 sqlalchemy==2.0.23

2. 主爬虫类实现

python

import asyncio import aiohttp import pandas as pd from datetime import datetime, timedelta from typing import Dict, List, Optional, Any import json import logging from dataclasses import dataclass from enum import Enum from urllib.parse import urljoin, urlparse import time from random import uniform import hashlib # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DataSource(Enum): """数据源枚举""" SEC_EDGAR = "sec_edgar" # 美国SEC EDGAR数据库 HKEX = "hkex" # 香港交易所 SSE = "sse" # 上海证券交易所 SZSE = "szse" # 深圳证券交易所 FUND_HOLDINGS = "fund_holdings" # 基金持仓报告 @dataclass class HoldingRecord: """持仓记录数据结构""" institution_name: str stock_code: str stock_name: str holding_date: str shares_held: int percentage_outstanding: float market_value: float data_source: str report_date: str filing_type: str class InstitutionHoldingSpider: """投资机构持仓爬虫主类""" def __init__(self, use_proxy: bool = True, headless: bool = True): self.use_proxy = use_proxy self.headless = headless self.session = None self.proxy_pool = self._init_proxy_pool() self.user_agent_rotator = UserAgentRotator() self.rate_limiter = RateLimiter(max_requests=10, time_window=60) def _init_proxy_pool(self) -> List[str]: """初始化代理池""" # 实际应用中可以从代理服务商API获取 return [ 'http://proxy1.example.com:8080', 'http://proxy2.example.com:8080', # 更多代理... ] async def fetch_sec_edgar_filings(self, cik: str, start_date: str, end_date: str) -> List[HoldingRecord]: """爬取SEC EDGAR数据库的13F文件(机构持仓报告)""" base_url = "https://www.sec.gov/Archives/edgar/data/" records = [] # 构建API请求URL api_url = f"https://data.sec.gov/api/xbrl/companyfacts/CIK{cik.zfill(10)}.json" headers = { 'User-Agent': self.user_agent_rotator.get_random(), 'Accept-Encoding': 'gzip, deflate', 'Host': 'data.sec.gov' } try: async with aiohttp.ClientSession(headers=headers) as session: async with session.get(api_url, proxy=self._get_random_proxy() if self.use_proxy else None, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status == 200: data = await response.json() records = self._parse_sec_13f_data(data, cik) else: logger.error(f"SEC请求失败: {response.status}") except Exception as e: logger.error(f"爬取SEC数据时出错: {str(e)}") return records def _parse_sec_13f_data(self, json_data: Dict, cik: str) -> List[HoldingRecord]: """解析SEC 13F JSON数据""" records = [] try: # 提取13F持仓数据 facts = json_data.get('facts', {}).get('us-gaap', {}) # 这里需要根据实际JSON结构进行解析 # 示例解析逻辑(实际结构可能不同) holdings = facts.get('CommonStockSharesOutstanding', {}).get('units', {}).get('USD', []) for holding in holdings: record = HoldingRecord( institution_name=self._get_institution_name(cik), stock_code=holding.get('accn', ''), stock_name=holding.get('description', ''), holding_date=holding.get('end', ''), shares_held=int(holding.get('val', 0)), percentage_outstanding=float(holding.get('pct', 0)), market_value=float(holding.get('marketValue', 0)), data_source=DataSource.SEC_EDGAR.value, report_date=datetime.now().strftime('%Y-%m-%d'), filing_type='13F-HR' ) records.append(record) except Exception as e: logger.error(f"解析SEC数据时出错: {str(e)}") return records async def fetch_hkex_holdings(self, stock_code: str) -> List[HoldingRecord]: """爬取港交所披露易的机构持仓数据""" url = f"https://www.hkexnews.hk/sdw/search/searchsdw.aspx" # 使用Playwright处理动态页面 from playwright.async_api import async_playwright records = [] async with async_playwright() as p: browser = await p.chromium.launch(headless=self.headless) context = await browser.new_context( user_agent=self.user_agent_rotator.get_random(), viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() try: await page.goto(url, wait_until='networkidle') # 输入股票代码 await page.fill('#txtStockCode', stock_code) await page.click('#btnSearch') # 等待结果加载 await page.wait_for_selector('#pnlResult', timeout=10000) # 提取表格数据 table_html = await page.inner_html('#pnlResult table') records = self._parse_hkex_table(table_html, stock_code) except Exception as e: logger.error(f"爬取港交所数据时出错: {str(e)}") finally: await browser.close() return records def _parse_hkex_table(self, html: str, stock_code: str) -> List[HoldingRecord]: """解析港交所HTML表格""" from bs4 import BeautifulSoup records = [] soup = BeautifulSoup(html, 'lxml') # 查找持仓表格 table = soup.find('table', {'class': 'table'}) if not table: return records rows = table.find_all('tr')[1:] # 跳过表头 for row in rows: cols = row.find_all('td') if len(cols) >= 5: try: record = HoldingRecord( institution_name=cols[0].text.strip(), stock_code=stock_code, stock_name=self._get_stock_name(stock_code), holding_date=datetime.now().strftime('%Y-%m-%d'), shares_held=self._parse_number(cols[1].text.strip()), percentage_outstanding=float(cols[2].text.strip('%')), market_value=0, # 需要额外计算 data_source=DataSource.HKEX.value, report_date=datetime.now().strftime('%Y-%m-%d'), filing_type='CCASS' ) records.append(record) except Exception as e: logger.warning(f"解析行数据时出错: {str(e)}") return records async def fetch_multiple_sources(self, tasks: List[Dict[str, Any]], max_concurrent: int = 5) -> Dict[str, List[HoldingRecord]]: """并发获取多个数据源""" from asyncio import Semaphore semaphore = Semaphore(max_concurrent) results = {} async def fetch_with_semaphore(task: Dict[str, Any]): async with semaphore: await self.rate_limiter.wait() source = task['source'] params = task['params'] if source == DataSource.SEC_EDGAR: return await self.fetch_sec_edgar_filings(**params) elif source == DataSource.HKEX: return await self.fetch_hkex_holdings(**params) # 其他数据源... tasks_list = [fetch_with_semaphore(task) for task in tasks] task_results = await asyncio.gather(*tasks_list, return_exceptions=True) for i, result in enumerate(task_results): if not isinstance(result, Exception): results[tasks[i]['source'].value] = result else: logger.error(f"任务执行失败: {result}") return results def _get_random_proxy(self) -> Optional[str]: """随机获取代理""" if self.proxy_pool: import random return random.choice(self.proxy_pool) return None def _parse_number(self, text: str) -> int: """解析数字字符串""" try: # 移除千分位逗号,处理单位(万、亿等) text = text.replace(',', '') if '万' in text: return int(float(text.replace('万', '')) * 10000) elif '亿' in text: return int(float(text.replace('亿', '')) * 100000000) return int(float(text)) except: return 0 def _get_institution_name(self, cik: str) -> str: """根据CIK获取机构名称(实际应查询数据库)""" # 这里简化为映射表 cik_map = { '0001067983': 'BERKSHIRE HATHAWAY INC', '0001166559': 'BLACKROCK INC', # 更多映射... } return cik_map.get(cik, f"INSTITUTION_{cik}") def _get_stock_name(self, stock_code: str) -> str: """根据股票代码获取股票名称""" # 实际应查询股票代码数据库 return f"Stock_{stock_code}" def save_to_database(self, records: List[HoldingRecord], db_type: str = 'mongodb'): """保存数据到数据库""" if db_type == 'mongodb': self._save_to_mongodb(records) elif db_type == 'postgresql': self._save_to_postgresql(records) elif db_type == 'csv': self._save_to_csv(records) elif db_type == 'excel': self._save_to_excel(records) def _save_to_mongodb(self, records: List[HoldingRecord]): """保存到MongoDB""" from pymongo import MongoClient from pymongo.errors import DuplicateKeyError client = MongoClient('mongodb://localhost:27017/') db = client['investment_data'] collection = db['institution_holdings'] # 创建唯一索引 collection.create_index([ ('institution_name', 1), ('stock_code', 1), ('holding_date', 1), ('data_source', 1) ], unique=True) for record in records: try: collection.insert_one(self._record_to_dict(record)) except DuplicateKeyError: logger.info(f"重复记录已跳过: {record.institution_name} - {record.stock_code}") def _save_to_postgresql(self, records: List[HoldingRecord]): """保存到PostgreSQL""" from sqlalchemy import create_engine, Table, Column, String, Integer, Float, Date, MetaData from sqlalchemy.dialects.postgresql import insert engine = create_engine('postgresql://user:password@localhost/investment') metadata = MetaData() holdings_table = Table( 'institution_holdings', metadata, Column('id', Integer, primary_key=True), Column('institution_name', String), Column('stock_code', String), Column('stock_name', String), Column('holding_date', Date), Column('shares_held', Integer), Column('percentage_outstanding', Float), Column('market_value', Float), Column('data_source', String), Column('report_date', Date), Column('filing_type', String), Column('created_at', Date, default=datetime.now()) ) metadata.create_all(engine) with engine.connect() as conn: for record in records: stmt = insert(holdings_table).values(self._record_to_dict(record)) stmt = stmt.on_conflict_do_nothing( index_elements=['institution_name', 'stock_code', 'holding_date', 'data_source'] ) conn.execute(stmt) conn.commit() def _save_to_csv(self, records: List[HoldingRecord], filename: str = 'holdings.csv'): """保存为CSV文件""" df = pd.DataFrame([self._record_to_dict(r) for r in records]) df.to_csv(filename, index=False, encoding='utf-8-sig') logger.info(f"数据已保存到 {filename}") def _save_to_excel(self, records: List[HoldingRecord], filename: str = 'holdings.xlsx'): """保存为Excel文件""" df = pd.DataFrame([self._record_to_dict(r) for r in records]) with pd.ExcelWriter(filename, engine='openpyxl') as writer: df.to_excel(writer, sheet_name='持仓数据', index=False) # 添加数据透视表 pivot = df.pivot_table( values='shares_held', index='institution_name', columns='stock_name', aggfunc='sum', fill_value=0 ) pivot.to_excel(writer, sheet_name='机构持仓汇总') logger.info(f"数据已保存到 {filename}") def _record_to_dict(self, record: HoldingRecord) -> Dict[str, Any]: """将记录对象转换为字典""" return { 'institution_name': record.institution_name, 'stock_code': record.stock_code, 'stock_name': record.stock_name, 'holding_date': record.holding_date, 'shares_held': record.shares_held, 'percentage_outstanding': record.percentage_outstanding, 'market_value': record.market_value, 'data_source': record.data_source, 'report_date': record.report_date, 'filing_type': record.filing_type, 'crawl_time': datetime.now().isoformat() } class UserAgentRotator: """用户代理轮换器""" def __init__(self): self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36', # 更多用户代理... ] def get_random(self) -> str: import random return random.choice(self.user_agents) class RateLimiter: """速率限制器""" def __init__(self, max_requests: int = 10, time_window: int = 60): self.max_requests = max_requests self.time_window = time_window self.requests = [] async def wait(self): """等待直到可以发送下一个请求""" now = time.time() # 移除时间窗口之外的请求记录 self.requests = [req_time for req_time in self.requests if now - req_time < self.time_window] if len(self.requests) >= self.max_requests: # 计算需要等待的时间 sleep_time = self.time_window - (now - self.requests[0]) if sleep_time > 0: logger.info(f"速率限制,等待 {sleep_time:.2f} 秒") await asyncio.sleep(sleep_time) self.requests.append(time.time()) # 高级功能:数据质量监控和异常检测 class DataQualityMonitor: """数据质量监控器""" @staticmethod def check_anomalies(records: List[HoldingRecord]) -> Dict[str, List[HoldingRecord]]: """检测数据异常""" anomalies = { 'sudden_changes': [], 'missing_data': [], 'outliers': [] } # 检测持仓量突变 for record in records: # 这里可以添加业务逻辑,比如与历史数据对比 if record.percentage_outstanding > 10: # 示例:持仓超过10%可能是异常 anomalies['sudden_changes'].append(record) return anomalies # 主程序 async def main(): """主函数""" logger.info("开始爬取投资机构持仓数据...") # 初始化爬虫 spider = InstitutionHoldingSpider(use_proxy=True, headless=True) # 定义爬取任务 tasks = [ { 'source': DataSource.SEC_EDGAR, 'params': { 'cik': '0001067983', # 伯克希尔哈撒韦 'start_date': '2024-01-01', 'end_date': '2024-12-31' } }, { 'source': DataSource.HKEX, 'params': { 'stock_code': '00700' # 腾讯控股 } } # 可以添加更多任务... ] # 并发执行所有任务 try: results = await spider.fetch_multiple_sources(tasks, max_concurrent=3) # 合并所有结果 all_records = [] for source_records in results.values(): all_records.extend(source_records) logger.info(f"共爬取到 {len(all_records)} 条持仓记录") # 数据质量检查 quality_monitor = DataQualityMonitor() anomalies = quality_monitor.check_anomalies(all_records) if anomalies['sudden_changes']: logger.warning(f"发现 {len(anomalies['sudden_changes'])} 条异常变动记录") # 保存数据 spider.save_to_database(all_records, 'mongodb') spider.save_to_excel(all_records, 'institution_holdings.xlsx') # 生成数据报告 generate_report(all_records) except Exception as e: logger.error(f"主程序执行出错: {str(e)}", exc_info=True) logger.info("爬取任务完成!") def generate_report(records: List[HoldingRecord]): """生成数据分析报告""" if not records: return df = pd.DataFrame([spider._record_to_dict(r) for r in records]) # 统计摘要 summary = { '总记录数': len(df), '机构数量': df['institution_name'].nunique(), '股票数量': df['stock_code'].nunique(), '总持仓市值': df['market_value'].sum(), '平均持仓比例': df['percentage_outstanding'].mean(), '数据源分布': df['data_source'].value_counts().to_dict() } # 输出报告 report_content = f""" =================== 持仓数据报告 =================== 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 数据概览: - 总记录数: {summary['总记录数']:,} - 涉及机构: {summary['机构数量']} 家 - 涉及股票: {summary['股票数量']} 支 - 总持仓市值: ${summary['总持仓市值']:,.2f} - 平均持仓比例: {summary['平均持仓比例']:.2f}% 数据源分布: """ for source, count in summary['数据源分布'].items(): report_content += f" - {source}: {count} 条\n" # 找出持仓最多的机构 top_institutions = df.groupby('institution_name')['market_value'].sum().nlargest(10) report_content += "\n 持仓市值前十的机构:\n" for institution, value in top_institutions.items(): report_content += f" - {institution}: ${value:,.2f}\n" print(report_content) # 保存报告文件 with open('holding_report.txt', 'w', encoding='utf-8') as f: f.write(report_content) if __name__ == "__main__": # 异步运行主程序 asyncio.run(main())

高级功能扩展

1. 分布式爬虫架构

python

import redis from celery import Celery from kombu import Queue # 配置Celery分布式任务队列 app = Celery('holding_spider', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') @app.task(queue='crawler_tasks') def crawl_institution_holdings_task(source_config): """分布式爬虫任务""" # 实现分布式爬取逻辑 pass

2. 数据更新监控

python

class DataUpdateMonitor: """数据更新监控器""" def __init__(self): self.last_update_time = {} async def check_for_updates(self, data_source: str) -> bool: """检查数据源是否有更新""" # 实现检查逻辑 pass

3. 可视化分析

python

import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots def create_holding_visualization(df): """创建持仓数据可视化""" fig = make_subplots( rows=2, cols=2, subplot_titles=('机构持仓分布', '持仓市值TOP10', '行业分布', '时间趋势') ) # 添加各种图表 # ... fig.write_html('holding_visualization.html') fig.show()

反爬虫策略应对

  1. 动态User-Agent轮换:每次请求使用不同的浏览器标识

  2. IP代理池:自动切换代理IP避免被封

  3. 请求频率控制:智能延迟和并发控制

  4. 浏览器指纹模拟:使用Playwright模拟真实浏览器

  5. 验证码识别:集成OCR或第三方验证码服务

  6. Honeypot检测:识别和避免爬虫陷阱

法律与伦理考虑

在实施爬虫时需要注意:

  1. 遵守robots.txt协议:尊重网站的爬虫规则

  2. 数据使用限制:仅用于个人研究或合法商业用途

  3. 访问频率限制:避免对目标网站造成负担

  4. 用户隐私保护:不收集个人敏感信息

  5. 遵守当地法律法规:特别是金融数据相关法规

性能优化建议

  1. 使用连接池:复用HTTP连接减少握手开销

  2. 数据流式处理:边爬取边处理,减少内存占用

  3. 增量爬取:只爬取更新的数据

  4. 缓存机制:缓存静态资源减少重复下载

  5. 错误重试机制:智能处理网络异常

总结

本文详细介绍了一个完整的投资机构持仓数据爬虫系统的设计与实现。通过采用现代化的Python异步编程、动态页面渲染技术和分布式架构,我们构建了一个高效、稳定且可扩展的数据采集系统。这个系统不仅能够帮助投资者获取宝贵的持仓数据,还可以作为金融数据分析的基础设施。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/6 6:51:19

HeyGem是否支持并发任务?系统队列机制深度解析

HeyGem是否支持并发任务&#xff1f;系统队列机制深度解析 在AI数字人内容创作日益普及的今天&#xff0c;越来越多的企业和个人开始尝试批量生成口型同步视频。无论是制作系列课程、产品宣传&#xff0c;还是打造虚拟主播内容矩阵&#xff0c;用户都希望系统能“一口气处理多个…

作者头像 李华
网站建设 2026/1/9 3:08:04

ASG三权模式下各管理员的职责是什么

本文档提供了ASG系列产品的维护指导。 文章目录ASG三权模式下各管理员的职责是什么三权模式可以切换到普通模式吗三个默认管理员账号是否可编辑普通模式切换到三权模式后&#xff0c;原来的系统管理员、审计员账号还可以登录吗三权模式下&#xff0c;新建的管理员下可以再创建管…

作者头像 李华
网站建设 2026/1/7 8:42:09

为什么推荐使用批量处理模式?效率提升三倍以上

为什么推荐使用批量处理模式&#xff1f;效率提升三倍以上 在企业级数字内容生产日益自动化的今天&#xff0c;一个看似简单的视频生成流程&#xff0c;往往隐藏着巨大的效率瓶颈。比如&#xff0c;一家教育公司需要为同一段课程音频&#xff0c;生成由不同“数字人”形象讲解的…

作者头像 李华
网站建设 2026/1/9 3:07:31

使用IE浏览器https无法访问设备Web界面

本文档提供了ASG系列产品的维护指导。 文章目录使用IE浏览器https无法访问设备Web界面使用IE浏览器https无法访问设备Web界面 IE浏览器因对证书安全检验级别较高&#xff0c;公司私有证书网站浏览器会禁止用户继续访问&#xff0c;导致无法通过https访问设备。 推荐使用火狐、…

作者头像 李华
网站建设 2026/1/9 3:17:01

金洲慈航珠宝消费:HeyGem制作婚庆饰品定制服务介绍

金洲慈航珠宝消费&#xff1a;HeyGem制作婚庆饰品定制服务介绍 在婚礼筹备的无数细节中&#xff0c;一件定制婚戒、一条刻名项链&#xff0c;早已不只是饰品——它们承载的是两个人独一无二的情感印记。而当这份情感需要被“讲述”时&#xff0c;传统的图文卡片或千篇一律的祝福…

作者头像 李华
网站建设 2026/1/8 5:02:41

删除选中视频无效?刷新页面解决临时UI bug

删除选中视频无效&#xff1f;刷新页面解决临时UI bug 在使用数字人视频生成系统时&#xff0c;你是否遇到过这样的情况&#xff1a;点击“删除选中视频”按钮后&#xff0c;界面上的文件却纹丝不动&#xff1f;没有报错提示&#xff0c;操作也看似执行成功了&#xff0c;但那…

作者头像 李华