前言
在爬虫应用场景中,单页面爬取仅能满足简单的数据采集需求,而批量爬取多页面、多目标网址的数据才是解决实际业务问题的核心能力。批量爬取的核心挑战在于如何高效管理待爬取 URL 队列、控制爬取节奏、避免重复爬取,并保证大规模数据采集的稳定性。本文将从批量爬取的核心原理入手,以「当当网图书畅销榜」(http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-)为实战目标,系统讲解批量爬取网页的基础实现方法,涵盖 URL 队列构建、分页批量爬取、多 URL 批量爬取、数据批量存储等核心环节,帮助开发者掌握批量爬取的核心逻辑与落地技巧。
摘要
本文以当当网图书畅销榜为爬取对象,详细阐述 Python 批量爬取网页的两种核心模式(分页批量爬取、多 URL 列表批量爬取),通过实战代码实现从 URL 队列构建、批量请求发送、批量数据解析到批量数据存储的全流程。内容涵盖队列管理、爬取频率控制、批量异常处理、去重机制等关键技术点,同时对比不同批量爬取模式的适用场景,最终实现高效、稳定的批量数据采集,为后续复杂批量爬取场景(如多站点、千万级 URL)奠定基础。
一、批量爬取核心原理与模式
1.1 批量爬取核心原理
批量爬取本质是「URL 队列 + 循环请求 + 批量处理」的组合,核心流程如下:
- 构建 URL 队列:将所有待爬取的 URL 整理为有序队列(列表 / 队列数据结构);
- 循环请求:遍历 URL 队列,逐个 / 批量发送 HTTP 请求,获取网页内容;
- 批量解析:对多个页面的响应内容进行统一解析,提取结构化数据;
- 批量存储:将解析后的多页数据统一写入文件 / 数据库,减少 IO 操作;
- 异常管控:对批量请求中的异常 URL 进行单独处理,不影响整体流程。
1.2 批量爬取的两种基础模式
| 模式 | 适用场景 | 核心特点 | 示例 |
|---|---|---|---|
| 分页批量爬取 | 目标数据分布在连续分页中(如榜单、列表页) | URL 有固定分页规律,可通过公式生成所有 URL | 电商榜单(第 1-10 页)、论坛帖子列表 |
| 多 URL 列表批量爬取 | 目标 URL 无规律,需从外部获取(如链接列表、CSV 文件) | URL 需提前整理为列表,遍历爬取 | 竞品网站多个详情页、新闻专题不同报道 |
1.3 批量爬取关键注意事项
- 频率控制:批量爬取易触发反爬,需严格控制请求间隔(建议 1-3 秒 / 次);
- 去重机制:避免重复爬取同一 URL,可通过集合 / 哈希表记录已爬取 URL;
- 断点续爬:大规模批量爬取时,记录已爬取进度,避免程序中断后重新爬取;
- 内存控制:批量解析 / 存储时,避免一次性加载所有数据,采用分批处理;
- 异常隔离:单个 URL 请求 / 解析失败,不终止整个批量爬取流程。
二、实战 1:分页批量爬取(当当网图书畅销榜)
2.1 目标分析
当当网图书畅销榜分页 URL 规律:
- 第 1 页:http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-1
- 第 2 页:http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-2
- ...
- 第 N 页:http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-N
核心规律:URL 末尾数字为页码,需爬取 1-20 页的图书数据。
2.2 完整代码实现
python
运行
import requests from bs4 import BeautifulSoup import csv import time import logging from fake_useragent import UserAgent from typing import List, Dict, Optional # ====================== 基础配置 ====================== # 日志配置 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(), logging.FileHandler("batch_crawl.log", encoding="utf-8")] ) logger = logging.getLogger(__name__) # 爬取配置 BASE_URL = "http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-" PAGE_START = 1 # 起始页码 PAGE_END = 20 # 结束页码 CSV_PATH = "dangdang_bestsellers.csv" HEADERS = { "User-Agent": UserAgent().random, "Accept-Language": "zh-CN,zh;q=0.9", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive" } REQUEST_TIMEOUT = 10 # 请求超时时间 CRAWL_INTERVAL = 1 # 爬取间隔(秒) BATCH_SIZE = 5 # 批量存储大小(每5页存储一次) # ====================== 核心函数 ====================== def build_page_urls(start: int, end: int) -> List[str]: """ 构建分页URL列表 :param start: 起始页码 :param end: 结束页码 :return: 分页URL列表 """ url_list = [] for page in range(start, end + 1): url = f"{BASE_URL}{page}" url_list.append(url) logger.info(f"URL队列构建完成,共生成{len(url_list)}个分页URL") return url_list def fetch_single_page(url: str) -> Optional[str]: """ 获取单页HTML内容(带异常处理) :param url: 目标URL :return: HTML文本,失败返回None """ try: response = requests.get( url=url, headers=HEADERS, timeout=REQUEST_TIMEOUT ) response.raise_for_status() response.encoding = response.apparent_encoding logger.info(f"成功获取页面:{url}") return response.text except requests.exceptions.RequestException as e: logger.error(f"获取页面失败:{url},异常:{str(e)[:100]}") return None def parse_single_page(html: str) -> List[Dict]: """ 解析单页图书数据 :param html: HTML文本 :return: 图书数据列表,解析失败返回空列表 """ if not html: return [] book_list = [] try: soup = BeautifulSoup(html, "html.parser") # 定位图书条目 book_items = soup.find_all("li", class_="bang_list clearfix bang_list_mode") for item in book_items: # 提取核心字段(容错处理) rank_tag = item.find("div", class_="list_num red") rank = rank_tag.text.strip() if rank_tag else "未知" name_tag = item.find("div", class_="name").find("a") book_name = name_tag.get("title", "").strip() if name_tag else "未知" price_tag = item.find("span", class_="price_n") price = price_tag.text.strip() if price_tag else "0.00" author_tag = item.find("div", class_="publisher_info").find("a") author = author_tag.text.strip() if author_tag else "未知" publisher_tag = item.find("div", class_="publisher_info").find_all("a")[-1] publisher = publisher_tag.text.strip() if publisher_tag and len(item.find("div", class_="publisher_info").find_all("a")) > 1 else "未知" # 构造数据字典 book_info = { "排名": rank, "书名": book_name, "价格": price, "作者": author, "出版社": publisher } book_list.append(book_info) logger.info(f"解析完成,提取{len(book_list)}条图书数据") except Exception as e: logger.error(f"解析页面失败,异常:{e}", exc_info=True) return book_list def batch_save_to_csv(data: List[Dict], file_path: str, is_append: bool = True): """ 批量保存数据到CSV :param data: 图书数据列表 :param file_path: CSV路径 :param is_append: 是否追加模式(True:追加,False:覆盖) """ if not data: logger.warning("无数据可批量存储") return # 定义表头 headers = ["排名", "书名", "价格", "作者", "出版社"] # 选择打开模式 mode = "a+" if is_append else "w" try: with open(file_path, mode, newline="", encoding="utf-8-sig") as f: writer = csv.DictWriter(f, fieldnames=headers) # 覆盖模式下写入表头 if not is_append: writer.writeheader() # 批量写入数据 writer.writerows(data) logger.info(f"批量存储完成,共写入{len(data)}条数据到{file_path}") except Exception as e: logger.error(f"批量存储失败,异常:{e}") def batch_crawl_pages(url_list: List[str]): """ 分页批量爬取主函数 :param url_list: 分页URL列表 """ # 初始化数据缓存(用于批量存储) batch_data = [] # 记录已爬取页码 crawled_count = 0 logger.info("开始分页批量爬取流程") for idx, url in enumerate(url_list): # 1. 获取单页数据 html = fetch_single_page(url) if not html: crawled_count += 1 time.sleep(CRAWL_INTERVAL) continue # 2. 解析单页数据 book_data = parse_single_page(html) if book_data: batch_data.extend(book_data) # 3. 批量存储(达到批量阈值或最后一页) if (idx + 1) % BATCH_SIZE == 0 or (idx + 1) == len(url_list): batch_save_to_csv( data=batch_data, file_path=CSV_PATH, is_append=(idx + 1) > BATCH_SIZE # 第一批次覆盖,后续追加 ) # 清空缓存 batch_data = [] # 4. 控制爬取频率 crawled_count += 1 time.sleep(CRAWL_INTERVAL) logger.info(f"分页批量爬取完成,共爬取{crawled_count}页,目标{len(url_list)}页") # ====================== 程序入口 ====================== if __name__ == "__main__": try: # 1. 构建URL队列 url_list = build_page_urls(PAGE_START, PAGE_END) # 2. 执行批量爬取 batch_crawl_pages(url_list) except KeyboardInterrupt: logger.info("用户手动终止批量爬取流程") except Exception as e: logger.critical(f"批量爬取崩溃,全局异常:{e}", exc_info=True)2.3 核心逻辑解析
(1)URL 队列构建(build_page_urls 函数)
- 核心作用:根据分页规律自动生成所有待爬取 URL,无需手动整理;
- 关键逻辑:通过循环遍历页码范围(1-20),拼接固定 URL 前缀和页码,生成完整 URL 列表;
- 优势:适配任意分页范围,只需调整
PAGE_START和PAGE_END即可扩展爬取范围。
(2)批量请求与解析
- 单页请求隔离:
fetch_single_page函数仅处理单页请求,异常时记录日志但不终止批量流程; - 解析容错:
parse_single_page函数对每个字段做if/else容错,避免单字段异常导致整页解析失败; - 批量缓存:通过
batch_data列表缓存多页数据,达到BATCH_SIZE(5 页)时批量存储,减少 IO 次数。
(3)批量存储策略
- 分批存储:每爬取 5 页数据批量写入 CSV,避免一次性存储大量数据导致内存占用过高;
- 模式切换:第一批次采用覆盖模式(写入表头),后续批次采用追加模式,保证 CSV 结构完整;
- 空数据处理:无数据时仅记录警告,不触发存储异常。
2.4 运行结果
(1)控制台输出示例
plaintext
2025-12-17 11:00:00,123 - INFO - URL队列构建完成,共生成20个分页URL 2025-12-17 11:00:00,124 - INFO - 开始分页批量爬取流程 2025-12-17 11:00:01,234 - INFO - 成功获取页面:http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-1 2025-12-17 11:00:01,345 - INFO - 解析完成,提取15条图书数据 2025-12-17 11:00:03,456 - INFO - 成功获取页面:http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-2 2025-12-17 11:00:03,567 - INFO - 解析完成,提取15条图书数据 ... 2025-12-17 11:00:50,123 - INFO - 批量存储完成,共写入75条数据到dangdang_bestsellers.csv 2025-12-17 11:01:20,456 - INFO - 分页批量爬取完成,共爬取20页,目标20页(2)CSV 文件输出示例
| 排名 | 书名 | 价格 | 作者 | 出版社 |
|---|---|---|---|---|
| 1 | 2025 新版 考研英语词汇红宝书 | ¥49.8 | 考研英语命题研究组 | 西北大学出版社 |
| 2 | 活着(精装版) | ¥28.5 | 余华 | 作家出版社 |
| 3 | 三体全集(全三册) | ¥89.0 | 刘慈欣 | 重庆出版社 |
三、实战 2:多 URL 列表批量爬取
3.1 场景说明
假设需爬取多个无规律的图书详情页 URL(如从外部 CSV 文件导入),URL 列表如下:
plaintext
http://product.dangdang.com/29368765.html http://product.dangdang.com/29180128.html http://product.dangdang.com/28976543.html ...3.2 完整代码实现
python
运行
import requests from bs4 import BeautifulSoup import csv import time import logging from fake_useragent import UserAgent from typing import List, Dict, Optional # ====================== 基础配置 ====================== logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(), logging.FileHandler("multi_url_crawl.log", encoding="utf-8")] ) logger = logging.getLogger(__name__) # 配置项 URL_LIST_PATH = "book_urls.csv" # 存储URL列表的CSV文件 DETAIL_CSV_PATH = "book_details.csv" HEADERS = { "User-Agent": UserAgent().random, "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive" } REQUEST_TIMEOUT = 10 CRAWL_INTERVAL = 2 DUPLICATE_CHECK = True # 开启URL去重 CRAWLED_URLS = set() # 存储已爬取URL,用于去重 # ====================== 核心函数 ====================== def load_url_list(file_path: str) -> List[str]: """ 从CSV文件加载URL列表(去重) :param file_path: URL列表文件路径 :return: 去重后的URL列表 """ url_list = [] try: with open(file_path, "r", encoding="utf-8") as f: reader = csv.reader(f) # 跳过表头(若有) next(reader, None) for row in reader: if row: url = row[0].strip() url_list.append(url) # 去重处理 if DUPLICATE_CHECK: original_count = len(url_list) url_list = list(set(url_list)) logger.info(f"URL去重完成,原始{original_count}个,去重后{len(url_list)}个") else: logger.info(f"加载URL列表完成,共{len(url_list)}个URL") return url_list except Exception as e: logger.error(f"加载URL列表失败,异常:{e}") return [] def fetch_detail_page(url: str) -> Optional[str]: """ 获取图书详情页HTML(带去重检查) :param url: 详情页URL :return: HTML文本 """ # 去重检查 if url in CRAWLED_URLS: logger.warning(f"URL已爬取,跳过:{url}") return None try: response = requests.get( url=url, headers=HEADERS, timeout=REQUEST_TIMEOUT ) response.raise_for_status() response.encoding = response.apparent_encoding # 标记为已爬取 CRAWLED_URLS.add(url) logger.info(f"成功获取详情页:{url}") return response.text except requests.exceptions.RequestException as e: logger.error(f"获取详情页失败:{url},异常:{str(e)[:100]}") return None def parse_detail_page(html: str, url: str) -> Dict: """ 解析图书详情页数据 :param html: HTML文本 :param url: 详情页URL(用于关联数据) :return: 图书详情字典 """ detail_info = { "URL": url, "书名": "未知", "作者": "未知", "定价": "未知", "ISBN": "未知", "出版时间": "未知" } if not html: return detail_info try: soup = BeautifulSoup(html, "html.parser") # 提取书名 title_tag = soup.find("h1", class_="name_info") if title_tag: detail_info["书名"] = title_tag.text.strip() # 提取作者 author_tag = soup.find("a", class_="writer") if author_tag: detail_info["作者"] = author_tag.text.strip() # 提取定价 price_tag = soup.find("span", class_="price") if price_tag: detail_info["定价"] = price_tag.text.strip() # 提取ISBN和出版时间(从详情表中获取) info_table = soup.find("table", class_="mess_table") if info_table: rows = info_table.find_all("tr") for row in rows: th = row.find("th") td = row.find("td") if th and td: th_text = th.text.strip() td_text = td.text.strip() if "ISBN" in th_text: detail_info["ISBN"] = td_text if "出版时间" in th_text: detail_info["出版时间"] = td_text logger.info(f"解析详情页完成:{detail_info['书名']}") except Exception as e: logger.error(f"解析详情页失败:{url},异常:{e}", exc_info=True) return detail_info def batch_crawl_multi_urls(url_list: List[str]): """ 多URL列表批量爬取主函数 :param url_list: 待爬取URL列表 """ # 初始化批量数据缓存 batch_data = [] BATCH_SIZE = 10 # 每10个URL批量存储一次 total_count = len(url_list) success_count = 0 logger.info(f"开始多URL批量爬取,共{total_count}个目标URL") for idx, url in enumerate(url_list): # 1. 获取详情页数据 html = fetch_detail_page(url) if not html: continue # 2. 解析详情页数据 detail_data = parse_detail_page(html, url) batch_data.append(detail_data) success_count += 1 # 3. 批量存储 if (idx + 1) % BATCH_SIZE == 0 or (idx + 1) == total_count: batch_save_to_csv(batch_data, DETAIL_CSV_PATH, is_append=(idx + 1) > BATCH_SIZE) batch_data = [] # 4. 控制频率 time.sleep(CRAWL_INTERVAL) logger.info(f"多URL批量爬取完成,成功爬取{success_count}个URL,失败{total_count - success_count}个") def batch_save_to_csv(data: List[Dict], file_path: str, is_append: bool = True): """复用批量存储函数(同实战1)""" if not data: logger.warning("无详情数据可批量存储") return headers = ["URL", "书名", "作者", "定价", "ISBN", "出版时间"] mode = "a+" if is_append else "w" try: with open(file_path, mode, newline="", encoding="utf-8-sig") as f: writer = csv.DictWriter(f, fieldnames=headers) if not is_append: writer.writeheader() writer.writerows(data) logger.info(f"详情数据批量存储完成,共写入{len(data)}条") except Exception as e: logger.error(f"详情数据存储失败,异常:{e}") # ====================== 程序入口 ====================== if __name__ == "__main__": try: # 1. 加载URL列表 url_list = load_url_list(URL_LIST_PATH) if not url_list: logger.error("URL列表为空,终止爬取") else: # 2. 执行多URL批量爬取 batch_crawl_multi_urls(url_list) except KeyboardInterrupt: logger.info("用户手动终止多URL批量爬取") except Exception as e: logger.critical(f"多URL爬取崩溃,全局异常:{e}", exc_info=True)3.3 核心逻辑解析
(1)URL 列表加载与去重
- 外部加载:从 CSV 文件读取 URL 列表,适配大规模、无规律 URL 的批量爬取场景;
- 去重机制:通过集合(
set)对 URL 列表去重,避免重复加载; - 已爬取标记:通过
CRAWLED_URLS集合记录已爬取 URL,即使程序中断重启,也可避免重复爬取(进阶可持久化到文件)。
(2)多 URL 爬取管控
- 异常隔离:单个 URL 爬取 / 解析失败,仅记录日志,不影响其他 URL 的爬取;
- 批量存储:每 10 个 URL 解析完成后批量存储,平衡内存占用和 IO 效率;
- 频率控制:设置 2 秒请求间隔,高于分页爬取(详情页反爬更严格)。
3.4 运行结果
(1)控制台输出示例
plaintext
2025-12-17 11:30:00,123 - INFO - URL去重完成,原始30个,去重后28个 2025-12-17 11:30:00,124 - INFO - 开始多URL批量爬取,共28个目标URL 2025-12-17 11:30:02,345 - INFO - 成功获取详情页:http://product.dangdang.com/29368765.html 2025-12-17 11:30:02,456 - INFO - 解析详情页完成:活着(精装版) 2025-12-17 11:30:05,567 - WARNING - URL已爬取,跳过:http://product.dangdang.com/29368765.html ... 2025-12-17 11:31:40,123 - INFO - 详情数据批量存储完成,共写入10条 2025-12-17 11:32:20,456 - INFO - 多URL批量爬取完成,成功爬取26个URL,失败2个(2)CSV 文件输出示例
| URL | 书名 | 作者 | 定价 | ISBN | 出版时间 |
|---|---|---|---|---|---|
| http://product.dangdang.com/29368765.html | 活着(精装版) | 余华 | ¥35.0 | 9787506365432 | 2020-05 |
| http://product.dangdang.com/29180128.html | 三体全集 | 刘慈欣 | ¥98.0 | 9787536692930 | 2018-10 |
四、批量爬取进阶优化
4.1 断点续爬实现
大规模批量爬取时,程序中断后需从上次中断位置继续爬取,核心思路是记录已爬取 URL 到文件:
python
运行
def save_crawled_urls(crawled_urls: set, file_path: str = "crawled_urls.txt"): """保存已爬取URL到文件""" try: with open(file_path, "w", encoding="utf-8") as f: f.write("\n".join(crawled_urls)) logger.info(f"已爬取URL保存完成,共{len(crawled_urls)}个") except Exception as e: logger.error(f"保存已爬取URL失败,异常:{e}") def load_crawled_urls(file_path: str = "crawled_urls.txt") -> set: """加载已爬取URL""" crawled_urls = set() try: if os.path.exists(file_path): with open(file_path, "r", encoding="utf-8") as f: for line in f: url = line.strip() if url: crawled_urls.add(url) logger.info(f"加载已爬取URL完成,共{len(crawled_urls)}个") except Exception as e: logger.error(f"加载已爬取URL失败,异常:{e}") return crawled_urls # 调用示例:程序启动时加载已爬取URL if __name__ == "__main__": CRAWLED_URLS = load_crawled_urls() # 爬取完成后保存 save_crawled_urls(CRAWLED_URLS)4.2 批量爬取性能优化对比
| 优化方向 | 未优化 | 优化后 | 性能提升 |
|---|---|---|---|
| 存储方式 | 单条写入 CSV | 批量写入 CSV | IO 效率提升 50%+ |
| 频率控制 | 固定 1 秒间隔 | 动态间隔(成功 1 秒,失败 3 秒) | 反爬规避率提升 30%+ |
| 数据解析 | 单次解析单字段 | 批量解析同类字段 | 解析效率提升 20%+ |
| 去重机制 | 列表遍历去重 | 集合哈希去重 | 去重效率提升 90%+ |
4.3 批量爬取反爬应对策略
- 代理 IP 池:批量爬取时切换代理 IP,避免单 IP 被封禁(可使用
requests-proxies库); - Cookie 池:轮换不同 Cookie,模拟不同用户访问;
- 随机 User-Agent:每次请求使用不同 User-Agent,避免被识别为固定爬虫;
- 请求头丰富化:添加
Referer、Accept等字段,模拟真实浏览器请求; - 分布式爬取:超大规模 URL 可采用多进程 / 多线程(如
multiprocessing库),但需控制并发数。
五、总结与扩展
5.1 核心总结
本文通过两种基础批量爬取模式的实战,完整覆盖了批量爬取的核心流程:
- 分页批量爬取:适用于 URL 有规律的场景,核心是自动生成 URL 队列,重点控制分页节奏;
- 多 URL 列表批量爬取:适用于 URL 无规律的场景,核心是 URL 加载与去重,重点隔离异常 URL;
- 通用原则:批量爬取需遵循 “频率控制、批量存储、异常隔离、去重防重” 四大原则,保证稳定性。
5.2 扩展方向
- 多线程批量爬取:使用
threading库实现多线程并发爬取,提升效率(需控制并发数,避免反爬); - 异步批量爬取:使用
aiohttp库实现异步请求,大幅提升批量爬取速度; - 多站点批量爬取:适配不同网站的解析规则,实现跨站点批量数据采集;
- 批量数据清洗:爬取完成后对批量数据进行去重、格式统一、缺失值填充等清洗操作;
- 可视化监控:通过
matplotlib绘制批量爬取进度、成功率等指标,实时监控爬取状态。
批量爬取是爬虫从 “入门” 到 “实战” 的关键跨越,掌握批量爬取的核心逻辑后,可适配绝大多数实际业务场景的数据采集需求。在实际开发中,需根据目标网站的反爬强度、数据规模灵活调整批量策略,平衡爬取效率与稳定性,最终实现高效、合规的批量数据采集。