news 2026/3/2 1:41:16

Python爬虫实战:使用异步技术与机器学习构建智能优惠券信息聚合系统

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python爬虫实战:使用异步技术与机器学习构建智能优惠券信息聚合系统

摘要

本文详细介绍了如何构建一个高效、智能的优惠券信息聚合系统,利用最新的Python异步爬虫技术、机器学习分类算法以及现代化数据存储方案,实现跨平台折扣信息的自动抓取、智能分类与实时推送。

1. 项目概述与技术栈

1.1 项目目标

开发一个能够自动抓取主流电商平台(亚马逊、淘宝、京东等)和优惠券网站(Coupons.com、RetailMeNot等)的折扣信息,通过智能算法过滤重复内容、识别优惠力度,并以RESTful API形式提供数据服务的聚合系统。

1.2 技术栈选择

  • 爬虫框架: httpx + asyncio(异步HTTP客户端)

  • 解析工具: BeautifulSoup4 + parsel(双解析引擎备用)

  • 数据存储: MongoDB + Redis(主存储+缓存)

  • 消息队列: RabbitMQ/Celery(分布式任务调度)

  • 机器学习: scikit-learn(优惠券分类与去重)

  • 反爬策略: Playwright(处理JavaScript渲染)

  • 部署监控: Docker + Prometheus + Grafana

2. 系统架构设计

python

""" 系统架构示意图: [数据源] → [爬虫调度器] → [异步爬虫集群] → [数据清洗管道] ↓ [智能分类器] → [去重检测器] → [数据库存储] ↓ [REST API] → [前端展示] / [推送服务] """

3. 异步爬虫核心实现

3.1 基础异步爬虫类

python

import asyncio import aiohttp import httpx from typing import List, Dict, Optional, Any from dataclasses import dataclass from datetime import datetime import json from urllib.parse import urljoin, urlparse import hashlib import logging from playwright.async_api import async_playwright from bs4 import BeautifulSoup import parsel # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class Coupon: """优惠券数据类""" id: str title: str description: str discount: str code: Optional[str] = None url: str = "" platform: str = "" category: str = "" expiry_date: Optional[datetime] = None popularity: int = 0 verified: bool = False created_at: datetime = datetime.now() source_url: str = "" image_url: Optional[str] = None terms: List[str] = None def __post_init__(self): if self.terms is None: self.terms = [] # 生成唯一ID if not self.id: content = f"{self.title}{self.platform}{self.discount}" self.id = hashlib.md5(content.encode()).hexdigest() class AsyncCouponSpider: """异步优惠券爬虫基类""" def __init__(self, proxy: Optional[str] = None, timeout: int = 30, max_concurrent: int = 10): self.proxy = proxy self.timeout = timeout self.semaphore = asyncio.Semaphore(max_concurrent) self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', } async def fetch(self, url: str, use_playwright: bool = False) -> Optional[str]: """异步获取页面内容""" async with self.semaphore: try: if use_playwright: return await self.fetch_with_playwright(url) else: async with httpx.AsyncClient( timeout=self.timeout, headers=self.headers, proxies=self.proxy, follow_redirects=True ) as client: response = await client.get(url) response.raise_for_status() return response.text except Exception as e: logger.error(f"Error fetching {url}: {str(e)}") return None async def fetch_with_playwright(self, url: str) -> Optional[str]: """使用Playwright获取动态渲染页面""" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context( user_agent=self.headers['User-Agent'], viewport={'width': 1920, 'height': 1080} ) page = await context.new_page() try: await page.goto(url, wait_until='networkidle') await page.wait_for_timeout(2000) # 等待额外2秒 content = await page.content() await browser.close() return content except Exception as e: logger.error(f"Playwright error: {str(e)}") await browser.close() return None def parse_html(self, html: str, selector_type: str = 'css') -> Any: """解析HTML,支持多种选择器""" if selector_type == 'css': return parsel.Selector(html) elif selector_type == 'bs4': return BeautifulSoup(html, 'lxml') else: raise ValueError(f"Unsupported selector type: {selector_type}") async def crawl(self, urls: List[str]) -> List[Coupon]: """并发爬取多个URL""" tasks = [self.process_url(url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) coupons = [] for result in results: if isinstance(result, Exception): logger.error(f"Task failed: {result}") elif result: coupons.extend(result) return coupons async def process_url(self, url: str) -> List[Coupon]: """处理单个URL(子类需要重写)""" raise NotImplementedError

