asyncio

  1. 回调模式编码的复杂度高,
  2. 同步编码的并发性不高,
  3. 多线程编程需要线程间同步, 通过lock机制,会影响效率,

所以我们就需要采用同步的方式去编写异步的代码,

1
2
3
4
5
6
7
def get_url(url):
# 我们获取网页信息是耗时操作,我们需要在这里等待
html = get_html(url)# 所以我们希望在这里能跳出函数,去做一下别的事情,等获取到了数据再回来,
parse_html(html)

def get_next(url):
pass

这样我们也就不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

所以就有了协程

协程,又称微线程,也就是一个可以暂停的函数,等有了结果再切换回去,是我们程序员自己来切换的

那有什么办法能暂停函数,然后再回来呢?

我们想到了生成器,所以我们这里再提一下 生成器

生成器

send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def get_url():
# 这里有一个耗时的请求,需要用另一个函数来处理,并把处理好的结果返回回来
html = yield '耗时操作'
print(html)
# 这里需要加一个yield,不然会报错StopIteration,所以send也会接收到值
yield 123

# 生成生成器
gen = get_url()
# 获取值,模拟另一个函数处理,必须先用一次next来启动生成器
print(next(gen))
# 发送值,处理完后把结果发送回去
print(gen.send('返回结果'))

>>耗时操作
>>返回结果
>>123

yield from:允许发送生成器

1
2
3
4
5
6
7
8
9
10
11
12
13
def gen1():
yield 1
yield 2
def gen2():
yield 3
# 可以返回或者说发送一个 生成器
yield from gen1()
yield 4


g2 = gen2()
for item in g2:
print(item)

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 我们这样一个数据,我们希望获取一个总数,然后跟一个原来的列表
items = {
'小明':[94,68,46,66],
'小红':[99,33,43,66],
'小白':[98,45,46,66],
'小黑':[101,64,46,66],
'小路':[93,65,46,66],
'小兰':[91,66,46,66],
}
"""
result = {
总数
'小明': (274, [94, 68, 46, 66]),
'小红': (241, [99, 33, 43, 66]),
'小白': (255, [98, 45, 46, 66]),
'小黑': (277, [101, 64, 46, 66]),
'小路': (270, [93, 65, 46, 66]),
'小兰': (269, [91, 66, 46, 66])
}
"""
result = {}

def computer(key):
total = 0
temp = []
while True:
# 获得main函数发送过来的数据
a = yield
print(key,'传了一个值过来',a)
# 如果发送过来的是None则退出循环
if not a:
break
total+=a
temp.append(a)
# 返回一个元组
return total,temp
def middle(key):
while True:
result[key] = yield from computer(key)
def main():
for k,v in items.items():
m=middle(k)
# 激活生成器
m.send(None)
for i in v:
m.send(i)
#关闭computer生成器
m.send(None)
main()
print(result)

asyncio异步模块

asyncio是python 用于解决异步IO变成的一整套方案

实现了:

  • 包括各种特定系统实现的模块化的事件循环
  • 传输和协议抽象
  • TCP,UDP,SSL,子进程,延时调用,以及其他的具体支持
  • 模块futures模块,但适用于事件循环使用的Future类
  • 基于yield from 的协议和任务.可以让你用顺序的方式编写并发代码
  • 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池
  • 模仿threading模块中的同步原语,可以用在单线程内的协程之间

三个要素:事件循环,回调函数(驱动生成器),以及IO多路复用

简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time

# 在 asyncio中 我们用async 表示这个函数是一个异步函数
async def get_html(url):

print('start get html')
# 我们用asyncio.sleep(2) 来模拟一个耗时操作,这里我们不用time.sleep
# 要记住在异步函数内不能使用同步
# 需要加上await 来表示这是一个 耗时操作
await asyncio.sleep(2)
print('end get html')
start_time = time.time()
# 我们创建事件循环
loop = asyncio.get_event_loop()
# 运行我们的异步程序
loop.run_until_complete(get_html('url'))
print(time.time()-start_time)

>>start get html
>>end get html
>>2.001361608505249

这样看我们并不能感受到异步带来的好处,我们接下来创建100个任务来模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time

