协程
asyncio对象
asyncio是python3.4之后引入的标准库,内置对异步IO的支持。asyncio的编程模型是一个消息循环,我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。
event_loop事件循环 程序开启一个无限的循环,程序员会把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。
coroutine协程 指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task任务 一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
async/await关键字 python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
事件循环
理解成一个死循环,会去检测某些代码
# 伪代码 任务列表 =[任务1、任务2....] while True: 可执行的任务列表, 执行完成的任务列表 = 去任务列表中检查所有的任务, 将“可执行”和“已完成”的任务返回 for 就绪任务 in 可执行任务列表: 执行就绪任务 for 已完成的任务 in 执行完成的任务列表: 任务列表中移除 已完成的任务 如果 任务列表 中任务都完成,则终止循环创建事件循环
import asyncio # 异步任务1 async def work_1(): for _ in range(5): print('我是异步任务1') await asyncio.sleep(1) # 异步任务2 async def work_2(): for _ in range(5): print('我是异步任务2') await asyncio.sleep(1) # 将多个任务存放在一个列表中 tasks = [ work_1(), work_2() ] # 创建或获取一个事件循环 loop = asyncio.get_event_loop() # 将任务列表放到事件循环 loop.run_until_complete(asyncio.wait(tasks)) """ 当前代码在python3.7后被asyncio.run替代。无需手动创建事件循环 """快速上手
协程函数:定义函数的时候前面有async关键字
协程对象:执行协程函数()得到的是协程对象
创建协程
async def do_work(): print('学习协程中...') result = do_work()以上代码会获取一个协程对象。内部代码不会被运行
使用事件循环运行协程对象
loop = asyncio.get_event_loop() loop.run_until_complete(result)以上代码为python3.6的写法,在python3.7后使用run()方法来执行协程对象:
asyncio.run(result)await关键字
await后面跟的是一个可等待的对象。await的作用就是等待后面的函数返回结果,只有后面的函数返回结果了,程序才会继续往下执行。
可等待的对象:
- 协程对象
- future对象
- task对象
await在等待以上对象所返回的值。
协程嵌套
在一个协程函数中执行其他的协程函数
import asyncio async def others(): print('start') await asyncio.sleep(2) # 当前等待的sleep对象的返回值为None print('end') return '返回值' async def func(): print('执行协程函数内部代码') # 遇到IO操作,挂起该协程。当IO操作完成后再继续往下执行 # 当前协程挂起时,事件循环可以执行其他协程 response = await others() # others()是一个协程对象 print('IO操作结果为:', response) # python3.6的写法 # loop=asyncio.get_event_loop() # loop.run_until_complete(func()) # python3.7以上版本的写法 asyncio.run(func())打印信息:
执行协程函数内部代码 start end IO操作结果为: 返回值一个协程函数中可以同时存在多个await
import asyncio async def others(): print('start') await asyncio.sleep(2) print('end') return '返回值' async def func(): print('执行协程函数内部代码') # 遇到IO操作,挂起该协程。当IO操作完成后再继续往下执行 # 当前协程挂起时,事件循环可以执行其他协程 response_1 = await others() print('response_1的IO操作结果为:', response_1) response_2 = await others() print('response_2的IO操作结果为:', response_2) # python3.6的写法 loop = asyncio.get_event_loop() loop.run_until_complete(func()) # python3.7及以上写法 # asyncio.run(func())打印结果:
执行协程函数内部代码 start end response_1的IO操作结果为: 返回值 start end response_2的IO操作结果为: 返回值代码先执行func中的print(‘执行协程函数内部代码’),然后遇到await,协程挂起。等待await后面的程序全部执行完毕并返回结果。发现此处await的是一个协程,程序进入协程others() 中,执行print(‘start’),又遇到了await,此次await的是一个IO操作,则程序挂起,去执行其他的协程。(这里没有其他的协程,于是只能老实等待)。asyncio.sleep(2)执行结束后,回到func()里面继续执行挂起之后的代码。response_1结束。response_2同理。
task对象
创建一个task对象
task对象就是往事件循环中加入任务用的。Task用于开发调度协程,通过asyncio.create_task(协程对象)创建(python3.7之后有这个函数),也可以用**asyncio.ensure_future(coroutine)**** 和loop.create_task(coroutine)创建一个task**。run_until_complete的参数是一个future对象,当传入一个协程,其内部会自动封装成task。不建议手动实例化task对象。
import asyncio async def others(): print('start') await asyncio.sleep(2) print('end') return '返回值' async def func(): print('执行协程函数内部代码') # python3.6写法 # task_1 = asyncio.ensure_future(others()) # task_2 = asyncio.ensure_future(others()) # python3.7以上版本写法 task_1 = asyncio.create_task(others()) task_2 = asyncio.create_task(others()) response_1 = await task_1 print('task_1的IO操作结果为:', response_1) response_2 = await task_2 print('task_2的IO操作结果为:', response_2) # python3.6写法 # loop = asyncio.get_event_loop() # loop.run_until_complete(func()) # python3.7以上版本写法 asyncio.run(func())打印结果:
执行协程函数内部代码 start start end end task_1的IO操作结果为: 返回值 task_2的IO操作结果为: 返回值发现这里程序运行的结果和前面在await后面加协程对象的时候不太一样。这是为什么呢?主要原因是加了两个 asyncio.create_task()语句,添加了两个task对象。
- 程序执行的逻辑如下:
- 创建事件循环asyncio.get_event_loop()
- 运行主函数func(),执行print(),创建task_1,创建task_2
- 程序来到response_1 = await task_1,response_1一直在等待await后面的task_1传回返回值。至此,由于遇到了await,func()主程序挂起.
- 进入task_1中执行。在await中执行print(‘start’)后,遇到await阻塞,此时task_1挂起,等待await后面的程序执行完毕。与此同时,发现除了task_1外还存在另一个任务task_2,于是去执行task_2里面的程序。
- 执行task_2中的print(‘start’)后又遇到await等待,但此时已经没有其他任务了,因此只能等待task_1和task_2中挂起的程序执行完毕。
- task_1先结束挂起,执行print(‘end’),并return返回值。谁先结束挂起,就先执行谁那边的后续代码
- 在task_1执行完print(‘end’)之后,return返回值之前,task_2也结束挂起了。执行了print(‘end’),return返回值。这里虽然返回了值,但是response_2只能等到await的时候才能接收到这个值
- 函数返回至func()中,response_1获得了await task_1返回的值,执行了print()
- response_2获得了await task_2的返回值,执行了print()
将程序改成如下的形式可以更加深刻理解程序执行的过程
import asyncio async def others(time, name): print('start', name) await asyncio.sleep(time) print('end', name) return '返回值' + name async def func(): print('执行协程函数内部代码') task_1 = asyncio.ensure_future(others(3, 'task_1')) task_2 = asyncio.ensure_future(others(2, 'task_2')) response_1 = await task_1 print('task_1的IO操作结果为:', response_1) response_2 = await task_2 print('task_2的IO操作结果为:', response_2) loop = asyncio.get_event_loop() loop.run_until_complete(func()) """ 随堂测试: 将以上代码修改成python3.7以上版本的写法 """打印结果:
执行协程函数内部代码 start task_1 start task_2 end task_2 # 这边task_2比task_1先结束,这里之所以不会先print('task_2的IO操作结果为:', response_2)是因为func()函数需要先等response_1结束挂起才能执行后续。 end task_1 task_1的IO操作结果为: 返回值task_1 task_2的IO操作结果为: 返回值task_2倘若交换task_1和task_2的sleep时间
import asyncio async def others(time, name): print('start', name) await asyncio.sleep(time) print('end', name) return '返回值' + name async def func(): print('执行协程函数内部代码') task_1 = asyncio.ensure_future(others(2, 'task_1')) task_2 = asyncio.ensure_future(others(3, 'task_2')) response_1 = await task_1 print('task_1的IO操作结果为:', response_1) response_2 = await task_2 print('task_2的IO操作结果为:', response_2) loop = asyncio.get_event_loop() loop.run_until_complete(func()) 执行协程函数内部代码 start task_1 start task_2 end task_1 task_1的IO操作结果为: 返回值task_1 end task_2 task_2的IO操作结果为: 返回值task_2协程并发
以上代码示例的代码写法一般用的很少,只是为了便于理解,正式写法如下: 使用asyncio.wait(task_list)即可。
import time import asyncio # 使用async 来定义一个协程对象 async def do_work(x): print('waiting:', x) await asyncio.sleep(x) # 模拟一个IO操作 return 'Done after {}s'.format(x) async def main(): # 创建多个协程对象 coroutine_1 = do_work(1) coroutine_2 = do_work(2) coroutine_3 = do_work(3) tasks = [ # 在列表中创建多个future对象 asyncio.ensure_future(coroutine_1), asyncio.ensure_future(coroutine_2), asyncio.ensure_future(coroutine_3) ] # 用gather()方法也行 # time_out=2 表示最多等2秒,假如time_out=None,则表示默认等到所有结果都完成 dones, pendings = await asyncio.wait(tasks, timeout=None) for task in dones: print('task result:', task.result()) now = lambda: time.time() start = now() # 创建事件循环 loop = asyncio.get_event_loop() # 将协程对象加入到事件循环中 loop.run_until_complete(main()) print('Time:', now() - start)打印结果:
waiting: 1 waiting: 2 waiting: 3 task result: Done after 1s task result: Done after 2s task result: Done after 3s Time: 3.0020153522491455在python 3.7之后我们可以在创建task的时候给task起名字
coroutine_1 = do_work(1) coroutine_2 = do_work(2) coroutine_3 = do_work(3) tasks = [ asyncio.create_task(coroutine_1, name='MY_COROUTINE_1'), asyncio.create_task(coroutine_2, name='MY_COROUTINE_2'), asyncio.create_task(coroutine_3, name='MY_COROUTINE_3') ]asyncio的future对象
future类是task类的基类,task对象只有运算得到返回值后,await的对象才能传回值并且向下运行。这个功能就是future对象来实现的。future源码中存在一个_state,一旦_state值变成finished,await就不再继续等待。future一般不手动去写它。
若执行以下代码,则程序一直会处于等待状态(卡死)。
import asyncio async def main(): # 获取当前事件循环 loop = asyncio.get_running_loop() # python3.7以后 # 创建一个任务(future对象),什么也不干 fut = loop.create_future() # await会等待任务最终结果,没有结果则会一直等待下去 await fut loop = asyncio.get_event_loop() loop.run_until_complete(main()) import asyncio async def set_after(fut): await asyncio.sleep(2) fut.set_result('666') async def main(): # 获取当前事件循环 loop = asyncio.get_running_loop() # python3.7以后 # 创建一个任务(future对象),什么也不干 fut = loop.create_future() # 等待任务最终结果,没有结果则会一直等待下去 # 手动设置future的最终结果,那样fut就可以结束了 await loop.create_task(set_after(fut)) data = await fut # 等待future对象获取最终结果,否则一直等待下去 print(data) loop = asyncio.get_event_loop() loop.run_until_complete(main())以上两段代码没有实际意义,只是加深对于future的理解。
concurrent中的future对象
在python中还有一个concurrent模块,concurrent模块也有对应的java接口。concurrent.future.Future对象和asyncio.Future对象没有任何关系,是完全不同的两个东西。concurrent.futures.Future是在利用进程池、线程池来实现异步操作时来使用的对象。
import time from concurrent.futures import Future from concurrent.futures.thread import ThreadPoolExecutor # 用线程执行异步 from concurrent.futures.process import ProcessPoolExecutor # 用进程执行异步 def func(value): time.sleep(1) print(value) return 123 # 创建线程池 pool = ThreadPoolExecutor(max_workers=2) # 或者创建进程池 # pool = ProcessPoolExecutor(max_workers=5) for i in range(4): fut = pool.submit(func, i) # 返回的是一个Future对象, fut.result()才是返回结果 print(fut) # print(fut.result())打印结果:
<Future at 0x7f02f1bf2850 state=running> <Future at 0x7f02f1a1b3a0 state=running> <Future at 0x7f02f1a1b700 state=pending> <Future at 0x7f02f1a1b820 state=pending> 1 0 3 2这边可以发现future对象是并行创建的。
协程与线程和进程的交叉使用
日后写代码可能会存在协程和线程、进程下的两种Future交叉使用的情况。 (在编程的时候,遇到某个第三方模块不支持协程异步的时候用) 一般情况下,不会交叉使用。在并发编程中,要么用asyncio这种协程式的来实现异步,要么统一用进程池或线程池来实现异步。有些情况下会交叉使用,例如:crm项目中80%是基于协程异步编程 + MySQL,这种情况下只有MySQL内部也支持async异步,两者才能实现无缝衔接。假如MySQL不支持协程,则需要考虑对此用线程、进程来实现了。
asyncio.run_in_executor()实现异步的内部原理:
- 第一步:内部会先调用TreadPoolExector的submit方法完成任务提交,并返回一个concurrent.futures.Future对象 (与asyncio没有任何关系)
- 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装成asyncio.Future对象
因为concurrent.futures.Future对象不支持await语法,因此需要转换成asyncio.Future对象才能进行使用
import time import asyncio import concurrent.futures def func_1(): time.sleep(2) return '这是一个任务的返回值...' async def main(): loop = asyncio.get_running_loop() # 在协程函数中运行普通函数 在执行函数时,协程内部会自动创建一个线程池来运行任务 # run_in_executor()方法第一个参数为None时则默认创建一个线程池 fut = loop.run_in_executor(None, func_1) result = await fut print('当前方式会自动创建一个线程池去执行普通函数: ', result) # 在协程函数中运行基于线程池的任务, 效果与以上代码一致 with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor(pool, func_1) print('在线程池中得到的执行结果: ', result) # 在协程函数中运行基于进程池的任务 with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor(pool, func_1) print('在进程池中得到的执行结果: ', result) if __name__ == "__main__": asyncio.run(main())异步与非异步模块混合使用案例
import asyncio import requests async def download_image(url): # 发送网络请求下载图片 print('开始下载:', url) loop = asyncio.get_running_loop() # request不支持异步,使用线程池来辅助实现 future = loop.run_in_executor(None, requests.get, url) response = await future print('下载完成') # 图片保存 file_name = url.rsplit('/')[-1] with open(file_name, 'wb') as f: f.write(response.content) if __name__ == '__main__': url_list = [ 'http://pic.bizhi360.com/bbpic/98/10798.jpg', 'http://pic.bizhi360.com/bbpic/92/10792.jpg', 'http://pic.bizhi360.com/bbpic/86/10386.jpg' ] loop = asyncio.get_event_loop() # 使用事件循环对象批量创建task对象 tasks = [loop.create_task(download_image(url)) for url in url_list] loop.run_until_complete(asyncio.wait(tasks))上述代码运行逻辑:
- 一开始创建了三个url的任务
- 针对第一个url,执行download_image()函数,执行run_in_executor(),下面遇到await
- 遇到await后协程挂起,去执行第二个url的任务,又创建一个线程,继续await,然后转向第三个url,然后继续创建线程,又等待。
以上的程序和纯依靠asyncio支持的协程实现异步存在一个区别就是,纯aysncio实现协程只存在一个线程,所有的I/O等待都是协程在等待。而上述过程,在实现异步的过程中,建立了三个线程,I/O等待是线程在进行等待,因此有较大的资源浪费。
绑定回调
在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值,如果回调需要多个参数,可以通过偏函数导入。
import asyncio # 使用async 来定义一个协程对象 async def do_work(x): print('waiting:', x) return 'Done after {} s'.format(x) # 获取协程对象 coroutine = do_work(3) # 创建事件循环 loop = asyncio.get_event_loop() # 创建task task = loop.create_task(coroutine) # 给任务添加回调函数 def callback(future): print('callback:', future.result()) task.add_done_callback(callback) # 将协程对象加入到事件循环中 loop.run_until_complete(task)future与result
回调一直是很多异步编程的噩梦,程序员更喜欢用同步的编写方式写异步代码,以避免回调的问题。回调中我们使用了future的result()方法,前面不绑定回调的例子中,可以看到task的finished状态。在那个时候可以直接读取task的result方法。
import asyncio # 使用async 来定义一个协程对象 async def do_work(x): print('waiting:', x) return f'Done after {x} s' # 获取协程对象 coroutine = do_work(3) # 创建事件循环 loop = asyncio.get_event_loop() # 创建task task = loop.create_task(coroutine) # 将协程对象加入到事件循环中 loop.run_until_complete(task) # 直接调用task中的result()来获取返回结果 print('直接获取返回结果:', task.result())阻塞与await
使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器中的yield一样,函数交出控制权。协程遇到await,事件循环就会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再执行下一个协程。
import time import asyncio # 使用async 来定义一个协程对象 async def do_work(x): print('waiting:', x) await asyncio.sleep(x) # 模拟一个IO操作 return f'Done after {x} s' now = lambda: time.time() start = now() # 获取协程对象 coroutine = do_work(3) # 创建事件循环 loop = asyncio.get_event_loop() # 创建task task = loop.create_task(coroutine) # 将协程对象加入到事件循环中 loop.run_until_complete(task) print('task result:', task.result()) print('Time:', now() - start)打印结果:
waiting: 3 task result: Done after 3s Time: 3.002943754196167并发执行与同步执行
对创建的future对象进行变量赋值并进行await
import time import asyncio now = lambda: time.time() async def func(): task_1 = asyncio.ensure_future(asyncio.sleep(1)) task_2 = asyncio.ensure_future(asyncio.sleep(1)) await task_1 await task_2 start = now() loop = asyncio.get_event_loop() for i in range(5): loop.run_until_complete(func()) print('异步所花费的时间: %f s' % (now() - start))打印结果:
异步所花费的时间: 5.005247 s直接运行future对象
import time import asyncio now = lambda: time.time() async def func(): await asyncio.ensure_future(asyncio.sleep(1)) await asyncio.ensure_future(asyncio.sleep(1)) start = now() loop = asyncio.get_event_loop() for i in range(5): loop.run_until_complete(func()) print('异步所花费的时间: %f s' % (now() - start))打印结果:
异步所花费的时间: 10.010090 s直接对asyncio.ensure_future()创建的future对象进行await是不能实现并发的,必须将创建的对象返回给一个变量,在对该变量绑定的future对象进行await才可实现。更好的方法是使用asyncio.gather()方法或者aysncio.wait()方法来实现并发。
异步上下文管理器
这种对象通过定义__aenter__()和__aexit__()方法来对asyncio with语句中的环境进行控制,叫做异步上下文管理器。
import asyncio class AsyncContextManager: def __init__(self, conn=None): self.conn = conn async def do_something(self): # 异步读取数据库操作 return '模拟增删改查数据库并返回操作结果...' async def __aenter__(self): self.conn = await asyncio.sleep(1) return self async def __aexit__(self, exec_type, exc_val, exc_tb): """ with语句运行结束之后触发此方法的运行 exc_type:如果抛出异常, 这里获取异常类型 exc_val:如果抛出异常, 这里显示异常内容 exc_tb:如果抛出异常, 这里显示所在位置, traceback """ # 异步关闭数据库连接 await asyncio.sleep(1) async def func(): async with AsyncContextManager() as fp: result = await fp.do_something() print(result) loop = asyncio.get_event_loop() loop.run_until_complete(func())