批量处理大量文本?异步任务队列整合部署教程
1. 引言:当语义填空遇上高并发需求
你有没有遇到过这样的场景:手头有一批文案需要自动补全关键词,比如广告语中的空白、试题里的成语填空,甚至是一段段待修复的错别字文本?如果只是单条输入,BERT 的智能填空服务已经足够快——毫秒级响应,体验丝滑。但一旦面对成百上千条文本,问题就来了:逐条提交太慢,界面卡顿,系统压力陡增。
这时候,光靠一个轻量模型可不够看了。我们需要的是——异步任务队列。
本文将带你完成一次完整的进阶部署:在已有的 BERT 中文掩码语言模型基础上,集成Celery + Redis构建异步任务系统,实现对大批量文本的后台批量处理。整个过程无需重写核心模型逻辑,只需添加少量调度代码,就能让原本只能“一对一”交互的服务,升级为支持“一对多”的生产级应用。
无论你是想做教育题库自动化填充、内容平台语义纠错,还是企业级文本预处理流水线,这套方案都能直接复用。
本教程你能学到什么?
- 如何为已有 WebAI 服务接入异步任务机制
- Celery 与 FastAPI 的协同工作模式
- 使用 Redis 作为消息中间件的实际配置方法
- 提交批量任务、查询进度、获取结果的完整流程
🧰前置知识提醒:建议你已熟悉 Python 基础、REST API 概念,并能运行基础的 Web 服务(如 Flask/FastAPI)。若尚未部署原始镜像,请先完成初始环境搭建再继续本教程。
2. 系统架构设计:从同步到异步的跃迁
2.1 原有系统的局限性
当前的 BERT 填空服务基于 FastAPI 构建,采用典型的同步请求-响应模式:
用户输入 → Web服务器接收 → 调用模型推理 → 返回结果这在单次请求下表现优异,但在高负载时会出现以下问题:
- 长时间推理阻塞主线程,导致其他请求排队等待
- 浏览器可能因超时中断连接
- 无法追踪任务执行状态(进行中/已完成)
- 不支持断点续查或结果缓存
换句话说,它适合“人机互动”,但不适合“机器批处理”。
2.2 引入异步任务队列后的架构升级
我们引入Celery作为任务调度引擎,Redis作为任务队列和结果存储,形成如下新架构:
[Web UI] ↓ 提交任务 [FastAPI Server] ↓ 发布任务 [Celery Worker ←→ Redis] ↓ 执行推理 [BERT Model] ↓ 回写结果 [Redis Result Backend] ↑ 查询结果 [FastAPI / 用户]这个结构带来了几个关键优势:
- 解耦请求与执行:用户提交后立即返回任务ID,无需等待
- 支持并发处理:多个 Worker 可并行消费任务
- 容错性强:任务失败可重试,结果持久化保存
- 易于扩展:后续可加入优先级队列、定时任务等高级功能
3. 环境准备与依赖安装
3.1 确认基础服务运行正常
确保你的原始 BERT 填空服务正在运行,通常可通过以下命令启动:
uvicorn app.main:app --host 0.0.0.0 --port 8000访问http://localhost:8000应能看到 WebUI 界面,输入[MASK]示例可正常返回预测结果。
3.2 安装异步任务相关依赖
在项目根目录创建或更新requirements.txt,加入以下内容:
celery==5.4.0 redis==5.0.1 fastapi[all] uvicorn transformers torch然后执行安装:
pip install -r requirements.txt注意:请确保 Redis 服务已安装并运行。Linux 用户可用
sudo apt install redis-server,Mac 用户推荐brew install redis,Windows 用户可使用 WSL 或 Docker 启动 Redis 容器。
启动 Redis 服务:
redis-server --daemonize yes验证是否运行成功:
redis-cli ping # 返回 PONG 即表示正常4. 核心模块开发:构建异步任务系统
4.1 创建任务配置文件
新建celery_config.py,定义 Celery 实例:
from celery import Celery # 配置 Redis 为 broker 和 backend app = Celery( 'bert_fill_task', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1' ) # 可选:设置任务序列化方式 app.conf.accept_content = ['json'] app.conf.task_serializer = 'json'4.2 编写异步任务函数
新建tasks.py,封装模型调用逻辑:
import time from celery_config import app from app.inference import predict_mask # 假设原推理函数在此处 @app.task(bind=True, max_retries=3) def fill_mask_async(self, text_list): """ 异步批量处理 MASK 填空任务 :param text_list: 包含 [MASK] 的中文句子列表 :return: 每条文本的 top5 预测结果 """ try: results = [] for idx, text in enumerate(text_list): result = predict_mask(text) # 调用原始模型接口 results.append({ "index": idx, "text": text, "predictions": result }) return {"status": "success", "data": results} except Exception as exc: raise self.retry(exc=exc, countdown=10) # 失败自动重试说明:
bind=True允许任务绑定自身实例,用于重试机制max_retries=3设置最大重试次数countdown=10表示每次重试间隔 10 秒
4.3 扩展 FastAPI 接口支持任务提交
修改app/main.py,新增两个路由:
from fastapi import FastAPI from celery.result import AsyncResult from tasks import fill_mask_async app = FastAPI() @app.post("/submit_batch") async def submit_batch_task(texts: list[str]): task = fill_mask_async.delay(texts) return {"task_id": task.id, "message": "任务已提交,可通过 ID 查询进度"} @app.get("/task_status/{task_id}") async def get_task_status(task_id: str): task_result = AsyncResult(task_id, app=celery_config.app) if task_result.state == 'PENDING': response = {'state': task_result.state, 'status': '任务排队中'} elif task_result.state == 'PROGRESS': response = {'state': task_result.state, 'status': '任务执行中'} else: response = { 'state': task_result.state, 'result': task_result.result } return response这样我们就有了两个关键接口:
POST /submit_batch:提交一批带[MASK]的文本GET /task_status/{id}:轮询任务状态和结果
5. 使用示例:发起一次批量填空任务
5.1 准备测试数据
假设我们要批量处理以下五句话:
[ "床前明月光,疑是地[MASK]霜。", "人生自古谁无死,留取丹心照汗[MASK]。", "山重水复疑无路,柳暗花明又一[MASK]。", "春风又[MACK]江南岸,明月何时照我还。", "海阔凭鱼跃,天高任鸟[MASK]。" ]5.2 提交任务(使用 curl 或 Postman)
curl -X POST http://localhost:8000/submit_batch \ -H "Content-Type: application/json" \ -d '["床前明月光,疑是地[MASK]霜。", "人生自古谁无死,留取丹心照汗[MASK]。"]'返回示例:
{ "task_id": "c7a3f8b2-1e5d-4b9a-8f3a-1d2c3b4e5f6a", "message": "任务已提交,可通过 ID 查询进度" }5.3 查询任务状态
curl http://localhost:8000/task_status/c7a3f8b2-1e5d-4b9a-8f3a-1d2c3b4e5f6a初期返回:
{"state": "PENDING", "status": "任务排队中"}完成后返回:
{ "state": "SUCCESS", "result": { "status": "success", "data": [ { "index": 0, "text": "床前明月光,疑是地[MASK]霜。", "predictions": ["上 (98%)", "下 (1%)", ...] }, ... ] } }6. 运行 Celery Worker 并监控任务
6.1 启动 Worker 进程
打开新终端,运行:
celery -A tasks worker --loglevel=info --concurrency=2参数说明:
-A tasks:指定任务模块--concurrency=2:启用两个工作线程(根据 CPU 核数调整)--loglevel=info:输出详细日志便于调试
你会看到类似日志:
INFO/MainProcess] Received task: tasks.fill_mask_async[c7a3f8b2-1e5d-4b9a-8f3a-1d2c3b4e5f6a] INFO/MainProcess] Task tasks.fill_mask_async[c7a3f8b2-...] succeeded in 1.23s6.2 (可选)启用结果清理策略
由于 Redis 不会自动清除已完成任务的结果,建议定期清理:
# 在任务完成后手动删除 result = AsyncResult(task_id) if result.ready(): result.forget() # 清除结果,释放内存或设置全局过期时间(在 Celery 配置中):
app.conf.result_expires = 3600 # 结果保留1小时7. 总结:打造可落地的批量语义处理系统
通过本次教程,我们成功将一个面向交互式使用的 BERT 填空服务,升级为支持批量异步处理的生产级系统。核心成果包括:
- 实现了非阻塞任务提交:用户不再需要等待长时间推理,提交即走。
- 构建了稳定的消息通道:Redis 保障任务不丢失,Celery 实现可靠执行。
- 保留了原有模型能力:无需改动模型代码,仅通过接口扩展即可赋能。
- 提供了清晰的状态反馈:前端可通过任务 ID 实时查询进度与结果。
这套架构不仅适用于 BERT 填空,还可轻松迁移至其他 NLP 任务,如:
- 批量情感分析
- 文本分类打标
- 错别字自动修正
- 多文档摘要生成
未来你可以进一步优化:
- 加入任务优先级队列
- 使用数据库替代 Redis 存储长期结果
- 搭配前端页面实现可视化任务管理面板
技术的价值在于解决真实问题。当你面对“一千个[MASK]等着填”的时候,希望这篇文章能成为你手中的第一把工具。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。