AI工作流编排:用Airflow调度cv_unet批量任务实战
1. 为什么需要把WebUI任务搬进Airflow?
你可能已经用过科哥开发的cv_unet_image-matting WebUI——那个紫蓝渐变界面、三秒出图、支持单图和批量抠图的AI工具。它很友好,但有个现实问题:当你要每天凌晨自动处理300张电商商品图、每周定时清洗用户上传的人像素材、或者在数据管道中把抠图作为固定环节时,点点鼠标就不管用了。
这时候,你需要的不是另一个图形界面,而是一套能写进代码、能设定时、能监控状态、能重试失败、能串联上下游的生产级调度系统。Apache Airflow 就是为此而生的。
它不替代WebUI,而是“指挥”WebUI——就像一个经验丰富的班组长,不亲手拧螺丝,但清楚每道工序何时启动、谁来干、干完怎么交接、出错了找谁。
本文不讲Airflow原理,只聚焦一件事:如何让Airflow真正跑起来,调用cv_unet完成批量抠图任务,并稳定交付结果。所有步骤都经过实测,命令可复制、配置可复用、问题有解法。
2. 环境准备:让Airflow认识你的cv_unet服务
Airflow本身不直接执行图像处理,它需要通过某种方式“触达”正在运行的cv_unet服务。我们采用最轻量、最可控的方式:HTTP API调用 + 文件系统协同。
2.1 确认cv_unet已暴露本地API(关键前提)
科哥的WebUI默认监听http://localhost:7860,但它只提供前端界面。我们需要让它同时响应后端请求。好消息是:Gradio应用天然支持API端点,无需额外开发。
打开你的cv_unet项目目录,检查app.py或启动脚本中是否包含类似以下代码:
# 确保启动时启用API demo.launch( server_name="0.0.0.0", server_port=7860, share=False, enable_queue=True # 必须开启队列,否则并发请求会阻塞 )验证API是否就绪:
在浏览器访问http://localhost:7860/docs—— 如果看到Swagger文档页面,说明API已激活。
(若无此页面,请在启动命令后加--enable-api参数,例如:gradio app.py --enable-api)
2.2 构建Airflow运行环境(精简版)
我们不装全套Hadoop生态,只用最核心组件:
# 创建独立Python环境(推荐Python 3.9+) python -m venv airflow_env source airflow_env/bin/activate # Linux/Mac # airflow_env\Scripts\activate # Windows # 安装Airflow(2.8+,兼容性好) pip install "apache-airflow[postgres,celery]==2.8.3" # 初始化数据库(使用SQLite,适合中小规模任务) airflow db init # 创建管理员用户(用于Web界面登录) airflow users create \ --username admin \ --password admin123 \ --firstname Admin \ --lastname User \ --role Admin \ --email admin@example.com注意:生产环境建议换PostgreSQL + Celery,但本文所有操作在SQLite单机模式下完全可行,避免复杂依赖干扰主线。
2.3 目录结构约定(Airflow与cv_unet协同基础)
Airflow需要知道“往哪放图、从哪取结果”。我们统一约定路径(可按需修改,但需全局一致):
/home/user/cv_unet/ # cv_unet项目根目录 ├── inputs/ # Airflow放入待处理图片的文件夹 ├── outputs/ # cv_unet输出结果的文件夹(WebUI默认已有) └── run.sh # 启动脚本(如你描述中所示)Airflow DAG将负责:
- 把新图片拷贝到
inputs/ - 调用API触发批量处理
- 等待
outputs/出现结果文件 - 归档或推送结果
3. 核心DAG编写:四步完成一次可靠调度
下面是一个完整、可运行的Airflow DAG文件(保存为dags/cv_unet_batch_dag.py):
# dags/cv_unet_batch_dag.py from datetime import datetime, timedelta import os import time import requests import logging from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.sensors.filesystem import FileSensor from airflow.models import Variable # --- 配置区(按实际路径修改)--- CV_UNET_API_URL = "http://localhost:7860/api/predict/" # Gradio API默认路径 INPUT_DIR = "/home/user/cv_unet/inputs" OUTPUT_DIR = "/home/user/cv_unet/outputs" BATCH_TIMEOUT = 300 # 批量处理最长等待时间(秒) # 日志配置 logger = logging.getLogger("cv_unet_dag") default_args = { 'owner': 'data-engineer', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': False, 'retries': 2, 'retry_delay': timedelta(minutes=1), } dag = DAG( 'cv_unet_batch_processing', default_args=default_args, description='Use Airflow to schedule cv_unet batch matting tasks', schedule_interval='0 2 * * *', # 每天凌晨2点执行 catchup=False, tags=['cv', 'matting', 'airflow'], ) # --- 步骤1:检查输入目录是否有新图片 --- def check_input_files(**context): files = [f for f in os.listdir(INPUT_DIR) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp'))] if not files: raise ValueError(f"No image files found in {INPUT_DIR}. Task skipped.") logger.info(f"Found {len(files)} images to process: {files}") # 将文件列表存入XCom,供下游使用 context['task_instance'].xcom_push(key='input_files', value=files) return len(files) check_inputs = PythonOperator( task_id='check_input_files', python_callable=check_input_files, dag=dag, ) # --- 步骤2:调用cv_unet API启动批量处理 --- def trigger_batch_api(**context): input_files = context['task_instance'].xcom_pull(key='input_files') # 构造Gradio API请求体(适配cv_unet批量接口) # 注意:此处假设cv_unet WebUI的批量处理组件位于第2个输入框(index=1),请根据实际Gradio界面顺序调整 payload = { "data": [ None, # 第一个输入(单图上传,留空) input_files, # 第二个输入(批量文件名列表) "#ffffff", # 背景色 "PNG", # 输出格式 ], "event_data": None, "fn_index": 2 # 批量处理函数索引,需实测确认(通常为2或3) } try: logger.info("Calling cv_unet batch API...") response = requests.post(CV_UNET_API_URL, json=payload, timeout=30) response.raise_for_status() result = response.json() logger.info(f"API triggered successfully. Response: {result.get('data', 'no data')[:100]}") except Exception as e: logger.error(f"Failed to call cv_unet API: {e}") raise trigger_api = PythonOperator( task_id='trigger_cv_unet_batch', python_callable=trigger_batch_api, dag=dag, ) # --- 步骤3:等待输出完成(轮询检测outputs/下生成zip包)--- def wait_for_output(**context): start_time = time.time() zip_path = os.path.join(OUTPUT_DIR, "batch_results.zip") while time.time() - start_time < BATCH_TIMEOUT: if os.path.exists(zip_path) and os.path.getsize(zip_path) > 1024: # 大于1KB视为有效 logger.info(f"Output zip ready: {zip_path}") return True time.sleep(5) raise TimeoutError(f"Batch output not generated within {BATCH_TIMEOUT}s. Check cv_unet logs.") wait_output = PythonOperator( task_id='wait_for_batch_output', python_callable=wait_for_output, dag=dag, ) # --- 步骤4:归档并清理(示例:移动zip到日期文件夹)--- def archive_and_clean(**context): from datetime import datetime import shutil now = datetime.now().strftime("%Y%m%d_%H%M%S") archive_dir = os.path.join(OUTPUT_DIR, "archive", now) os.makedirs(archive_dir, exist_ok=True) zip_path = os.path.join(OUTPUT_DIR, "batch_results.zip") if os.path.exists(zip_path): shutil.move(zip_path, os.path.join(archive_dir, f"batch_{now}.zip")) logger.info(f"Archived to {archive_dir}") # 清空inputs/(谨慎!确保上游已确认处理完成) for f in os.listdir(INPUT_DIR): if f.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')): os.remove(os.path.join(INPUT_DIR, f)) logger.info("Cleaned input directory.") archive_task = PythonOperator( task_id='archive_and_clean', python_callable=archive_and_clean, dag=dag, ) # --- 任务依赖关系 --- check_inputs >> trigger_api >> wait_output >> archive_task3.1 关键配置说明(避坑指南)
| 配置项 | 说明 | 如何确认 |
|---|---|---|
fn_index | Gradio函数在API中的序号 | 启动WebUI后访问/docs→ 查看/api/predict/接口 → 找到批量处理函数的fn_index值(通常为2或3) |
INPUT_DIR/OUTPUT_DIR | 必须与cv_unet实际路径完全一致 | 运行ls -l /home/user/cv_unet/inputs确认权限可读写 |
BATCH_TIMEOUT | 预估最大处理时间 | 单图3秒 × 图片数 + 网络开销,建议预留1.5倍缓冲 |
实测提示:首次运行前,手动向
inputs/放2张测试图,然后在Airflow UI中触发一次DAG,观察日志定位fn_index和路径问题。
4. 效果验证:从调度到交付的全链路
启动Airflow Web服务:
airflow webserver # 新终端中启动调度器 airflow scheduler访问http://localhost:8080,用账号admin/admin123登录。
4.1 DAG状态可视化
- 在「DAGs」列表中找到
cv_unet_batch_processing - 点击进入,看到四个任务节点呈水平排列
- 点击右上角「Trigger DAG」手动执行一次
成功标志:
check_input_files显示绿色 ✔,日志中打印发现的文件名trigger_cv_unet_batch显示绿色 ✔,日志含"API triggered successfully"wait_for_batch_output显示绿色 ✔,日志含"Output zip ready"archive_and_clean显示绿色 ✔,outputs/archive/下出现带时间戳的zip包
4.2 输出结果验证
解压batch_20240515_020000.zip,你会看到:
batch_results/ ├── batch_1_20240515_020001.png # 第一张原图抠图结果 ├── batch_2_20240515_020002.png # 第二张 └── metadata.json # 记录处理时间、参数等对比原图,边缘自然、透明通道完整、无白边残留——这正是cv_unet的成熟表现。
5. 生产增强:让调度更健壮、更智能
上述DAG已可用,但真实场景还需三点加固:
5.1 失败自动告警(邮件/钉钉/企业微信)
Airflow原生支持邮件通知,只需在airflow.cfg中配置SMTP:
[smtp] smtp_host = smtp.qq.com smtp_starttls = True smtp_ssl = False smtp_user = your_email@qq.com smtp_password = your_app_password smtp_port = 587 smtp_mail_from = your_email@qq.com然后在DAG中添加:
default_args = { # ... 其他配置 'email_on_failure': True, 'email': ['ops@yourcompany.com'], }5.2 动态参数传递(不同业务用不同背景色)
把“背景色”从硬编码改为变量:
# 在Airflow UI → Admin → Variables 中创建 # Key: cv_unet_bg_color, Value: "#000000" bg_color = Variable.get("cv_unet_bg_color", default_var="#ffffff") # 然后在payload中替换:bg_color5.3 结果质量校验(防“假成功”)
在wait_for_output后增加一步:用OpenCV快速检查zip内首张图是否含透明通道:
def validate_output(**context): import cv2 import zipfile zip_path = os.path.join(OUTPUT_DIR, "batch_results.zip") with zipfile.ZipFile(zip_path) as z: first_img = [n for n in z.namelist() if n.endswith('.png')][0] with z.open(first_img) as f: img = cv2.imdecode(np.frombuffer(f.read(), np.uint8), cv2.IMREAD_UNCHANGED) if img.shape[2] != 4: # 不是RGBA,说明透明通道丢失 raise ValueError("Output image missing alpha channel!")6. 总结:你真正掌握的不是Airflow,而是AI工程化能力
回看整个过程,你做的远不止“写了个DAG”:
- 你打通了AI模型与工程系统的最后一公里:不再把模型当玩具,而是当作可编排、可监控、可集成的生产组件;
- 你建立了人机协作的新范式:科哥的WebUI负责“把事做对”,Airflow负责“把事做准时、做可靠、做可追溯”;
- 你获得了可复用的方法论:这套模式可平移至Stable Diffusion批量生图、Whisper语音转写、LLM批量摘要等任何Gradio封装的AI服务。
真正的AI落地,从来不是比谁的模型参数多,而是比谁能把模型稳稳地放进业务流水线里——这一次,你做到了。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。