news 2026/5/11 8:05:03

Python异步编程深入:从协程到高性能并发

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python异步编程深入:从协程到高性能并发

Python异步编程深入:从协程到高性能并发

引言

异步编程是提高Python应用性能的关键技术之一。通过事件循环和协程,我们可以在单线程中实现高并发处理。

本文将深入探讨Python异步编程的核心概念,包括协程、事件循环、任务管理和最佳实践。

一、协程基础

1.1 协程定义与创建

import asyncio # 基本协程定义 async def hello(): print("Hello") await asyncio.sleep(1) print("World") # 运行协程 asyncio.run(hello()) # 协程作为函数 async def fetch_data(url): print(f"Fetching {url}") await asyncio.sleep(2) return {"url": url, "data": "sample"} async def main(): result = await fetch_data("https://api.example.com") print(result) asyncio.run(main())

1.2 协程的状态

import asyncio async def my_coroutine(): await asyncio.sleep(1) return "done" # 获取协程对象 coro = my_coroutine() print(f"状态: {coro.send(None)}") # 启动协程 # 协程状态转换 # PENDING -> RUNNING -> DONE # -> CANCELLED

二、事件循环

2.1 事件循环的作用

import asyncio def main(): # 获取或创建事件循环 loop = asyncio.get_event_loop() # 运行协程直到完成 loop.run_until_complete(hello()) # 关闭事件循环 loop.close() if __name__ == "__main__": main()

2.2 自定义事件循环

import asyncio import uvloop # 使用uvloop替代默认事件循环 uvloop.install() async def benchmark(): tasks = [asyncio.sleep(0.1) for _ in range(1000)] await asyncio.gather(*tasks) # uvloop提供更好的性能 asyncio.run(benchmark())

三、任务管理

3.1 创建任务

import asyncio async def task_function(name, delay): print(f"Task {name} started") await asyncio.sleep(delay) print(f"Task {name} completed") return f"Result from {name}" async def main(): # 创建任务 task1 = asyncio.create_task(task_function("A", 2)) task2 = asyncio.create_task(task_function("B", 1)) # 等待任务完成 result1 = await task1 result2 = await task2 print(f"Results: {result1}, {result2}") asyncio.run(main())

3.2 并发任务

import asyncio async def fetch_url(url): print(f"Fetching {url}") await asyncio.sleep(1) return f"Data from {url}" async def main(): urls = [ "https://api.example.com/1", "https://api.example.com/2", "https://api.example.com/3", ] # 并发执行所有任务 tasks = [fetch_url(url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())

3.3 任务取消

import asyncio async def long_running_task(): try: print("Starting long task") for i in range(10): await asyncio.sleep(0.5) print(f"Progress: {i+1}/10") return "Completed" except asyncio.CancelledError: print("Task was cancelled") raise async def main(): task = asyncio.create_task(long_running_task()) # 1秒后取消任务 await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("Main caught cancellation") asyncio.run(main())

四、异步模式

4.1 生产者-消费者模式

import asyncio from collections import deque async def producer(queue): for i in range(5): await asyncio.sleep(1) item = f"Item {i}" await queue.put(item) print(f"Produced: {item}") async def consumer(queue): while True: item = await queue.get() print(f"Consumed: {item}") queue.task_done() async def main(): queue = asyncio.Queue() # 创建任务 producer_task = asyncio.create_task(producer(queue)) consumer_task = asyncio.create_task(consumer(queue)) # 等待生产者完成 await producer_task # 等待队列清空 await queue.join() # 取消消费者 consumer_task.cancel() asyncio.run(main())

4.2 超时处理

import asyncio async def slow_operation(): await asyncio.sleep(5) return "Done" async def main(): try: # 设置超时 result = await asyncio.wait_for(slow_operation(), timeout=2) print(result) except asyncio.TimeoutError: print("Operation timed out") asyncio.run(main())

4.3 信号量控制并发

import asyncio async def limited_task(semaphore, task_id): async with semaphore: print(f"Task {task_id} started") await asyncio.sleep(1) print(f"Task {task_id} completed") async def main(): # 限制并发数为3 semaphore = asyncio.Semaphore(3) tasks = [limited_task(semaphore, i) for i in range(10)] await asyncio.gather(*tasks) asyncio.run(main())

五、异步IO操作

5.1 异步文件操作

import asyncio async def read_file_async(file_path): async with asyncio.open(file_path, 'r') as f: contents = await f.read() return contents async def write_file_async(file_path, content): async with asyncio.open(file_path, 'w') as f: await f.write(content) async def main(): content = await read_file_async('input.txt') await write_file_async('output.txt', content) asyncio.run(main())

