news 2026/6/10 1:48:11

基于异步协程与机器学习去重的智能招聘信息聚合python爬虫实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于异步协程与机器学习去重的智能招聘信息聚合python爬虫实战

引言:招聘信息聚合的挑战与机遇

在当今数字化招聘时代,求职者常常需要在多个招聘平台间切换搜索,而招聘网站反爬机制日益复杂,传统爬虫技术已难以应对。本文将介绍一个基于Python异步协程、智能代理池和机器学习去重技术的现代化招聘信息聚合爬虫系统,实现高效、稳定、智能的数据采集。

技术栈概览

  • 异步框架: asyncio + aiohttp + aiomysql

  • 反反爬技术: 动态代理池 + 请求指纹模拟 + 浏览器行为模拟

  • 智能解析: Playwright自动化 + XPath/CSS选择器 + 正则表达式

  • 数据存储: MySQL 8.0 + Redis + 异步数据库操作

  • 去重技术: SimHash算法 + 布隆过滤器 + 文本相似度计算

  • 监控部署: Prometheus + Grafana + Docker容器化

系统架构设计

python

""" 智能招聘信息聚合爬虫系统架构 """ import asyncio import aiohttp import aiomysql from typing import Dict, List, Optional from dataclasses import dataclass from datetime import datetime import hashlib import json @dataclass class JobPosition: """职位数据模型""" id: str title: str company: str salary: str location: str experience: str education: str tags: List[str] description: str source: str url: str publish_time: datetime crawl_time: datetime hash_value: str = None def __post_init__(self): """生成内容哈希值用于去重""" content = f"{self.title}{self.company}{self.description}" self.hash_value = self.generate_simhash(content) @staticmethod def generate_simhash(content: str, bits: int = 64) -> str: """SimHash算法生成文档指纹""" import numpy as np # 分词和哈希(简化版) words = content.split() vector = np.zeros(bits) for word in words: # 生成每个词的哈希 word_hash = bin(int(hashlib.md5(word.encode()).hexdigest(), 16))[2:].zfill(bits) # 加权累加 for i, bit in enumerate(word_hash): vector[i] += 1 if bit == '1' else -1 # 生成SimHash simhash = ''.join(['1' if v > 0 else '0' for v in vector]) return simhash

核心爬虫实现

1. 异步爬虫引擎

python

import asyncio import aiohttp from aiohttp import ClientTimeout, TCPConnector from contextlib import asynccontextmanager import random import logging from urllib.parse import urlparse, urljoin import backoff logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AsyncSpiderEngine: """异步爬虫引擎""" def __init__(self, max_concurrent: int = 10, request_timeout: int = 30): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) self.timeout = ClientTimeout(total=request_timeout) self.session = None self.proxy_pool = ProxyPool() self.user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36' ] @asynccontextmanager async def create_session(self): """创建aiohttp会话""" connector = TCPConnector(limit=self.max_concurrent, ssl=False) async with aiohttp.ClientSession( connector=connector, timeout=self.timeout, headers=self._get_headers() ) as session: self.session = session yield session def _get_headers(self) -> Dict: """生成随机请求头""" return { 'User-Agent': random.choice(self.user_agents), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', } @backoff.on_exception( backoff.expo, (aiohttp.ClientError, asyncio.TimeoutError), max_tries=3, max_time=30 ) async def fetch(self, url: str, use_proxy: bool = True) -> Optional[str]: """异步获取页面内容""" async with self.semaphore: try: proxy = await self.proxy_pool.get_proxy() if use_proxy else None async with self.session.get( url, proxy=proxy, headers=self._get_headers(), cookies=self._get_cookies() ) as response: if response.status == 200: content = await response.text() # 更新代理评分 if proxy: await self.proxy_pool.update_score(proxy, True) return content else: logger.warning(f"请求失败: {url}, 状态码: {response.status}") if proxy: await self.proxy_pool.update_score(proxy, False) return None except Exception as e: logger.error(f"请求异常 {url}: {e}") return None def _get_cookies(self) -> Dict: """生成模拟cookies""" return { 'session_id': hashlib.md5(str(random.random()).encode()).hexdigest(), 'user_token': hashlib.md5(str(random.random()).encode()).hexdigest()[:16] }

2. 智能代理池实现

python

