news 2026/5/6 9:28:35

asyncio+queue实现生产者消费者爬虫模型

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
asyncio+queue实现生产者消费者爬虫模型

在网络爬虫开发中,生产者 - 消费者模型是经典且高效的架构模式。它将 “任务生产(URL 采集)” 和 “任务消费(页面爬取)” 解耦,能有效控制并发、避免资源浪费。而 Python 的asyncio(异步 I/O)结合asyncio.Queue,可以打造出高性能的异步爬虫,相比传统多线程 / 多进程爬虫,异步模型在 IO 密集型的爬虫场景下效率提升显著。

一、核心原理

1. 生产者 - 消费者模型

  • 生产者:负责生成待爬取的 URL,将其放入队列中,是爬虫的 “任务源”;
  • 消费者:从队列中取出 URL,执行页面请求、数据解析等操作,是爬虫的 “执行端”;
  • 队列(asyncio.Queue):作为生产者和消费者之间的缓冲,解耦两者的执行节奏,控制并发数量,避免瞬间请求量过大导致的被封 IP 或服务器压力过高。

2. asyncio 的优势

asyncio是 Python 内置的异步编程框架,基于事件循环实现,无需创建大量线程 / 进程,仅通过单线程的协程切换就能处理大量 IO 操作(如网络请求),资源开销远低于多线程,适合爬虫这类高 IO 场景。

二、完整实现代码

以下是一个可直接运行的异步爬虫示例,以爬取某测试站点的页面标题为例,完整实现生产者 - 消费者模型:

python

运行

import asyncio import aiohttp from aiohttp import ClientTimeout from typing import List # 全局配置 MAX_CONCURRENT = 5 # 最大并发消费者数量 QUEUE_MAXSIZE = 10 # 队列最大容量,控制缓冲大小 BASE_URL = "https://httpbin.org/get?page={}" # 测试URL PAGE_RANGE = range(1, 21) # 待爬取的页面范围(1-20页) class AsyncCrawler: def __init__(self): # 初始化异步队列,设置最大容量 self.queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE) # 存储爬取结果 self.results = [] async def producer(self, urls: List[str]): """生产者:将待爬取的URL放入队列""" for url in urls: await self.queue.put(url) print(f"生产者:已放入URL -> {url}") # 放入结束标记(数量等于消费者数量),通知消费者退出 for _ in range(MAX_CONCURRENT): await self.queue.put(None) async def consumer(self, session: aiohttp.ClientSession, consumer_id: int): """消费者:从队列取出URL并爬取""" while True: # 从队列获取URL(异步阻塞,直到有数据) url = await self.queue.get() # 检测结束标记 if url is None: print(f"消费者{consumer_id}:收到结束标记,退出") self.queue.task_done() break try: # 异步请求页面 async with session.get(url, timeout=ClientTimeout(total=10)) as response: if response.status == 200: # 解析响应数据(此处仅示例,可替换为实际解析逻辑) data = await response.json() page = data["args"]["page"] self.results.append(f"页面{page}爬取成功") print(f"消费者{consumer_id}:成功爬取 -> {url}") else: print(f"消费者{consumer_id}:爬取失败,状态码 -> {response.status}") except Exception as e: print(f"消费者{consumer_id}:爬取异常 -> {url},错误:{str(e)}") finally: # 标记任务完成(用于队列的join()方法) self.queue.task_done() async def run(self): """启动爬虫主流程""" # 1. 生成待爬取的URL列表 urls = [BASE_URL.format(page) for page in PAGE_RANGE] # 2. 创建异步HTTP会话(复用连接,提升性能) async with aiohttp.ClientSession() as session: # 3. 创建并启动生产者任务 producer_task = asyncio.create_task(self.producer(urls)) # 4. 创建并启动多个消费者任务 consumer_tasks = [] for i in range(MAX_CONCURRENT): task = asyncio.create_task(self.consumer(session, i+1)) consumer_tasks.append(task) # 5. 等待生产者任务完成 await producer_task # 6. 等待队列中所有任务处理完成 await self.queue.join() # 7. 等待所有消费者任务退出 await asyncio.gather(*consumer_tasks) # 8. 输出爬取结果 print("\n===== 爬取完成 =====") for res in self.results: print(res) print(f"总计成功爬取:{len(self.results)} 条数据") if __name__ == "__main__": # 运行异步爬虫 crawler = AsyncCrawler() asyncio.run(crawler.run())

三、代码核心解析

1. 队列初始化

self.queue = asyncio.Queue(maxsize=QUEUE_MAXSIZE)初始化异步队列,maxsize限制队列最大容量,避免生产者生产过快导致内存溢出。

