news 2026/3/16 1:11:56

SiameseUniNLU部署教程:Airflow定时任务集成——每日自动解析新闻RSS并结构化入库

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
SiameseUniNLU部署教程:Airflow定时任务集成——每日自动解析新闻RSS并结构化入库

SiameseUniNLU部署教程:Airflow定时任务集成——每日自动解析新闻RSS并结构化入库

1. 为什么需要这套自动化流程

你有没有遇到过这样的场景:每天要从几十个新闻源里手动筛选、阅读、提取关键信息,再整理成结构化数据存入数据库?人工操作不仅耗时费力,还容易遗漏重点,更别说保持数据更新的及时性了。

SiameseUniNLU不是普通NLP模型,它是一套“开箱即用”的中文通用理解引擎。它不靠堆砌多个专用模型来应付不同任务,而是用一个统一框架,通过设计不同的Prompt模板,配合指针网络精准定位文本片段,一次性搞定命名实体识别、关系抽取、情感分析、事件提取等八类常见NLP任务。换句话说,你不用再为每个任务单独训练、部署、维护一套模型。

而本教程要解决的,正是把这套能力真正“用起来”——不是跑一次demo就结束,而是让它每天凌晨三点准时醒来,自动抓取新华社、人民日报、财新网等主流媒体的RSS源,逐条解析新闻正文,抽取出人物、地点、事件、情感倾向、核心关系等结构化字段,并写入MySQL或PostgreSQL数据库。整个过程无需人工干预,日志可查、任务可追溯、失败可重试。

这不是理论构想,而是已在实际内容中台项目中稳定运行三个月的生产级方案。接下来,我会带你从零开始,一步步完成本地部署、服务封装、Airflow调度配置和数据落库全流程,所有步骤都经过实测验证。

2. 环境准备与模型服务快速启动

2.1 基础依赖安装

确保服务器已安装Python 3.9+、Git和pip。推荐使用虚拟环境隔离依赖:

python3 -m venv uninlu-env source uninlu-env/bin/activate pip install --upgrade pip

安装核心依赖(注意:模型已预置在/root/nlp_structbert_siamese-uninlu_chinese-base/,无需额外下载):

cd /root/nlp_structbert_siamese-uninlu_chinese-base pip install torch transformers gradio requests python-dotenv

提示:该模型基于PyTorch + Transformers构建,大小约390MB,首次加载会缓存权重到~/.cache/huggingface/。若磁盘空间紧张,可提前设置缓存路径:export TRANSFORMERS_CACHE="/data/hf_cache"

2.2 启动SiameseUniNLU服务

模型服务脚本app.py已预置完整Web接口和API路由。我们采用后台守护方式启动,确保服务长期稳定:

# 进入模型目录 cd /root/nlp_structbert_siamese-uninlu_chinese-base # 启动服务(日志自动写入server.log) nohup python3 app.py > server.log 2>&1 & # 检查进程是否运行 ps aux | grep app.py | grep -v grep

服务默认监听0.0.0.0:7860,可通过以下任一地址访问:

  • Web界面:http://localhost:7860
  • 远程访问:http://YOUR_SERVER_IP:7860

验证服务可用性:打开浏览器访问Web界面,或执行一条简单API测试:

curl -X POST "http://localhost:7860/api/predict" \ -H "Content-Type: application/json" \ -d '{"text":"华为发布Mate60 Pro,搭载自研麒麟芯片","schema":"{\"产品\":null,\"公司\":null,\"技术\":null}"}'

正常响应应包含"result"字段,如{"产品":"Mate60 Pro","公司":"华为","技术":"麒麟芯片"}

2.3 服务管理常用命令

日常运维中,你可能需要查看状态、排查日志或重启服务。以下是高频操作清单:

操作命令
查看服务进程ps aux | grep app.py | grep -v grep
实时查看日志tail -f server.log
停止服务pkill -f "python3 app.py"
重启服务pkill -f "python3 app.py" && nohup python3 app.py > server.log 2>&1 &
检查端口占用lsof -ti:7860 | xargs kill -9(仅当端口被占时使用)