# 在 asyncio中 我们用async 表示这个函数是一个异步函数
async def get_html(url):
print('start get html')
# 我们用asyncio.sleep(2) 来模拟一个耗时操作,这里我们不用time.sleep
# 要记住在异步函数内不能使用同步
# 需要加上await 来表示这是一个 耗时操作
await asyncio.sleep(2)
print('end get html')
start_time = time.time()
# 我们创建事件循环
loop = asyncio.get_event_loop()
# 创建一个100个task任务
tasks = [get_html('url') for i in range(100)]
# 运行我们的异步程序
# 这时需要使用asyncio.gather,也可以用wait 推荐实用gather
# loop.run_until_complete(asyncio.wait(tasks))
loop.run_until_complete(asyncio.gather(*tasks))
print(time.time()-start_time)

>>2.0133612155914307
大家可以去运行一下 时间还是2s

取消task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def cancel_test(sleep_time):
print('start')
await asyncio.sleep(sleep_time)
print(sleep_time)
tasks = []
for i in range(3):
tasks.append(cancel_test(i+1))
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.gather(*tasks))
except KeyboardInterrupt as e:
# 返回事件循环所运行的未完成的 Task 对象的集合。
# asyncio.all_tasks() 实测 3.7.3 不会创建loop 需要传当前loop
# 如果 loop 为 None,则会使用 get_running_loop() 获取当前事件循环。
# 可以用asyncio.Task.all_tasks():
for task in asyncio.all_tasks(loop):
print(task.cancel())
finally:
loop.close()

子协程调度

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio

# 计算协程
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y

# 执行print_sum函数
async def print_sum(x, y):
# 交给子协程
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))

# 创建事件循环
loop = asyncio.get_event_loop()
# 创建task事件
task = loop.create_task(print_sum(1,2))
# 放入事件循环
loop.run_until_complete(task)
loop.close()

该图在3.6的官方文档中

tulip_coro

解析:

image-20200702193436174

线程池结合asyncio

线程池为什么要跟asyncio 结合呢?

由于协程中不能处理阻塞IO 如mysql库是阻塞的,如果我们需要在协程中强行使用,怎么办?asyncio也可以使用多线程下面我们来模拟一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

# 模拟阻塞的函数
def get_url(url):
print('*'*10,url)
# 模拟阻塞操作
time.sleep(5)

start_time = time.time()
loop = asyncio.get_event_loop()
# 生成一个线程池
executor = ThreadPoolExecutor(3)
tasks = []
for url in range(20):
# 生成任务,用线程池run_in_executor(线程池,函数,参数)
task = loop.run_in_executor(executor,get_url,url)
# 添加任务
tasks.append(task)
# 调用任务
loop.run_until_complete(asyncio.gather(*tasks))
print('==='*10,time.time()-start_time)

其实就是将阻塞的函数放到线程池中

建议大家不要在协程中调用阻塞方式,如果非要运行的话,建议使用线程池

future 和 task

future是一个结果容器,用来存放结果

task是future的子类,主要用作协程与future的桥梁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def __init__(self, coro, *, loop=None):
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
if not coroutines.iscoroutine(coro):
# raise after Future.__init__(), attrs are required for __del__
# prevent logging for pending task in __del__
self._log_destroy_pending = False
raise TypeError(f"a coroutine was expected, got {coro!r}")

self._must_cancel = False
self._fut_waiter = None
self._coro = coro
self._context = contextvars.copy_context()
# 初始化的时候调用自己的_step方法
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)

__step方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def __step(self, exc=None):
if self.done():
raise futures.InvalidStateError(
f'_step(): already done: {self!r}, {exc!r}')
if self._must_cancel:
if not isinstance(exc, futures.CancelledError):
exc = futures.CancelledError()
self._must_cancel = False
coro = self._coro
self._fut_waiter = None

_enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
# 第一次请求
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
# 激活协程
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
super().set_exception(futures.CancelledError())
else:
# 处理报错 并获取值
super().set_result(exc.value)
# 下面还有很多逻辑,做了很多处理

协程还是需要很多底层知识的,还是乖乖去看一下 底层原理