别让async变成装饰品:在异步代码里混入阻塞 I/O 会发生什么?
在很多 FastAPI 项目里,我们经常看到这样的代码:
@app.get("/users/{user_id}")asyncdefget_user(user_id:int):time.sleep(2)user=db.query(User).filter(User.id==user_id).first()returnuser它看起来是异步接口,因为函数前面写了async def。
但它真的异步吗?
很遗憾,不一定。甚至很多时候,它比普通同步代码更危险,因为它给了团队一种“我们已经异步化了”的错觉。
这篇文章专门聊一个 Python 后端开发中非常常见、也非常隐蔽的问题:
在异步代码里混入阻塞 I/O 会发生什么?
如何识别一个项目只是“写了 async”,但本质上仍然是“伪异步”?
一、先说结论:阻塞 I/O 会卡住事件循环
Python 的异步编程依赖事件循环。
你可以把事件循环想象成一个非常忙碌但很聪明的调度员。它负责在多个任务之间快速切换:
请求 A 等数据库返回 → 先挂起 请求 B 等 HTTP 响应 → 先挂起 请求 C 可以继续执行 → 先处理 C这就是异步 I/O 的优势:
当一个任务在“等待”时,CPU 不必傻等,可以去处理别的任务。
但是,如果你在异步函数里写了阻塞代码,比如:
time.sleep(3)或者调用同步数据库驱动:
result=sync_db.query("SELECT * FROM users")事件循环就没法切换了。
它会变成:
请求 A 调用了 time.sleep(3) 事件循环被卡住 3 秒 请求 B、C、D 全部等待这就像一个餐厅只有一个服务员。
本来服务员可以在 A 桌等菜的时候去服务 B 桌。
但现在 A 桌拉着服务员聊了 3 分钟,所有桌子都没人管。
这就是异步代码里混入阻塞 I/O 的核心问题:
阻塞代码会占用事件循环,让所有并发请求一起变慢。
二、一个最小复现:async不等于异步
先看一个 FastAPI 例子。
错误示范:在async def里使用time.sleep
fromfastapiimportFastAPIimporttime app=FastAPI()@app.get("/bad")asyncdefbad_api():time.sleep(3)return{"message":"done"}很多初学者会以为:
“我已经用了async def,所以这个接口是异步的。”
但实际上,time.sleep(3)是同步阻塞函数。它会让当前线程暂停 3 秒。对于基于事件循环的异步服务来说,这意味着整个事件循环都被按下了暂停键。
正确写法:使用asyncio.sleep
fromfastapiimportFastAPIimportasyncio app=FastAPI()@app.get("/good")asyncdefgood_api():awaitasyncio.sleep(3)return{"message":"done"}asyncio.sleep(3)不会阻塞事件循环。
它的含义是:
当前任务休眠 3 秒,但事件循环可以继续处理别的任务。
两个接口表面上都“等 3 秒”,但并发能力完全不同。
三、阻塞 I/O 到底有哪些常见来源?
在真实项目里,阻塞 I/O 不只来自time.sleep()。更常见的是这些:
1. 同步数据库驱动
例如:
importpymysql@app.get("/users")asyncdeflist_users():conn=pymysql.connect(...)cursor=conn.cursor()cursor.execute("SELECT * FROM users")returncursor.fetchall()pymysql是同步驱动。
即使它被放进async def,它仍然会阻塞事件循环。
如果使用 PostgreSQL,同样要注意:
importpsycopg2psycopg2传统用法也是同步的。异步项目中更推荐使用:
asyncpg或者 SQLAlchemy 的异步接口。
2. 同步 HTTP 请求
错误示范:
importrequests@app.get("/proxy")asyncdefproxy():response=requests.get("https://example.com/api")returnresponse.json()requests.get()是同步阻塞调用。
如果目标接口慢,整个事件循环都会跟着慢。
更合适的写法是使用异步 HTTP 客户端:
importhttpx@app.get("/proxy")asyncdefproxy():asyncwithhttpx.AsyncClient()asclient:response=awaitclient.get("https://example.com/api")returnresponse.json()3. 文件读写
普通文件操作也是阻塞的:
@app.get("/file")asyncdefread_file():withopen("large.log","r")asf:content=f.read()return{"length":len(content)}对于小文件问题不大,但如果文件很大,或者接口请求频繁,就可能造成明显阻塞。
可以考虑:
importaiofiles@app.get("/file")asyncdefread_file():asyncwithaiofiles.open("large.log","r")asf:content=awaitf.read()return{"length":len(content)}4. CPU 密集型任务
比如压缩、加密、图片处理、大量循环计算:
@app.get("/calculate")asyncdefcalculate():total=0foriinrange(10**8):total+=ireturn{"total":total}这类任务不是 I/O 等待,而是持续占用 CPU。
异步并不能让 CPU 密集型代码自动变快。
如果必须在接口中处理 CPU 密集型任务,通常应该使用:
fromconcurrent.futuresimportProcessPoolExecutor或者把任务交给 Celery、RQ、Dramatiq 等后台任务系统。
四、FastAPI 里的一个关键区别:def和async def
FastAPI 支持两种路由函数:
@app.get("/sync")defsync_api():...以及:
@app.get("/async")asyncdefasync_api():...它们的执行模型不同。
def路由
普通def路由通常会被 FastAPI 放到线程池中运行。
也就是说,如果你在里面调用同步数据库驱动,虽然它仍然会阻塞一个线程,但不会直接阻塞事件循环。
@app.get("/users")defget_users():users=sync_db.query(...)returnusers这种写法在同步技术栈里是可以接受的。
async def路由
async def路由会运行在事件循环中。
如果你在里面调用阻塞代码,就会直接卡住事件循环。
@app.get("/users")asyncdefget_users():users=sync_db.query(...)returnusers这就很危险。
所以一个重要原则是:
如果函数内部主要调用同步阻塞库,优先使用普通
def,不要盲目写async def。
这句话对很多 FastAPI 项目非常重要。
不是所有接口都应该写成async def。
异步不是仪式感,而是一整套配套设计。
五、怎么改造阻塞代码?
方案一:替换为真正的异步库
这是最推荐的方式。
同步 HTTP
importrequestsdeffetch_user():response=requests.get("https://api.example.com/user")returnresponse.json()异步 HTTP
importhttpxasyncdeffetch_user():asyncwithhttpx.AsyncClient()asclient:response=awaitclient.get("https://api.example.com/user")returnresponse.json()同步数据库
importpymysqldefget_user(user_id):cursor.execute("SELECT * FROM users WHERE id=%s",(user_id,))returncursor.fetchone()异步数据库
importasyncpgasyncdefget_user(user_id:int):conn=awaitasyncpg.connect(user="postgres",password="secret",database="app",host="127.0.0.1",)row=awaitconn.fetchrow("SELECT * FROM users WHERE id=$1",user_id,)awaitconn.close()returndict(row)ifrowelseNone生产项目里不要每次请求都新建连接,应该使用连接池。
@app.on_event("startup")asyncdefstartup():app.state.pool=awaitasyncpg.create_pool(...)@app.on_event("shutdown")asyncdefshutdown():awaitapp.state.pool.close()@app.get("/users/{user_id}")asyncdefget_user(user_id:int):asyncwithapp.state.pool.acquire()asconn:row=awaitconn.fetchrow("SELECT * FROM users WHERE id=$1",user_id,)returndict(row)ifrowelseNone方案二:把阻塞任务丢进线程池
有些老系统无法立刻替换同步库。
这时可以用线程池隔离阻塞操作。
importasynciodefblocking_query(user_id:int):time.sleep(2)return{"id":user_id,"name":"Alice"}@app.get("/users/{user_id}")asyncdefget_user(user_id:int):result=awaitasyncio.to_thread(blocking_query,user_id)returnresultasyncio.to_thread()会把同步函数放到单独线程里执行,避免直接阻塞事件循环。
不过这不是银弹。线程池也有容量限制。
如果大量请求都进入线程池,线程数、数据库连接数、内存都会成为瓶颈。
适合场景:
短期兼容同步 SDK 调用无法异步化的第三方库 迁移期过渡方案 少量低频阻塞任务不适合场景:
高并发核心链路 大量长时间阻塞任务 数据库访问主路径 CPU 密集型计算方案三:CPU 密集任务交给进程池或任务队列
线程池适合 I/O 阻塞,不适合重 CPU 计算。
因为 Python 里存在 GIL,纯 Python 的 CPU 密集计算通常不能通过多线程获得理想加速。
可以使用进程池:
importasynciofromconcurrent.futuresimportProcessPoolExecutor executor=ProcessPoolExecutor()defheavy_compute(n:int):returnsum(i*iforiinrange(n))@app.get("/compute")asyncdefcompute():loop=asyncio.get_running_loop()result=awaitloop.run_in_executor(executor,heavy_compute,10_000_000,)return{"result":result}对于耗时更长的任务,更推荐任务队列:
FastAPI 接收请求 ↓ 写入任务队列 ↓ Worker 异步处理 ↓ 前端轮询或 WebSocket 获取结果这种架构能让接口保持轻量,避免请求一直占用服务资源。
六、怎么识别一个“伪异步”项目?
“伪异步”项目通常有一个共同特征:
代码表面写满了
async和await,但核心 I/O 仍然是同步阻塞的。
下面是一些非常实用的排查方法。
1. 搜索async def里的危险调用
可以全局搜索:
time.sleep( requests. pymysql psycopg2 sqlite3 subprocess.run open( pandas.read_重点看这些调用是否出现在async def中。
例如:
asyncdefhandler():response=requests.get(url)returnresponse.json()这是典型伪异步。
2. 检查依赖库是否真的支持异步
不是所有库都因为名字现代就支持异步。
可以问几个问题:
这个库的方法需要 await 吗? 它的客户端是否叫 AsyncClient? 数据库驱动是否提供 async connection / pool? 文档里是否明确支持 asyncio?例如:
client.get(url)和:
awaitclient.get(url)是完全不同的执行模型。
异步项目里,核心 I/O 调用最好长这样:
awaitdb.fetch(...)awaitclient.get(...)awaitfile.read(...)awaitqueue.put(...)如果你的服务入口是async def,但核心调用全都没有await,就要警惕了。
3. 压测时观察并发退化
可以写两个接口:
@app.get("/bad")asyncdefbad():time.sleep(2)return{"ok":True}@app.get("/good")asyncdefgood():awaitasyncio.sleep(2)return{"ok":True}然后用压测工具测试:
wrk-t4-c100-d10shttp://127.0.0.1:8000/bad wrk-t4-c100-d10shttp://127.0.0.1:8000/good你会发现:
/bad 并发越高,排队越严重 /good 可以同时挂起大量请求这不是因为asyncio.sleep更快,而是因为它不会霸占事件循环。
4. 监控事件循环延迟
在生产环境里,可以监控事件循环是否经常被卡住。
一个简单的检测思路:
importasyncioimporttimeimportlogging logger=logging.getLogger(__name__)asyncdefmonitor_loop_lag(interval=0.5,threshold=0.1):whileTrue:start=time.perf_counter()awaitasyncio.sleep(interval)delay=time.perf_counter()-start-intervalifdelay>threshold:logger.warning("Event loop lag detected: %.4fs",delay)在应用启动时创建后台任务:
@app.on_event("startup")asyncdefstartup():asyncio.create_task(monitor_loop_lag())如果日志里频繁出现 event loop lag,说明某些代码正在卡住事件循环。
5. 看接口耗时分布是否“整体抖动”
阻塞事件循环最明显的现象是:
某一个慢操作发生时,其他本来很快的接口也一起变慢比如:
/health 原本 5ms /report 触发阻塞操作后 /health 突然变成 1s、2s、3s这通常说明问题不是单个接口慢,而是事件循环或线程池被拖垮。
七、一个实战案例:同步数据库驱动拖垮 FastAPI
假设我们有一个用户查询接口:
@app.get("/users/{user_id}")asyncdefget_user(user_id:int):user=sync_session.query(User).filter(User.id==user_id).first()return{"id":user.id,"name":user.name,}上线后,低流量下没问题。
但流量稍微一上来,接口开始出现:
P95 延迟升高 P99 延迟飙升 偶发超时 健康检查变慢 CPU 不高但服务响应慢这就是典型的阻塞 I/O 问题。
改法一:如果继续使用同步 ORM,改成def
@app.get("/users/{user_id}")defget_user(user_id:int):user=sync_session.query(User).filter(User.id==user_id).first()return{"id":user.id,"name":user.name,}这样 FastAPI 会把它放到线程池中执行,至少不会直接卡住事件循环。
改法二:迁移到异步数据库访问
fromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemyimportselect@app.get("/users/{user_id}")asyncdefget_user(user_id:int,session:AsyncSession):result=awaitsession.execute(select(User).where(User.id==user_id))user=result.scalar_one_or_none()ifuserisNone:return{"error":"user not found"}return{"id":user.id,"name":user.name,}这才是真正与异步框架匹配的写法。
八、异步代码的最佳实践清单
下面这份清单,可以直接用于代码评审。
1. 不要为了 async 而 async
如果接口内部全是同步逻辑:
@app.get("/sync-style")asyncdefsync_style():result=sync_library.call()returnresult不要盲目写成async def。
更合理:
@app.get("/sync-style")defsync_style():result=sync_library.call()returnresult2. 异步函数内尽量只调用异步 I/O
推荐:
asyncdefhandler():data=awaitdb.fetch(...)response=awaitclient.get(...)awaitcache.set(...)警惕:
asyncdefhandler():data=db.fetch(...)response=requests.get(...)cache.set(...)3. 不要在异步代码里使用time.sleep
错误:
time.sleep(1)正确:
awaitasyncio.sleep(1)4. 第三方 SDK 不支持异步时,要显式隔离
result=awaitasyncio.to_thread(sync_sdk.call,payload)这样至少能让团队一眼看出:
这里是同步阻塞调用,只是被放进线程池隔离了。
5. 限制并发,避免把下游打爆
异步很容易制造大量并发请求。
如果没有限制,可能把数据库、Redis、第三方 API 打爆。
可以使用信号量:
importasyncioimporthttpx semaphore=asyncio.Semaphore(10)asyncdeffetch(url:str):asyncwithsemaphore:asyncwithhttpx.AsyncClient()asclient:response=awaitclient.get(url)returnresponse.text6. 设置超时
不要让任何 I/O 无限等待。
timeout=httpx.Timeout(5.0)asyncwithhttpx.AsyncClient(timeout=timeout)asclient:response=awaitclient.get(url)数据库、缓存、消息队列也都应该设置超时。
7. 使用连接池
错误示范:
asyncdefquery():conn=awaitasyncpg.connect(...)...awaitconn.close()更合理:
pool=awaitasyncpg.create_pool(...)asyncwithpool.acquire()asconn:rows=awaitconn.fetch(...)连接池不仅提升性能,也能限制资源使用,避免连接数失控。
8. 为阻塞风险写测试
可以写一个简单的并发测试:
importasyncioimporttimeimporthttpxasyncdefrequest_once(client):start=time.perf_counter()response=awaitclient.get("http://127.0.0.1:8000/health")cost=time.perf_counter()-startreturncost,response.status_codeasyncdefmain():asyncwithhttpx.AsyncClient()asclient:tasks=[request_once(client)for_inrange(100)]results=awaitasyncio.gather(*tasks)costs=[costforcost,_inresults]print("max:",max(costs))print("avg:",sum(costs)/len(costs))asyncio.run(main())如果一个轻量接口在并发下明显排队,就要排查事件循环是否被阻塞。
九、一个判断口诀
看到 FastAPI 项目时,可以用这个口诀快速判断异步质量:
入口 async 不稀奇, 核心 await 才算数。 I/O 等待要释放, CPU 重活别硬扛。 同步老库进线程, 高频主链要异步。 压测观察 P99, 循环延迟露真相。真正的异步项目,不是把所有函数都改成async def,而是从入口、依赖库、数据库驱动、HTTP 客户端、连接池、超时控制、并发限制到监控体系,都围绕“不要阻塞事件循环”来设计。
十、总结:异步不是魔法,而是一种契约
Python 的异步编程很优雅,但它也很诚实。
你愿意在等待 I/O 时交出控制权,它就能帮你提升并发能力。
你在事件循环里偷偷塞进阻塞调用,它就会把问题原样还给你,甚至放大给整个服务。
在 FastAPI 项目里,async def不是性能保证书。
真正重要的是:
你的 I/O 是否真的可 await? 你的阻塞任务是否被隔离? 你的数据库、HTTP、文件、队列客户端是否匹配异步模型? 你的压测和监控是否能发现事件循环阻塞?对初学者来说,先记住一句话:
不要在
async def里调用阻塞 I/O。
对资深开发者来说,更值得追问的是:
我们的项目是真的异步,还是只是穿了一件 async 的外衣?
当你开始用这个视角审视代码,你会发现很多性能问题并不神秘。它们往往藏在一个不起眼的time.sleep()、一次同步数据库查询、一个requests.get(),或者一个看似无害的文件读取里。
写出高质量 Python 代码,从来不只是掌握语法。
它更像是一种工程直觉:知道什么时候该等待,什么时候该释放,什么时候该隔离,什么时候该重构。
愿你的每一个await,都不是装饰,而是真正释放系统潜能的承诺。