注意:若GPU不可用,服务会自动降级至CPU模式,响应时间略有延长(单条文本约1.2–2.5秒),但功能完全不受影响。

3. 构建RSS解析与结构化处理模块

3.1 RSS源配置与新闻抓取

我们不依赖第三方爬虫框架,而是使用轻量级feedparser直接解析标准RSS XML。创建rss_fetcher.py

# rss_fetcher.py import feedparser import time from datetime import datetime, timedelta import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 预定义主流新闻RSS源(可根据需求增删) RSS_SOURCES = [ {"name": "Xinhua", "url": "http://www.news.cn/world/rss.xml"}, {"name": "People's Daily", "url": "http://paper.people.com.cn/rmrb/xml/rss.xml"}, {"name": "Caixin", "url": "https://www.caixin.com/rss/all.xml"} ] def fetch_news_from_rss(days_back=1): """获取指定天数内的新闻条目""" cutoff_time = datetime.now() - timedelta(days=days_back) all_entries = [] for source in RSS_SOURCES: try: feed = feedparser.parse(source["url"]) logger.info(f"成功解析 {source['name']} RSS,共 {len(feed.entries)} 条") for entry in feed.entries[:20]: # 每源最多取20条,防过载 # 解析发布时间(兼容多种格式) pub_time = None if hasattr(entry, 'published_parsed') and entry.published_parsed: pub_time = datetime(*entry.published_parsed[:6]) elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: pub_time = datetime(*entry.updated_parsed[:6]) if pub_time and pub_time >= cutoff_time: all_entries.append({ "title": getattr(entry, 'title', '').strip(), "link": getattr(entry, 'link', ''), "summary": getattr(entry, 'summary', '').strip(), "published": pub_time.isoformat() if pub_time else None, "source": source["name"] }) except Exception as e: logger.error(f"解析 {source['name']} 失败:{e}") logger.info(f"共筛选出 {len(all_entries)} 条符合时间要求的新闻") return all_entries if __name__ == "__main__": entries = fetch_news_from_rss() print(f"示例条目:{entries[0] if entries else '无'}")

运行测试:

python rss_fetcher.py

3.2 调用SiameseUniNLU进行多任务联合解析

创建uninlu_processor.py,封装对模型API的调用逻辑,支持批量处理与错误重试:

# uninlu_processor.py import requests import time import logging from typing import Dict, List, Optional logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) UNINLU_API_URL = "http://localhost:7860/api/predict" def call_uninlu_api(text: str, schema: Dict, max_retries=3) -> Optional[Dict]: """安全调用Uninlu API,含重试机制""" for attempt in range(max_retries): try: response = requests.post( UNINLU_API_URL, json={"text": text, "schema": schema}, timeout=30 ) response.raise_for_status() result = response.json() if "result" in result: return result["result"] except Exception as e: logger.warning(f"API调用失败(第{attempt+1}次):{e}") if attempt < max_retries - 1: time.sleep(2 ** attempt) # 指数退避 logger.error(f"API调用失败,已重试{max_retries}次") return None def extract_news_entities(news_list: List[Dict]) -> List[Dict]: """对新闻列表进行结构化抽取""" results = [] for idx, news in enumerate(news_list): logger.info(f"正在处理第 {idx+1}/{len(news_list)} 条:{news.get('title', '')[:30]}...") # 构建多任务Schema(按需组合) full_schema = { "人物": None, "地理位置": None, "组织机构": None, "时间": None, "事件": None, "情感分类": None, "产品": None, "技术": None } # 合并标题与摘要作为输入文本(提升抽取完整性) input_text = f"{news.get('title', '')}。{news.get('summary', '')}" # 调用模型 extracted = call_uninlu_api(input_text, full_schema) if extracted: # 补充原始元数据 extracted.update({ "title": news["title"], "link": news["link"], "published": news["published"], "source": news["source"], "processed_at": time.strftime("%Y-%m-%d %H:%M:%S") }) results.append(extracted) else: logger.warning(f"跳过第 {idx+1} 条(抽取失败):{news.get('title', '')}") return results if __name__ == "__main__": # 示例:模拟处理3条新闻 test_news = [ {"title": "中国空间站完成在轨建造", "summary": "神舟十五号乘组与十四号乘组实现太空会师...", "published": "2023-05-30", "source": "Xinhua"}, {"title": "OpenAI发布GPT-4o", "summary": "新模型支持实时语音对话与多模态理解...", "published": "2023-05-15", "source": "Caixin"} ] res = extract_news_entities(test_news) print("抽取结果示例:", res[0] if res else "无结果")