5.2 异步HTTP客户端

import httpx async def fetch_data(url): async with httpx.AsyncClient() as client: response = await client.get(url) return response.json() async def main(): urls = [ "https://api.github.com/users/octocat", "https://api.github.com/repos/python/cpython" ] tasks = [fetch_data(url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())

六、异步最佳实践

6.1 避免阻塞调用

import asyncio import time # 错误:同步sleep阻塞事件循环 async def bad_example(): print("Start") time.sleep(1) # 阻塞! print("End") # 正确:使用异步sleep async def good_example(): print("Start") await asyncio.sleep(1) # 非阻塞 print("End")

6.2 使用asyncio.to_thread

import asyncio def blocking_io(): """阻塞IO操作""" with open('large_file.txt', 'r') as f: return f.read() async def main(): # 将阻塞操作移到线程池 result = await asyncio.to_thread(blocking_io) print(f"Read {len(result)} characters") asyncio.run(main())

6.3 异步上下文管理器

import asyncio class AsyncResource: async def __aenter__(self): print("Acquiring resource") await asyncio.sleep(0.1) return self async def __aexit__(self, exc_type, exc, tb): print("Releasing resource") await asyncio.sleep(0.1) async def do_work(self): print("Doing work") async def main(): async with AsyncResource() as resource: await resource.do_work() asyncio.run(main())

七、总结

Python异步编程的核心概念:

  1. 协程:轻量级的并发执行单元
  2. 事件循环:协调协程执行的核心
  3. 任务:协程的封装和管理
  4. 并发模式:生产者-消费者、信号量控制等

在实际项目中,建议:

  • 使用asyncio进行异步编程
  • 避免在协程中调用阻塞函数
  • 使用httpx等异步库
  • 合理控制并发数量

思考:在你的项目中,异步编程带来了哪些性能提升?欢迎分享!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/11 8:03:39

多Agent系统通信协议设计2026:从消息队列到共享状态的完整工程指南

单个AI Agent能力有限,多Agent协作才能解决复杂问题。但Agent间如何通信,决定了整个系统的性能、可靠性和可扩展性。本文深度解析多Agent通信架构的设计模式与工程实践。一、多Agent通信的核心挑战多Agent系统面临的通信挑战不同于传统微服务&#xff1a…

作者头像 李华
网站建设 2026/5/11 8:03:15

Postmate核心原理深度解析:从握手机制到消息验证

Postmate核心原理深度解析:从握手机制到消息验证 【免费下载链接】postmate 📭 A powerful, simple, promise-based postMessage library. 项目地址: https://gitcode.com/gh_mirrors/po/postmate Postmate是一个强大而简单的基于Promise的postMe…

作者头像 李华
网站建设 2026/5/11 8:00:42

CANN/ops-nn二元交叉熵损失算子

aclnnBinaryCrossEntropyWithLogits 【免费下载链接】ops-nn 本项目是CANN提供的神经网络类计算算子库,实现网络在NPU上加速计算。 项目地址: https://gitcode.com/cann/ops-nn 📄 查看源码 产品支持情况 产品是否支持Ascend 950PR/Ascend 950D…

作者头像 李华
网站建设 2026/5/11 8:00:17

高效解决Dell G15散热难题:开源温度控制中心TCC-G15完全指南

高效解决Dell G15散热难题:开源温度控制中心TCC-G15完全指南 【免费下载链接】tcc-g15 Thermal Control Center for Dell G15 - open source alternative to AWCC 项目地址: https://gitcode.com/gh_mirrors/tc/tcc-g15 在追求极致游戏性能的旅程中&#xff…

作者头像 李华
网站建设 2026/5/11 7:57:44

Apache Atlas UI实战:从数据资产发现到血缘追溯的完整操作指南

1. Apache Atlas入门:数据治理的瑞士军刀 第一次接触Apache Atlas时,我正被公司混乱的数据资产搞得焦头烂额。报表数据频繁出错却找不到源头,新来的同事总在问"这个字段是什么意思",业务部门抱怨找不到他们需要的数据..…

作者头像 李华
网站建设 2026/5/11 7:57:20

CANN ops-nn L1损失算子

aclnnL1Loss 【免费下载链接】ops-nn 本项目是CANN提供的神经网络类计算算子库,实现网络在NPU上加速计算。 项目地址: https://gitcode.com/cann/ops-nn 📄 查看源码 产品支持情况 产品是否支持Ascend 950PR/Ascend 950DT√Atlas A3 训练系列产…

作者头像 李华