Python异步编程深入:从协程到高性能并发
引言
异步编程是提高Python应用性能的关键技术之一。通过事件循环和协程,我们可以在单线程中实现高并发处理。
本文将深入探讨Python异步编程的核心概念,包括协程、事件循环、任务管理和最佳实践。
一、协程基础
1.1 协程定义与创建
import asyncio # 基本协程定义 async def hello(): print("Hello") await asyncio.sleep(1) print("World") # 运行协程 asyncio.run(hello()) # 协程作为函数 async def fetch_data(url): print(f"Fetching {url}") await asyncio.sleep(2) return {"url": url, "data": "sample"} async def main(): result = await fetch_data("https://api.example.com") print(result) asyncio.run(main())1.2 协程的状态
import asyncio async def my_coroutine(): await asyncio.sleep(1) return "done" # 获取协程对象 coro = my_coroutine() print(f"状态: {coro.send(None)}") # 启动协程 # 协程状态转换 # PENDING -> RUNNING -> DONE # -> CANCELLED二、事件循环
2.1 事件循环的作用
import asyncio def main(): # 获取或创建事件循环 loop = asyncio.get_event_loop() # 运行协程直到完成 loop.run_until_complete(hello()) # 关闭事件循环 loop.close() if __name__ == "__main__": main()2.2 自定义事件循环
import asyncio import uvloop # 使用uvloop替代默认事件循环 uvloop.install() async def benchmark(): tasks = [asyncio.sleep(0.1) for _ in range(1000)] await asyncio.gather(*tasks) # uvloop提供更好的性能 asyncio.run(benchmark())三、任务管理
3.1 创建任务
import asyncio async def task_function(name, delay): print(f"Task {name} started") await asyncio.sleep(delay) print(f"Task {name} completed") return f"Result from {name}" async def main(): # 创建任务 task1 = asyncio.create_task(task_function("A", 2)) task2 = asyncio.create_task(task_function("B", 1)) # 等待任务完成 result1 = await task1 result2 = await task2 print(f"Results: {result1}, {result2}") asyncio.run(main())3.2 并发任务
import asyncio async def fetch_url(url): print(f"Fetching {url}") await asyncio.sleep(1) return f"Data from {url}" async def main(): urls = [ "https://api.example.com/1", "https://api.example.com/2", "https://api.example.com/3", ] # 并发执行所有任务 tasks = [fetch_url(url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())3.3 任务取消
import asyncio async def long_running_task(): try: print("Starting long task") for i in range(10): await asyncio.sleep(0.5) print(f"Progress: {i+1}/10") return "Completed" except asyncio.CancelledError: print("Task was cancelled") raise async def main(): task = asyncio.create_task(long_running_task()) # 1秒后取消任务 await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("Main caught cancellation") asyncio.run(main())四、异步模式
4.1 生产者-消费者模式
import asyncio from collections import deque async def producer(queue): for i in range(5): await asyncio.sleep(1) item = f"Item {i}" await queue.put(item) print(f"Produced: {item}") async def consumer(queue): while True: item = await queue.get() print(f"Consumed: {item}") queue.task_done() async def main(): queue = asyncio.Queue() # 创建任务 producer_task = asyncio.create_task(producer(queue)) consumer_task = asyncio.create_task(consumer(queue)) # 等待生产者完成 await producer_task # 等待队列清空 await queue.join() # 取消消费者 consumer_task.cancel() asyncio.run(main())4.2 超时处理
import asyncio async def slow_operation(): await asyncio.sleep(5) return "Done" async def main(): try: # 设置超时 result = await asyncio.wait_for(slow_operation(), timeout=2) print(result) except asyncio.TimeoutError: print("Operation timed out") asyncio.run(main())4.3 信号量控制并发
import asyncio async def limited_task(semaphore, task_id): async with semaphore: print(f"Task {task_id} started") await asyncio.sleep(1) print(f"Task {task_id} completed") async def main(): # 限制并发数为3 semaphore = asyncio.Semaphore(3) tasks = [limited_task(semaphore, i) for i in range(10)] await asyncio.gather(*tasks) asyncio.run(main())五、异步IO操作
5.1 异步文件操作
import asyncio async def read_file_async(file_path): async with asyncio.open(file_path, 'r') as f: contents = await f.read() return contents async def write_file_async(file_path, content): async with asyncio.open(file_path, 'w') as f: await f.write(content) async def main(): content = await read_file_async('input.txt') await write_file_async('output.txt', content) asyncio.run(main())5.2 异步HTTP客户端
import httpx async def fetch_data(url): async with httpx.AsyncClient() as client: response = await client.get(url) return response.json() async def main(): urls = [ "https://api.github.com/users/octocat", "https://api.github.com/repos/python/cpython" ] tasks = [fetch_data(url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())六、异步最佳实践
6.1 避免阻塞调用
import asyncio import time # 错误:同步sleep阻塞事件循环 async def bad_example(): print("Start") time.sleep(1) # 阻塞! print("End") # 正确:使用异步sleep async def good_example(): print("Start") await asyncio.sleep(1) # 非阻塞 print("End")6.2 使用asyncio.to_thread
import asyncio def blocking_io(): """阻塞IO操作""" with open('large_file.txt', 'r') as f: return f.read() async def main(): # 将阻塞操作移到线程池 result = await asyncio.to_thread(blocking_io) print(f"Read {len(result)} characters") asyncio.run(main())6.3 异步上下文管理器
import asyncio class AsyncResource: async def __aenter__(self): print("Acquiring resource") await asyncio.sleep(0.1) return self async def __aexit__(self, exc_type, exc, tb): print("Releasing resource") await asyncio.sleep(0.1) async def do_work(self): print("Doing work") async def main(): async with AsyncResource() as resource: await resource.do_work() asyncio.run(main())七、总结
Python异步编程的核心概念:
- 协程:轻量级的并发执行单元
- 事件循环:协调协程执行的核心
- 任务:协程的封装和管理
- 并发模式:生产者-消费者、信号量控制等
在实际项目中,建议:
- 使用asyncio进行异步编程
- 避免在协程中调用阻塞函数
- 使用httpx等异步库
- 合理控制并发数量
思考:在你的项目中,异步编程带来了哪些性能提升?欢迎分享!