import asyncio import aiohttp from typing import List, Dict import random class ProxyPool: """智能代理池管理""" def __init__(self): self.proxies = [] self.proxy_scores = {} self.lock = asyncio.Lock() self.proxy_sources = [ 'http://www.proxy-list.org/', 'https://free-proxy-list.net/', 'http://www.gatherproxy.com/' ] async def initialize(self): """初始化代理池""" await self.refresh_proxies() # 启动定时刷新任务 asyncio.create_task(self._scheduled_refresh()) async def refresh_proxies(self): """刷新代理列表""" async with self.lock: new_proxies = [] for source in self.proxy_sources: proxies = await self._fetch_proxies_from_source(source) new_proxies.extend(proxies) # 验证代理可用性 valid_proxies = await self._validate_proxies(new_proxies) self.proxies = valid_proxies # 初始化分数 for proxy in valid_proxies: self.proxy_scores[proxy] = 100 async def get_proxy(self) -> Optional[str]: """获取高质量代理""" if not self.proxies: await self.refresh_proxies() # 根据分数选择代理(加权随机) weighted_proxies = [] for proxy in self.proxies: weight = self.proxy_scores.get(proxy, 50) weighted_proxies.extend([proxy] * weight) return random.choice(weighted_proxies) if weighted_proxies else None async def update_score(self, proxy: str, success: bool): """更新代理评分""" current_score = self.proxy_scores.get(proxy, 50) if success: new_score = min(current_score + 10, 200) else: new_score = max(current_score - 30, 0) if new_score <= 0: self.proxies.remove(proxy) self.proxy_scores.pop(proxy, None) return self.proxy_scores[proxy] = new_score async def _scheduled_refresh(self): """定时刷新代理""" while True: await asyncio.sleep(3600) # 每小时刷新一次 await self.refresh_proxies()

3. Playwright动态渲染支持

python

import asyncio from playwright.async_api import async_playwright from bs4 import BeautifulSoup class DynamicPageRenderer: """处理JavaScript动态渲染的页面""" def __init__(self): self.browser = None self.context = None async def __aenter__(self): self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=True, args=['--disable-blink-features=AutomationControlled'] ) self.context = await self.browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', viewport={'width': 1920, 'height': 1080} ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.context.close() await self.browser.close() await self.playwright.stop() async def render_page(self, url: str, wait_for_selector: str = None) -> str: """渲染动态页面""" page = await self.context.new_page() # 模拟人类行为 await self._simulate_human_behavior(page) try: await page.goto(url, wait_until='networkidle') if wait_for_selector: await page.wait_for_selector(wait_for_selector, timeout=10000) # 随机滚动 await self._random_scroll(page) # 获取渲染后的内容 content = await page.content() return content finally: await page.close() async def _simulate_human_behavior(self, page): """模拟人类浏览行为""" # 随机延迟 await asyncio.sleep(random.uniform(1, 3)) # 随机移动鼠标 await page.mouse.move( random.randint(100, 500), random.randint(100, 500) ) async def _random_scroll(self, page): """随机滚动页面""" for _ in range(random.randint(2, 5)): scroll_amount = random.randint(300, 800) await page.evaluate(f"window.scrollBy(0, {scroll_amount})") await asyncio.sleep(random.uniform(0.5, 1.5))

4. 招聘网站解析器

python