3.2 电商平台爬虫实现示例

python

class AmazonCouponSpider(AsyncCouponSpider): """亚马逊优惠券爬虫""" def __init__(self, **kwargs): super().__init__(**kwargs) self.base_url = "https://www.amazon.com" self.coupon_url = "https://www.amazon.com/coupons" async def process_url(self, url: str) -> List[Coupon]: html = await self.fetch(url, use_playwright=True) if not html: return [] # 使用BeautifulSoup解析 soup = self.parse_html(html, 'bs4') coupons = [] # 查找优惠券元素(示例选择器,实际需要根据页面调整) coupon_cards = soup.select('div.coupon-item, div[data-coupon]') for card in coupon_cards: try: coupon = self._parse_coupon_card(card) if coupon: coupons.append(coupon) except Exception as e: logger.error(f"Error parsing coupon card: {e}") continue return coupons def _parse_coupon_card(self, card) -> Optional[Coupon]: """解析单个优惠券卡片""" # 提取标题 title_elem = card.select_one('h3, .coupon-title, [data-title]') title = title_elem.get_text(strip=True) if title_elem else "No Title" # 提取折扣信息 discount_elem = card.select_one('.discount, .coupon-discount, [data-discount]') discount = discount_elem.get_text(strip=True) if discount_elem else "Unknown" # 提取描述 desc_elem = card.select_one('.description, .coupon-desc, p') description = desc_elem.get_text(strip=True) if desc_elem else "" # 提取优惠码 code_elem = card.select_one('.code, .coupon-code, [data-code]') code = code_elem.get_text(strip=True) if code_elem else None # 提取过期时间 expiry_elem = card.select_one('.expiry, .expires, [data-expiry]') expiry_text = expiry_elem.get_text(strip=True) if expiry_elem else "" # 构建优惠券对象 coupon = Coupon( id="", title=title[:200], # 限制长度 description=description[:500], discount=discount, code=code, platform="Amazon", category=self._categorize_coupon(title, description), expiry_date=self._parse_expiry_date(expiry_text), source_url=self.coupon_url ) return coupon def _categorize_coupon(self, title: str, description: str) -> str: """简单分类逻辑""" categories = { 'electronics': ['电视', '手机', '电脑', '耳机', 'Electronics'], 'fashion': ['服装', '鞋子', '时尚', 'Fashion', 'Clothing'], 'home': ['家居', '厨房', '家具', 'Home', 'Kitchen'], 'grocery': ['食品', '杂货', 'Grocery', 'Food'] } text = (title + description).lower() for category, keywords in categories.items(): if any(keyword.lower() in text for keyword in keywords): return category return 'other' def _parse_expiry_date(self, expiry_text: str) -> Optional[datetime]: """解析过期时间""" if not expiry_text: return None try: # 多种日期格式解析 for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%d %B %Y', 'expires %b %d, %Y']: try: return datetime.strptime(expiry_text, fmt) except ValueError: continue except Exception: pass return None class TaobaoCouponSpider(AsyncCouponSpider): """淘宝优惠券爬虫""" def __init__(self, **kwargs): super().__init__(**kwargs) self.base_url = "https://taobao.com" self.api_endpoint = "https://api.taobao.com/router/rest" async def process_url(self, url: str) -> List[Coupon]: """淘宝API接口调用(示例)""" # 实际开发中需要使用淘宝开放平台API # 这里使用模拟数据 return [ Coupon( id="", title="双十一大促满300减40", description="全平台通用优惠券", discount="满300减40", code="TB20231111", platform="淘宝", category="fashion", expiry_date=datetime(2023, 11, 11, 23, 59, 59) ) ]