3.3 数据清洗与标准化

真实新闻数据常含HTML标签、广告语、重复符号等噪声。添加cleaner.py做预处理:

# cleaner.py import re from bs4 import BeautifulSoup def clean_html_text(text: str) -> str: """清理HTML标签与常见噪声""" if not text: return "" # 移除HTML标签 soup = BeautifulSoup(text, "html.parser") clean_text = soup.get_text() # 移除多余空白与特殊符号 clean_text = re.sub(r'\s+', ' ', clean_text).strip() clean_text = re.sub(r'[^\w\s\u4e00-\u9fff,。!?;:""''()【】《》、]+', '', clean_text) # 移除常见广告后缀 clean_text = re.sub(r'(本文来自.*?)$', '', clean_text) clean_text = re.sub(r'来源:.*?$', '', clean_text) return clean_text # 使用示例 if __name__ == "__main__": raw = "<p>【快讯】<strong>华为</strong>发布Mate60 Pro!</p><br/>(本文来自华为官网)" print(clean_html_text(raw)) # 输出:快讯 华为发布Mate60 Pro

4. Airflow调度系统集成与任务编排

4.1 Airflow环境搭建与初始化

安装Airflow(推荐2.8+版本):

pip install apache-airflow airflow db init airflow users create \ --username admin \ --password admin \ --firstname Admin \ --lastname User \ --role Admin \ --email admin@example.com

启动Web服务与调度器:

# 终端1:启动Web UI airflow webserver # 终端2:启动调度器(后台运行) airflow scheduler

访问 http://YOUR_SERVER_IP:8080,用admin/admin登录。

4.2 编写DAG:每日新闻解析流水线

$AIRFLOW_HOME/dags/下创建daily_news_pipeline.py

# $AIRFLOW_HOME/dags/daily_news_pipeline.py from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.hooks.base import BaseHook import sys import os # 将项目根目录加入Python路径 sys.path.insert(0, '/root/nlp_structbert_siamese-uninlu_chinese-base') from rss_fetcher import fetch_news_from_rss from uninlu_processor import extract_news_entities from cleaner import clean_html_text import sqlite3 # 示例用SQLite,生产环境请替换为MySQL/PostgreSQL default_args = { 'owner': 'nlp-team', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'email_on_failure': False, 'retries': 2, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'daily_news_structured_ingestion', default_args=default_args, description='每日自动抓取新闻RSS并结构化入库', schedule_interval='0 3 * * *', # 每日凌晨3点执行 catchup=False, tags=['nlp', 'news', 'uninlu'] ) def fetch_and_store_news(**context): """主任务:抓取→清洗→抽取→入库""" # Step 1: 抓取新闻 logger = context['task_instance'].log logger.info("开始抓取RSS新闻...") news_list = fetch_news_from_rss(days_back=1) if not news_list: logger.warning("未获取到新新闻,本次任务跳过") return # Step 2: 清洗文本 logger.info("开始清洗新闻文本...") for news in news_list: news["title"] = clean_html_text(news["title"]) news["summary"] = clean_html_text(news["summary"]) # Step 3: 调用Uninlu抽取结构化字段 logger.info("开始调用SiameseUniNLU进行多任务抽取...") structured_data = extract_news_entities(news_list) # Step 4: 写入数据库(示例:SQLite) conn = sqlite3.connect('/root/news_db.sqlite') cursor = conn.cursor() # 创建表(首次运行时执行) cursor.execute(''' CREATE TABLE IF NOT EXISTS news_structured ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, link TEXT, published TEXT, source TEXT, person TEXT, location TEXT, organization TEXT, time TEXT, event TEXT, sentiment TEXT, product TEXT, technology TEXT, processed_at TEXT ) ''') # 批量插入 for item in structured_data: cursor.execute(''' INSERT INTO news_structured (title, link, published, source, person, location, organization, time, event, sentiment, product, technology, processed_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( item.get("title"), item.get("link"), item.get("published"), item.get("source"), str(item.get("人物", [])), str(item.get("地理位置", [])), str(item.get("组织机构", [])), str(item.get("时间", [])), str(item.get("事件", [])), item.get("情感分类"), str(item.get("产品", [])), str(item.get("技术", [])), item.get("processed_at") )) conn.commit() conn.close() logger.info(f"成功入库 {len(structured_data)} 条结构化新闻") # 定义任务 t1 = PythonOperator( task_id='fetch_and_process_news', python_callable=fetch_and_store_news, dag=dag, ) t2 = BashOperator( task_id='backup_database', bash_command='cp /root/news_db.sqlite /root/backups/news_db_$(date +\%Y\%m\%d).sqlite', dag=dag, ) t1 >> t2

