一、为什么 RAG 是当前 AI 落地的最佳路径?
1.1 大模型的三大痛点
| 问题 | 说明 | RAG 如何解决 |
|---|---|---|
| 幻觉(Hallucination) | 模型编造不存在的事实 | 仅基于检索到的真实文档回答 |
| 知识滞后 | 训练数据截止 2024 年 | 可注入最新企业文档(如 2025 年财报) |
| 私有数据无法使用 | 不能上传敏感数据到公有云 | 本地向量库 + 私有部署模型 |
✅RAG = Retrieval(检索) + Augmentation(增强) + Generation(生成)
1.2 典型企业场景
- HR 部门:员工问“年假怎么休?” → 自动从《员工手册》找答案
- IT 支持:问“如何重置密码?” → 返回内部 Wiki 步骤
- 销售团队:问“竞品 A 的定价策略?” → 检索市场分析报告
💡核心价值:把沉睡的企业文档变成可对话的知识资产
二、整体架构设计
🔑关键设计:
- 多租户:每个部门(tenant_id)有独立 collection
- 流式响应:避免长时间等待
- 模块解耦:文档处理、检索、生成分离
三、环境准备
3.1 安装依赖
# 创建虚拟环境 python -m venv rag-env source rag-env/bin/activate # Linux/Mac # rag-env\Scripts\activate # Windows # 安装核心包 pip install langchain==0.2.0 \ langchain-community==0.2.0 \ langchain-core==0.2.0 \ chromadb==0.4.24 \ unstructured[local-inference]==0.14.9 \ # PDF/Word 解析 python-dotenv \ fastapi \ uvicorn \ qwen-agent \ # 阿里官方 SDK httpx \ sse-starlette⚠️注意:
unstructured需要额外安装libmagic(Mac:brew install libmagic)- 中文分块推荐使用
ChineseRecursiveTextSplitter(后文详解)
3.2 获取 Qwen API Key
- 访问 阿里云百炼平台
- 创建应用,选择qwen-max模型
- 获取
API_KEY,写入.env文件:
# .env QWEN_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxx QWEN_MODEL=qwen-max四、文档加载与向量化(核心!)
4.1 支持多种格式(PDF/Word/TXT)
# document_loader.py from langchain_community.document_loaders import ( PyPDFLoader, UnstructuredWordDocumentLoader, TextLoader ) from pathlib import Path def load_documents(file_path: str) -> list: """根据文件后缀加载文档""" path = Path(file_path) if path.suffix.lower() == ".pdf": loader = PyPDFLoader(str(path)) elif path.suffix.lower() in [".docx", ".doc"]: loader = UnstructuredWordDocumentLoader(str(path)) elif path.suffix.lower() == ".txt": loader = TextLoader(str(path), encoding="utf-8") else: raise ValueError(f"Unsupported file type: {path.suffix}") return loader.load()✅
Unstructured库能较好保留 Word/PDF 的段落结构
4.2 中文文本分块(避免语义断裂)
通用RecursiveCharacterTextSplitter对中文效果差!改用:
# text_splitter.py from langchain_text_splitters import RecursiveCharacterTextSplitter import re class ChineseRecursiveTextSplitter(RecursiveCharacterTextSplitter): def __init__(self, *args, **kwargs): separators = [ "\n\n", "\n", "。|!|?", "…", "”", "’", "】", ")", "]", "}", " ", "" ] super().__init__(*args, separators=separators, **kwargs) # 使用示例 splitter = ChineseRecursiveTextSplitter( chunk_size=500, # 每块约 500 字符 chunk_overlap=50, # 重叠 50 字符,避免断句 length_function=len ) chunks = splitter.split_documents(documents)💡经验:
- 技术文档:chunk_size=300
- 合同/法律:chunk_size=800(需完整条款)
- 重叠部分能显著提升检索召回率!
4.3 向量化并存入 Chroma(带租户隔离)
# vector_store.py import chromadb from chromadb.utils import embedding_functions from langchain_community.vectorstores import Chroma from langchain_community.embeddings import HuggingFaceEmbeddings class TenantAwareChroma: def __init__(self, persist_directory: str = "./chroma_db"): self.client = chromadb.PersistentClient(path=persist_directory) # 使用中文 Embedding 模型(无需 API Key) self.embedding_func = HuggingFaceEmbeddings( model_name="GanymedeNil/text2vec-large-chinese" ) def add_documents(self, tenant_id: str, documents: list): """为指定租户添加文档""" collection_name = f"kb_{tenant_id}" # 创建或获取 collection collection = self.client.get_or_create_collection( name=collection_name, embedding_function=self._get_embedding_func() ) # LangChain 封装(便于后续检索) vectorstore = Chroma( client=self.client, collection_name=collection_name, embedding_function=self.embedding_func ) vectorstore.add_documents(documents) return vectorstore def get_retriever(self, tenant_id: str, top_k: int = 4): """获取指定租户的检索器""" collection_name = f"kb_{tenant_id}" vectorstore = Chroma( client=self.client, collection_name=collection_name, embedding_function=self.embedding_func ) return vectorstore.as_retriever(search_kwargs={"k": top_k})✅优势:
- 使用开源中文 Embedding 模型,无调用成本;
- 每个租户独立 collection,天然隔离;
PersistentClient支持持久化到磁盘。
五、构建 LangChain RAG Pipeline
5.1 核心链:检索 + 生成
# rag_chain.py from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough from langchain_core.output_parsers import StrOutputParser from qwen_agent.llm import get_chat_model def create_rag_chain(retriever, model_name: str = "qwen-max"): # 1. 定义 Prompt(中文优化!) prompt = ChatPromptTemplate.from_template( """你是一个专业的企业知识助手,请根据以下上下文回答问题。 如果不知道答案,请说“根据现有资料无法回答”,不要编造。 上下文: {context} 问题:{question} """ ) # 2. 初始化 Qwen 模型(通过阿里百炼) llm = get_chat_model({ "model": model_name, "api_key": os.getenv("QWEN_API_KEY"), "model_server": "dashscope" # 阿里 DashScope }) # 3. 构建链 rag_chain = ( {"context": retriever, "question": RunnablePassthrough()} | prompt | llm | StrOutputParser() ) return rag_chain💡Prompt 设计要点:
- 明确禁止幻觉(“不要编造”);
- 强调“根据上下文”,约束模型行为;
- 中文指令更符合 Qwen 训练偏好。
5.2 支持流式响应(SSE)
# stream_rag.py from sse_starlette.sse import EventSourceResponse import asyncio async def stream_rag_answer(rag_chain, question: str): """异步流式生成答案""" loop = asyncio.get_event_loop() # 使用迭代器逐步获取 token async def event_generator(): full_response = "" async for chunk in rag_chain.astream(question): full_response += chunk yield {"data": chunk} # 最后发送结束标记 yield {"data": "[DONE]"} return EventSourceResponse(event_generator())✅ 前端可通过
EventSource接收流式数据,实现“打字机”效果。
六、FastAPI 后端服务
6.1 核心接口
# main.py from fastapi import FastAPI, HTTPException, Depends from pydantic import BaseModel from typing import Optional app = FastAPI(title="企业知识库问答系统") class QueryRequest(BaseModel): tenant_id: str # 部门ID,如 "hr", "it" question: str stream: Optional[bool] = False @app.post("/query") async def query_knowledge_base(request: QueryRequest): try: # 1. 获取租户专属检索器 retriever = vector_store.get_retriever(request.tenant_id) # 2. 构建 RAG 链 rag_chain = create_rag_chain(retriever) # 3. 流式 or 非流式 if request.stream: return await stream_rag_answer(rag_chain, request.question) else: answer = await rag_chain.ainvoke(request.question) return {"answer": answer} except Exception as e: raise HTTPException(status_code=500, detail=str(e))6.2 文档上传接口(支持多文件)
from fastapi import File, UploadFile import tempfile import os @app.post("/upload/{tenant_id}") async def upload_documents(tenant_id: str, files: list[UploadFile] = File(...)): with tempfile.TemporaryDirectory() as tmpdir: documents = [] for file in files: # 保存临时文件 file_path = os.path.join(tmpdir, file.filename) with open(file_path, "wb") as f: f.write(await file.read()) # 加载并分块 docs = load_documents(file_path) chunks = chinese_splitter.split_documents(docs) documents.extend(chunks) # 向量化存储 vector_store.add_documents(tenant_id, documents) return {"message": f"成功上传 {len(files)} 个文件到 {tenant_id} 知识库"}七、前端简易界面(Vue3 + SSE)
<!-- App.vue --> <template> <div> <h1>企业知识库问答</h1> <select v-model="tenantId"> <option value="hr">人力资源部</option> <option value="it">IT支持</option> </select> <input v-model="question" @keyup.enter="ask" placeholder="输入问题..." /> <button @click="ask">提问</button> <div class="answer">{{ answer }}</div> </div> </template> <script setup> import { ref } from 'vue' const tenantId = ref('hr') const question = ref('') const answer = ref('') const ask = () => { answer.value = '' const eventSource = new EventSource( `/query?tenant_id=${tenantId.value}&question=${encodeURIComponent(question.value)}&stream=true` ) eventSource.onmessage = (event) => { if (event.data === '[DONE]') { eventSource.close() } else { answer.value += event.data } } } </script>✅ 实现实时流式输出,用户体验接近 ChatGPT。
八、企业级增强功能
8.1 权限控制(JWT 验证)
# auth.py from fastapi.security import HTTPBearer from jose import jwt security = HTTPBearer() def verify_token(token: str = Depends(security)): try: payload = jwt.decode(token.credentials, SECRET_KEY, algorithms=["HS256"]) return payload["tenant_id"] # 从 token 提取租户ID except: raise HTTPException(status_code=401, detail="Invalid token")前端登录后获取 JWT,确保用户只能访问授权租户。
8.2 性能优化:重排序(Rerank)
初始检索可能召回不相关片段,用bge-reranker二次排序:
from FlagEmbedding import FlagReranker reranker = FlagReranker('BAAI/bge-reranker-large', use_fp16=True) def rerank_docs(query: str, docs: list, top_k: int = 3): scores = reranker.compute_score([[query, doc.page_content] for doc in docs]) sorted_docs = [doc for _, doc in sorted(zip(scores, docs), reverse=True)] return sorted_docs[:top_k]📊 实测:Rerank 可将问答准确率提升15–30%!
8.3 缓存高频问题
from functools import lru_cache @lru_cache(maxsize=1000) def cached_rag_answer(tenant_id: str, question: str): retriever = vector_store.get_retriever(tenant_id) rag_chain = create_rag_chain(retriever) return rag_chain.invoke(question)对“年假政策”等高频问题,响应速度从 2s → 50ms。
九、Docker 一键部署
# Dockerfile FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . EXPOSE 8000 CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.yml version: '3' services: rag-app: build: . ports: - "8000:8000" environment: - QWEN_API_KEY=your_key_here volumes: - ./chroma_db:/app/chroma_db # 持久化向量库执行
docker-compose up -d即可启动完整服务!
十、避坑指南(血泪经验)
| 问题 | 解决方案 |
|---|---|
| PDF 表格识别乱码 | 改用pdfplumber替代PyPDFLoader |
| Qwen 返回 JSON 格式错误 | 在 Prompt 中明确要求“纯文本回答” |
| Chroma 内存占用高 | 使用PersistentClient+ 定期清理旧 collection |
| 中文分块丢失标题 | 在ChineseRecursiveTextSplitter中加入\n#分隔符 |
| 流式响应卡顿 | 增加asyncio.sleep(0.01)避免事件循环阻塞 |
十一、效果演示
用户提问:
“实习生转正需要满足哪些条件?”
系统返回(来自《HR 手册.pdf》):
根据《员工转正管理办法》第3.2条,实习生需同时满足:
- 试用期满3个月;
- 月度考核平均分≥80;
- 通过部门转正答辩。
详细流程请参考 HR 系统 > 入职管理 > 转正申请。
✅全程无需人工干预,答案可追溯到原文位置
AI 工程化的本质,不是取代人,而是把人从重复劳动中解放出来。
你不需要成为算法科学家,只需掌握LangChain + 向量数据库 + 大模型 API这一组合拳,就能在 2025 年的技术浪潮中占据先机。👍 如果你觉得有帮助,欢迎点赞、收藏!
💬 你打算用 RAG 解决什么业务问题?欢迎评论区交流!