摘要
本文详细介绍了如何构建一个高效、智能的优惠券信息聚合系统,利用最新的Python异步爬虫技术、机器学习分类算法以及现代化数据存储方案,实现跨平台折扣信息的自动抓取、智能分类与实时推送。
1. 项目概述与技术栈
1.1 项目目标
开发一个能够自动抓取主流电商平台(亚马逊、淘宝、京东等)和优惠券网站(Coupons.com、RetailMeNot等)的折扣信息,通过智能算法过滤重复内容、识别优惠力度,并以RESTful API形式提供数据服务的聚合系统。
1.2 技术栈选择
爬虫框架: httpx + asyncio(异步HTTP客户端)
解析工具: BeautifulSoup4 + parsel(双解析引擎备用)
数据存储: MongoDB + Redis(主存储+缓存)
消息队列: RabbitMQ/Celery(分布式任务调度)
机器学习: scikit-learn(优惠券分类与去重)
反爬策略: Playwright(处理JavaScript渲染)
部署监控: Docker + Prometheus + Grafana
2. 系统架构设计
python
""" 系统架构示意图: [数据源] → [爬虫调度器] → [异步爬虫集群] → [数据清洗管道] ↓ [智能分类器] → [去重检测器] → [数据库存储] ↓ [REST API] → [前端展示] / [推送服务] """
3. 异步爬虫核心实现
3.1 基础异步爬虫类
python
import asyncio import aiohttp import httpx from typing import List, Dict, Optional, Any from dataclasses import dataclass from datetime import datetime import json from urllib.parse import urljoin, urlparse import hashlib import logging from playwright.async_api import async_playwright from bs4 import BeautifulSoup import parsel # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class Coupon: """优惠券数据类""" id: str title: str description: str discount: str code: Optional[str] = None url: str = "" platform: str = "" category: str = "" expiry_date: Optional[datetime] = None popularity: int = 0 verified: bool = False created_at: datetime = datetime.now() source_url: str = "" image_url: Optional[str] = None terms: List[str] = None def __post_init__(self): if self.terms is None: self.terms = [] # 生成唯一ID if not self.id: content = f"{self.title}{self.platform}{self.discount}" self.id = hashlib.md5(content.encode()).hexdigest() class AsyncCouponSpider: """异步优惠券爬虫基类""" def __init__(self, proxy: Optional[str] = None, timeout: int = 30, max_concurrent: int = 10): self.proxy = proxy self.timeout = timeout self.semaphore = asyncio.Semaphore(max_concurrent) self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', } async def fetch(self, url: str, use_playwright: bool = False) -> Optional[str]: """异步获取页面内容""" async with self.semaphore: try: if use_playwright: return await self.fetch_with_playwright(url) else: async with httpx.AsyncClient( timeout=self.timeout, headers=self.headers, proxies=self.proxy, follow_redirects=True ) as client: response = await client.get(url) response.raise_for_status() return response.text except Exception as e: logger.error(f"Error fetching {url}: {str(e)}") return None async def fetch_with_playwright(self, url: str) -> Optional[str]: """使用Playwright获取动态渲染页面""" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=self.headers['User-Agent'], viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() try: await page.goto(url, wait_until='networkidle') await page.wait_for_timeout(2000) # 等待额外2秒 content = await page.content() await browser.close() return content except Exception as e: logger.error(f"Playwright error: {str(e)}") await browser.close() return None def parse_html(self, html: str, selector_type: str = 'css') -> Any: """解析HTML,支持多种选择器""" if selector_type == 'css': return parsel.Selector(html) elif selector_type == 'bs4': return BeautifulSoup(html, 'lxml') else: raise ValueError(f"Unsupported selector type: {selector_type}") async def crawl(self, urls: List[str]) -> List[Coupon]: """并发爬取多个URL""" tasks = [self.process_url(url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) coupons = [] for result in results: if isinstance(result, Exception): logger.error(f"Task failed: {result}") elif result: coupons.extend(result) return coupons async def process_url(self, url: str) -> List[Coupon]: """处理单个URL(子类需要重写)""" raise NotImplementedError3.2 电商平台爬虫实现示例
python
class AmazonCouponSpider(AsyncCouponSpider): """亚马逊优惠券爬虫""" def __init__(self, **kwargs): super().__init__(**kwargs) self.base_url = "https://www.amazon.com" self.coupon_url = "https://www.amazon.com/coupons" async def process_url(self, url: str) -> List[Coupon]: html = await self.fetch(url, use_playwright=True) if not html: return [] # 使用BeautifulSoup解析 soup = self.parse_html(html, 'bs4') coupons = [] # 查找优惠券元素(示例选择器,实际需要根据页面调整) coupon_cards = soup.select('div.coupon-item, div[data-coupon]') for card in coupon_cards: try: coupon = self._parse_coupon_card(card) if coupon: coupons.append(coupon) except Exception as e: logger.error(f"Error parsing coupon card: {e}") continue return coupons def _parse_coupon_card(self, card) -> Optional[Coupon]: """解析单个优惠券卡片""" # 提取标题 title_elem = card.select_one('h3, .coupon-title, [data-title]') title = title_elem.get_text(strip=True) if title_elem else "No Title" # 提取折扣信息 discount_elem = card.select_one('.discount, .coupon-discount, [data-discount]') discount = discount_elem.get_text(strip=True) if discount_elem else "Unknown" # 提取描述 desc_elem = card.select_one('.description, .coupon-desc, p') description = desc_elem.get_text(strip=True) if desc_elem else "" # 提取优惠码 code_elem = card.select_one('.code, .coupon-code, [data-code]') code = code_elem.get_text(strip=True) if code_elem else None # 提取过期时间 expiry_elem = card.select_one('.expiry, .expires, [data-expiry]') expiry_text = expiry_elem.get_text(strip=True) if expiry_elem else "" # 构建优惠券对象 coupon = Coupon( id="", title=title[:200], # 限制长度 description=description[:500], discount=discount, code=code, platform="Amazon", category=self._categorize_coupon(title, description), expiry_date=self._parse_expiry_date(expiry_text), source_url=self.coupon_url ) return coupon def _categorize_coupon(self, title: str, description: str) -> str: """简单分类逻辑""" categories = { 'electronics': ['电视', '手机', '电脑', '耳机', 'Electronics'], 'fashion': ['服装', '鞋子', '时尚', 'Fashion', 'Clothing'], 'home': ['家居', '厨房', '家具', 'Home', 'Kitchen'], 'grocery': ['食品', '杂货', 'Grocery', 'Food'] } text = (title + description).lower() for category, keywords in categories.items(): if any(keyword.lower() in text for keyword in keywords): return category return 'other' def _parse_expiry_date(self, expiry_text: str) -> Optional[datetime]: """解析过期时间""" if not expiry_text: return None try: # 多种日期格式解析 for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d %B %Y', 'expires %b %d, %Y']: try: return datetime.strptime(expiry_text, fmt) except ValueError: continue except Exception: pass return None class TaobaoCouponSpider(AsyncCouponSpider): """淘宝优惠券爬虫""" def __init__(self, **kwargs): super().__init__(**kwargs) self.base_url = "https://taobao.com" self.api_endpoint = "https://api.taobao.com/router/rest" async def process_url(self, url: str) -> List[Coupon]: """淘宝API接口调用(示例)""" # 实际开发中需要使用淘宝开放平台API # 这里使用模拟数据 return [ Coupon( id="", title="双十一大促满300减40", description="全平台通用优惠券", discount="满300减40", code="TB20231111", platform="淘宝", category="fashion", expiry_date=datetime(2023, 11, 11, 23, 59, 59) ) ]3.3 分布式爬虫调度器
python
import redis import pickle from celery import Celery from datetime import timedelta class DistributedSpiderScheduler: """分布式爬虫调度器""" def __init__(self, redis_url: str = 'redis://localhost:6379/0'): self.redis_client = redis.from_url(redis_url) self.celery_app = Celery( 'coupon_spider', broker=redis_url, backend=redis_url ) # 配置Celery self.celery_app.conf.update( task_serializer='pickle', accept_content=['pickle', 'json'], result_serializer='pickle', timezone='Asia/Shanghai', enable_utc=True, beat_schedule={ 'crawl-amazon-every-hour': { 'task': 'tasks.crawl_amazon', 'schedule': timedelta(hours=1), }, 'crawl-taobao-every-30min': { 'task': 'tasks.crawl_taobao', 'schedule': timedelta(minutes=30), }, } ) def schedule_spider(self, spider_class, urls: List[str], task_name: str = None): """调度爬虫任务""" task_name = task_name or spider_class.__name__ @self.celery_app.task(name=f'tasks.{task_name.lower()}') def crawl_task(): # 创建爬虫实例并执行 spider = spider_class() return asyncio.run(spider.crawl(urls)) return crawl_task.delay() def get_spider_status(self, spider_name: str) -> Dict: """获取爬虫状态""" key = f"spider:{spider_name}:status" status = self.redis_client.get(key) return pickle.loads(status) if status else {}4. 智能分类与去重
python
import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import DBSCAN from sklearn.decomposition import TruncatedSVD import jieba import jieba.analyse from sentence_transformers import SentenceTransformer class CouponDeduplicator: """优惠券去重器""" def __init__(self): self.vectorizer = TfidfVectorizer( max_features=5000, stop_words='english', ngram_range=(1, 2) ) self.model = SentenceTransformer('paraphrase-MiniLM-L6-v2') self.coupon_embeddings = {} def calculate_similarity(self, coupon1: Coupon, coupon2: Coupon) -> float: """计算两个优惠券的相似度""" # 组合特征 text1 = f"{coupon1.title} {coupon1.description} {coupon1.discount}" text2 = f"{coupon2.title} {coupon2.description} {coupon2.discount}" # 使用Sentence Transformers计算语义相似度 embeddings = self.model.encode([text1, text2]) similarity = np.dot(embeddings[0], embeddings[1]) / ( np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[1]) ) return float(similarity) def find_duplicates(self, coupons: List[Coupon], threshold: float = 0.85) -> List[List[Coupon]]: """查找重复优惠券""" if len(coupons) < 2: return [] # 计算相似度矩阵 similarity_matrix = np.zeros((len(coupons), len(coupons))) for i in range(len(coupons)): for j in range(i+1, len(coupons)): similarity = self.calculate_similarity(coupons[i], coupons[j]) similarity_matrix[i, j] = similarity similarity_matrix[j, i] = similarity # 使用DBSCAN聚类找出重复项 clustering = DBSCAN( eps=threshold, min_samples=1, metric='precomputed' ).fit(1 - similarity_matrix) # 距离矩阵 # 分组重复项 groups = {} for idx, label in enumerate(clustering.labels_): if label not in groups: groups[label] = [] groups[label].append(coupons[idx]) # 返回包含重复项的组(组长度>1) return [group for group in groups.values() if len(group) > 1] class CouponClassifier: """优惠券分类器""" def __init__(self): self.categories = [ 'electronics', 'fashion', 'home', 'grocery', 'beauty', 'books', 'sports', 'automotive' ] self.vectorizer = TfidfVectorizer() self.model = None # 可以加载预训练模型 def extract_keywords(self, text: str, top_k: int = 10) -> List[str]: """提取关键词(支持中英文)""" try: # 英文关键词提取 if all(ord(c) < 128 for c in text): return jieba.analyse.extract_tags(text, topK=top_k) else: # 中文关键词提取 return jieba.analyse.extract_tags(text, topK=top_k) except: return [] def predict_category(self, coupon: Coupon) -> str: """预测优惠券类别""" features = f"{coupon.title} {coupon.description}" # 这里可以使用训练好的分类模型 # 简化版本:基于关键词匹配 features_lower = features.lower() category_scores = {} for category in self.categories: # 加载类别的关键词 keywords = self._get_category_keywords(category) score = sum(1 for kw in keywords if kw in features_lower) category_scores[category] = score # 返回得分最高的类别 if category_scores: return max(category_scores.items(), key=lambda x: x[1])[0] return 'other' def _get_category_keywords(self, category: str) -> List[str]: """获取类别关键词""" keyword_map = { 'electronics': ['电视', '手机', '电脑', '耳机', 'laptop', 'smartphone'], 'fashion': ['衣服', '鞋子', '时尚', 'shirt', 'dress', 'shoes'], 'home': ['家居', '厨房', '家具', 'bed', 'sofa', 'table'], 'grocery': ['食品', '水果', '蔬菜', 'food', 'fruit', 'vegetable'] } return keyword_map.get(category, [])5. 数据存储与API服务
python
from pymongo import MongoClient, ASCENDING, DESCENDING from pymongo.errors import DuplicateKeyError from fastapi import FastAPI, HTTPException, Query, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from typing import List, Optional from pydantic import BaseModel, Field from datetime import datetime class MongoDBStorage: """MongoDB存储管理器""" def __init__(self, connection_string: str = "mongodb://localhost:27017/"): self.client = MongoClient(connection_string) self.db = self.client['coupon_aggregator'] self.coupons_collection = self.db['coupons'] # 创建索引 self.coupons_collection.create_index([('id', ASCENDING)], unique=True) self.coupons_collection.create_index([('platform', ASCENDING)]) self.coupons_collection.create_index([('category', ASCENDING)]) self.coupons_collection.create_index([('expiry_date', ASCENDING)]) self.coupons_collection.create_index([ ('title', 'text'), ('description', 'text') ]) async def insert_coupon(self, coupon: Coupon) -> bool: """插入优惠券""" try: coupon_dict = self._coupon_to_dict(coupon) result = self.coupons_collection.insert_one(coupon_dict) return result.inserted_id is not None except DuplicateKeyError: logger.warning(f"Duplicate coupon: {coupon.id}") return False async def bulk_insert(self, coupons: List[Coupon]) -> int: """批量插入""" successful = 0 for coupon in coupons: if await self.insert_coupon(coupon): successful += 1 return successful async def search_coupons(self, query: Optional[str] = None, platform: Optional[str] = None, category: Optional[str] = None, min_discount: Optional[str] = None, limit: int = 50, skip: int = 0) -> List[Coupon]: """搜索优惠券""" filter_dict = {} if query: filter_dict['$text'] = {'$search': query} if platform: filter_dict['platform'] = platform if category: filter_dict['category'] = category # 复杂的折扣解析逻辑可以在这里实现 # 简化版本:只做基本过滤 cursor = self.coupons_collection.find( filter_dict ).sort('created_at', DESCENDING).skip(skip).limit(limit) coupons = [] for doc in cursor: coupons.append(self._dict_to_coupon(doc)) return coupons def _coupon_to_dict(self, coupon: Coupon) -> dict: """Coupon对象转字典""" return { 'id': coupon.id, 'title': coupon.title, 'description': coupon.description, 'discount': coupon.discount, 'code': coupon.code, 'url': coupon.url, 'platform': coupon.platform, 'category': coupon.category, 'expiry_date': coupon.expiry_date, 'popularity': coupon.popularity, 'verified': coupon.verified, 'created_at': coupon.created_at, 'source_url': coupon.source_url, 'image_url': coupon.image_url, 'terms': coupon.terms } def _dict_to_coupon(self, data: dict) -> Coupon: """字典转Coupon对象""" return Coupon(**data) # FastAPI应用 app = FastAPI(title="优惠券聚合API", version="1.0.0") # CORS配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 依赖注入 def get_db(): return MongoDBStorage() # 数据模型 class CouponResponse(BaseModel): id: str title: str description: str discount: str code: Optional[str] = None platform: str category: str expiry_date: Optional[datetime] = None verified: bool = False created_at: datetime class SearchRequest(BaseModel): query: Optional[str] = None platform: Optional[str] = None category: Optional[str] = None page: int = 1 per_page: int = 20 @app.get("/") async def root(): return {"message": "优惠券聚合API服务"} @app.get("/coupons", response_model=List[CouponResponse]) async def get_coupons( query: Optional[str] = Query(None, description="搜索关键词"), platform: Optional[str] = Query(None, description="平台筛选"), category: Optional[str] = Query(None, description="分类筛选"), page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), db: MongoDBStorage = Depends(get_db) ): """获取优惠券列表""" skip = (page - 1) * per_page coupons = await db.search_coupons( query=query, platform=platform, category=category, limit=per_page, skip=skip ) return coupons @app.get("/coupons/{coupon_id}", response_model=CouponResponse) async def get_coupon(coupon_id: str, db: MongoDBStorage = Depends(get_db)): """获取单个优惠券详情""" coupon = await db.get_coupon_by_id(coupon_id) if not coupon: raise HTTPException(status_code=404, detail="优惠券不存在") return coupon @app.get("/platforms") async def get_platforms(db: MongoDBStorage = Depends(get_db)): """获取所有平台列表""" platforms = db.coupons_collection.distinct("platform") return {"platforms": platforms} @app.get("/categories") async def get_categories(db: MongoDBStorage = Depends(get_db)): """获取所有分类列表""" categories = db.coupons_collection.distinct("category") return {"categories": categories} @app.get("/stats") async def get_statistics(db: MongoDBStorage = Depends(get_db)): """获取统计信息""" total = db.coupons_collection.count_documents({}) platforms = db.coupons_collection.aggregate([ {"$group": {"_id": "$platform", "count": {"$sum": 1}}} ]) platform_stats = {p["_id"]: p["count"] for p in platforms} return { "total_coupons": total, "platforms": platform_stats, "updated_at": datetime.now() }6. 部署与监控
Docker部署配置
dockerfile
# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ wget \ gnupg \ unzip \ && rm -rf /var/lib/apt/lists/* # 安装Chrome(Playwright需要) RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && echo "deb http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \ && apt-get update \ && apt-get install -y google-chrome-stable \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 安装Playwright浏览器 RUN playwright install chromium # 启动命令 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
yaml
# docker-compose.yml version: '3.8' services: mongodb: image: mongo:latest container_name: coupon-mongodb restart: always ports: - "27017:27017" volumes: - mongodb_data:/data/db environment: MONGO_INITDB_ROOT_USERNAME: admin MONGO_INITDB_ROOT_PASSWORD: password redis: image: redis:alpine container_name: coupon-redis restart: always ports: - "6379:6379" volumes: - redis_data:/data api: build: . container_name: coupon-api restart: always ports: - "8000:8000" depends_on: - mongodb - redis environment: MONGO_URI: "mongodb://admin:password@mongodb:27017/" REDIS_URL: "redis://redis:6379/0" volumes: - ./logs:/app/logs celery-worker: build: . container_name: coupon-celery-worker restart: always command: celery -A tasks worker --loglevel=info depends_on: - redis - mongodb environment: MONGO_URI: "mongodb://admin:password@mongodb:27017/" REDIS_URL: "redis://redis:6379/0" celery-beat: build: . container_name: coupon-celery-beat restart: always command: celery -A tasks beat --loglevel=info depends_on: - redis environment: REDIS_URL: "redis://redis:6379/0" prometheus: image: prom/prometheus:latest container_name: coupon-prometheus ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml - prometheus_data:/prometheus grafana: image: grafana/grafana:latest container_name: coupon-grafana ports: - "3000:3000" volumes: - grafana_data:/var/lib/grafana environment: GF_SECURITY_ADMIN_PASSWORD: admin volumes: mongodb_data: redis_data: prometheus_data: grafana_data:
监控配置
python
# monitoring.py from prometheus_client import start_http_server, Counter, Gauge, Histogram import time # 定义监控指标 COUPONS_COLLECTED = Counter( 'coupons_collected_total', 'Total number of coupons collected', ['platform', 'status'] ) COUPONS_STORED = Counter( 'coupons_stored_total', 'Total number of coupons stored in database' ) SPIDER_DURATION = Histogram( 'spider_duration_seconds', 'Time spent crawling', ['spider_name'] ) ACTIVE_SPIDERS = Gauge( 'active_spiders', 'Number of active spiders' ) class MonitoringMiddleware: """监控中间件""" def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope['type'] == 'http': start_time = time.time() # 记录请求 COUPONS_COLLECTED.labels( platform=scope.get('path', 'unknown'), status='request' ).inc() await self.app(scope, receive, send) duration = time.time() - start_time SPIDER_DURATION.labels( spider_name=scope.get('path', 'unknown') ).observe(duration)7. 完整代码实现
由于篇幅限制,这里提供一个简化的完整示例:
python
# main.py import asyncio import logging from typing import List from datetime import datetime import sys # 添加项目根目录到路径 sys.path.append('.') from spiders.amazon_spider import AmazonCouponSpider from spiders.taobao_spider import TaobaoCouponSpider from storage.mongodb_storage import MongoDBStorage from deduplication.deduplicator import CouponDeduplicator from classification.classifier import CouponClassifier logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CouponAggregator: """优惠券聚合器主类""" def __init__(self): self.spiders = { 'amazon': AmazonCouponSpider(), 'taobao': TaobaoCouponSpider() } self.storage = MongoDBStorage() self.deduplicator = CouponDeduplicator() self.classifier = CouponClassifier() async def run(self): """运行聚合器""" logger.info("开始优惠券聚合任务") all_coupons = [] # 并发运行所有爬虫 tasks = [] for spider_name, spider in self.spiders.items(): task = self._run_spider(spider_name, spider) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) # 收集所有优惠券 for result in results: if isinstance(result, Exception): logger.error(f"爬虫任务失败: {result}") elif result: all_coupons.extend(result) # 去重 logger.info(f"共收集到 {len(all_coupons)} 个优惠券") if all_coupons: duplicates = self.deduplicator.find_duplicates(all_coupons) if duplicates: logger.info(f"发现 {len(duplicates)} 组重复优惠券") # 去除重复(保留第一个) unique_coupons = self._remove_duplicates(all_coupons, duplicates) logger.info(f"去重后剩余 {len(unique_coupons)} 个优惠券") else: unique_coupons = all_coupons # 分类 for coupon in unique_coupons: if not coupon.category or coupon.category == 'other': coupon.category = self.classifier.predict_category(coupon) # 存储 saved = await self.storage.bulk_insert(unique_coupons) logger.info(f"成功存储 {saved} 个优惠券到数据库") logger.info("优惠券聚合任务完成") async def _run_spider(self, spider_name: str, spider): """运行单个爬虫""" logger.info(f"开始运行 {spider_name} 爬虫") # 这里可以配置不同爬虫的URL列表 urls = { 'amazon': ['https://www.amazon.com/coupons'], 'taobao': ['https://taobao.com'] } coupons = await spider.crawl(urls.get(spider_name, [])) logger.info(f"{spider_name} 爬虫收集到 {len(coupons)} 个优惠券") return coupons def _remove_duplicates(self, coupons: List, duplicate_groups: List) -> List: """去除重复优惠券""" # 展重复组 duplicate_ids = set() for group in duplicate_groups: # 保留第一个,标记其他为重复 for i, coupon in enumerate(group): if i > 0: duplicate_ids.add(coupon.id) # 过滤掉重复的 return [c for c in coupons if c.id not in duplicate_ids] async def main(): """主函数""" aggregator = CouponAggregator() await aggregator.run() if __name__ == "__main__": # 运行主程序 asyncio.run(main())8. 未来扩展方向
8.1 机器学习增强
使用BERT等预训练模型进行更准确的文本分类
实现基于用户行为的个性化推荐
构建优惠券价值评估模型
8.2 实时推送系统
集成WebSocket实现实时优惠券推送
开发移动端App推送通知
实现基于用户偏好的智能推送
8.3 反爬虫策略升级
实现动态IP代理池
使用机器学习识别验证码
模拟真实用户行为模式
8.4 数据可视化
构建优惠券数据分析仪表板
实现价格趋势预测
开发商家竞争分析工具
8.5 生态系统扩展
开发浏览器插件
提供API开放平台
构建联盟营销系统
结论
本文详细介绍了如何构建一个完整的优惠券信息聚合系统,涵盖了从数据抓取、清洗、存储到API服务的全流程。通过采用最新的异步爬虫技术、机器学习算法和微服务架构,我们能够构建一个高效、稳定且可扩展的系统。
关键技术要点:
异步并发:使用asyncio和httpx实现高性能并发爬取
智能处理:集成机器学习进行自动分类和去重
弹性架构:采用微服务和消息队列实现系统解耦
监控保障:完善的监控和日志体系确保系统稳定性