3.3 分布式爬虫调度器

python

import redis import pickle from celery import Celery from datetime import timedelta class DistributedSpiderScheduler: """分布式爬虫调度器""" def __init__(self, redis_url: str = 'redis://localhost:6379/0'): self.redis_client = redis.from_url(redis_url) self.celery_app = Celery( 'coupon_spider', broker=redis_url, backend=redis_url ) # 配置Celery self.celery_app.conf.update( task_serializer='pickle', accept_content=['pickle', 'json'], result_serializer='pickle', timezone='Asia/Shanghai', enable_utc=True, beat_schedule={ 'crawl-amazon-every-hour': { 'task': 'tasks.crawl_amazon', 'schedule': timedelta(hours=1), }, 'crawl-taobao-every-30min': { 'task': 'tasks.crawl_taobao', 'schedule': timedelta(minutes=30), }, } ) def schedule_spider(self, spider_class, urls: List[str], task_name: str = None): """调度爬虫任务""" task_name = task_name or spider_class.__name__ @self.celery_app.task(name=f'tasks.{task_name.lower()}') def crawl_task(): # 创建爬虫实例并执行 spider = spider_class() return asyncio.run(spider.crawl(urls)) return crawl_task.delay() def get_spider_status(self, spider_name: str) -> Dict: """获取爬虫状态""" key = f"spider:{spider_name}:status" status = self.redis_client.get(key) return pickle.loads(status) if status else {}

4. 智能分类与去重

python

import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import DBSCAN from sklearn.decomposition import TruncatedSVD import jieba import jieba.analyse from sentence_transformers import SentenceTransformer class CouponDeduplicator: """优惠券去重器""" def __init__(self): self.vectorizer = TfidfVectorizer( max_features=5000, stop_words='english', ngram_range=(1, 2) ) self.model = SentenceTransformer('paraphrase-MiniLM-L6-v2') self.coupon_embeddings = {} def calculate_similarity(self, coupon1: Coupon, coupon2: Coupon) -> float: """计算两个优惠券的相似度""" # 组合特征 text1 = f"{coupon1.title} {coupon1.description} {coupon1.discount}" text2 = f"{coupon2.title} {coupon2.description} {coupon2.discount}" # 使用Sentence Transformers计算语义相似度 embeddings = self.model.encode([text1, text2]) similarity = np.dot(embeddings[0], embeddings[1]) / ( np.linalg.norm(embeddings[0]) * np.linalg.norm(embeddings[1]) ) return float(similarity) def find_duplicates(self, coupons: List[Coupon], threshold: float = 0.85) -> List[List[Coupon]]: """查找重复优惠券""" if len(coupons) < 2: return [] # 计算相似度矩阵 similarity_matrix = np.zeros((len(coupons), len(coupons))) for i in range(len(coupons)): for j in range(i+1, len(coupons)): similarity = self.calculate_similarity(coupons[i], coupons[j]) similarity_matrix[i, j] = similarity similarity_matrix[j, i] = similarity # 使用DBSCAN聚类找出重复项 clustering = DBSCAN( eps=threshold, min_samples=1, metric='precomputed' ).fit(1 - similarity_matrix) # 距离矩阵 # 分组重复项 groups = {} for idx, label in enumerate(clustering.labels_): if label not in groups: groups[label] = [] groups[label].append(coupons[idx]) # 返回包含重复项的组(组长度>1) return [group for group in groups.values() if len(group) > 1] class CouponClassifier: """优惠券分类器""" def __init__(self): self.categories = [ 'electronics', 'fashion', 'home', 'grocery', 'beauty', 'books', 'sports', 'automotive' ] self.vectorizer = TfidfVectorizer() self.model = None # 可以加载预训练模型 def extract_keywords(self, text: str, top_k: int = 10) -> List[str]: """提取关键词(支持中英文)""" try: # 英文关键词提取 if all(ord(c) < 128 for c in text): return jieba.analyse.extract_tags(text, topK=top_k) else: # 中文关键词提取 return jieba.analyse.extract_tags(text, topK=top_k) except: return [] def predict_category(self, coupon: Coupon) -> str: """预测优惠券类别""" features = f"{coupon.title} {coupon.description}" # 这里可以使用训练好的分类模型 # 简化版本:基于关键词匹配 features_lower = features.lower() category_scores = {} for category in self.categories: # 加载类别的关键词 keywords = self._get_category_keywords(category) score = sum(1 for kw in keywords if kw in features_lower) category_scores[category] = score # 返回得分最高的类别 if category_scores: return max(category_scores.items(), key=lambda x: x[1])[0] return 'other' def _get_category_keywords(self, category: str) -> List[str]: """获取类别关键词""" keyword_map = { 'electronics': ['电视', '手机', '电脑', '耳机', 'laptop', 'smartphone'], 'fashion': ['衣服', '鞋子', '时尚', 'shirt', 'dress', 'shoes'], 'home': ['家居', '厨房', '家具', 'bed', 'sofa', 'table'], 'grocery': ['食品', '水果', '蔬菜', 'food', 'fruit', 'vegetable'] } return keyword_map.get(category, [])

