引言:微博热搜背后的数据价值
微博热搜榜作为中国社交媒体最热门的实时话题指标,每天吸引数亿用户关注。它不仅反映了当前的社会热点和舆论动向,更是网络营销、舆情分析、趋势预测的重要数据源。本文将详细介绍如何使用Python最新技术栈构建一个高性能的微博热搜榜爬虫系统,实现数据的实时采集与持久化存储。
技术选型与架构设计
核心技术栈
异步框架:
aiohttp+asyncio- 实现高并发请求HTML解析:
BeautifulSoup4+lxml- 高效解析DOM结构数据存储:
SQLAlchemy+Alembic- ORM映射与数据库迁移反爬对抗:
playwright+ 代理池 + 请求头轮换监控调度:
APScheduler- 定时任务管理配置管理:
pydantic- 数据验证与配置管理
系统架构设计
text
微博热搜采集系统架构: 1. 调度层:APScheduler定时触发采集任务 2. 采集层:异步爬虫集群,分布式代理支持 3. 解析层:多策略解析器,支持页面结构变化 4. 存储层:MySQL/PostgreSQL + Redis缓存 5. 监控层:日志记录 + 异常报警
完整代码实现
1. 环境配置与依赖安装
python
# requirements.txt aiohttp==3.9.1 beautifulsoup4==4.12.2 sqlalchemy==2.0.23 alembic==1.12.1 playwright==1.40.0 apscheduler==3.10.4 pydantic==2.5.0 pydantic-settings==2.1.0 redis==5.0.1 pandas==2.1.4 httpx==0.25.2 tenacity==8.2.3 loguru==0.7.2
2. 配置管理系统
python
# config.py from pydantic_settings import BaseSettings from pydantic import Field, RedisDsn, PostgresDsn from typing import List, Optional import os class Settings(BaseSettings): """应用配置类""" # 数据库配置 database_url: PostgresDsn = Field( default="postgresql://user:password@localhost:5432/weibo_hot" ) redis_url: RedisDsn = Field( default="redis://localhost:6379/0" ) # 爬虫配置 weibo_hot_url: str = "https://s.weibo.com/top/summary" request_timeout: int = 30 max_concurrent: int = 10 retry_times: int = 3 # 代理配置 proxy_enabled: bool = False proxy_pool: List[str] = [] # 请求头配置 user_agents: List[str] = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36", ] # 采集频率 crawl_interval: int = 300 # 5分钟 class Config: env_file = ".env" env_file_encoding = "utf-8" settings = Settings()
3. 数据模型定义
python
# models.py from sqlalchemy import ( Column, Integer, String, DateTime, Float, Text, Boolean, Index, BigInteger ) from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import func from datetime import datetime import json Base = declarative_base() class HotSearchItem(Base): """热搜条目数据模型""" __tablename__ = "hot_search_items" id = Column(BigInteger, primary_key=True, autoincrement=True) rank = Column(Integer, nullable=False, comment="排名") keyword = Column(String(255), nullable=False, comment="关键词") url = Column(String(500), comment="微博链接") tag = Column(String(50), comment="标签:热、新、爆等") hot_value = Column(Integer, comment="热度值") category = Column(String(50), comment="分类") timestamp = Column(DateTime, default=datetime.now, comment="采集时间") created_at = Column(DateTime, default=datetime.now, comment="创建时间") updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) # 添加复合索引 __table_args__ = ( Index('idx_timestamp_rank', 'timestamp', 'rank'), Index('idx_keyword_timestamp', 'keyword', 'timestamp'), ) def to_dict(self): """转换为字典""" return { "id": self.id, "rank": self.rank, "keyword": self.keyword, "url": self.url, "tag": self.tag, "hot_value": self.hot_value, "category": self.category, "timestamp": self.timestamp.isoformat() if self.timestamp else None, } class HotSearchHistory(Base): """热搜历史快照""" __tablename__ = "hot_search_history" id = Column(BigInteger, primary_key=True, autoincrement=True) snapshot_data = Column(Text, comment="快照数据JSON") total_items = Column(Integer, comment="总条目数") timestamp = Column(DateTime, default=datetime.now, comment="快照时间") created_at = Column(DateTime, default=datetime.now)4. 异步HTTP客户端
python
# http_client.py import aiohttp import asyncio from typing import Optional, Dict, Any from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type ) import random from loguru import logger from config import settings class AsyncHTTPClient: """异步HTTP客户端""" def __init__(self): self.session: Optional[aiohttp.ClientSession] = None self.user_agents = settings.user_agents self.timeout = aiohttp.ClientTimeout(total=settings.request_timeout) async def __aenter__(self): self.session = aiohttp.ClientSession(timeout=self.timeout) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() def _get_random_headers(self) -> Dict[str, str]: """获取随机请求头""" return { "User-Agent": random.choice(self.user_agents), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1", "Cache-Control": "max-age=0", } @retry( stop=stop_after_attempt(settings.retry_times), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError)) ) async def fetch(self, url: str, use_proxy: bool = False) -> str: """获取网页内容""" if not self.session: self.session = aiohttp.ClientSession(timeout=self.timeout) headers = self._get_random_headers() proxy = random.choice(settings.proxy_pool) if use_proxy and settings.proxy_pool else None try: async with self.session.get( url, headers=headers, proxy=proxy, ssl=False ) as response: response.raise_for_status() html = await response.text() logger.info(f"成功获取页面: {url}, 状态码: {response.status}") return html except aiohttp.ClientError as e: logger.error(f"请求失败: {url}, 错误: {str(e)}") raise async def close(self): """关闭会话""" if self.session: await self.session.close()5. 微博热搜解析器
python
# parser.py from bs4 import BeautifulSoup import re from typing import List, Dict, Any, Optional from datetime import datetime import json from loguru import logger class WeiboHotSearchParser: """微博热搜解析器""" @staticmethod def parse_hot_list(html: str) -> List[Dict[str, Any]]: """解析热搜列表""" soup = BeautifulSoup(html, 'lxml') hot_items = [] # 查找热搜表格 hot_table = soup.find('tbody') if not hot_table: logger.warning("未找到热搜表格") return hot_items # 解析每个热搜条目 rows = hot_table.find_all('tr', class_=lambda x: x != 'thead') for row in rows: try: item = WeiboHotSearchParser._parse_row(row) if item: hot_items.append(item) except Exception as e: logger.error(f"解析行失败: {str(e)}") continue logger.info(f"解析到 {len(hot_items)} 个热搜条目") return hot_items @staticmethod def _parse_row(row) -> Optional[Dict[str, Any]]: """解析单行数据""" # 获取排名 rank_td = row.find('td', class_='td-01') if not rank_td: return None rank_text = rank_td.get_text(strip=True) rank = int(rank_text) if rank_text.isdigit() else 0 # 获取关键词和链接 keyword_td = row.find('td', class_='td-02') if not keyword_td: return None keyword_link = keyword_td.find('a') if not keyword_link: return None keyword = keyword_link.get_text(strip=True) url = keyword_link.get('href', '') # 补全URL if url and not url.startswith('http'): url = f"https://s.weibo.com{url}" # 获取标签(热、新、爆等) tag_span = keyword_td.find('span') tag = tag_span.get_text(strip=True) if tag_span else None # 获取热度值 hot_value_td = row.find('td', class_='td-03') hot_value = None if hot_value_td: hot_text = hot_value_td.get_text(strip=True) if hot_text.isdigit(): hot_value = int(hot_text) # 获取分类 category_td = row.find('td', class_='td-04') category = category_td.get_text(strip=True) if category_td else None return { "rank": rank, "keyword": keyword, "url": url, "tag": tag, "hot_value": hot_value, "category": category, "timestamp": datetime.now() } @staticmethod def parse_real_time_hot(html: str) -> List[Dict[str, Any]]: """解析实时热搜(备用方案)""" # 使用正则表达式匹配JSON数据 pattern = r'<script>.*?STK\.pageId.*?="hot".*?</script>' match = re.search(pattern, html, re.DOTALL) if match: try: # 提取JSON数据 json_pattern = r'\[.*?\]' json_match = re.search(json_pattern, match.group(), re.DOTALL) if json_match: data = json.loads(json_match.group()) return data except (json.JSONDecodeError, AttributeError) as e: logger.error(f"JSON解析失败: {str(e)}") return []6. 数据库操作层
python
# database.py from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from contextlib import asynccontextmanager from typing import AsyncGenerator, Generator import asyncio from loguru import logger from config import settings from models import Base, HotSearchItem, HotSearchHistory class DatabaseManager: """数据库管理器""" def __init__(self): # 同步引擎(用于Alembic迁移) self.sync_engine = create_engine( str(settings.database_url), pool_size=20, max_overflow=30, pool_pre_ping=True, echo=False ) # 异步引擎 async_database_url = str(settings.database_url).replace( 'postgresql://', 'postgresql+asyncpg://' ) self.async_engine = create_async_engine( async_database_url, pool_size=20, max_overflow=30, pool_pre_ping=True, echo=False ) # 创建会话工厂 self.SyncSessionLocal = sessionmaker( bind=self.sync_engine, autocommit=False, autoflush=False ) self.AsyncSessionLocal = sessionmaker( bind=self.async_engine, class_=AsyncSession, expire_on_commit=False ) def init_database(self): """初始化数据库""" Base.metadata.create_all(bind=self.sync_engine) logger.info("数据库表创建完成") @asynccontextmanager async def get_async_session(self) -> AsyncGenerator[AsyncSession, None]: """获取异步会话""" session = self.AsyncSessionLocal() try: yield session await session.commit() except Exception as e: await session.rollback() logger.error(f"数据库操作失败: {str(e)}") raise finally: await session.close() def get_sync_session(self) -> Generator[Session, None, None]: """获取同步会话""" session = self.SyncSessionLocal() try: yield session session.commit() except Exception as e: session.rollback() logger.error(f"数据库操作失败: {str(e)}") raise finally: session.close() db_manager = DatabaseManager()7. 主爬虫类
python
# spider.py import asyncio from typing import List, Dict, Any from datetime import datetime import json from loguru import logger from http_client import AsyncHTTPClient from parser import WeiboHotSearchParser from database import db_manager, HotSearchItem, HotSearchHistory from config import settings class WeiboHotSearchSpider: """微博热搜爬虫""" def __init__(self): self.http_client = AsyncHTTPClient() self.parser = WeiboHotSearchParser() self.crawl_count = 0 async def crawl(self) -> List[Dict[str, Any]]: """执行爬取任务""" logger.info(f"开始第 {self.crawl_count + 1} 次爬取") try: # 获取页面内容 async with self.http_client: html = await self.http_client.fetch( settings.weibo_hot_url, use_proxy=settings.proxy_enabled ) # 解析热搜数据 hot_items = self.parser.parse_hot_list(html) if not hot_items: # 尝试备用解析方法 hot_items = self.parser.parse_real_time_hot(html) # 保存到数据库 await self.save_to_database(hot_items) # 保存历史快照 await self.save_snapshot(hot_items) self.crawl_count += 1 logger.success(f"爬取完成,获取到 {len(hot_items)} 条数据") return hot_items except Exception as e: logger.error(f"爬取失败: {str(e)}") return [] async def save_to_database(self, hot_items: List[Dict[str, Any]]): """保存数据到数据库""" async with db_manager.get_async_session() as session: for item in hot_items: # 检查是否已存在 existing = await session.execute( select(HotSearchItem).where( HotSearchItem.keyword == item["keyword"], HotSearchItem.timestamp == item["timestamp"] ) ) if existing.scalar() is None: db_item = HotSearchItem(**item) session.add(db_item) await session.commit() async def save_snapshot(self, hot_items: List[Dict[str, Any]]): """保存历史快照""" async with db_manager.get_async_session() as session: snapshot = HotSearchHistory( snapshot_data=json.dumps(hot_items, ensure_ascii=False, default=str), total_items=len(hot_items), timestamp=datetime.now() ) session.add(snapshot) await session.commit() async def batch_crawl(self, times: int = 10, interval: int = 300): """批量爬取""" for i in range(times): logger.info(f"开始第 {i+1}/{times} 轮爬取") await self.crawl() if i < times - 1: logger.info(f"等待 {interval} 秒后继续...") await asyncio.sleep(interval)8. 调度与监控系统
python
# scheduler.py from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from datetime import datetime, timedelta import asyncio from loguru import logger import signal import sys from spider import WeiboHotSearchSpider from config import settings class SpiderScheduler: """爬虫调度器""" def __init__(self): self.scheduler = AsyncIOScheduler() self.spider = WeiboHotSearchSpider() self.is_running = True # 设置信号处理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): """信号处理函数""" logger.info("接收到停止信号,正在关闭...") self.is_running = False self.scheduler.shutdown() sys.exit(0) async def crawl_job(self): """爬虫任务""" try: await self.spider.crawl() except Exception as e: logger.error(f"定时任务执行失败: {str(e)}") def start(self): """启动调度器""" # 添加定时任务 trigger = IntervalTrigger(seconds=settings.crawl_interval) self.scheduler.add_job( self.crawl_job, trigger=trigger, id='weibo_hot_crawl', name='微博热搜爬取任务', replace_existing=True ) # 添加立即执行任务 self.scheduler.add_job( self.crawl_job, trigger='date', run_date=datetime.now() + timedelta(seconds=5), id='initial_crawl' ) # 启动调度器 self.scheduler.start() logger.info("爬虫调度器已启动") # 保持程序运行 try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): logger.info("程序正常退出") def stop(self): """停止调度器""" self.scheduler.shutdown() logger.info("爬虫调度器已停止")9. 主程序入口
python
# main.py import asyncio import argparse from loguru import logger import sys from scheduler import SpiderScheduler from database import db_manager from config import settings # 配置日志 logger.add( "logs/weibo_spider_{time:YYYY-MM-DD}.log", rotation="00:00", retention="30 days", level="INFO", encoding="utf-8", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}" ) async def single_crawl(): """单次爬取""" from spider import WeiboHotSearchSpider spider = WeiboHotSearchSpider() results = await spider.crawl() # 打印结果 for item in results[:10]: # 只显示前10个 print(f"{item['rank']:2d}. {item['keyword']} ({item.get('tag', '')}) " f"- 热度: {item.get('hot_value', 'N/A')}") print(f"\n总共爬取到 {len(results)} 条数据") def main(): """主函数""" parser = argparse.ArgumentParser(description="微博热搜爬虫") parser.add_argument('--mode', choices=['single', 'daemon'], default='daemon', help='运行模式') parser.add_argument('--init-db', action='store_true', help='初始化数据库') args = parser.parse_args() # 初始化数据库 if args.init_db: logger.info("正在初始化数据库...") db_manager.init_database() logger.success("数据库初始化完成") # 运行模式 if args.mode == 'single': asyncio.run(single_crawl()) else: # 守护进程模式 scheduler = SpiderScheduler() scheduler.start() if __name__ == "__main__": main()10. Docker部署配置
dockerfile
# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ libpq-dev \ curl \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 创建日志目录 RUN mkdir -p logs # 运行数据库迁移 RUN alembic upgrade head # 运行应用 CMD ["python", "main.py", "--mode", "daemon"]
yaml
# docker-compose.yml version: '3.8' services: weibo-spider: build: . container_name: weibo-hot-search-spider environment: - DATABASE_URL=postgresql://user:password@db:5432/weibo_hot - REDIS_URL=redis://redis:6379/0 depends_on: - db - redis volumes: - ./logs:/app/logs - ./data:/app/data restart: unless-stopped db: image: postgres:15 container_name: weibo-db environment: - POSTGRES_USER=user - POSTGRES_PASSWORD=password - POSTGRES_DB=weibo_hot volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432" redis: image: redis:7-alpine container_name: weibo-redis ports: - "6379:6379" volumes: - redis_data:/data pgadmin: image: dpage/pgadmin4 container_name: pgadmin environment: - PGADMIN_DEFAULT_EMAIL=admin@example.com - PGADMIN_DEFAULT_PASSWORD=admin ports: - "5050:80" depends_on: - db volumes: postgres_data: redis_data:
高级功能与优化建议
1. 反爬虫策略应对
python
# 使用Playwright模拟浏览器 async def crawl_with_playwright(): from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) page = await browser.new_page() # 设置用户代理 await page.set_extra_http_headers({ "User-Agent": random.choice(settings.user_agents) }) # 访问页面 await page.goto(settings.weibo_hot_url, wait_until="networkidle") # 执行JavaScript滚动 await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await asyncio.sleep(2) # 获取页面内容 html = await page.content() await browser.close() return html2. 数据可视化展示
python
# 使用Plotly生成热力图 import plotly.graph_objects as go import pandas as pd from datetime import datetime, timedelta async def generate_heatmap(days: int = 7): """生成热搜热力图""" async with db_manager.get_async_session() as session: # 查询最近N天的数据 start_date = datetime.now() - timedelta(days=days) result = await session.execute( select(HotSearchItem) .where(HotSearchItem.timestamp >= start_date) .order_by(HotSearchItem.timestamp) ) items = result.scalars().all() # 转换为DataFrame df = pd.DataFrame([item.to_dict() for item in items]) # 创建热力图 fig = go.Figure(data=go.Heatmap( z=df['hot_value'], x=df['timestamp'], y=df['keyword'], colorscale='Viridis' )) fig.update_layout( title=f'微博热搜热力图(最近{days}天)', xaxis_title='时间', yaxis_title='关键词', height=800 ) fig.write_html(f"heatmap_{datetime.now().strftime('%Y%m%d')}.html")3. 异常监控与报警
python
# 监控报警系统 import smtplib from email.mime.text import MIMEText from typing import List class AlertSystem: """报警系统""" def __init__(self): self.error_count = 0 self.last_alert_time = None async def check_health(self, spider: WeiboHotSearchSpider): """检查爬虫健康状态""" if spider.crawl_count == 0: return success_rate = spider.success_count / spider.crawl_count if success_rate < 0.8: await self.send_alert(f"爬虫成功率过低: {success_rate:.2%}") if time.time() - spider.last_success_time > 3600: # 1小时无成功 await self.send_alert("爬虫超过1小时无成功记录") async def send_alert(self, message: str): """发送报警""" # 邮件报警 msg = MIMEText(message, 'plain', 'utf-8') msg['Subject'] = '微博爬虫报警' msg['From'] = 'alert@example.com' msg['To'] = 'admin@example.com' try: smtp = smtplib.SMTP('smtp.example.com', 587) smtp.starttls() smtp.login('user', 'password') smtp.send_message(msg) smtp.quit() except Exception as e: logger.error(f"发送报警邮件失败: {str(e)}")部署与运行指南
1. 环境准备
bash
# 克隆代码 git clone https://github.com/yourusername/weibo-hot-search-spider.git cd weibo-hot-search-spider # 创建虚拟环境 python -m venv venv source venv/bin/activate # Linux/Mac # venv\Scripts\activate # Windows # 安装依赖 pip install -r requirements.txt # 安装Playwright浏览器 playwright install chromium # 初始化数据库 python main.py --init-db
2. 配置环境变量
bash
# .env文件 DATABASE_URL=postgresql://user:password@localhost:5432/weibo_hot REDIS_URL=redis://localhost:6379/0 CRAWL_INTERVAL=300 MAX_CONCURRENT=10
3. 运行爬虫
bash
# 单次运行 python main.py --mode single # 守护进程模式 python main.py --mode daemon # Docker运行 docker-compose up -d
总结与展望
本文详细介绍了如何使用Python最新技术栈构建一个完整的微博热搜榜爬虫系统。通过采用异步编程、反爬虫策略、数据库优化等技术,实现了高效稳定的数据采集。
系统特点:
高性能:异步并发处理,支持大规模数据采集
高可用:多重容错机制,自动重试与故障恢复
易扩展:模块化设计,支持分布式部署
功能丰富:数据存储、监控报警、可视化展示
扩展方向:
添加机器学习模型,预测热搜趋势
实现情感分析,挖掘舆论倾向
构建实时数据大屏,动态展示热点变化
开发RESTful API,提供数据服务