news 2026/4/15 19:53:41

Python爬虫实战:最新异步技术抓取编程教程资源

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:最新异步技术抓取编程教程资源

一、前言:为什么需要新一代爬虫技术?

在当今信息爆炸的时代,海量编程教程资源分散在各个网站平台,手动收集这些资源既耗时又低效。传统同步爬虫在面对大量请求时效率低下,而基于异步IO的新一代爬虫技术能够并发处理数百个请求,大幅提升数据采集效率。本文将详细介绍如何使用Python最新的异步爬虫技术,构建一个高效、稳定的编程教程资源抓取系统。

二、技术栈:现代Python爬虫的核心组件

2.1 核心框架选择

  • aiohttp:异步HTTP客户端/服务器框架,支持WebSocket

  • httpx:新一代HTTP客户端,同时支持同步和异步请求

  • playwright:微软开发的浏览器自动化工具,支持现代JavaScript渲染

2.2 解析与存储

  • parsel:Scrapy团队开发的HTML/XML解析库

  • BeautifulSoup4:经典的HTML解析库

  • asyncpg:异步PostgreSQL客户端

  • aiofiles:异步文件操作库

2.3 异步生态

  • asyncio:Python原生异步IO框架

  • anyio:高级异步编程抽象层

  • uvloop:超快速异步事件循环(性能提升2-4倍)

三、完整代码实现:异步爬虫系统

python

