引言:餐厅点评数据采集的重要性与挑战
在数字化餐饮时代,餐厅点评数据已成为消费者决策、商家运营和行业分析的关键信息资源。这些数据包含了用户评分、评论内容、人均消费、推荐菜品等多维度信息,对餐饮行业的市场研究、竞争分析和用户体验优化具有重要价值。然而,随着各大平台反爬机制的日益完善,传统爬虫技术面临着前所未有的挑战。
本文将介绍如何使用最新Python爬虫技术(Playwright + Asyncio + 智能解析)构建一个高效、稳定的餐厅点评数据采集系统,并提供完整的实战代码和最佳实践。
目录
引言:餐厅点评数据采集的重要性与挑战
技术选型:为什么选择这些最新技术?
1. Playwright vs Selenium vs Requests
2. 核心工具栈
实战项目:多平台餐厅点评数据采集系统
系统架构设计
完整代码实现
高级功能与优化策略
1. 智能代理池管理
2. 分布式爬虫架构
3. 数据质量监控
法律与伦理考量
合规爬虫实践
性能优化技巧
技术选型:为什么选择这些最新技术?
1. Playwright vs Selenium vs Requests
Playwright:微软开发,支持Chromium、Firefox和WebKit,内置自动等待机制,API设计更现代化
异步支持:原生支持异步操作,性能远超传统同步爬虫
防检测能力:更好的模拟真实浏览器行为,规避反爬检测
2. 核心工具栈
爬虫框架:Playwright + Asyncio
数据解析:BeautifulSoup4 / Parsel
数据存储:SQLAlchemy + PostgreSQL / SQLite
代理管理:智能代理池轮换
验证码处理:OCR识别 + 人工打码降级方案
实战项目:多平台餐厅点评数据采集系统
系统架构设计
python
""" 餐厅点评数据采集系统架构 ├── 爬虫调度中心 (Scheduler) ├── 网页采集模块 (Fetcher) ├── 数据解析模块 (Parser) ├── 数据存储模块 (Storage) ├── 反爬对抗模块 (Anti-Anti-Spider) └── 监控报警模块 (Monitor) """
完整代码实现
python
""" 餐厅点评数据采集爬虫 - 基于Playwright的异步高效解决方案 作者:爬虫技术专家 日期:2024年 版本:2.0 """ import asyncio import json import random import time from dataclasses import dataclass from typing import List, Dict, Optional, Any from urllib.parse import urljoin, urlencode from datetime import datetime import asyncpg import pandas as pd from playwright.async_api import async_playwright, Browser, Page, Response from bs4 import BeautifulSoup from fake_useragent import UserAgent from pydantic import BaseModel, Field import aiofiles import aiohttp from tenacity import retry, stop_after_attempt, wait_exponential from rich.progress import Progress, SpinnerColumn, TextColumn from rich.console import Console from rich.table import Table import logging # 配置日志系统 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('restaurant_spider.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 数据模型定义 class RestaurantReview(BaseModel): """餐厅点评数据模型""" platform: str = Field(description="平台名称") restaurant_id: str = Field(description="餐厅ID") restaurant_name: str = Field(description="餐厅名称") average_rating: float = Field(description="平均评分") review_count: int = Field(description="点评数量") price_range: Optional[str] = Field(description="价格区间") address: Optional[str] = Field(description="地址") phone: Optional[str] = Field(description="电话") review_content: Optional[str] = Field(description="点评内容") reviewer_name: Optional[str] = Field(description="点评者名称") reviewer_rating: Optional[float] = Field(description="点评者评分") review_time: Optional[datetime] = Field(description="点评时间") useful_count: Optional[int] = Field(description="有用数") images: Optional[List[str]] = Field(description="点评图片") collected_at: datetime = Field(default_factory=datetime.now) class ReviewSpiderConfig(BaseModel): """爬虫配置模型""" headless: bool = Field(default=True, description="无头模式") proxy: Optional[str] = Field(default=None, description="代理服务器") timeout: int = Field(default=30000, description="超时时间(ms)") max_concurrent: int = Field(default=3, description="最大并发数") delay_range: tuple = Field(default=(1, 3), description="延迟范围(秒)") max_retries: int = Field(default=3, description="最大重试次数") class AsyncRestaurantSpider: """异步餐厅点评爬虫核心类""" def __init__(self, config: ReviewSpiderConfig): self.config = config self.playwright = None self.browser = None self.context = None self.semaphore = asyncio.Semaphore(config.max_concurrent) self.ua = UserAgent() self.console = Console() async def __aenter__(self): await self.init_browser() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def init_browser(self): """初始化Playwright浏览器""" self.playwright = await async_playwright().start() launch_options = { 'headless': self.config.headless, 'timeout': self.config.timeout, 'args': [ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', f'--user-agent={self.ua.random}', ] } if self.config.proxy: launch_options['proxy'] = {'server': self.config.proxy} self.browser = await self.playwright.chromium.launch(**launch_options) # 设置上下文,模拟真实用户 self.context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, locale='zh-CN', timezone_id='Asia/Shanghai', user_agent=self.ua.random ) # 添加随机鼠标移动脚本 await self.context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); window.chrome = { runtime: {}, loadTimes: function(){}, csi: function(){}, app: {} }; """) logger.info("浏览器初始化完成") @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) async def fetch_page(self, url: str, page_num: int = None) -> Optional[Page]: """获取页面内容""" async with self.semaphore: try: # 随机延迟,模拟人类行为 await asyncio.sleep(random.uniform(*self.config.delay_range)) page = await self.context.new_page() # 随机设置视口大小 await page.set_viewport_size({ 'width': random.randint(1200, 1920), 'height': random.randint(800, 1080) }) # 监听请求和响应 page.on('request', lambda req: logger.debug(f"请求: {req.url}")) page.on('response', lambda res: logger.debug(f"响应: {res.status} {res.url}")) # 添加随机鼠标移动 await self.simulate_human_behavior(page) logger.info(f"正在访问: {url}") response = await page.goto(url, timeout=self.config.timeout, wait_until='networkidle') if not response or response.status != 200: logger.warning(f"页面加载失败: {url}, 状态码: {getattr(response, 'status', '未知')}") await page.close() return None # 检查是否被反爬 if await self.check_anti_spider(page): logger.warning("检测到反爬机制,正在尝试绕过...") await self.handle_anti_spider(page) # 滚动页面加载更多内容 await self.scroll_page(page) return page except Exception as e: logger.error(f"获取页面失败: {url}, 错误: {str(e)}") if 'page' in locals(): await page.close() raise async def simulate_human_behavior(self, page: Page): """模拟人类浏览行为""" # 随机移动鼠标 for _ in range(random.randint(3, 7)): x = random.randint(100, 1800) y = random.randint(100, 1000) await page.mouse.move(x, y) await asyncio.sleep(random.uniform(0.1, 0.5)) # 随机滚动 scroll_steps = random.randint(2, 5) for _ in range(scroll_steps): scroll_amount = random.randint(200, 800) await page.evaluate(f"window.scrollBy(0, {scroll_amount})") await asyncio.sleep(random.uniform(0.2, 1)) async def scroll_page(self, page: Page): """滚动页面以加载动态内容""" scroll_height = await page.evaluate("document.body.scrollHeight") current_position = 0 scroll_step = random.randint(300, 600) while current_position < scroll_height: await page.evaluate(f"window.scrollTo(0, {current_position})") await asyncio.sleep(random.uniform(0.5, 1.5)) current_position += scroll_step # 随机停留 if random.random() > 0.7: await asyncio.sleep(random.uniform(1, 3)) async def check_anti_spider(self, page: Page) -> bool: """检查是否触发反爬机制""" # 检查常见反爬特征 checks = [ page.locator("text=验证码"), page.locator("text=访问过于频繁"), page.locator("text=请完成验证"), page.locator("text=Security Check"), page.locator(".captcha"), page.locator("#challenge-form") ] for check in checks: if await check.count() > 0: return True # 检查页面内容是否异常 content = await page.content() if len(content) < 1000 or "robot" in content.lower(): return True return False async def handle_anti_spider(self, page: Page): """处理反爬机制""" # 尝试刷新页面 await page.reload(wait_until='networkidle') await asyncio.sleep(random.uniform(3, 7)) # 如果还有验证码,尝试简单处理 if await page.locator(".captcha").count() > 0: logger.warning("遇到验证码,尝试人工处理或更换代理") # 这里可以集成验证码识别服务 # 或者暂停爬虫等待人工干预 async def parse_dianping_restaurant(self, page: Page) -> List[RestaurantReview]: """解析大众点评餐厅页面""" reviews = [] try: # 获取餐厅基本信息 content = await page.content() soup = BeautifulSoup(content, 'html.parser') # 餐厅名称 name_elem = soup.select_one('.shop-name') restaurant_name = name_elem.get_text(strip=True) if name_elem else "未知" # 餐厅评分 rating_elem = soup.select_one('.brief-info .num') average_rating = float(rating_elem.get_text(strip=True)) if rating_elem else 0.0 # 点评数量 count_elem = soup.select_one('.review-amount .count') if count_elem: review_count = int(count_elem.get_text(strip=True).replace(',', '')) else: review_count = 0 # 解析点评列表 review_items = soup.select('.reviews-items .main-review') for item in review_items: review = RestaurantReview( platform="dianping", restaurant_id=self.extract_restaurant_id(page.url), restaurant_name=restaurant_name, average_rating=average_rating, review_count=review_count, review_content=item.select_one('.review-words').get_text(strip=True) if item.select_one('.review-words') else None, reviewer_name=item.select_one('.name').get_text(strip=True) if item.select_one('.name') else None, reviewer_rating=float(item.select_one('.score').get_text(strip=True)) if item.select_one('.score') else None, review_time=datetime.strptime(item.select_one('.time').get_text(strip=True), '%Y-%m-%d') if item.select_one('.time') else None, useful_count=int(item.select_one('.useful-count').get_text(strip=True)) if item.select_one('.useful-count') else None ) reviews.append(review) except Exception as e: logger.error(f"解析大众点评页面失败: {str(e)}") return reviews async def parse_meituan_restaurant(self, page: Page) -> List[RestaurantReview]: """解析美团餐厅页面""" reviews = [] try: # 美团页面通常有更多的动态加载内容 # 需要等待元素加载 await page.wait_for_selector('.review-list', timeout=10000) content = await page.content() soup = BeautifulSoup(content, 'html.parser') # 餐厅信息解析 # 这里根据实际页面结构编写解析逻辑 # 由于篇幅限制,简化处理 except Exception as e: logger.error(f"解析美团页面失败: {str(e)}") return reviews def extract_restaurant_id(self, url: str) -> str: """从URL中提取餐厅ID""" import re patterns = [ r'shop/(\d+)', r'poi/(\d+)', r'id=(\d+)', r'item/(\d+)' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return "unknown" async def save_to_database(self, reviews: List[RestaurantReview], db_url: str): """保存数据到数据库""" conn = await asyncpg.connect(db_url) try: async with conn.transaction(): for review in reviews: await conn.execute(''' INSERT INTO restaurant_reviews (platform, restaurant_id, restaurant_name, average_rating, review_count, review_content, reviewer_name, reviewer_rating, review_time, useful_count, collected_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (platform, restaurant_id, reviewer_name, review_time) DO UPDATE SET review_content = EXCLUDED.review_content, useful_count = EXCLUDED.useful_count, collected_at = EXCLUDED.collected_at ''', review.platform, review.restaurant_id, review.restaurant_name, review.average_rating, review.review_count, review.review_content, review.reviewer_name, review.reviewer_rating, review.review_time, review.useful_count, review.collected_at) logger.info(f"成功保存 {len(reviews)} 条点评数据") except Exception as e: logger.error(f"数据库保存失败: {str(e)}") finally: await conn.close() async def save_to_csv(self, reviews: List[RestaurantReview], filename: str): """保存数据到CSV文件""" df = pd.DataFrame([review.dict() for review in reviews]) df.to_csv(filename, index=False, encoding='utf-8-sig') logger.info(f"数据已保存到 {filename}") async def crawl_restaurant_list(self, search_urls: List[str], max_pages: int = 10): """爬取餐厅列表""" all_reviews = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), console=self.console ) as progress: task = progress.add_task("[cyan]爬取餐厅数据...", total=len(search_urls)) for url in search_urls: try: for page_num in range(1, max_pages + 1): paginated_url = f"{url}&page={page_num}" if "?" in url else f"{url}?page={page_num}" page = await self.fetch_page(paginated_url, page_num) if not page: break # 根据URL判断平台并解析 if 'dianping' in url: reviews = await self.parse_dianping_restaurant(page) elif 'meituan' in url: reviews = await self.parse_meituan_restaurant(page) else: reviews = [] all_reviews.extend(reviews) logger.info(f"第 {page_num} 页爬取完成,获取 {len(reviews)} 条点评") await page.close() # 随机延迟,避免请求过快 await asyncio.sleep(random.uniform(2, 5)) except Exception as e: logger.error(f"爬取 {url} 失败: {str(e)}") progress.update(task, advance=1) return all_reviews async def close(self): """关闭浏览器和Playwright""" if self.context: await self.context.close() if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() class RestaurantSpiderManager: """爬虫管理器""" def __init__(self, config_path: str = "config.json"): self.config = self.load_config(config_path) self.spiders = [] def load_config(self, config_path: str) -> Dict: """加载配置文件""" try: with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: return { "headless": True, "max_concurrent": 3, "delay_range": [1, 3], "database_url": "postgresql://user:password@localhost/restaurant_reviews", "platforms": [ { "name": "dianping", "search_urls": [ "https://www.dianping.com/search/keyword/1/0_餐厅", "https://www.dianping.com/search/keyword/1/0_火锅" ], "max_pages": 5 } ] } async def run(self): """运行爬虫""" console = Console() console.print("[bold green]🚀 餐厅点评数据采集系统启动[/bold green]") spider_config = ReviewSpiderConfig( headless=self.config.get('headless', True), max_concurrent=self.config.get('max_concurrent', 3), delay_range=tuple(self.config.get('delay_range', [1, 3])) ) async with AsyncRestaurantSpider(spider_config) as spider: all_reviews = [] for platform_config in self.config.get('platforms', []): platform_name = platform_config['name'] search_urls = platform_config['search_urls'] max_pages = platform_config.get('max_pages', 5) console.print(f"\n[bold cyan]开始爬取 {platform_name} 数据...[/bold cyan]") reviews = await spider.crawl_restaurant_list(search_urls, max_pages) all_reviews.extend(reviews) console.print(f"[green]✓ {platform_name} 爬取完成,共获取 {len(reviews)} 条数据[/green]") # 保存数据 if all_reviews: # 保存到数据库 db_url = self.config.get('database_url') if db_url: await spider.save_to_database(all_reviews, db_url) # 保存到CSV timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_file = f"restaurant_reviews_{timestamp}.csv" await spider.save_to_csv(all_reviews, csv_file) # 显示统计信息 self.display_statistics(all_reviews) console.print("[bold green]✨ 数据采集任务完成![/bold green]") def display_statistics(self, reviews: List[RestaurantReview]): """显示统计信息""" console = Console() if not reviews: console.print("[yellow]⚠️ 未获取到任何数据[/yellow]") return table = Table(title="数据采集统计", show_header=True, header_style="bold magenta") table.add_column("平台", style="cyan") table.add_column("餐厅数量", justify="right") table.add_column("点评数量", justify="right") table.add_column("平均评分", justify="right") from collections import defaultdict platform_stats = defaultdict(lambda: {'restaurants': set(), 'reviews': 0, 'ratings': []}) for review in reviews: stats = platform_stats[review.platform] stats['restaurants'].add(review.restaurant_id) stats['reviews'] += 1 if review.reviewer_rating: stats['ratings'].append(review.reviewer_rating) for platform, stats in platform_stats.items(): avg_rating = sum(stats['ratings'])/len(stats['ratings']) if stats['ratings'] else 0 table.add_row( platform, str(len(stats['restaurants'])), str(stats['reviews']), f"{avg_rating:.1f}" ) console.print(table) async def main(): """主函数""" # 创建爬虫管理器 manager = RestaurantSpiderManager() try: # 运行爬虫 await manager.run() except KeyboardInterrupt: logger.info("用户中断爬虫程序") except Exception as e: logger.error(f"爬虫运行失败: {str(e)}", exc_info=True) if __name__ == "__main__": # 创建数据库表(如果不存在) async def init_database(): conn = await asyncpg.connect('postgresql://user:password@localhost/restaurant_reviews') await conn.execute(''' CREATE TABLE IF NOT EXISTS restaurant_reviews ( id SERIAL PRIMARY KEY, platform VARCHAR(50), restaurant_id VARCHAR(100), restaurant_name VARCHAR(200), average_rating DECIMAL(3,1), review_count INTEGER, review_content TEXT, reviewer_name VARCHAR(100), reviewer_rating DECIMAL(3,1), review_time TIMESTAMP, useful_count INTEGER, collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(platform, restaurant_id, reviewer_name, review_time) ) ''') await conn.close() # 运行异步主函数 asyncio.run(main())高级功能与优化策略
1. 智能代理池管理
python
class ProxyManager: """智能代理池管理器""" def __init__(self): self.proxies = [] self.blacklist = set() self.success_rate = {} async def get_proxy(self) -> str: """获取最优代理""" # 根据成功率、响应时间等指标选择代理 # 实现代理自动切换和故障转移 pass2. 分布式爬虫架构
python
class DistributedSpider: """基于Redis的分布式爬虫""" def __init__(self): self.redis_client = None self.task_queue = "restaurant:urls" self.result_queue = "restaurant:results" async def distribute_tasks(self, urls: List[str]): """分发爬取任务""" pass async def collect_results(self): """收集爬取结果""" pass
3. 数据质量监控
python
class DataQualityMonitor: """数据质量监控系统""" @staticmethod def check_review_quality(review: RestaurantReview) -> Dict: """检查数据质量""" checks = { 'content_length': len(review.review_content or '') > 10, 'rating_valid': 0 <= review.reviewer_rating <= 5, 'time_valid': review.review_time < datetime.now(), 'no_duplicate': True # 需要实现去重检查 } score = sum(checks.values()) / len(checks) return {'score': score, 'details': checks}法律与伦理考量
合规爬虫实践
遵守robots.txt:尊重网站的爬虫协议
限制爬取频率:避免对目标网站造成负担
数据使用规范:仅用于合法目的和研究
用户隐私保护:匿名化处理用户个人信息
版权尊重:注明数据来源,不用于商业侵权
性能优化技巧
连接池管理:重用数据库和HTTP连接
内存优化:使用生成器处理大量数据
错误重试机制:指数退避策略
缓存策略:避免重复请求相同页面
异步文件IO:提高数据写入效率