在数据爬取场景中,传统同步爬虫受限于 IO 等待(如网络请求、数据库写入),效率往往难以满足大规模数据采集需求。而异步编程能最大化利用 CPU 资源,结合 MongoDB 的异步驱动,则可实现 “爬取 - 存储” 全流程异步化,大幅提升爬虫整体性能。本文将详细讲解如何基于 Python 的 aiohttp 实现异步爬虫,并结合 pymongo 的异步特性(Motor)完成数据的高效存储。
一、核心技术栈说明
在开始编码前,先明确核心依赖库的作用:
- aiohttp:Python 异步 HTTP 客户端 / 服务器框架,用于发起异步网络请求,替代同步的 requests 库;
- Motor:MongoDB 官方推荐的异步驱动,是 pymongo 的异步版本,支持异步 IO 操作 MongoDB;
- asyncio:Python 内置的异步编程框架,用于管理异步任务、事件循环;
- pymongo:MongoDB 的同步驱动(本文仅作为对比参考,核心使用 Motor)。
二、环境准备
1. 安装依赖
bash
运行
pip install aiohttp motor python-dotenv2. 前提条件
- 已安装并启动 MongoDB 服务(本地或远程);
- 了解基本的异步编程概念(如协程、async/await);
- 目标网站允许爬虫访问(遵守 robots 协议,避免法律风险)。
三、异步爬虫 + MongoDB 异步存储实现
1. 核心思路
- 初始化 Motor 客户端,建立与 MongoDB 的异步连接;
- 定义异步爬虫函数,通过 aiohttp 发起异步请求,解析目标数据;
- 定义异步存储函数,将解析后的数据异步写入 MongoDB;
- 利用 asyncio 创建任务列表,批量执行异步爬取 + 存储任务;
- 统一管理事件循环,确保所有异步任务完成后关闭连接。
2. 完整代码实现
python
运行
import asyncio import aiohttp from motor.motor_asyncio import AsyncIOMotorClient from dotenv import load_dotenv import os from typing import Dict, List # 加载环境变量(建议将敏感信息放在.env文件中) load_dotenv() # MongoDB配置 MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/") MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "spider_db") MONGO_COLLECTION_NAME = os.getenv("MONGO_COLLECTION_NAME", "async_spider_data") # 爬虫配置 TARGET_URLS = [ "https://jsonplaceholder.typicode.com/posts/1", "https://jsonplaceholder.typicode.com/posts/2", "https://jsonplaceholder.typicode.com/posts/3", "https://jsonplaceholder.typicode.com/posts/4", "https://jsonplaceholder.typicode.com/posts/5", ] HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } # 初始化MongoDB异步客户端 async def init_mongo() -> AsyncIOMotorClient: """初始化MongoDB异步连接""" try: client = AsyncIOMotorClient(MONGO_URI) # 测试连接 await client.admin.command("ping") print("MongoDB异步连接成功!") return client except Exception as e: print(f"MongoDB连接失败:{e}") raise # 异步爬取单条数据 async def crawl_single_url(session: aiohttp.ClientSession, url: str) -> Dict | None: """异步爬取单个URL的内容""" try: async with session.get(url, headers=HEADERS, timeout=aiohttp.ClientTimeout(10)) as response: if response.status == 200: data = await response.json() print(f"成功爬取:{url}") return data else: print(f"爬取失败,状态码:{response.status},URL:{url}") return None except Exception as e: print(f"爬取异常:{e},URL:{url}") return None # 异步存储数据到MongoDB async def save_to_mongo(db, data: Dict) -> None: """异步将数据写入MongoDB""" if not data: return try: collection = db[MONGO_COLLECTION_NAME] # 异步插入单条数据 result = await collection.insert_one(data) print(f"数据存储成功,ID:{result.inserted_id}") except Exception as e: print(f"数据存储失败:{e}") # 核心异步任务:爬取+存储 async def crawl_and_save(session: aiohttp.ClientSession, db, url: str) -> None: """单个URL的爬取+存储一体化异步任务""" data = await crawl_single_url(session, url) await save_to_mongo(db, data) # 主函数:批量执行异步任务 async def main(): # 初始化MongoDB连接 client = await init_mongo() db = client[MONGO_DB_NAME] # 创建aiohttp会话(复用连接池,提升效率) async with aiohttp.ClientSession() as session: # 创建异步任务列表 tasks = [] for url in TARGET_URLS: task = asyncio.create_task(crawl_and_save(session, db, url)) tasks.append(task) # 等待所有任务完成 await asyncio.gather(*tasks) # 关闭MongoDB连接 client.close() print("所有爬取和存储任务完成!") if __name__ == "__main__": # 适配Python 3.7+的事件循环运行方式 if sys.version_info >= (3, 7): asyncio.run(main()) else: loop = asyncio.get_event_loop() loop.run_until_complete(main())3. 代码关键解析
(1)MongoDB 异步初始化
init_mongo函数通过AsyncIOMotorClient创建异步客户端,替代 pymongo 的同步MongoClient,并通过await client.admin.command("ping")测试连接,确保连接成功后再执行后续操作。
(2)异步 HTTP 请求
使用aiohttp.ClientSession创建会话(复用连接池),通过async with session.get()发起异步请求,await response.json()异步解析响应数据,避免同步请求的 IO 阻塞。
(3)异步数据存储
save_to_mongo函数中,await collection.insert_one(data)是核心异步操作,替代 pymongo 同步的insert_one,无需等待数据库写入完成即可继续处理下一个爬取任务。
(4)批量任务管理
通过asyncio.create_task创建多个异步任务,再用asyncio.gather批量执行,实现多 URL 并行爬取和存储,最大化利用资源。
四、性能对比与优化建议
1. 同步 vs 异步性能差异
以爬取 100 个 URL 并写入 MongoDB 为例:
- 同步爬虫(requests+pymongo):约需 30-60 秒(受网络延迟、数据库写入等待影响);
- 异步爬虫(aiohttp+Motor):约需 5-10 秒(并行处理 IO 操作,无等待时间)。
2. 优化方向
(1)控制并发数
避免无限制并发导致目标网站封禁 IP 或 MongoDB 压力过大,可使用asyncio.Semaphore限制并发数:
python
运行
# 在main函数中添加信号量,限制最大并发为5 semaphore = asyncio.Semaphore(5) # 修改crawl_and_save函数,增加信号量控制 async def crawl_and_save(session: aiohttp.ClientSession, db, url: str) -> None: async with semaphore: # 限制并发 data = await crawl_single_url(session, url) await save_to_mongo(db, data)(2)数据批量写入
若爬取数据量极大,可将多条数据缓存后批量插入,减少数据库交互次数:
python
运行
async def save_batch_to_mongo(db, data_list: List[Dict]) -> None: """批量异步插入数据""" if not data_list: return try: collection = db[MONGO_COLLECTION_NAME] result = await collection.insert_many(data_list) print(f"批量存储成功,插入ID数量:{len(result.inserted_ids)}") except Exception as e: print(f"批量存储失败:{e}")(3)异常重试机制
对失败的爬取任务添加重试逻辑,提升稳定性:
python
运行
async def crawl_single_url(session: aiohttp.ClientSession, url: str, retry=3) -> Dict | None: """带重试机制的异步爬取""" for i in range(retry): try: async with session.get(url, headers=HEADERS, timeout=aiohttp.ClientTimeout(10)) as response: if response.status == 200: return await response.json() else: print(f"第{i+1}次重试失败,状态码:{response.status}") await asyncio.sleep(1) # 重试前休眠1秒 except Exception as e: print(f"第{i+1}次重试异常:{e}") await asyncio.sleep(1) return None五、注意事项
- 遵守网站规则:异步爬虫效率高,需控制爬取频率,避免对目标网站造成压力,必要时添加延迟;
- MongoDB 索引:针对查询频繁的字段(如 url、id)创建索引,提升后续数据查询效率;
- 资源释放:确保异步任务完成后关闭 MongoDB 客户端和 aiohttp 会话,避免资源泄漏;
- 数据去重:可通过 MongoDB 的唯一索引(如
create_index("id", unique=True))避免重复存储数据。
总结
- 异步爬虫结合 MongoDB 异步驱动(Motor)可实现 “爬取 - 存储” 全流程异步化,相比同步方案能提升 5-10 倍效率;
- 核心依赖为 aiohttp(异步请求)和 Motor(MongoDB 异步驱动),需通过
async/await管理异步任务; - 实际应用中需控制并发数、添加重试机制、批量写入数据,平衡效率与稳定性,同时遵守网站爬取规则。
通过本文的实践,你可以快速搭建高效的异步爬虫系统,满足大规模数据采集与存储的需求,同时兼顾代码的可维护性和扩展性。