news 2026/4/16 0:22:48

Python网络爬虫实战:使用aiohttp与parsel异步爬取小说网站全文内容

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python网络爬虫实战:使用aiohttp与parsel异步爬取小说网站全文内容

前言:小说爬虫的技术挑战与解决方案

在当今数字化阅读时代,小说网站成为了广大读者获取文学作品的重要渠道。对于文学研究者、数据分析师或普通读者来说,获取完整的小说文本数据具有重要价值。然而,小说网站通常有反爬虫机制、分页结构复杂、动态加载内容等技术挑战。本文将详细介绍如何使用Python最新异步技术,构建一个高效、稳定的小说全文爬虫。

技术选型与优势分析

核心库选择

  1. aiohttp:异步HTTP客户端/服务器框架,比传统requests快3-5倍

  2. parsel:基于lxml的解析库,支持XPath和CSS选择器

  3. asyncio:Python原生异步I/O框架

  4. aiofiles:异步文件操作库

  5. fake-useragent:随机User-Agent生成,避免被识别为爬虫

技术优势

  • 异步并发:可同时处理数十个请求,大幅提升爬取效率

  • 智能延迟:自适应请求间隔,避免对目标服务器造成压力

  • 错误恢复:完善的异常处理和重试机制

  • 增量爬取:支持断点续传,避免重复工作

完整爬虫实现代码

python