2. 生产者逻辑

  • 遍历 URL 列表,通过await self.queue.put(url)将 URL 放入队列;
  • 生产完成后,放入与消费者数量相同的None作为结束标记,确保每个消费者都能收到退出信号。

3. 消费者逻辑

  • 循环从队列取 URL:url = await self.queue.get()(异步阻塞,无数据时等待);
  • 检测到None时退出循环,完成消费者任务;
  • 使用aiohttp.ClientSession发起异步 HTTP 请求,复用连接池提升效率;
  • 无论爬取成功 / 失败,最终调用self.queue.task_done()标记任务完成,供队列join()方法检测。

4. 主流程控制

  • asyncio.create_task()创建协程任务,实现生产者和多个消费者的并发执行;
  • await self.queue.join()阻塞直到队列中所有任务都被task_done()标记,确保所有 URL 都被处理;
  • asyncio.gather(*consumer_tasks)等待所有消费者任务正常退出。

四、关键优化点

  1. 连接复用:使用aiohttp.ClientSession而非每次请求创建新会话,减少 TCP 连接建立开销;
  2. 并发控制:通过MAX_CONCURRENT限制消费者数量,避免并发过高触发反爬;
  3. 异常处理:捕获请求过程中的异常,避免单个 URL 爬取失败导致消费者退出;
  4. 队列缓冲:设置QUEUE_MAXSIZE,当队列满时生产者会阻塞,实现生产消费的速率匹配。

五、适用场景与扩展

1. 适用场景

  • 高并发爬取大量网页(如电商商品页、新闻列表页);
  • 需控制爬取速率,避免目标网站反爬;
  • IO 密集型爬取任务(大部分时间消耗在网络请求上)。

2. 扩展方向

  • 去重机制:添加 URL 去重(如使用集合),避免重复爬取;
  • 重试机制:对失败的 URL 进行重试,提升爬取成功率;
  • 数据存储:将爬取结果写入数据库(如 MySQL、MongoDB),需使用异步数据库驱动;
  • 代理池集成:添加异步代理池,随机切换代理 IP,避免被封;
  • 任务持久化:将队列中的 URL 持久化到文件 / 数据库,实现断点续爬。

总结

  1. asyncio + Queue实现的生产者 - 消费者爬虫,核心是通过异步队列解耦生产(URL 生成)和消费(页面爬取),利用异步 IO 提升爬取效率;
  2. 关键控制点包括:并发数限制、队列容量限制、异常处理、结束标记传递;
  3. 该模型兼具高性能和可控性,是异步爬虫开发的经典架构,可根据实际需求扩展去重、重试、代理等功能。

相比传统同步爬虫,异步生产者 - 消费者模型在处理大量 IO 密集型任务时,能以更低的资源开销实现更高的并发量,是 Python 爬虫开发中值得掌握的核心技术。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/6 6:38:51

字为基·星为途——汉语何以领跑科技时代,领航星际文明?

字为基星为途——汉语何以领跑科技时代,领航星际文明? 引言:语言,科技与星际文明的底层基础设施 在科技发展日新月异的当下,我们仿佛置身于一辆高速飞驰的列车之上,见证着无数的创新与变革。从人工智能的…

作者头像 李华
网站建设 2026/4/22 21:09:53

[嵌入式系统-172]:直流电机、步进电机和伺服电机的异同

直流电机、步进电机和伺服电机是三种最常用的电动机类型,广泛应用于机器人、自动化设备、消费电子、工业控制等领域。它们在工作原理、控制方式、精度、成本和应用场景上既有区别也有联系。 下面从多个维度对三者进行系统性的 异同对比分析。 一、基本定义 类型简…

作者头像 李华
网站建设 2026/5/2 11:39:10

[嵌入式系统-173]:步进电机是如何实现精确的角度控制的?

步进电机之所以能实现精确的角度控制,是因为它将旋转运动“数字化”——每接收一个电脉冲信号,就转动一个固定的微小角度(称为步距角)。这种特性使其在无需反馈装置的情况下,也能实现高精度的位置和角度控制。下面详细…

作者头像 李华
网站建设 2026/5/3 16:00:03

基于微信小程序的家政服务与互助平台

随着微信小程序的普及,我们推出了一个创新的家政服务与互助平台。该平台基于微信生态系统,利用其社交网络优势,为用户提供便捷的家政服务预约与管理。用户可以轻松浏览家政服务、预约服务、接收订单提醒,并通过服务记录追踪服务状…

作者头像 李华
网站建设 2026/5/5 0:51:48

2000-2024年地级市市场化水平

在地方经济治理研究中,“市场化水平”常被用来刻画市场在资源配置中的相对作用强弱 本文参考《中国工业经济》熊凌云等(2025)文中关于市场化水平指标的构建与测算方法,测算地级市层面的市场化水平数据,测算方式如下&a…

作者头像 李华