class JobSiteParser: """招聘网站解析基类""" def __init__(self): self.spider = AsyncSpiderEngine() self.renderer = DynamicPageRenderer() async def parse_job_list(self, url: str) -> List[Dict]: """解析职位列表页""" raise NotImplementedError async def parse_job_detail(self, url: str) -> Optional[JobPosition]: """解析职位详情页""" raise NotImplementedError def extract_salary_range(self, salary_text: str) -> Dict: """提取薪资范围""" import re pattern = r'(\d+\.?\d*)[kK]?-?(\d+\.?\d*)?[kK]?' match = re.search(pattern, salary_text) if match: min_salary = float(match.group(1)) * 1000 max_salary = float(match.group(2)) * 1000 if match.group(2) else min_salary return { 'min': min_salary, 'max': max_salary, 'text': salary_text } return {'min': 0, 'max': 0, 'text': salary_text} class BossZhiPinParser(JobSiteParser): """Boss直聘解析器""" async def parse_job_list(self, url: str) -> List[Dict]: """解析Boss直聘列表页""" content = await self.spider.fetch(url) if not content: # 尝试使用动态渲染 async with self.renderer: content = await self.renderer.render_page( url, wait_for_selector='.job-list-box' ) soup = BeautifulSoup(content, 'html.parser') jobs = [] for item in soup.select('.job-card-wrapper'): try: job = { 'title': item.select_one('.job-title').text.strip(), 'company': item.select_one('.company-name').text.strip(), 'salary': item.select_one('.salary').text.strip(), 'location': item.select_one('.job-area').text.strip(), 'experience': item.select_one('.tag-list').text.strip(), 'link': item.select_one('a')['href'], 'source': 'boss_zhipin' } jobs.append(job) except Exception as e: logger.error(f"解析职位项失败: {e}") return jobs async def parse_job_detail(self, url: str) -> Optional[JobPosition]: """解析Boss直聘详情页""" full_url = f"https://www.zhipin.com{url}" if not url.startswith('http') else url async with self.renderer: content = await self.renderer.render_page( full_url, wait_for_selector='.job-detail' ) soup = BeautifulSoup(content, 'html.parser') try: title = soup.select_one('.job-title').text.strip() company = soup.select_one('.company-name').text.strip() salary = soup.select_one('.salary').text.strip() # 提取其他信息 info_items = soup.select('.job-detail-section-item') location = info_items[0].text.strip() if len(info_items) > 0 else '' experience = info_items[1].text.strip() if len(info_items) > 1 else '' description = soup.select_one('.job-sec-text').text.strip() return JobPosition( id=hashlib.md5(full_url.encode()).hexdigest(), title=title, company=company, salary=salary, location=location, experience=experience, education='', # 可根据需要提取 tags=[], # 可根据需要提取 description=description, source='boss_zhipin', url=full_url, publish_time=datetime.now(), crawl_time=datetime.now() ) except Exception as e: logger.error(f"解析详情页失败 {full_url}: {e}") return None class LagouParser(JobSiteParser): """拉勾网解析器""" # 实现类似Boss直聘的解析逻辑 pass class ZhilianParser(JobSiteParser): """智联招聘解析器""" # 实现类似Boss直聘的解析逻辑 pass

5. 异步数据存储

python

import aiomysql from motor.motor_asyncio import AsyncIOMotorClient from redis import asyncio as aioredis import json class AsyncDataStorage: """异步数据存储管理器""" def __init__(self, mysql_config: Dict, redis_config: Dict, mongo_config: Dict = None): self.mysql_config = mysql_config self.redis_config = redis_config self.mongo_config = mongo_config self.pool = None self.redis = None self.mongo = None async def initialize(self): """初始化数据库连接""" # 初始化MySQL连接池 self.pool = await aiomysql.create_pool(**self.mysql_config) # 初始化Redis连接 self.redis = await aioredis.from_url( f"redis://{self.redis_config['host']}:{self.redis_config['port']}", password=self.redis_config.get('password'), db=self.redis_config.get('db', 0) ) # 初始化MongoDB连接(可选) if self.mongo_config: self.mongo = AsyncIOMotorClient(self.mongo_config['uri']) async def save_job(self, job: JobPosition): """保存职位信息""" # 1. 使用布隆过滤器去重 if await self.is_duplicate(job.hash_value): logger.info(f"检测到重复职位: {job.title}") return False async with self.pool.acquire() as conn: async with conn.cursor() as cursor: sql = """ INSERT INTO jobs ( id, title, company, salary, location, experience, education, description, source, url, publish_time, crawl_time, hash_value ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE crawl_time = VALUES(crawl_time) """ await cursor.execute(sql, ( job.id, job.title, job.company, job.salary, job.location, job.experience, job.education, job.description, job.source, job.url, job.publish_time, job.crawl_time, job.hash_value )) await conn.commit() # 2. 添加到布隆过滤器 await self.redis.setbit('job_bloom_filter', int(job.hash_value[:8], 16) % (2**20), 1) # 3. 缓存到Redis cache_key = f"job:{job.id}" await self.redis.setex( cache_key, 3600 * 24, # 缓存24小时 json.dumps(job.__dict__, default=str) ) return True async def is_duplicate(self, hash_value: str) -> bool: """使用布隆过滤器检查是否重复""" # 简单的布隆过滤器实现 position = int(hash_value[:8], 16) % (2**20) result = await self.redis.getbit('job_bloom_filter', position) return bool(result) async def close(self): """关闭数据库连接""" if self.pool: self.pool.close() await self.pool.wait_closed() if self.redis: await self.redis.close()