""" 现代异步编程教程资源爬虫系统 支持并发抓取、自动去重、断点续爬、反爬绕过等功能 """ import asyncio import aiohttp import aiofiles from typing import List, Dict, Optional from dataclasses import dataclass, asdict from urllib.parse import urljoin, urlparse import hashlib import json import logging from datetime import datetime from contextlib import asynccontextmanager import ssl import certifi # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('crawler.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class TutorialResource: """教程资源数据模型""" title: str url: str description: str category: str file_type: str # pdf, video, zip等 file_size: str download_url: str source_site: str upload_date: str tags: List[str] language: str = "中文" difficulty: str = "中级" rating: float = 0.0 class AsyncTutorialCrawler: """异步教程资源爬虫核心类""" def __init__(self, max_concurrent: int = 100, request_timeout: int = 30, use_proxy: bool = False): """ 初始化爬虫 Args: max_concurrent: 最大并发数 request_timeout: 请求超时时间(秒) use_proxy: 是否使用代理 """ self.max_concurrent = max_concurrent self.request_timeout = request_timeout self.use_proxy = use_proxy self.visited_urls = set() self.resources = [] # SSL上下文配置 self.ssl_context = ssl.create_default_context(cafile=certifi.where()) # 请求头配置(模拟浏览器) self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'Cache-Control': 'max-age=0', } # 网站配置(示例) self.site_configs = { 'tutorialspoint': { 'base_url': 'https://www.tutorialspoint.com', 'selectors': { 'resource_list': '.resource-list > div', 'title': 'h2 a::text', 'url': 'h2 a::attr(href)', 'description': '.description::text', 'category': '.category::text', 'file_type': '.file-type::text' } }, 'freecodecamp': { 'base_url': 'https://www.freecodecamp.org', 'api_endpoint': '/api/v1/tutorials' } } @asynccontextmanager async def create_session(self): """创建aiohttp会话上下文""" connector = aiohttp.TCPConnector( limit=self.max_concurrent, ssl=self.ssl_context, force_close=True, enable_cleanup_closed=True ) timeout = aiohttp.ClientTimeout(total=self.request_timeout) async with aiohttp.ClientSession( connector=connector, timeout=timeout, headers=self.headers ) as session: yield session def generate_url_hash(self, url: str) -> str: """生成URL的MD5哈希值用于去重""" return hashlib.md5(url.encode('utf-8')).hexdigest() async def fetch_with_retry(self, session: aiohttp.ClientSession, url: str, retries: int = 3, delay: float = 1.0) -> Optional[str]: """ 带重试机制的请求函数 Args: session: aiohttp会话 url: 请求URL retries: 重试次数 delay: 重试延迟(秒) Returns: 响应文本或None """ url_hash = self.generate_url_hash(url) # 检查是否已访问 if url_hash in self.visited_urls: logger.debug(f"跳过已访问URL: {url}") return None for attempt in range(retries): try: async with session.get(url, ssl=self.ssl_context) as response: if response.status == 200: self.visited_urls.add(url_hash) text = await response.text() logger.info(f"成功获取: {url} (尝试 {attempt + 1})") return text elif response.status == 429: # 太多请求 wait_time = delay * (2 ** attempt) # 指数退避 logger.warning(f"触发限流,等待 {wait_time} 秒") await asyncio.sleep(wait_time) else: logger.error(f"HTTP错误 {response.status}: {url}") return None except (aiohttp.ClientError, asyncio.TimeoutError) as e: logger.warning(f"请求失败 (尝试 {attempt + 1}/{retries}): {url} - {e}") if attempt < retries - 1: await asyncio.sleep(delay * (2 ** attempt)) else: logger.error(f"请求最终失败: {url}") return None return None async def parse_tutorialspoint(self, html: str, base_url: str) -> List[TutorialResource]: """解析tutorialspoint网站""" from parsel import Selector selector = Selector(html) resources = [] config = self.site_configs['tutorialspoint'] for item in selector.css(config['selectors']['resource_list']): try: title = item.css(config['selectors']['title']).get('').strip() relative_url = item.css(config['selectors']['url']).get('') url = urljoin(base_url, relative_url) resource = TutorialResource( title=title, url=url, description=item.css(config['selectors']['description']).get('')[:200], category=item.css(config['selectors']['category']).get('未分类'), file_type=item.css(config['selectors']['file_type']).get('未知'), file_size="待获取", download_url="", # 需要进一步解析详情页 source_site="tutorialspoint", upload_date=datetime.now().strftime("%Y-%m-%d"), tags=["编程", "教程"] ) resources.append(resource) except Exception as e: logger.error(f"解析失败: {e}") continue return resources async def parse_freecodecamp_api(self, data: dict) -> List[TutorialResource]: """解析freecodecamp API数据""" resources = [] for item in data.get('tutorials', []): try: resource = TutorialResource( title=item.get('title', ''), url=item.get('url', ''), description=item.get('description', ''), category=item.get('category', 'programming'), file_type='text', # 主要是文章 file_size='N/A', download_url=item.get('download_url', ''), source_site="freecodecamp", upload_date=item.get('published_at', ''), tags=item.get('tags', []), difficulty=item.get('difficulty', 'beginner'), rating=float(item.get('rating', 0)) ) resources.append(resource) except Exception as e: logger.error(f"解析API数据失败: {e}") continue return resources async def download_resource(self, session: aiohttp.ClientSession, resource: TutorialResource, save_dir: str = "downloads"): """异步下载资源文件""" if not resource.download_url: return try: async with session.get(resource.download_url, ssl=self.ssl_context) as response: if response.status == 200: # 获取文件扩展名 content_disposition = response.headers.get('Content-Disposition', '') filename = resource.title.replace(' ', '_') + self._get_file_extension(resource.file_type) # 异步保存文件 filepath = f"{save_dir}/{filename}" async with aiofiles.open(filepath, 'wb') as f: await f.write(await response.read()) logger.info(f"下载完成: {filename}") resource.file_size = f"{response.content_length / 1024 / 1024:.2f} MB" except Exception as e: logger.error(f"下载失败 {resource.title}: {e}") def _get_file_extension(self, file_type: str) -> str: """根据文件类型获取扩展名""" extensions = { 'pdf': '.pdf', 'video': '.mp4', 'zip': '.zip', 'text': '.txt', 'html': '.html', 'markdown': '.md' } return extensions.get(file_type.lower(), '.bin') async def crawl_site(self, site_name: str, start_url: str = None): """爬取指定网站""" async with self.create_session() as session: if site_name == 'freecodecamp': # API方式获取 api_url = self.site_configs[site_name]['api_endpoint'] full_url = self.site_configs[site_name]['base_url'] + api_url async with session.get(full_url) as response: if response.status == 200: data = await response.json() resources = await self.parse_freecodecamp_api(data) self.resources.extend(resources) else: # HTML解析方式 base_url = start_url or self.site_configs[site_name]['base_url'] html = await self.fetch_with_retry(session, base_url) if html: if site_name == 'tutorialspoint': resources = await self.parse_tutorialspoint(html, base_url) self.resources.extend(resources) # 可以添加更多网站的解析逻辑 async def concurrent_crawl(self, sites: List[Dict]): """并发爬取多个网站""" tasks = [] for site in sites: task = asyncio.create_task(self.crawl_site(site['name'], site.get('url'))) tasks.append(task) # 使用asyncio.gather并发执行 results = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常 for i, result in enumerate(results): if isinstance(result, Exception): logger.error(f"爬取任务失败 {sites[i]['name']}: {result}") async def save_results(self, format: str = 'json'): """保存爬取结果""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if format == 'json': filename = f"tutorial_resources_{timestamp}.json" data = [asdict(resource) for resource in self.resources] async with aiofiles.open(filename, 'w', encoding='utf-8') as f: await f.write(json.dumps(data, ensure_ascii=False, indent=2)) logger.info(f"结果已保存至 {filename}") elif format == 'csv': import csv filename = f"tutorial_resources_{timestamp}.csv" async with aiofiles.open(filename, 'w', encoding='utf-8', newline='') as f: writer = csv.DictWriter(f, fieldnames=list(asdict(self.resources[0]).keys())) await writer.writeheader() for resource in self.resources: await writer.writerow(asdict(resource)) logger.info(f"结果已保存至 {filename}") async def get_statistics(self) -> Dict: """获取爬取统计信息""" stats = { "total_resources": len(self.resources), "sites_count": len(set(r.source_site for r in self.resources)), "categories": {}, "file_types": {} } for resource in self.resources: # 统计分类 stats["categories"][resource.category] = \ stats["categories"].get(resource.category, 0) + 1 # 统计文件类型 stats["file_types"][resource.file_type] = \ stats["file_types"].get(resource.file_type, 0) + 1 return stats async def main(): """主函数""" # 初始化爬虫(配置高并发) crawler = AsyncTutorialCrawler( max_concurrent=50, request_timeout=45, use_proxy=False ) # 定义要爬取的网站 sites_to_crawl = [ {'name': 'tutorialspoint'}, {'name': 'freecodecamp'}, # 可以添加更多网站 ] logger.info("开始爬取编程教程资源...") try: # 并发爬取 await crawler.concurrent_crawl(sites_to_crawl) # 下载资源文件(可选) # async with crawler.create_session() as session: # download_tasks = [] # for resource in crawler.resources[:10]: # 限制前10个 # task = crawler.download_resource(session, resource) # download_tasks.append(task) # await asyncio.gather(*download_tasks) # 保存结果 await crawler.save_results('json') await crawler.save_results('csv') # 显示统计信息 stats = await crawler.get_statistics() logger.info(f"爬取完成!统计信息:") logger.info(f"总计资源: {stats['total_resources']}个") logger.info(f"来源网站: {stats['sites_count']}个") logger.info(f"分类分布: {stats['categories']}") except KeyboardInterrupt: logger.info("用户中断爬取过程") except Exception as e: logger.error(f"爬取过程发生错误: {e}") finally: logger.info("爬虫运行结束") if __name__ == "__main__": # 使用uvloop提升性能(Linux/Mac) try: import uvloop uvloop.install() logger.info("使用uvloop加速") except ImportError: logger.info("使用标准asyncio事件循环") # 运行主函数 asyncio.run(main())