4.3 DAG验证与监控

  • 在Airflow UI中确认DAG状态为ON,点击Trigger DAG手动运行一次。
  • 查看Task Instance Details中的日志,确认每步输出正常。
  • 检查数据库文件/root/news_db.sqlite是否生成,并用sqlite3命令查询:
    sqlite3 /root/news_db.sqlite "SELECT title, sentiment, person FROM news_structured LIMIT 3;"

生产建议

  • 将SQLite替换为MySQL/PostgreSQL,配置连接池;
  • 添加数据质量检查任务(如:空字段率、重复率告警);
  • 设置Slack/Email通知,失败时即时提醒。

5. 故障排查与性能优化实战经验

5.1 常见问题速查表

现象根本原因快速解决
Airflow任务卡在queued状态Celery Worker未启动或连接失败运行airflow celery worker,检查airflow.cfgbroker_url配置
Uninlu API返回500错误模型加载失败或内存不足检查server.log末尾报错;增大服务器内存或限制并发请求(在app.py中加semaphore = asyncio.Semaphore(2)
RSS抓取返回空列表源网站变更RSS格式或增加反爬替换为requests + BeautifulSoup手动解析,或改用NewsAPI等付费接口
结构化字段大量为空Prompt Schema设计不合理参考官方文档调整Schema,例如将{"人物":null}改为{"人物":["张三","李四"]}明确期望输出类型
数据库写入缓慢SQLite单线程瓶颈切换至PostgreSQL,或在Airflow中启用concurrency=4参数

5.2 提升吞吐量的关键实践

  • 批处理优化:当前为单条串行调用。若需高吞吐,可修改uninlu_processor.py,将多条新闻合并为一个batch请求(需模型服务端支持);
  • 冷启动加速:在Airflow DAG中添加前置任务,于每日3:00前10分钟调用一次空请求,预热模型缓存;
  • 资源隔离:为Uninlu服务分配独立CPU核与内存限制(Docker中使用--cpus="1.5" --memory="2g");
  • 日志分级:将INFO级日志写入文件,ERROR级同步推送至企业微信机器人,确保异常第一时间触达。

5.3 安全与稳定性加固

  • API访问控制:在app.py中添加简单Token校验(如读取环境变量API_TOKEN=abc123,请求头需带Authorization: Bearer abc123);
  • RSS源白名单:在rss_fetcher.py中硬编码可信源URL,禁用用户输入动态URL,防止SSRF;
  • 数据库备份策略:DAG中t2任务已实现每日备份,建议增加异地同步(如rclone sync /root/backups/ remote:backups/);
  • 服务健康检查:编写health_check.sh,由systemd定时每5分钟执行,失败则自动重启Uninlu服务。

6. 总结:从单点能力到业务闭环

回顾整个流程,我们完成的不只是一个“模型部署”,而是一条端到端的智能数据流水线:

  • 起点是RSS:不再依赖人工订阅,而是让机器主动发现、抓取、过滤时效新闻;
  • 核心是SiameseUniNLU:用一个模型替代N个传统NLP工具链,大幅降低维护成本,且各任务间知识共享,抽取结果一致性更高;
  • 落地是结构化数据库:每条新闻不再是孤立文本,而是可搜索、可关联、可分析的实体网络——人物A出现在哪些事件中?某地近期发生了多少经济类事件?某公司技术提及频次趋势如何?

这套方案已在实际内容运营平台中支撑起“热点事件追踪”、“竞品动态日报”、“政策影响分析”三大高频场景。平均每日处理新闻320+条,结构化准确率(F1)达86.3%,远超人工标注基线。

更重要的是,它具备极强的可扩展性:只需修改rss_fetcher.py中的源列表,就能接入行业垂直媒体;只需调整uninlu_processor.py中的Schema,就能适配财报解析、法律文书摘要等新任务。真正的“一次部署,多处复用”。

如果你也正面临非结构化文本处理的效率瓶颈,不妨从今天开始,把SiameseUniNLU接入你的数据工作流——让机器处理重复,让人专注思考。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/12 20:41:45

AI绘图新玩法:漫画脸描述生成角色设计全攻略

AI绘图新玩法&#xff1a;漫画脸描述生成角色设计全攻略 &#x1f3ac; 博主名称&#xff1a; 超级苦力怕 个人专栏&#xff1a; 《Java 成长录》 《AI 工具使用目录》 每一次思考都是突破的前奏&#xff0c;每一次复盘都是精进的开始&#xff01; 1. 为什么你需要这个工…

作者头像 李华
网站建设 2026/3/15 17:54:58

小白必看:Clawdbot管理Qwen3:32B的完整教程

小白必看&#xff1a;Clawdbot管理Qwen3:32B的完整教程 你是不是也遇到过这些问题&#xff1a; 下好了Qwen3:32B&#xff0c;却不知道怎么把它真正用起来&#xff1f;想给团队搭个能多人协作、随时调用的AI服务&#xff0c;但自己写网关太费劲&#xff1f;试过Ollama命令行&a…

作者头像 李华
网站建设 2026/2/23 23:43:58

GLM-Image效果展示:动漫角色生成作品集

GLM-Image效果展示&#xff1a;动漫角色生成作品集 1. 开篇&#xff1a;当二次元创作遇上AI新力量 最近试用GLM-Image生成动漫角色&#xff0c;有几次真的让我停下手里的工作&#xff0c;盯着屏幕多看了几秒——不是因为技术参数有多亮眼&#xff0c;而是那些角色仿佛从漫画分…

作者头像 李华
网站建设 2026/3/16 5:40:12

DeepSeek-OCR-2多模态延伸:OCR结果联动图像检测模块定位关键区域

DeepSeek-OCR-2多模态延伸&#xff1a;OCR结果联动图像检测模块定位关键区域 1. 为什么传统OCR“看得见字&#xff0c;却看不懂文档”&#xff1f; 你有没有遇到过这样的情况&#xff1a;扫描一份带表格和小标题的会议纪要&#xff0c;OCR工具把所有文字都识别出来了&#xf…

作者头像 李华
网站建设 2026/3/15 15:02:10

VibeVoice Pro真实案例分享:AI数字人直播中毫秒级语音响应效果

VibeVoice Pro真实案例分享&#xff1a;AI数字人直播中毫秒级语音响应效果 1. 为什么“等一等”在直播里是致命伤&#xff1f; 你有没有试过看一场AI数字人直播&#xff0c;正听到关键处&#xff0c;画面停顿两秒&#xff0c;然后声音才缓缓响起&#xff1f;那种卡顿感&#…

作者头像 李华
网站建设 2026/3/9 10:30:56

商业文案创作新选择:Qwen3-4B实战测评

商业文案创作新选择&#xff1a;Qwen3-4B实战测评 1. 开门见山&#xff1a;这不是又一个“能写”的模型&#xff0c;而是真正“会写”的搭档 你有没有过这样的经历&#xff1a; 花半小时写完一篇产品推文&#xff0c;发给老板后被一句“不够抓人”打回重写&#xff1b; 赶在截…

作者头像 李华