爬虫被网站封 IP 是家常便饭。每次被封就去手动换 IP 效率太低,正确的做法是搭建一个代理 IP 池——自动采集、自动验证、自动切换。
一、代理池的整体架构
代理来源(采集/购买) ↓ 存入代理池(Redis/数据库) ↓ 定时验证(检测可用性、延迟、匿名度) ↓ API 接口(爬虫按需获取可用代理) ↓ 爬虫使用(被封后自动标记并换下一个)二、代理来源
1. 采集免费代理
importrequestsfrombs4importBeautifulSoupimporttimedeffetch_89ip():"""从 89IP 采集免费代理"""proxies=[]url="https://www.89ip.com/"try:resp=requests.get(url,timeout=10)soup=BeautifulSoup(resp.text,"html.parser")rows=soup.select("table tr")[1:]# 跳过表头forrowinrows:cells=row.select("td")iflen(cells)>=2:ip=cells[0].text.strip()port=cells[1].text.strip()proxies.append(f"{ip}:{port}")exceptExceptionase:print(f"采集失败:{e}")returnproxies免费代理的特点:
- 90% 不可用(需要验证)
- 速度慢(平均 3-5 秒)
- 匿名度低
- 适合学习和小规模爬虫
2. 购买付费代理
classProxyProvider:"""代理服务商 API 提取"""def__init__(self,api_url):self.api_url=api_urldeffetch(self):"""从 API 提取代理"""try:resp=requests.get(self.api_url,timeout=10)ifresp.status_code==200:data=resp.json()return[f"{item['ip']}:{item['port']}"foritemindata.get("data",[])]exceptExceptionase:print(f"提取失败:{e}")return[]三、代理池核心实现
1. 存储层(Redis)
importredisimportjsonclassProxyRedis:"""代理池 Redis 存储"""def__init__(self,host="localhost",port=6379,db=0):self.client=redis.StrictRedis(host=host,port=port,db=db)self.key="proxy_pool"defadd(self,proxy,score=10):"""添加代理(分数越高越优先使用)"""self.client.zadd(self.key,{proxy:score})defget(self):"""获取一个可用代理(分数最高的)"""result=self.client.zrevrange(self.key,0,0)returnresult[0].decode()ifresultelseNonedefget_all(self):"""获取所有代理"""return[p.decode()forpinself.client.zrange(self.key,0,-1)]defremove(self,proxy):"""移除不可用的代理"""self.client.zrem(self.key,proxy)defdecrease(self,proxy):"""降低代理分数(连续失败后移除)"""score=self.client.zscore(self.key,proxy)ifscoreandscore>1:self.client.zincrby(self.key,-1,proxy)else:self.remove(proxy)defcount(self):"""代理数量"""returnself.client.zcard(self.key)2. 验证层
importthreadingimporttimeimportrequestsclassProxyValidator:"""代理验证器"""# 测试用 URL(建议用稳定的网站)TEST_URLS=["http://httpbin.org/ip","https://www.baidu.com",]def__init__(self,proxy_redis,timeout=5):self.proxy_redis=proxy_redis self.timeout=timeoutdefvalidate(self,proxy):"""验证单个代理是否可用"""fortest_urlinself.TEST_URLS:try:resp=requests.get(test_url,proxies={"http":proxy,"https":proxy},timeout=self.timeout,)ifresp.status_code==200:returnTrueexcept:passreturnFalsedefvalidate_all(self):"""验证所有代理(低分优先验证,高分的减少验证频率)"""proxies=self.proxy_redis.get_all()forproxyinproxies:ifself.validate(proxy):print(f"✅{proxy}可用")else:self.proxy_redis.decrease(proxy)print(f"❌{proxy}不可用,已扣分")3. 调度层
importscheduleimportthreadingclassProxyPoolScheduler:"""代理池调度器"""def__init__(self,proxy_redis,validator,providers=None):self.proxy_redis=proxy_redis self.validator=validator self.providers=providersor[]defcollect(self):"""定时采集"""print("开始采集代理...")count=0forproviderinself.providers:proxies=provider.fetch()forproxyinproxies:self.proxy_redis.add(proxy)count+=1print(f"采集完成,新增{count}个代理")defcheck(self):"""定时验证"""print(f"开始验证(当前代理数:{self.proxy_redis.count()})...")self.validator.validate_all()print(f"验证完成(剩余代理:{self.proxy_redis.count()})")defstart(self):"""启动定时任务"""# 每 30 分钟采集一次schedule.every(30).minutes.do(self.collect)# 每 10 分钟验证一次schedule.every(10).minutes.do(self.check)# 先执行一次self.collect()self.check()whileTrue:schedule.run_pending()time.sleep(30)defstart_async(self):"""异步启动(不阻塞主线程)"""thread=threading.Thread(target=self.start,daemon=True)thread.start()print("代理池调度器已启动(后台运行)")四、API 接口
fromflaskimportFlask,jsonifyimportredis app=Flask(__name__)proxy_redis=ProxyRedis()@app.route("/proxy")defget_proxy():"""获取一个代理"""proxy=proxy_redis.get()ifproxy:returnjsonify({"code":200,"proxy":proxy})returnjsonify({"code":404,"message":"代理池为空"})@app.route("/proxy/list")deflist_proxies():"""获取所有代理"""proxies=proxy_redis.get_all()returnjsonify({"code":200,"count":len(proxies),"data":proxies})@app.route("/proxy/count")defproxy_count():"""代理数量"""returnjsonify({"code":200,"count":proxy_redis.count()})if__name__=="__main__":app.run(port=5010)五、在爬虫中使用
importrequestsimporttimeclassProxyCrawler:"""使用代理池的爬虫"""def__init__(self,proxy_api="http://localhost:5010/proxy"):self.proxy_api=proxy_apidefget_proxy(self):"""从代理池获取代理"""try:resp=requests.get(self.proxy_api,timeout=3)data=resp.json()ifdata["code"]==200:returndata["proxy"]except:passreturnNonedefrequest(self,url,max_retries=5):"""带代理重试的请求"""foriinrange(max_retries):proxy=self.get_proxy()ifnotproxy:print("代理池为空,等待...")time.sleep(5)continuetry:resp=requests.get(url,proxies={"http":proxy,"https":proxy},timeout=10,headers={"User-Agent":"Mozilla/5.0"},)ifresp.status_code==200:returnrespexcept:print(f"代理{proxy}不可用,换下一个...")returnNonedefcrawl(self,urls):"""批量爬取"""forurlinurls:resp=self.request(url)ifresp:print(f"✅{url}爬取成功")else:print(f"❌{url}爬取失败")time.sleep(1)# 使用crawler=ProxyCrawler()crawler.crawl(["https://example.com/page/1","https://example.com/page/2"])六、完整搭建流程
# 一键启动代理池if__name__=="__main__":# 1. 初始化 Redis 存储redis_store=ProxyRedis()# 2. 初始化验证器validator=ProxyValidator(redis_store)# 3. 配置代理来源providers=[ProxyProvider("https://api.example.com/get_proxy"),]# 4. 启动调度器(后台运行)scheduler=ProxyPoolScheduler(redis_store,validator,providers)scheduler.start_async()# 5. 启动 API 服务app.run(port=5010)七、付费代理说明
免费代理只适合学习和测试,正式项目建议购买付费代理API。一个月几十块钱,省去采集验证的麻烦,可用率和速度都有保障。选择时关注以下几点:
| 指标 | 说明 |
|---|---|
| 可用率 | 最好 85% 以上 |
| 速度 | 平均响应时间 3 秒以内 |
| 匿名度 | 高匿名(不暴露真实 IP) |
| 协议 | 支持 HTTP/HTTPS |
| 提取方式 | API 接口提取,按量或按时付费 |
💡 觉得有用的话,点赞 + 关注【张老师技术栈】吧!每周更新 Java/Python/爬虫 实战干货,不让你白来。