5. 数据存储与API服务

python

from pymongo import MongoClient, ASCENDING, DESCENDING from pymongo.errors import DuplicateKeyError from fastapi import FastAPI, HTTPException, Query, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from typing import List, Optional from pydantic import BaseModel, Field from datetime import datetime class MongoDBStorage: """MongoDB存储管理器""" def __init__(self, connection_string: str = "mongodb://localhost:27017/"): self.client = MongoClient(connection_string) self.db = self.client['coupon_aggregator'] self.coupons_collection = self.db['coupons'] # 创建索引 self.coupons_collection.create_index([('id', ASCENDING)], unique=True) self.coupons_collection.create_index([('platform', ASCENDING)]) self.coupons_collection.create_index([('category', ASCENDING)]) self.coupons_collection.create_index([('expiry_date', ASCENDING)]) self.coupons_collection.create_index([ ('title', 'text'), ('description', 'text') ]) async def insert_coupon(self, coupon: Coupon) -> bool: """插入优惠券""" try: coupon_dict = self._coupon_to_dict(coupon) result = self.coupons_collection.insert_one(coupon_dict) return result.inserted_id is not None except DuplicateKeyError: logger.warning(f"Duplicate coupon: {coupon.id}") return False async def bulk_insert(self, coupons: List[Coupon]) -> int: """批量插入""" successful = 0 for coupon in coupons: if await self.insert_coupon(coupon): successful += 1 return successful async def search_coupons(self, query: Optional[str] = None, platform: Optional[str] = None, category: Optional[str] = None, min_discount: Optional[str] = None, limit: int = 50, skip: int = 0) -> List[Coupon]: """搜索优惠券""" filter_dict = {} if query: filter_dict['$text'] = {'$search': query} if platform: filter_dict['platform'] = platform if category: filter_dict['category'] = category # 复杂的折扣解析逻辑可以在这里实现 # 简化版本:只做基本过滤 cursor = self.coupons_collection.find( filter_dict ).sort('created_at', DESCENDING).skip(skip).limit(limit) coupons = [] for doc in cursor: coupons.append(self._dict_to_coupon(doc)) return coupons def _coupon_to_dict(self, coupon: Coupon) -> dict: """Coupon对象转字典""" return { 'id': coupon.id, 'title': coupon.title, 'description': coupon.description, 'discount': coupon.discount, 'code': coupon.code, 'url': coupon.url, 'platform': coupon.platform, 'category': coupon.category, 'expiry_date': coupon.expiry_date, 'popularity': coupon.popularity, 'verified': coupon.verified, 'created_at': coupon.created_at, 'source_url': coupon.source_url, 'image_url': coupon.image_url, 'terms': coupon.terms } def _dict_to_coupon(self, data: dict) -> Coupon: """字典转Coupon对象""" return Coupon(**data) # FastAPI应用 app = FastAPI(title="优惠券聚合API", version="1.0.0") # CORS配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 依赖注入 def get_db(): return MongoDBStorage() # 数据模型 class CouponResponse(BaseModel): id: str title: str description: str discount: str code: Optional[str] = None platform: str category: str expiry_date: Optional[datetime] = None verified: bool = False created_at: datetime class SearchRequest(BaseModel): query: Optional[str] = None platform: Optional[str] = None category: Optional[str] = None page: int = 1 per_page: int = 20 @app.get("/") async def root(): return {"message": "优惠券聚合API服务"} @app.get("/coupons", response_model=List[CouponResponse]) async def get_coupons( query: Optional[str] = Query(None, description="搜索关键词"), platform: Optional[str] = Query(None, description="平台筛选"), category: Optional[str] = Query(None, description="分类筛选"), page: int = Query(1, ge=1), per_page: int = Query(20, ge=1, le=100), db: MongoDBStorage = Depends(get_db) ): """获取优惠券列表""" skip = (page - 1) * per_page coupons = await db.search_coupons( query=query, platform=platform, category=category, limit=per_page, skip=skip ) return coupons @app.get("/coupons/{coupon_id}", response_model=CouponResponse) async def get_coupon(coupon_id: str, db: MongoDBStorage = Depends(get_db)): """获取单个优惠券详情""" coupon = await db.get_coupon_by_id(coupon_id) if not coupon: raise HTTPException(status_code=404, detail="优惠券不存在") return coupon @app.get("/platforms") async def get_platforms(db: MongoDBStorage = Depends(get_db)): """获取所有平台列表""" platforms = db.coupons_collection.distinct("platform") return {"platforms": platforms} @app.get("/categories") async def get_categories(db: MongoDBStorage = Depends(get_db)): """获取所有分类列表""" categories = db.coupons_collection.distinct("category") return {"categories": categories} @app.get("/stats") async def get_statistics(db: MongoDBStorage = Depends(get_db)): """获取统计信息""" total = db.coupons_collection.count_documents({}) platforms = db.coupons_collection.aggregate([ {"$group": {"_id": "$platform", "count": {"$sum": 1}}} ]) platform_stats = {p["_id"]: p["count"] for p in platforms} return { "total_coupons": total, "platforms": platform_stats, "updated_at": datetime.now() }