6. 主调度程序

python

class JobSpiderScheduler: """爬虫调度器""" def __init__(self): self.parsers = { 'boss_zhipin': BossZhiPinParser(), 'lagou': LagouParser(), 'zhilian': ZhilianParser() } self.storage = AsyncDataStorage( mysql_config={ 'host': 'localhost', 'port': 3306, 'user': 'root', 'password': 'password', 'db': 'job_spider', 'minsize': 1, 'maxsize': 10 }, redis_config={ 'host': 'localhost', 'port': 6379, 'password': '', 'db': 0 } ) self.task_queue = asyncio.Queue() self.results = [] async def run(self): """运行爬虫""" logger.info("开始运行招聘信息聚合爬虫...") # 初始化存储 await self.storage.initialize() # 定义爬取任务 tasks = [ ('boss_zhipin', 'https://www.zhipin.com/web/geek/job?query=python&city=101010100'), ('lagou', 'https://www.lagou.com/jobs/list_python?city=北京'), ('zhilian', 'https://sou.zhaopin.com/?jl=北京&kw=python') ] # 启动消费者任务 consumer_tasks = [ asyncio.create_task(self._consumer()) for _ in range(5) # 5个并发消费者 ] # 添加任务到队列 for task in tasks: await self.task_queue.put(task) # 等待所有任务完成 await self.task_queue.join() # 取消消费者任务 for consumer in consumer_tasks: consumer.cancel() # 关闭存储连接 await self.storage.close() logger.info(f"爬虫完成,共采集 {len(self.results)} 个职位") async def _consumer(self): """消费者:处理爬取任务""" while True: try: source, url = await self.task_queue.get() parser = self.parsers.get(source) if parser: # 解析列表页 jobs = await parser.parse_job_list(url) for job_info in jobs[:10]: # 限制每个站点爬取数量 # 解析详情页 job = await parser.parse_job_detail(job_info['link']) if job: # 保存到数据库 success = await self.storage.save_job(job) if success: self.results.append(job) logger.info(f"成功保存职位: {job.title}") # 礼貌延迟 await asyncio.sleep(random.uniform(1, 3)) self.task_queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f"处理任务失败: {e}") self.task_queue.task_done() async def main(): """主函数""" scheduler = JobSpiderScheduler() try: await scheduler.run() except KeyboardInterrupt: logger.info("收到中断信号,优雅退出...") except Exception as e: logger.error(f"爬虫运行异常: {e}") finally: logger.info("爬虫结束") if __name__ == "__main__": # 创建数据库表(一次性执行) create_table_sql = """ CREATE TABLE IF NOT EXISTS jobs ( id VARCHAR(64) PRIMARY KEY, title VARCHAR(255) NOT NULL, company VARCHAR(255) NOT NULL, salary VARCHAR(100), location VARCHAR(100), experience VARCHAR(100), education VARCHAR(100), description TEXT, source VARCHAR(50), url VARCHAR(500), publish_time DATETIME, crawl_time DATETIME, hash_value VARCHAR(128), INDEX idx_title (title(100)), INDEX idx_company (company(100)), INDEX idx_source (source), INDEX idx_crawl_time (crawl_time) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; """ # 运行爬虫 asyncio.run(main())

高级功能扩展

1. 机器学习去重优化

python

from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import numpy as np class SmartDeduplicator: """智能去重器""" def __init__(self): self.vectorizer = TfidfVectorizer(max_features=5000) self.job_vectors = [] self.job_ids = [] def calculate_similarity(self, text1: str, text2: str) -> float: """计算文本相似度""" vectors = self.vectorizer.fit_transform([text1, text2]) similarity = cosine_similarity(vectors[0:1], vectors[1:2])[0][0] return similarity def is_similar_job(self, new_job: JobPosition, threshold: float = 0.8) -> bool: """判断是否为相似职位""" for job_id, vector in zip(self.job_ids, self.job_vectors): similarity = cosine_similarity( self.vectorizer.transform([new_job.description]), vector )[0][0] if similarity > threshold: return True return False