""" 小说网站全文爬虫 - 异步高性能版本 作者:Python爬虫专家 创建日期:2024年 """ import asyncio import aiohttp import aiofiles import logging from urllib.parse import urljoin, urlparse from typing import List, Dict, Optional, Set import re import json import time from pathlib import Path from dataclasses import dataclass from contextlib import asynccontextmanager import hashlib # 第三方库 from parsel import Selector from fake_useragent import UserAgent from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('novel_crawler.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class NovelConfig: """小说爬虫配置类""" base_url: str # 小说目录页URL title_xpath: str # 小说标题XPath chapter_links_xpath: str # 章节链接XPath content_xpath: str # 正文内容XPath next_page_xpath: str = None # 下一页按钮XPath(可选) encoding: str = 'utf-8' # 网站编码 max_concurrency: int = 10 # 最大并发数 request_delay: float = 0.5 # 请求延迟(秒) retry_times: int = 3 # 重试次数 timeout: int = 30 # 请求超时时间 class AsyncRateLimiter: """异步速率限制器""" def __init__(self, rate_limit: float = 1.0): self.rate_limit = rate_limit self.last_request = 0 self.lock = asyncio.Lock() async def wait(self): async with self.lock: elapsed = time.time() - self.last_request if elapsed < self.rate_limit: await asyncio.sleep(self.rate_limit - elapsed) self.last_request = time.time() class NovelCrawler: """小说爬虫主类""" def __init__(self, config: NovelConfig, save_dir: str = "novels"): self.config = config self.save_dir = Path(save_dir) self.save_dir.mkdir(exist_ok=True) # 初始化组件 self.rate_limiter = AsyncRateLimiter(config.request_delay) self.ua = UserAgent() self.session = None self.visited_urls: Set[str] = set() self.failed_urls: Set[str] = set() # 统计信息 self.stats = { 'total_chapters': 0, 'successful_chapters': 0, 'failed_chapters': 0, 'total_bytes': 0, 'start_time': None, 'end_time': None } @asynccontextmanager async def create_session(self): """创建aiohttp会话上下文管理器""" connector = aiohttp.TCPConnector( limit=self.config.max_concurrency, ssl=False ) timeout = aiohttp.ClientTimeout(total=self.config.timeout) session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers={ 'User-Agent': self.ua.random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } ) try: self.session = session yield session finally: await session.close() self.session = None @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError)) ) async def fetch_url(self, session: aiohttp.ClientSession, url: str) -> Optional[str]: """获取URL内容,包含重试机制""" await self.rate_limiter.wait() try: async with session.get(url) as response: response.raise_for_status() content = await response.read() return content.decode(self.config.encoding, errors='ignore') except Exception as e: logger.error(f"获取URL失败: {url}, 错误: {e}") self.failed_urls.add(url) raise def extract_chapter_links(self, html: str, base_url: str) -> List[Dict[str, str]]: """从目录页提取章节链接和标题""" selector = Selector(text=html) chapters = [] # 提取小说标题 novel_title = selector.xpath(self.config.title_xpath).get() novel_title = novel_title.strip() if novel_title else "未知小说" # 提取章节链接 chapter_elements = selector.xpath(self.config.chapter_links_xpath) for elem in chapter_elements: title = elem.xpath('text()').get() href = elem.xpath('@href').get() if title and href: full_url = urljoin(base_url, href) chapters.append({ 'title': title.strip(), 'url': full_url, 'novel_title': novel_title }) # 处理分页(如果有下一页) if self.config.next_page_xpath: next_page = selector.xpath(self.config.next_page_xpath).get() if next_page: next_url = urljoin(base_url, next_page) logger.info(f"发现下一页: {next_url}") # 注意:这里需要递归处理,实际实现中可加入队列 return chapters def extract_chapter_content(self, html: str) -> str: """从章节页提取正文内容""" selector = Selector(text=html) # 使用XPath提取内容 content_elements = selector.xpath(self.config.content_xpath) if not content_elements: # 尝试备用选择器 content_elements = selector.xpath('//div[contains(@class, "content")]//text()') # 清理文本 paragraphs = [] for elem in content_elements: text = ''.join(elem.xpath('.//text()').getall()) text = self.clean_text(text) if text: paragraphs.append(text) return '\n\n'.join(paragraphs) @staticmethod def clean_text(text: str) -> str: """清理文本,移除多余空白和特殊字符""" # 移除HTML标签 text = re.sub(r'<[^>]+>', '', text) # 替换多个空白字符为单个空格 text = re.sub(r'\s+', ' ', text) # 移除控制字符 text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text) # 移除广告文本(根据实际情况调整) ad_patterns = [ r'请收藏.*?本站!', r'.*?最新最快.*?', r'记住网址.*?', r'\(本章完\)', ] for pattern in ad_patterns: text = re.sub(pattern, '', text, flags=re.IGNORECASE) return text.strip() async def crawl_chapter(self, session: aiohttp.ClientSession, chapter: Dict[str, str]) -> Optional[Dict[str, str]]: """爬取单个章节""" if chapter['url'] in self.visited_urls: return None try: logger.info(f"正在爬取: {chapter['title']}") html = await self.fetch_url(session, chapter['url']) if not html: return None content = self.extract_chapter_content(html) if not content: logger.warning(f"章节内容为空: {chapter['title']}") return None self.visited_urls.add(chapter['url']) self.stats['successful_chapters'] += 1 self.stats['total_bytes'] += len(content.encode('utf-8')) return { 'title': chapter['title'], 'content': content, 'url': chapter['url'], 'novel_title': chapter['novel_title'] } except Exception as e: logger.error(f"爬取章节失败 {chapter['title']}: {e}") self.stats['failed_chapters'] += 1 return None async def save_chapter(self, chapter_data: Dict[str, str]): """保存章节到文件""" if not chapter_data: return novel_title = chapter_data['novel_title'] # 创建安全文件名 safe_title = re.sub(r'[<>:"/\\|?*]', '_', novel_title) novel_dir = self.save_dir / safe_title novel_dir.mkdir(exist_ok=True) # 创建章节文件名 chapter_title = chapter_data['title'] safe_chapter = re.sub(r'[<>:"/\\|?*]', '_', chapter_title) filename = novel_dir / f"{safe_chapter}.txt" # 异步写入文件 async with aiofiles.open(filename, 'w', encoding='utf-8') as f: await f.write(f"{chapter_title}\n\n") await f.write(chapter_data['content']) await f.write(f"\n\n原文链接: {chapter_data['url']}\n") logger.info(f"已保存: {filename}") async def save_metadata(self, novel_title: str, chapters: List[Dict[str, str]]): """保存元数据信息""" safe_title = re.sub(r'[<>:"/\\|?*]', '_', novel_title) metadata_file = self.save_dir / safe_title / "metadata.json" metadata = { 'novel_title': novel_title, 'total_chapters': len(chapters), 'crawled_time': time.strftime('%Y-%m-%d %H:%M:%S'), 'chapters': [ { 'title': ch['title'], 'url': ch['url'], 'filename': f"{re.sub(r'[<>:\"/\\\\|?*]', '_', ch['title'])}.txt" } for ch in chapters ], 'stats': self.stats } async with aiofiles.open(metadata_file, 'w', encoding='utf-8') as f: await f.write(json.dumps(metadata, ensure_ascii=False, indent=2)) async def crawl_novel(self, start_url: str = None): """爬取整本小说""" start_url = start_url or self.config.base_url self.stats['start_time'] = time.time() async with self.create_session() as session: try: # 1. 获取目录页 logger.info(f"开始爬取小说,起始URL: {start_url}") index_html = await self.fetch_url(session, start_url) if not index_html: logger.error("无法获取目录页") return # 2. 提取所有章节链接 chapters = self.extract_chapter_links(index_html, start_url) self.stats['total_chapters'] = len(chapters) if not chapters: logger.warning("未找到章节链接,请检查XPath配置") return logger.info(f"发现 {len(chapters)} 个章节") # 3. 并发爬取所有章节 tasks = [] for chapter in chapters: task = self.crawl_chapter(session, chapter) tasks.append(task) # 分批处理,避免内存溢出 batch_size = self.config.max_concurrency for i in range(0, len(tasks), batch_size): batch = tasks[i:i + batch_size] results = await asyncio.gather(*batch, return_exceptions=True) # 保存每个章节 for result in results: if isinstance(result, Exception): logger.error(f"章节爬取异常: {result}") continue await self.save_chapter(result) # 4. 保存元数据 await self.save_metadata(chapters[0]['novel_title'], chapters) self.stats['end_time'] = time.time() elapsed = self.stats['end_time'] - self.stats['start_time'] logger.info(f"爬取完成!统计信息:") logger.info(f"总章节数: {self.stats['total_chapters']}") logger.info(f"成功: {self.stats['successful_chapters']}") logger.info(f"失败: {self.stats['failed_chapters']}") logger.info(f"总数据量: {self.stats['total_bytes'] / 1024:.2f} KB") logger.info(f"耗时: {elapsed:.2f} 秒") logger.info(f"平均速度: {self.stats['total_bytes'] / elapsed / 1024:.2f} KB/秒") # 5. 保存失败记录 if self.failed_urls: fail_file = self.save_dir / "failed_urls.txt" async with aiofiles.open(fail_file, 'w', encoding='utf-8') as f: for url in self.failed_urls: await f.write(f"{url}\n") logger.warning(f"有 {len(self.failed_urls)} 个URL失败,已保存到 {fail_file}") except Exception as e: logger.error(f"爬取过程发生错误: {e}") raise def run(self): """运行爬虫""" asyncio.run(self.crawl_novel()) # 配置示例:假设的小说网站 CONFIG_EXAMPLES = { 'qidian': NovelConfig( base_url='https://www.qidian.com/book/123456789/', # 示例URL title_xpath='//div[@class="book-info"]/h1/text()', chapter_links_xpath='//div[@class="volume-wrap"]//ul[@class="cf"]/li/a', content_xpath='//div[@class="read-content"]//p/text()', next_page_xpath='//a[@class="next"]/@href', encoding='utf-8', max_concurrency=5, request_delay=1.0 ), 'biquge': NovelConfig( base_url='https://www.biquge.com.cn/book/123456/', # 示例URL title_xpath='//div[@id="info"]/h1/text()', chapter_links_xpath='//div[@id="list"]//dd/a', content_xpath='//div[@id="content"]/text()', encoding='gbk', # 某些网站使用GBK编码 max_concurrency=8, request_delay=0.8 ) } def main(): """主函数""" # 选择配置 config = CONFIG_EXAMPLES['biquge'] # 创建爬虫实例 crawler = NovelCrawler( config=config, save_dir="downloaded_novels" ) # 运行爬虫 try: crawler.run() logger.info("爬虫执行完毕!") except KeyboardInterrupt: logger.info("用户中断爬虫") except Exception as e: logger.error(f"爬虫执行失败: {e}") if __name__ == "__main__": main()