6. 部署与监控

Docker部署配置

dockerfile

# Dockerfile FROM python:3.11-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ wget \ gnupg \ unzip \ && rm -rf /var/lib/apt/lists/* # 安装Chrome(Playwright需要) RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && echo "deb http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list \ && apt-get update \ && apt-get install -y google-chrome-stable \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 安装Playwright浏览器 RUN playwright install chromium # 启动命令 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

yaml

# docker-compose.yml version: '3.8' services: mongodb: image: mongo:latest container_name: coupon-mongodb restart: always ports: - "27017:27017" volumes: - mongodb_data:/data/db environment: MONGO_INITDB_ROOT_USERNAME: admin MONGO_INITDB_ROOT_PASSWORD: password redis: image: redis:alpine container_name: coupon-redis restart: always ports: - "6379:6379" volumes: - redis_data:/data api: build: . container_name: coupon-api restart: always ports: - "8000:8000" depends_on: - mongodb - redis environment: MONGO_URI: "mongodb://admin:password@mongodb:27017/" REDIS_URL: "redis://redis:6379/0" volumes: - ./logs:/app/logs celery-worker: build: . container_name: coupon-celery-worker restart: always command: celery -A tasks worker --loglevel=info depends_on: - redis - mongodb environment: MONGO_URI: "mongodb://admin:password@mongodb:27017/" REDIS_URL: "redis://redis:6379/0" celery-beat: build: . container_name: coupon-celery-beat restart: always command: celery -A tasks beat --loglevel=info depends_on: - redis environment: REDIS_URL: "redis://redis:6379/0" prometheus: image: prom/prometheus:latest container_name: coupon-prometheus ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml - prometheus_data:/prometheus grafana: image: grafana/grafana:latest container_name: coupon-grafana ports: - "3000:3000" volumes: - grafana_data:/var/lib/grafana environment: GF_SECURITY_ADMIN_PASSWORD: admin volumes: mongodb_data: redis_data: prometheus_data: grafana_data:

监控配置

python

# monitoring.py from prometheus_client import start_http_server, Counter, Gauge, Histogram import time # 定义监控指标 COUPONS_COLLECTED = Counter( 'coupons_collected_total', 'Total number of coupons collected', ['platform', 'status'] ) COUPONS_STORED = Counter( 'coupons_stored_total', 'Total number of coupons stored in database' ) SPIDER_DURATION = Histogram( 'spider_duration_seconds', 'Time spent crawling', ['spider_name'] ) ACTIVE_SPIDERS = Gauge( 'active_spiders', 'Number of active spiders' ) class MonitoringMiddleware: """监控中间件""" def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope['type'] == 'http': start_time = time.time() # 记录请求 COUPONS_COLLECTED.labels( platform=scope.get('path', 'unknown'), status='request' ).inc() await self.app(scope, receive, send) duration = time.time() - start_time SPIDER_DURATION.labels( spider_name=scope.get('path', 'unknown') ).observe(duration)

7. 完整代码实现

由于篇幅限制,这里提供一个简化的完整示例:

python

# main.py import asyncio import logging from typing import List from datetime import datetime import sys # 添加项目根目录到路径 sys.path.append('.') from spiders.amazon_spider import AmazonCouponSpider from spiders.taobao_spider import TaobaoCouponSpider from storage.mongodb_storage import MongoDBStorage from deduplication.deduplicator import CouponDeduplicator from classification.classifier import CouponClassifier logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class CouponAggregator: """优惠券聚合器主类""" def __init__(self): self.spiders = { 'amazon': AmazonCouponSpider(), 'taobao': TaobaoCouponSpider() } self.storage = MongoDBStorage() self.deduplicator = CouponDeduplicator() self.classifier = CouponClassifier() async def run(self): """运行聚合器""" logger.info("开始优惠券聚合任务") all_coupons = [] # 并发运行所有爬虫 tasks = [] for spider_name, spider in self.spiders.items(): task = self._run_spider(spider_name, spider) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) # 收集所有优惠券 for result in results: if isinstance(result, Exception): logger.error(f"爬虫任务失败: {result}") elif result: all_coupons.extend(result) # 去重 logger.info(f"共收集到 {len(all_coupons)} 个优惠券") if all_coupons: duplicates = self.deduplicator.find_duplicates(all_coupons) if duplicates: logger.info(f"发现 {len(duplicates)} 组重复优惠券") # 去除重复(保留第一个) unique_coupons = self._remove_duplicates(all_coupons, duplicates) logger.info(f"去重后剩余 {len(unique_coupons)} 个优惠券") else: unique_coupons = all_coupons # 分类 for coupon in unique_coupons: if not coupon.category or coupon.category == 'other': coupon.category = self.classifier.predict_category(coupon) # 存储 saved = await self.storage.bulk_insert(unique_coupons) logger.info(f"成功存储 {saved} 个优惠券到数据库") logger.info("优惠券聚合任务完成") async def _run_spider(self, spider_name: str, spider): """运行单个爬虫""" logger.info(f"开始运行 {spider_name} 爬虫") # 这里可以配置不同爬虫的URL列表 urls = { 'amazon': ['https://www.amazon.com/coupons'], 'taobao': ['https://taobao.com'] } coupons = await spider.crawl(urls.get(spider_name, [])) logger.info(f"{spider_name} 爬虫收集到 {len(coupons)} 个优惠券") return coupons def _remove_duplicates(self, coupons: List, duplicate_groups: List) -> List: """去除重复优惠券""" # 展重复组 duplicate_ids = set() for group in duplicate_groups: # 保留第一个,标记其他为重复 for i, coupon in enumerate(group): if i > 0: duplicate_ids.add(coupon.id) # 过滤掉重复的 return [c for c in coupons if c.id not in duplicate_ids] async def main(): """主函数""" aggregator = CouponAggregator() await aggregator.run() if __name__ == "__main__": # 运行主程序 asyncio.run(main())