四、高级功能扩展

4.1 Playwright动态渲染支持

python

async def crawl_js_heavy_site(url: str): """处理JavaScript动态渲染的网站""" from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' ) page = await context.new_page() await page.goto(url, wait_until='networkidle') # 等待内容加载 await page.wait_for_selector('.resource-item', timeout=10000) # 获取渲染后的HTML content = await page.content() await browser.close() return content

4.2 分布式爬虫支持

python

import redis.asyncio as redis from celery import Celery # Redis连接池 redis_pool = redis.ConnectionPool.from_url('redis://localhost:6379/0') class DistributedCrawler: def __init__(self): self.redis = redis.Redis(connection_pool=redis_pool) self.celery_app = Celery('crawler', broker='redis://localhost:6379/0') async def distribute_tasks(self, urls: List[str]): """分布式任务分发""" for url in urls: await self.redis.lpush('crawl_queue', url)

4.3 智能限速与反反爬策略

python

class RateLimiter: """智能请求限速器""" def __init__(self, requests_per_minute: int = 60): self.requests_per_minute = requests_per_minute self.request_times = [] async def acquire(self): """获取请求许可""" now = asyncio.get_event_loop().time() # 清理过期的请求记录 self.request_times = [t for t in self.request_times if now - t < 60] if len(self.request_times) >= self.requests_per_minute: # 计算需要等待的时间 oldest = self.request_times[0] wait_time = 60 - (now - oldest) if wait_time > 0: await asyncio.sleep(wait_time) self.request_times.append(now)

五、部署与监控

5.1 Docker部署配置

dockerfile

FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ libpq-dev \ && rm -rf /var/lib/apt/lists/* # 安装Python依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制源代码 COPY . . # 运行爬虫 CMD ["python", "-m", "crawler.main"]

5.2 监控配置

python

from prometheus_client import start_http_server, Counter, Histogram # 定义监控指标 REQUESTS_TOTAL = Counter('crawler_requests_total', 'Total requests') REQUEST_DURATION = Histogram('crawler_request_duration_seconds', 'Request duration') @REQUEST_DURATION.time() async def monitored_fetch(session, url): """带监控的请求函数""" REQUESTS_TOTAL.inc() return await session.get(url)

六、最佳实践与注意事项

6.1 遵守robots.txt

python

from urllib.robotparser import RobotFileParser async def check_robots_txt(url: str): """检查robots.txt协议""" rp = RobotFileParser() base_url = f"{urlparse(url).scheme}://{urlparse(url).netloc}" rp.set_url(f"{base_url}/robots.txt") rp.read() return rp.can_fetch("*", url)

6.2 数据清洗与去重

python

import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity def deduplicate_resources(resources: List[TutorialResource], similarity_threshold: float = 0.9): """基于内容相似度的去重""" titles = [r.title for r in resources] # 计算TF-IDF向量 vectorizer = TfidfVectorizer() title_vectors = vectorizer.fit_transform(titles) # 计算相似度矩阵 similarity_matrix = cosine_similarity(title_vectors) # 找出重复项 duplicates = set() for i in range(len(resources)): for j in range(i + 1, len(resources)): if similarity_matrix[i, j] > similarity_threshold: duplicates.add(j) # 返回去重后的列表 return [r for idx, r in enumerate(resources) if idx not in duplicates]

七、总结

本文详细介绍了使用Python最新异步技术构建高效爬虫的完整方案。通过采用aiohttp、playwright等现代库,结合智能限速、分布式部署等高级特性,可以构建出稳定高效的编程教程资源采集系统。在实际应用中,请务必:

  1. 遵守目标网站的robots.txt协议

  2. 设置合理的请求间隔,避免对目标服务器造成压力

  3. 尊重版权,仅下载允许自由传播的资源

  4. 定期更新爬虫以适应网站改版

  5. 实施完善的数据验证和清洗流程

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/15 9:16:27

从零构建C++ AIGC推理框架,实现超高吞吐量的实战路径

第一章&#xff1a;从零构建C AIGC推理框架的必要性在人工智能生成内容&#xff08;AIGC&#xff09;快速发展的背景下&#xff0c;高性能、低延迟的推理系统成为实际落地的关键。尽管Python生态提供了丰富的深度学习框架&#xff0c;但在生产环境中&#xff0c;尤其是在对性能…

作者头像 李华
网站建设 2026/4/15 9:15:17

vue+uniapp+ssm农副产品交易系统原生小程序vue

文章目录摘要主要技术与实现手段系统设计与实现的思路系统设计方法java类核心代码部分展示结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;摘要 基于Vue.js、Uniapp和SSM框架的农副产品交易系统原生小程序&#xff0c;旨在为农户和消…

作者头像 李华
网站建设 2026/4/12 3:43:01

Keil5编辑器字符编码设置从零实现

彻底解决Keil5中文注释乱码&#xff1a;从编码原理到实战配置 你有没有遇到过这样的场景&#xff1f;在Keil5里辛辛苦苦写了一段中文注释&#xff0c;回头一看——满屏方块、问号&#xff0c;甚至变成一堆看不懂的“火星文”&#xff1f;而同事用VS Code打开同一个文件却显示正…

作者头像 李华
网站建设 2026/4/6 2:36:47

国内访问HuggingFace困难?试试这些稳定镜像网站

国内访问HuggingFace困难&#xff1f;试试这些稳定镜像网站 在AI研发的日常中&#xff0c;你是否也遇到过这样的场景&#xff1a;满怀期待地打开终端&#xff0c;准备下载一个热门的Stable Diffusion模型或LLM权重&#xff0c;结果git clone命令卡在10%一动不动&#xff1f;再刷…

作者头像 李华
网站建设 2026/4/14 12:13:50

PCBA高密度互连设计:微小间距器件布局方案

微小间距器件布局实战&#xff1a;突破高密度PCBA的设计瓶颈你有没有遇到过这样的场景&#xff1f;项目进入关键阶段&#xff0c;原理图已经敲定&#xff0c;芯片选型也完成了——结果在PCB布局时卡住了。一个0.4 mm节距的BGA封装DSP芯片摆在板子中央&#xff0c;引脚密密麻麻像…

作者头像 李华