高级功能扩展

1. 动态内容处理(JavaScript渲染)

python

from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC class DynamicNovelCrawler(NovelCrawler): """处理JavaScript动态加载内容的爬虫""" async def fetch_dynamic_page(self, url: str) -> str: """使用Selenium获取动态页面""" chrome_options = Options() chrome_options.add_argument('--headless') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument(f'user-agent={self.ua.random}') driver = webdriver.Chrome(options=chrome_options) try: driver.get(url) # 等待内容加载 WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.XPATH, self.config.content_xpath)) ) return driver.page_source finally: driver.quit()

2. 代理IP支持

python

class ProxyNovelCrawler(NovelCrawler): """支持代理IP的爬虫""" def __init__(self, config: NovelConfig, proxy_pool: List[str] = None, **kwargs): super().__init__(config, **kwargs) self.proxy_pool = proxy_pool or [] self.current_proxy_idx = 0 def get_next_proxy(self) -> Optional[str]: """获取下一个代理""" if not self.proxy_pool: return None proxy = self.proxy_pool[self.current_proxy_idx] self.current_proxy_idx = (self.current_proxy_idx + 1) % len(self.proxy_pool) return proxy async def fetch_url_with_proxy(self, session: aiohttp.ClientSession, url: str) -> Optional[str]: """使用代理获取URL""" proxy = self.get_next_proxy() try: async with session.get(url, proxy=proxy) as response: response.raise_for_status() return await response.text(encoding=self.config.encoding) except aiohttp.ClientError as e: logger.warning(f"代理 {proxy} 失败: {e}") # 尝试不使用代理 return await self.fetch_url(session, url)