8. 未来扩展方向

8.1 机器学习增强

  • 使用BERT等预训练模型进行更准确的文本分类

  • 实现基于用户行为的个性化推荐

  • 构建优惠券价值评估模型

8.2 实时推送系统

  • 集成WebSocket实现实时优惠券推送

  • 开发移动端App推送通知

  • 实现基于用户偏好的智能推送

8.3 反爬虫策略升级

  • 实现动态IP代理池

  • 使用机器学习识别验证码

  • 模拟真实用户行为模式

8.4 数据可视化

  • 构建优惠券数据分析仪表板

  • 实现价格趋势预测

  • 开发商家竞争分析工具

8.5 生态系统扩展

  • 开发浏览器插件

  • 提供API开放平台

  • 构建联盟营销系统

结论

本文详细介绍了如何构建一个完整的优惠券信息聚合系统,涵盖了从数据抓取、清洗、存储到API服务的全流程。通过采用最新的异步爬虫技术、机器学习算法和微服务架构,我们能够构建一个高效、稳定且可扩展的系统。

关键技术要点:

  1. 异步并发:使用asyncio和httpx实现高性能并发爬取

  2. 智能处理:集成机器学习进行自动分类和去重

  3. 弹性架构:采用微服务和消息队列实现系统解耦

  4. 监控保障:完善的监控和日志体系确保系统稳定性

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/25 12:01:43

从零构建C++ AIGC推理框架,实现超高吞吐量的实战路径

第一章&#xff1a;从零构建C AIGC推理框架的必要性在人工智能生成内容&#xff08;AIGC&#xff09;快速发展的背景下&#xff0c;高性能、低延迟的推理系统成为实际落地的关键。尽管Python生态提供了丰富的深度学习框架&#xff0c;但在生产环境中&#xff0c;尤其是在对性能…

作者头像 李华
网站建设 2026/2/28 23:16:13

vue+uniapp+ssm农副产品交易系统原生小程序vue

文章目录摘要主要技术与实现手段系统设计与实现的思路系统设计方法java类核心代码部分展示结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;摘要 基于Vue.js、Uniapp和SSM框架的农副产品交易系统原生小程序&#xff0c;旨在为农户和消…

作者头像 李华
网站建设 2026/2/25 4:15:21

Keil5编辑器字符编码设置从零实现

彻底解决Keil5中文注释乱码&#xff1a;从编码原理到实战配置 你有没有遇到过这样的场景&#xff1f;在Keil5里辛辛苦苦写了一段中文注释&#xff0c;回头一看——满屏方块、问号&#xff0c;甚至变成一堆看不懂的“火星文”&#xff1f;而同事用VS Code打开同一个文件却显示正…

作者头像 李华
网站建设 2026/2/26 4:00:13

国内访问HuggingFace困难?试试这些稳定镜像网站

国内访问HuggingFace困难&#xff1f;试试这些稳定镜像网站 在AI研发的日常中&#xff0c;你是否也遇到过这样的场景&#xff1a;满怀期待地打开终端&#xff0c;准备下载一个热门的Stable Diffusion模型或LLM权重&#xff0c;结果git clone命令卡在10%一动不动&#xff1f;再刷…

作者头像 李华
网站建设 2026/2/28 19:41:24

PCBA高密度互连设计:微小间距器件布局方案

微小间距器件布局实战&#xff1a;突破高密度PCBA的设计瓶颈你有没有遇到过这样的场景&#xff1f;项目进入关键阶段&#xff0c;原理图已经敲定&#xff0c;芯片选型也完成了——结果在PCB布局时卡住了。一个0.4 mm节距的BGA封装DSP芯片摆在板子中央&#xff0c;引脚密密麻麻像…

作者头像 李华