2. 反爬策略监控

python

class AntiAntiSpiderMonitor: """反反爬监控器""" def __init__(self): self.request_count = 0 self.blocked_count = 0 self.success_count = 0 async def monitor_request(self, url: str, success: bool): """监控请求状态""" self.request_count += 1 if success: self.success_count += 1 else: self.blocked_count += 1 # 计算成功率 success_rate = self.success_count / max(self.request_count, 1) # 如果成功率过低,触发警报 if success_rate < 0.3: logger.warning(f"爬虫被频繁拦截,成功率: {success_rate:.2%}") await self.adjust_strategy() async def adjust_strategy(self): """调整爬取策略""" logger.info("调整爬取策略:增加延迟、更换代理、更换User-Agent") # 实现策略调整逻辑

部署与监控

Docker部署配置

dockerfile

# Dockerfile FROM python:3.9-slim WORKDIR /app # 安装依赖 COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # 安装Playwright浏览器 RUN playwright install chromium # 复制代码 COPY . . # 运行爬虫 CMD ["python", "main.py"]

Prometheus监控配置

yaml

# prometheus.yml scrape_configs: - job_name: 'job_spider' static_configs: - targets: ['localhost:9091']
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 19:48:55

懒人专属:用预装MGeo的云端镜像实现中文地址智能去重

懒人专属&#xff1a;用预装MGeo的云端镜像实现中文地址智能去重 在日常政务系统维护中&#xff0c;经常会遇到地址数据混乱的问题。比如同一个小区可能被记录为"XX小区3期"和"XX小区三期"&#xff0c;传统正则匹配难以准确识别这类语义相似的地址。本文将…

作者头像 李华
网站建设 2026/6/9 22:31:13

考古新助手:MGeo在历史地名对齐中的应用

考古新助手&#xff1a;MGeo在历史地名对齐中的应用 历史地理学研究中&#xff0c;经常需要将古代文献中的地名与现代GIS系统中的地理坐标对齐。传统方法依赖人工比对或简单字符串匹配&#xff0c;难以处理古今地名语义变化、行政区划调整等复杂情况。MGeo作为多模态地理语言模…

作者头像 李华
网站建设 2026/6/9 19:44:20

模型微调入门:基于预置镜像的MGeo定制化训练

模型微调入门&#xff1a;基于预置镜像的MGeo定制化训练 如果你正在处理地理地址相关的AI任务&#xff0c;比如针对特定地区的地址特点进行模型微调&#xff0c;但苦于本地显卡显存不足&#xff0c;这篇文章就是为你准备的。MGeo是由达摩院与高德联合开发的多模态地理文本预训练…

作者头像 李华
网站建设 2026/6/10 1:07:32

模型解释性:理解MGeo地址匹配决策的可视化方法

模型解释性&#xff1a;理解MGeo地址匹配决策的可视化方法 在金融机构的风控业务中&#xff0c;客户地址匹配是一个关键环节。MGeo作为多模态地理语言模型&#xff0c;能够高效完成地址标准化和匹配任务&#xff0c;但仅调用API获取结果往往无法满足监管对模型可解释性的要求。…

作者头像 李华
网站建设 2026/6/9 21:29:29

成本减半:用按需GPU优化MGeo地址服务的运营支出

成本减半&#xff1a;用按需GPU优化MGeo地址服务的运营支出 引言 在物流快递行业中&#xff0c;地址识别与标准化是核心业务环节之一。某快递公司的技术团队发现自建MGeo推理服务器利用率波动大&#xff0c;固定成本居高不下。本文将分享如何通过按需GPU资源优化MGeo地址服务的…

作者头像 李华
网站建设 2026/6/9 20:58:17

AIGC内容去重全攻略:精选工具测评与核心概念深度解析

核心工具对比速览 工具名称 核心功能 适用场景 处理速度 特色优势 aibiye 降AIGC率查重 学术论文优化 20分钟 适配知网/格子达/维普规则 aicheck AIGC检测 风险区域识别 实时 可视化热力图报告 askpaper 学术内容优化 论文降重 20分钟 保留专业术语 秒篇 …

作者头像 李华