3. 数据库存储支持

python

import sqlite3 import aiosqlite class DBNovelCrawler(NovelCrawler): """支持数据库存储的爬虫""" async def init_database(self): """初始化数据库""" self.db = await aiosqlite.connect('novels.db') await self.db.execute(''' CREATE TABLE IF NOT EXISTS novels ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, author TEXT, url TEXT UNIQUE, content TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') await self.db.commit() async def save_to_database(self, chapter_data: Dict[str, str]): """保存到数据库""" if not chapter_data: return await self.db.execute(''' INSERT OR REPLACE INTO novels (title, url, content) VALUES (?, ?, ?) ''', (chapter_data['title'], chapter_data['url'], chapter_data['content'])) await self.db.commit()

反爬虫策略应对

1. 请求头轮换

python

class RotatingHeadersSession: """轮换请求头的会话管理""" def __init__(self): self.headers_list = [ {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}, {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15'}, {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36'}, ] self.current = 0 def get_headers(self): headers = self.headers_list[self.current] self.current = (self.current + 1) % len(self.headers_list) return headers

2. 验证码识别(基础示例)

python

import pytesseract from PIL import Image async def solve_captcha(session: aiohttp.ClientSession, captcha_url: str) -> str: """简单验证码识别""" async with session.get(captcha_url) as response: image_data = await response.read() # 保存图片 with open('captcha.png', 'wb') as f: f.write(image_data) # 使用pytesseract识别 image = Image.open('captcha.png') text = pytesseract.image_to_string(image, config='--psm 8') return text.strip()

部署与调度

使用Celery进行分布式爬取

python

from celery import Celery app = Celery('novel_crawler', broker='redis://localhost:6379/0') @app.task def crawl_novel_task(config_dict: dict, start_url: str): """Celery任务:爬取小说""" config = NovelConfig(**config_dict) crawler = NovelCrawler(config) crawler.run(start_url)

Docker部署配置

dockerfile

# Dockerfile FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["python", "main.py"]

使用指南

1. 安装依赖

bash

pip install aiohttp parsel fake-useragent tenacity aiofiles pip install selenium pytesseract pillow # 可选:动态页面和验证码支持

2. 配置爬虫

python

# 自定义配置 custom_config = NovelConfig( base_url='https://www.example.com/novel/123', title_xpath='//h1[@class="book-title"]/text()', chapter_links_xpath='//div[@class="chapter-list"]//a/@href', content_xpath='//div[@class="chapter-content"]//text()', encoding='utf-8', max_concurrency=10, request_delay=1.0 )

3. 运行爬虫

python

crawler = NovelCrawler(custom_config, save_dir="my_novels") crawler.run()

注意事项与法律声明

重要提醒:

  1. 遵守robots.txt:爬取前检查目标网站的robots.txt文件

  2. 尊重版权:仅爬取用于个人学习研究,不得用于商业用途

  3. 控制频率:避免对目标服务器造成过大压力

  4. 用户协议:遵守网站的用户协议和服务条款

  5. 隐私保护:不爬取用户个人信息

性能优化建议:

  1. 根据网络状况调整并发数

  2. 使用连接池复用HTTP连接

  3. 实现缓存机制避免重复请求

  4. 使用布隆过滤器检查URL重复

结语

本文详细介绍了如何使用Python最新异步技术构建高性能小说爬虫。通过aiohttp实现异步并发,parsel进行高效解析,结合多种反爬虫策略,构建了一个健壮、高效的爬虫系统。在实际应用中,请根据具体网站结构调整XPath,并始终遵守相关法律法规和网站规定。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/12 18:07:46

SAP 2511 版本 Business Configuration 深度解读:向导升级、对象瘦身与可治理配置落地实践

Business Configuration 的业务意义:把配置当成一等公民来治理 在很多企业里,配置数据的地位一直很微妙:它不像主数据那样有清晰的主数据治理流程,也不像交易数据那样天然带流程与审计,但它又会直接改变系统行为——定价规则、税码映射、消息类型路由、接口字段开关、国家…

作者头像 李华
网站建设 2026/4/12 16:49:59

上市公司关键核心技术专利数据(2007-2024)

1824上市公司关键核心技术专利数据&#xff08;2007-2024&#xff09;数据简介企业开展关键核心技术创新面临诸多挑战&#xff0c;主要体现在四个方面&#xff1a;第一&#xff0c;短期与长期的抉择。虽然关键核心技术具有长期价值&#xff0c;但研发周期长、难度大&#xff0c…

作者头像 李华
网站建设 2026/4/14 2:00:58

基于HAL库的STM32H7 UART接收流程完整示例

基于HAL库的STM32H7 UART接收机制深度解析&#xff1a;从启动到回调的完整闭环在嵌入式开发中&#xff0c;串口通信是连接外界最直接、最常用的桥梁。无论是调试信息输出、传感器数据采集&#xff0c;还是工业协议交互&#xff08;如Modbus、NMEA0183&#xff09;&#xff0c;U…

作者头像 李华
网站建设 2026/3/31 3:54:20

Hunyuan-MT-7B-WEBUI深度评测:7B参数如何做到翻译SOTA?

Hunyuan-MT-7B-WEBUI 深度解析&#xff1a;如何用 70 亿参数做到翻译 SOTA&#xff1f; 在企业出海加速、内容全球化需求激增的今天&#xff0c;高质量机器翻译早已不再是“锦上添花”&#xff0c;而是实实在在的生产力刚需。但现实却常常令人沮丧——大多数性能强劲的翻译模型…

作者头像 李华
网站建设 2026/4/15 18:09:24

DVWA学习笔记汉化:借助Hunyuan-MT-7B理解网络安全术语

DVWA学习笔记汉化&#xff1a;借助Hunyuan-MT-7B理解网络安全术语 在当今全球化的技术生态中&#xff0c;一个常见的困境摆在许多中文开发者面前&#xff1a;想要深入学习像DVWA&#xff08;Damn Vulnerable Web Application&#xff09;这样的开源安全项目&#xff0c;却卡在…

作者头像 李华
网站建设 2026/4/13 7:01:19

传统VS现代:Office XML处理效率大比拼

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 构建一个效率对比工具&#xff0c;分别实现传统方法和AI辅助方法处理Office 2007 XML文件。传统方法使用标准XML解析库&#xff0c;AI方法集成Kimi-K2模型。工具应能记录处理时间、…

作者头像 李华