news 2026/4/23 17:28:37

Python协程使用详解

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python协程使用详解

Python协程使用详解

协程(Coroutine)是Python中实现并发编程的一种重要方式,它比线程更轻量级,适合I/O密集型任务。下面我通过几个例子来详细讲解。

1. 基础概念

协程通过async/await语法实现,主要特点:

  • 使用async def定义协程函数
  • 使用await挂起协程执行
  • 需要事件循环(event loop)来运行

2. 基础示例

示例1:最简单的协程

importasyncio# 定义协程函数asyncdefhello():print("Hello")awaitasyncio.sleep(1)# 模拟I/O操作print("World")# 运行协程asyncio.run(hello())

示例2:多个协程顺序执行

importasyncioasyncdeftask1():print("开始任务1")awaitasyncio.sleep(2)print("完成任务1")return"结果1"asyncdeftask2():print("开始任务2")awaitasyncio.sleep(1)print("完成任务2")return"结果2"asyncdefmain():# 顺序执行result1=awaittask1()result2=awaittask2()print(f"结果:{result1},{result2}")asyncio.run(main())

3. 并发执行协程

示例3:使用asyncio.gather并发运行

importasyncioasyncdefdownload(url,seconds):print(f"开始下载{url}")awaitasyncio.sleep(seconds)# 模拟下载时间print(f"完成下载{url}")returnf"{url}的内容"asyncdefmain():# 创建多个任务并发执行tasks=[download("http://example.com/page1",3),download("http://example.com/page2",2),download("http://example.com/page3",1)]# 并发执行所有任务results=awaitasyncio.gather(*tasks)print("所有下载完成:",results)asyncio.run(main())

示例4:使用asyncio.create_task

importasyncioasyncdefprocess_data(data_id,processing_time):print(f"开始处理数据{data_id}")awaitasyncio.sleep(processing_time)print(f"完成处理数据{data_id}")returnf"处理后的数据{data_id}"asyncdefmain():# 创建任务但不立即等待task1=asyncio.create_task(process_data(1,2))task2=asyncio.create_task(process_data(2,1))task3=asyncio.create_task(process_data(3,3))# 在需要的时候等待结果result1=awaittask1 result2=awaittask2 result3=awaittask3print(f"结果:{result1},{result2},{result3}")asyncio.run(main())

4. 协程间通信

示例5:使用asyncio.Queue

importasyncioimportrandomasyncdefproducer(queue,producer_id):foriinrange(3):item=f"产品{producer_id}-{i}"awaitasyncio.sleep(random.random())# 模拟生产时间awaitqueue.put(item)print(f"生产者{producer_id}生产了:{item}")awaitqueue.put(None)# 结束信号asyncdefconsumer(queue,consumer_id):whileTrue:item=awaitqueue.get()ifitemisNone:queue.put(None)# 将结束信号放回队列供其他消费者使用breakawaitasyncio.sleep(random.random()*2)# 模拟消费时间print(f"消费者{consumer_id}消费了:{item}")queue.task_done()asyncdefmain():queue=asyncio.Queue(maxsize=5)# 创建生产者和消费者producers=[asyncio.create_task(producer(queue,i))foriinrange(2)]consumers=[asyncio.create_task(consumer(queue,i))foriinrange(3)]# 等待所有生产者完成awaitasyncio.gather(*producers)# 等待队列中的所有项目被消费awaitqueue.join()# 取消消费者任务forcinconsumers:c.cancel()asyncio.run(main())

5. 实际应用示例

示例6:并发HTTP请求

importaiohttpimportasyncioimporttimeasyncdeffetch_url(session,url):try:asyncwithsession.get(url,timeout=10)asresponse:data=awaitresponse.text()returnf"{url}: 状态码{response.status}, 长度{len(data)}"exceptExceptionase:returnf"{url}: 错误 -{str(e)}"asyncdefmain():urls=["https://httpbin.org/get","https://httpbin.org/delay/2",# 延迟2秒的端点"https://httpbin.org/status/404","https://httpbin.org/status/500"]start_time=time.time()asyncwithaiohttp.ClientSession()assession:tasks=[fetch_url(session,url)forurlinurls]results=awaitasyncio.gather(*tasks)forresultinresults:print(result)end_time=time.time()print(f"\n总耗时:{end_time-start_time:.2f}秒")asyncio.run(main())

示例7:带超时控制的协程

importasyncioasyncdeflong_running_task(task_id,seconds):print(f"任务{task_id}开始")awaitasyncio.sleep(seconds)print(f"任务{task_id}完成")returnf"任务{task_id}结果"asyncdefmain():try:# 设置超时时间为3秒result=awaitasyncio.wait_for(long_running_task(1,5),timeout=3)print(f"成功:{result}")exceptasyncio.TimeoutError:print("任务超时!")# 使用asyncio.shield防止取消try:task=asyncio.create_task(long_running_task(2,4))# shield保护任务不被取消result=awaitasyncio.wait_for(asyncio.shield(task),timeout=2)print(f"shield结果:{result}")exceptasyncio.TimeoutError:print("外部超时,但任务继续运行")# 等待被shield的任务完成result=awaittaskprint(f"被shield的任务最终完成:{result}")asyncio.run(main())

6. 协程模式示例

示例8:生产者-消费者模式

importasyncioimportrandomasyncdefproducer(name,queue):foriinrange(5):item=f"item-{name}-{i}"awaitasyncio.sleep(random.uniform(0.1,0.5))awaitqueue.put(item)print(f"{name}生产了{item}")asyncdefconsumer(name,queue):whileTrue:try:# 设置超时避免永远等待item=awaitasyncio.wait_for(queue.get(),timeout=2)awaitasyncio.sleep(random.uniform(0.2,0.8))# 模拟处理时间print(f"{name}处理了{item}")queue.task_done()exceptasyncio.TimeoutError:print(f"{name}等待超时,退出")breakasyncdefmain():queue=asyncio.Queue(maxsize=3)# 创建生产者和消费者producers=[asyncio.create_task(producer(f"P{i}",queue))foriinrange(2)]consumers=[asyncio.create_task(consumer(f"C{i}",queue))foriinrange(3)]# 等待所有生产者完成awaitasyncio.gather(*producers)# 等待队列清空awaitqueue.join()# 取消消费者forcinconsumers:c.cancel()print("所有任务完成")asyncio.run(main())

关键要点总结

  1. 定义协程: 使用async def定义协程函数
  2. 调用协程: 使用await调用其他协程
  3. 运行协程: 使用asyncio.run()运行顶级协程
  4. 并发执行: 使用asyncio.gather()asyncio.create_task()实现并发
  5. 协程通信: 使用asyncio.Queue进行协程间通信
  6. 错误处理: 协程中可以使用常规的try/except处理异常
  7. 超时控制: 使用asyncio.wait_for()设置超时

协程适合I/O密集型应用,如网络请求、文件读写等场景,可以显著提高程序的并发性能。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!