跳到主要内容

从零开始的 Python AsyncIO 生活

· 阅读需 32 分钟
Muel - Nova

一直在用 AsyncIOPython里使用异步编程,但是从来没想过为什么,借这个机会浅浅搞一搞 AsyncIO

可迭代对象(Iterable

首先,我们要明确什么是 Iterable,简而言之,就是可以放到 for 循环中的对象。常见的 list, str, tuple 以及 dict 等都是 Iterable

Python 是如何判断一个 obj 是不是 Iterable 的呢?我们可以使用 dir() 函数来查看它的属性列表。

使用下面这段代码,我们可以知道它们的共同接口

from typing import Iterable

iterable = [
"", # str
[], # list
{}, # dict
(), # tuple
set() # set
]


def show_diff(*objects: Iterable):
""" 打印 Iterable 和 object 的属性差集 """
assert objects
attrs = set(dir(objects[0]))
for obj in objects[1:]:
attrs &= set(dir(obj)) # 取 Iterable 交集
attrs -= set(dir(object)) # 取 Iterable 和 object 的差集
print(attrs)


show_diff(*iterable)

# {'__iter__', '__contains__', '__len__'}

可以看到,关键显然是 __iter__。事实上,对于任意指定了 __iter__ 方法的对象,都会被算作 Iterable。而 __len____contains__ 等等,则是 容器 类型的 Iterable 的共有属性。

如果我们添加一个 非容器 类型的 Iterable,结果就显而易见了。

iterable = [
"", # str
[], # list
{}, # dict
(), # tuple
set(), # set
open(__file__) # IO
]

show_diff(*iterable)

# {'__iter__'}

迭代器(Iterator

在 Python 中,诸如 __iter__ 这样的魔术方法都有相对应的调用方法,也就是 iter()

我们不妨来看一下上文所列举的 容器 型的 Iterable 在调用 iter() 后的结果

IO 类型相对特殊,暂时不作考虑。

for i in iterable:
print(iter(i))

"""
<str_iterator object at 0x7f7bd06fafe0>
<list_iterator object at 0x7f7bd06fafe0>
<dict_keyiterator object at 0x7f7bd08c4b80>
<tuple_iterator object at 0x7f7bd06fafe0>
<set_iterator object at 0x7f7bd0720440>
"""

可以看到,它们都返回了 Iterator 的对象。正如在 Iterable 中展示的那样,我们再次获取它们的属性差集。

# {'__next__', '__iter__'}

可以看到,相比于 Iterable 多了一个 __next__ 方法,从名字也能看出来,这便是用于在下一次迭代中返回数据的接口。

Iterator

我们下断点查看,发现 list_iterator__iter__ 显然是返回了它本身,而 __next__ 则会返回数据(在这里应该是 'This',但由于它再运行了一次 __iter__ 所以变成了下一个 `'Is')

在所有值迭代完毕后,它会抛出一个 StopIteration 的错误来告知迭代结束。

我们可以用下面的代码构建一个自定义的迭代器。

class MyIterator:
def __init__(self, Iter):
self.index = 0
self.data = Iter

def __next__(self):
while self.index < len(self.data):
data = self.data[self.index]
self.index += 1
return data
raise StopIteration

def __iter__(self):
""" 迭代器必须是可迭代的 """
return self


things = ["I", "AM", "ITERABLE", "GOD"]


for i in MyIterator(things): # -> i = iter(MyIterator(things)) && i = next(i)
print(i)

二者之间

回到我们最开始的 Iterable,我们说有 非容器 型的 Iterable,事实上,这些就是直接构建了我们的 Iterator

容器 型的 Iterable 是可重复迭代的(因为它们的 __iter__ 方法生成了新的 Iterator!),静态的。而 Iterator 则是一次性,可动态生成的(参见 MyIterator,你永远不能通过 "AM" 找到之前的数据 "I"

什么叫可动态生成的呢?我们以一个例子为例

import random


class Random:
def __iter__(self):
return self

def __next__(self):
return random.random()


for i in Random():
print(i)

这个程序不会自己停下来!因为它一直在实时的动态生成数据,而且不占用内存空间!(这实际上就是我们接下来要讨论的 Generator

生成器(Generator

一个函数在任意地方出现了 yield 关键词,无论是否会被执行,该函数都会变异成为 Generator

def my_generator_function():
return 0
yield "HI"


my_generator = my_generator_function()
print(my_generator)

# <generator object my_generator at 0x7f169b38c2e0>

在 Python 官方文档中,把变异的函数称为 Generator,而其返回的对象叫做 Generator Iterator,尽管我们并不喜欢这么说。

我们更喜欢将函数称为 Generator Function 而返回的对象称为 Generator

而通过这个名字我们也可以看出来,事实上 Generator Iterator 也是迭代器。

assert my_generator is iter(my_generator)
assert hasattr(my_generator, "__next__") and hasattr(my_generator, "__iter__")

也就是说,Generator 实际上只是一个函数版本的 Iterator,唯一的不同是 next(Generator) 是由 yield 来控制的。

接下来我们重点关注 yield 这个表达式

test_yield_0

从图中我们可以发现,第一次执行 next(my_generator) 时,函数执行到了 yield 1 便被 挂起,等到下一次的 next(my_generator) 后才会执行接下来的代码块。

为了更清楚的看出这个特点,我们实现一个 Generator 的计数器作为例子(代码复制于 itertools

def count(start: int = 0, step: int = 1):
while True:
yield start
start += step

在第一次 __next__ 执行时,直接 yield start

此后,每次执行都会先执行 yield 下面的代码块 start += step,并重新执行循环,再次 yield start

我们继续来看一开始的例子。

在遇到 return 后,next(my_generator) 则直接抛出了 StopIteration 的异常(且 StopIteration.value 就是我们 return 的值),且 return 之后的代码块不再执行(哪怕你处理了 #line:28 的错误执行到了 #line:29 也仍会抛出 StopIteration

test_yield_1

此时,我们可以将我们的 MyIterator 对象重写成 Generator 形式

class MyIterator:
def __init__(self, Iter):
self.index = 0
self.data = Iter

def __iter__(self):
idx = 0
while idx < len(self.data):
yield self.data[idx]
idx += 1


things = ["I", "AM", "ITERABLE", "GOD"]
ge = MyIterator(things)

for i in ge: # 需要注意的是, `ge` 本身并非迭代器, 只有 `iter(ge)` 返回的 `generator` 对象才是迭代器
# 换句话说,`Generator` 自动帮我们实现了 `__iter__` 接口,而 `__next__` 就是我们的 `yield`
print(i)

为什么讲迭代器

为何我们在说协程之前要需要先讲迭代器呢?这就涉及到我们的 FunctionGenerator 的底层运行原理。

代码对象

def foo():
...


print(foo.__code__)

如上的代码打印了 foo() 函数的代码对象,代码对象保存了函数的一些静态信息。

code_obj

帧对象

frame_obj

使用 inspect 返回当前 frame,需要注意的是一般情况下函数运行完毕 frame 就会自动销毁,因此我们使用变量保存。

可以看到,frame 保存了函数运行时的一些状态,其中就有我们特别要关注的 stack

你可以预想到的,正常的函数执行是一个 先进后出 结构,在这里不做进一步的解释。(详情请去看 C 语言函数调用栈

function_stack

Generator 不同,它自带了一个帧,每次调用 __next__ 时都会使用这个帧。换句话说,每次这个帧都会执行出栈入栈操作。

而这个帧的入栈时机就是 next() 执行的时刻,出栈就是 next() 返回的时刻。

直到抛出 StopIteration,这个帧就会被销毁。

def generator():
...
yield ...

def a():
m = generator()
b(m)
c(m)

def b(g):
return next(g)

def c(g):
return next(g)

a()

generator_stack

这部分可能有点难理解,推荐是用自己下断点调一调啃一下。

同步和异步

此时,聪明的你可能已经想到了为什么我们要讨论生成器

如果是一个普通的函数

def normal_func():
foo()
bar()

根据栈结构,你永远不可能在 foo() 执行完成前执行 bar()

而如果是一个生成器

def generator_func_1():
...
yield # step1
...
yield # step2
...
yield # step3

def generator_func_2():
...
yield # step1
...
yield # step2
...
yield # step3

tasks = []
a = generator_func_1()
b = generator_func_2()
tasks.extend([a, b])

def runner():
for task in tasks:
next(task)

借助 runner() 的多次运行,我们可以同时的运行多个任务(在例子里,ab

每次 yield 之前的代码就是我们每一次的业务逻辑,每次 yield 完都会把控制权让出来交由下一个任务,这就是 协作 的概念

换而言之,

对于迭代器,我们关注的重点是 yield 返回的数据

对于协程,我们关注的重点是**yield 前执行的业务代码**

深入浅出 yield from

yield 表达式

之前在 生成器(Generator) 那里提到过一嘴,其实 yield 是一个表达式(Expression),这意味着它可以进行赋值等的操作。

my_yield = yield
my_yield = yield + 1

generator.send()

既然他可以被赋值,那我们不妨来看看它的值是多少

def _yield():
my_yield = yield
print(my_yield)


g = _yield()
next(g) # 1
next(g) # 2

根据上面所说的内容,我们的函数是执行到 yield 时就被挂起的,因此,想要 print 执行的话,还得再进行一次迭代。此时由于函数返回了隐式的 None,所以程序还会抛出 StopIteration 异常。这个结果是显而易见的。

next 作为最初期的生成器语法,到目前版本只是作为一个兼容使用(初期的生成器里,yield 是一个关键字,无法被赋值),在 Python >= 3.5 的版本后,针对生成器添加了一个新的方法,generator.send(),它接受一个参数,作用是在恢复挂起生成器的同时,将值传入 yield 表达式

在这个例子里

g.send(1)  # 2
# 相当于
next(g) # 2
def _yield()
my_yield = 1
print(my_yield)

不过值得注意的是,在生成器未运行时,第一个 generator.send() 的参数必须是 None,否则它会抛出一个 TypeError,这是显然的,因为你无法在一个刚运行的生成器里找到一个可赋值的 yield

g.send(1)  # 1
# TypeError: can't send non-None value to a just-started generator

优先级

此外,在使用 yield 表达式时,它的优先级也是我们需要注意的。

def _yield():
my_yield = yield + 1
print(my_yield)


g = _yield()
print(g.send(None))
g.send(1)

"""
1
1
"""

yield 仍会保留其作为 yield 的特性,即 返回 yield 后的语句

也就是说,当我们执行第一次 g.send(None) 时,yield 会返回 + 1 这个表达式

第二次执行 g.send(1) 时,它会把 yield + 1 这个表达式都替换成 1 (因为后面的 + 1 这个表达式已经返回了)

因此,我们 print(my_yield) 的预期结果是 2,而其实最后是 1

解决方法也很简单,只需要通过括号手动设置优先级即可

def _yield():
my_yield = (yield) + 1
print(my_yield)


g = _yield()
print(g.send(None))
g.send(1)

"""
None
2
"""

generator.close()generator.throw()

没什么好说的,close增加了一个方法让我们人为可控的结束 generatorthrow 则增加了一个方法让我们向 generator 内部抛出异常。它们会向生成器内部对应的 yield (与 generator.send() 同样的 yield 表达式)抛出一个 GeneratorExit / 自定义的异常,可以用于执行一些资源释放等的工作。

除了显式调用 generator.close() 外,在 Python 执行隐式的内存释放(del, 程序退出等等)的时候,也会调用这个 close 方法

最后,我们通过一个例子来体会一哈 yield 表达式的作用

def calc_average():
""" 计算动态数据的平均值 """
total = 0
count = 0
average = 0
while True:
try:
val = yield average
except GeneratorExit:
return average
else:
total += val
count += 1
average = total / count

g = calc_average()
g.send(None) # 0
g.send(1) # 1.0
g.send(9) # 5.0
...

yield from

RESULT = yield from EXPR  # EXPR must be Iterable

# 等价于(省略了部分代码好针对协程)

_i = iter(EXPR)
try:
_y = _i.send(None) # Prime, 预激
except StopIteration as _e:
_r = _e.value # 没有碰到 yield, 直接返回
else:
while True:
try:
_s = yield _y # 把结果原样 yield 出去,并接收传入的值
except GeneratorExit as _e:
_i.close() # 关闭 _i
raise _e # 再次抛出异常
except BaseException as _e:
_x = sys.exe_info()
try:
_y = _i.throw(*_x) # 向 _i 中抛出相同的异常
except StopIteration as _e:
_r = _e.value
break
else:
_y = _i.send(_s) # 再把接受的值原样 send 出去
except StopIteration as _e:
_r = _e.value
break

RESULT = _r # 最后的结果

而在协程里,事实上我们并不会传入值(也就是 _s = yield _y 永远是 None

因此,我们还可以再简化为:

_i = iter(EXPR)	
while True:
try:
_y = _i.send(None)
except StopIteration as _e:
_r = _e.value
else:
try:
yield _y # 把结果原样 yield 出去
except GeneratorExit as _e:
_i.close() # 关闭 _i
raise _e # 再次抛出异常
except BaseException as _e:
_x = sys.exe_info()
try:
_y = _i.throw(*_x) # 向 _i 中抛出相同的异常
except StopIteration as _e:
_r = _e.value
break

RESULT = _r # 最后的结果

你可以发现,这个 yield from 不过是一个套娃的过程。

真正的 AsyncIO

为什么要引入这个关键字呢?其实这就是协程,或者说异步的本质:

不能消除阻塞,而是将阻塞从下游传到上游

函数协程化

from time import sleep


def task():
""" 新建一个 task """
print("TASK BEGIN...")

print(" MainStep...")

main_result = main_step()

print(f" MainStep Finished with result {main_result}")

print("TASK END")


def main_step():
print(" SmallStep(s)...")

small_result = small_step()
... # There could be more steps

print(f" SmallStep(s) Finished with result {small_result}")
return small_result * 100


def small_step():
print(" I'm doing the small step...")
sleep(2) # doing something...
print(" I'm finished")
return 123


task()

让我们来看这样一段代码。可以看到,我们的 sleep (也就是实际事件中的阻塞,例如网络请求或是其他的可能导致阻塞的 IO 操作)实际是发生在 small_step 内部的,倘若我们 main_step() 中有不止一个的 small_step ,那么其他的小步骤一定要等到我们定义的这个 small_step 结束后才会执行。

我们不妨这样把阻塞转移到 main_step()

def small_step():
print(" I'm doing the small step...")
t1 = time.time()
yield sleep, 2
assert time.time() - t1 > 2, "你没阻塞,小猪"
print(" I'm finished")
return 123

此时,我们返回的内容不再是直接的结果,而是一个协程。

因此我们对 main_step() 也需要做一定的改动

def main_step():
print(" SmallStep(s)...")

small_coro = small_step()
while True:
try:
x = small_coro.send(None)
except StopIteration as _e:
small_result = _e.value
break
else: # 阻塞
yield x # 再次传到上游
... # There could be more steps

print(f" SmallStep(s) Finished with result {small_result}")
return small_result * 100

同样的,我们对于 one_task() 也这样的作出改动

def task():
""" 新建一个 task """
print("TASK BEGIN...")

print(" MainStep...")

main_coro = main_step()
while True:
try:
x = main_coro.send(None)
except StopIteration as _e:
main_result = _e.value
break
else:
func, arg = x
func(arg)

print(f" MainStep Finished with result {main_result}")

print("TASK END")

细心的人可能已经发现了,我们的 yield 是传染性的,也就是说你在 small_step 里利用了 yield,那么对于它的上游函数,也必须要修改(这也正是 async 的工作模式)

此时我们回到 main_step,对照 yield from 的等价代码其实你就明白了,这个关键词的存在意义就是用于简化我们套娃(或者官方一点,透传)的过程。

def main_step():
print(" SmallStep(s)...")

small_result = yield from small_step()

print(f" SmallStep(s) Finished with result {small_result}")
return small_result * 100

如果我们把 yield from 改成 await 呢?如果你想到了这个,证明你已经理解了 async 的原理。

然而这样做,我们就会混用 yield fromyield 两种方法。有没有方法统一我们的 task, main_step, small_step 呢?

def small_step():
print(" I'm doing the small step...")
t1 = time.time()
yield from sleep, 2
assert time.time() - t1 > 2, "你没阻塞,小猪"
print(" I'm finished")
return 123

我们直接把 small_step() 里的 yield 改成 yield from,现在,如果你还记得 yield from 的等价代码,你就知道我们应该传入一个 Iterable

我们定义一个新的类来包裹它。

def small_step():
print(" I'm doing the small step...")
t1 = time.time()
yield from YieldIterable(sleep, 2)
assert time.time() - t1 > 2, "你没阻塞,小猪"
print(" I'm finished")
return 123


class YieldIterable:
def __init__(self, *obj):
self.obj = obj

def __iter__(self):
yield self.obj

此时,我们的 small_stepmain_step 都已经 协程化 ,只剩下一个 task。对比代码我们可以发现,事实上我们的 task 内的代码已经很接近 yield from 的形式。

def task():
""" 新建一个 task """
print("TASK BEGIN...")

print(" MainStep...")

main_result = yield from main_step()

print(f" MainStep Finished with result {main_result}")

print("TASK END")

由于我们的 task 出现了 yield,因此我们无法直接的运行 task()

任务驱动器

我们可以通过一个任务驱动器来运行它。

class Task:
def __init__(self, _task):
self.coro = _task

def run(self):
while True:
try:
x = self.coro.send(None)
except StopIteration as _e:
result = _e.value
break
else:
func, arg = x
func(arg)
return result


Task(task()).run()

此时,我们的协程改造已经结束了。事实上,我们完全可以修改我们的 yield fromawait,并添加 async 关键字,同时把我们类中的 __iter__ 改为 __await__,接下来,我们也将在这个的基础上继续完善。

import time
from time import sleep


async def task():
""" 新建一个 task """
print("TASK BEGIN...")

print(" MainStep...")

main_result = await main_step()

print(f" MainStep Finished with result {main_result}")

print("TASK END")


async def main_step():
print(" SmallStep(s)...")

small_result = await small_step()

print(f" SmallStep(s) Finished with result {small_result}")
return small_result * 100


async def small_step():
print(" I'm doing the small step...")
t1 = time.time()
await Awaitable(sleep, 2)
assert time.time() - t1 > 2, "你没阻塞,小猪"
print(" I'm finished")
return 123


class Awaitable:
def __init__(self, *obj):
self.obj = obj

def __await__(self):
yield self.obj


class Task:
def __init__(self, _task):
self.coro = _task

def run(self):
while True:
try:
x = self.coro.send(None)
except StopIteration as _e:
result = _e.value
break
else:
func, arg = x
func(arg)
return result


Task(task()).run()

回到我们的 small_step,我们的阻塞使用的是 sleep, 2 这么一个硬编码的阻塞,而现实中的阻塞远不止这么一种,我们应该追求一种更为普遍的阻塞处理。

Awaitable 里,我们直接 yield self

class Awaitable:
def __init__(self, *obj):
self.obj = obj

def __await__(self):
yield self

class Task:
def __init__(self, _task):
self.coro = _task

def run(self):
while True:
try:
x = self.coro.send(None)
except StopIteration as _e:
result = _e.value
break
else:
func, arg = x.obj
func(arg)
return result

现在,注意到一件事:我们的 Task.run() 仍然是阻塞的,我们的程序的运行权仍然没有彻底的让出去。让我们继续来修改 Task 的代码。

class Task:
def __init__(self, _task):
self.coro = _task
self._done = False
self._result = None

def run(self):
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self._result = _e.value
self._done = True
else:
... # 这不应该出现, 应该抛出异常


t = Task(task())
t.run()
for i in range(10): # 在 sleep(2) 的过程中,我们可以做其他的事情。
print("doing something", i)
sleep(0.2)
t.run()

我们手工调度了多任务,在实际上,我们应该通过事件循环(Event Loop)自动的调度任务

Event Loop

首先,任务必定需要有一个队列。我们使用 deque 这个双向队列来保存。

class Event:
def __init__(self):
self._queue = collections.deque()

def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))

接下来我们添加定时任务。由于定时任务的特殊性,我们使用 来储存。这里,利用 heapq 来操作。

class Event:
def __init__(self):
self._queue = collections.deque()
self._scheduled = []

def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))

def call_later(self, delay, callback, *args, **kwargs):
_t = time.time() + delay
heapq.heappush(self._scheduled, (_t, callback, args, kwargs))

接着我们写事件调度的函数。

class Event:
def __init__(self):
self._queue = collections.deque()
self._scheduled = []
self._stopping = False

def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))

def call_later(self, delay, callback, *args, **kwargs):
_t = time.time() + delay
heapq.heappush(self._scheduled, (_t, callback, args, kwargs))

def stop(self):
self._stopping = True

def run_forever(self):
while True:
self.run_once() # 至少要执行一次, 所以把判断写在下面
if self._stopping:
break

def run_once(self):
now = time.time()
if self._scheduled and now > self._scheduled[0][0]:
_, cb, args, kwargs = heapq.heappop(self._scheduled)
self._queue.append((cb, args, kwargs))

task_num = len(self._queue) # 防止运行过程中队列添加更多任务
for _ in range(task_num):
cb, args, kwargs = self._queue.popleft()
cb(*args, **kwargs)


t = Task(task())
loop = Event()
loop.call_soon(t.run)
loop.call_later(2, t.run)
loop.call_later(2.1, loop.stop)
loop.run_forever()

现在,我们修改一下 small_step

async def small_step():
t1 = time.time()
time_ = random.randint(1, 3)
await Awaitable(time_)
assert time.time() - t1 > time_, "你没阻塞,小猪"
return time_

此时,由于这个时间传到了 Task,所以我们要在 Task 中处理,换而言之就是在阻塞的时候添加一个 loop.call_later(),回调就是 self.run

class Task:
def __init__(self, _task):
self.coro = _task
self._done = False
self._result = None

def run(self):
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self._result = _e.value
self._done = True
else:
loop.call_later(*x.obj, self.run)
else:
... # 这不应该出现, 应该抛出异常

我们便可以删除我们手动指定的 call_later

t = Task(task())
loop = Event()
loop.call_soon(t.run)
loop.call_later(1.1, loop.stop) # random() 只会出现 0 ~ 1 之间的
loop.run_forever()

现在,我们试试多任务实现,并通过一些参数来实际的展示 异步 的效果。

import collections
import heapq
import itertools
import random
import time
from time import sleep

count = itertools.count(0)
total = 0


async def task():
""" 新建一个 task """
print("TASK BEGIN...")

print(" MainStep...")

main_result = await main_step()

print(f" MainStep Finished with result {main_result}")

print("TASK END")


async def main_step():
print(" SmallStep(s)...")

small_result = await small_step()

print(f" SmallStep(s) Finished with result {small_result}")

return small_result * 100


async def small_step():
t1 = time.time()
time_ = random.random()
await Awaitable(time_)
assert time.time() - t1 > time_, f"{time_} 你没阻塞,小猪 {time.time() - t1}"
return time_


class Awaitable:
def __init__(self, *obj):
self.obj = obj

def __await__(self):
yield self


class Task:
def __init__(self, _task):
self.coro = _task
self._done = False
self._result = None
self._id = f"Task-{next(count)}"

def run(self):
print(f"--------- {self._id} --------")
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self._result = _e.value
self._done = True
else:
loop.call_later(*x.obj, self.run)
else:
... # 这不应该出现, 应该抛出异常
print("-------------------------")


class Event:
def __init__(self):
self._queue = collections.deque()
self._scheduled = []
self._stopping = False

def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))

def call_later(self, delay, callback, *args, **kwargs):
_t = time.time() + delay
global total
total += delay
heapq.heappush(self._scheduled, (_t, callback, args, kwargs))

def stop(self):
self._stopping = True

def run_forever(self):
while True:
self.run_once() # 至少要执行一次, 所以把判断写在下面
if self._stopping:
break

def run_once(self):
now = time.time()
if self._scheduled and now > self._scheduled[0][0] + (10 ** -5):
_, cb, args, kwargs = heapq.heappop(self._scheduled)
self._queue.append((cb, args, kwargs))

task_num = len(self._queue) # 防止运行过程中队列添加更多任务
for _ in range(task_num):
cb, args, kwargs = self._queue.popleft()
cb(*args, **kwargs)


loop = Event()
for _ in range(1000):
t = Task(task())
loop.call_soon(t.run)
loop.call_later(1.1, loop.stop)
t1 = time.time()
loop.run_forever()
print(f"actually: {time.time()-t1}")
print(f"total: {total}")

image-20220830143812689

可以看到,我们正常运行这么多任务需要的时间应该是 509.3s,但是由于多任务的调度实现的并发执行,我们实际上在 1s 以内便运行完成了这所有的 1000 个任务。

Future

最后,我们的代码是主动使用 sleep 来模拟阻塞的,那实际情况我们应该怎么做呢?

通常情况下,我们是希望我们执行某个操作,并且获得某个值,例如下面

async def small_step():
result = await Awaitable(...)
return result

此时我们应该引入 Future。什么叫 Future?就是未来才会发生的结果,对比 Awaitable,我们显然不能在创建时就将结果直接传入。

class Future:
def __init__(self):
self._result = None
self._done = False

def set_result(self, result):
if self._done:
raise RuntimeError() # 不允许的操作
self._result = result
self._done = True

@property
def result(self):
if self._done:
return self._result
raise RuntimeError()

def __await__(self):
yield self

这样做的话,我们必须要有一个东西来指派什么时候运行 set_result

async def small_step():
fut = Future()
# do something, 它会调用 set_result
...
result = await fut
return result

此时,Task 中接收到了我们这个 future,但是这个 future 中什么信息都没有,只有一个标志位告诉我们任务还没完成。

我们的 Task 中应该如何知道它什么时候恢复运行呢?

我们可以在 Future 中再添加一个 callback 的记录。

class Future:
def __init__(self):
self._result = None
self._done = False
self._callbacks = []

def add_done_callback(self, cb):
self._callbacks.append(cb)

def set_result(self, result):
if self._done:
raise RuntimeError() # 不允许的操作
self._result = result
self._done = True

for cb in self._callbacks:
cb() # 当然可能有其他参数

@property
def result(self):
if self._done:
return self._result
raise RuntimeError()

def __await__(self):
yield self
return self.result # result = await fut 会获取这个值

class Task:
def __init__(self, _task):
self.coro = _task
self._done = False
self._result = None
self._id = f"Task-{next(count)}"

def run(self):
print(f"--------- {self._id} --------")
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self._result = _e.value
self._done = True
else:
x.add_done_callback(self.run)
else:
... # 这不应该出现, 应该抛出异常
print("-------------------------")

现在,我们通过写一个 fake_io 来查看效果

def fake_io(fut):
def read():
sleep(random.random()) # IO 阻塞
fut.set_result(random.random())
threading.Thread(target=read).start()

最后,我们观察 TaskFuture

我们可以发现 Task 完全可以继承 Future

class Task(Future):
def __init__(self, _task):
super().__init__()
self.coro = _task
self._id = f"Task-{next(count)}"

def run(self):
print(f"--------- {self._id} --------")
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self.set_result(_e.value)
else:
x.add_done_callback(self.run)
else:
... # 这不应该出现, 应该抛出异常
print("-------------------------")

此时,AsyncIO 已经基本实现了。当然,相比 Python 本身的 AsyncIO,我们的代码可以叫做十分简陋。性能不够(毕竟不是 C 写的)之外,也在更多的异常处理等等方面存在问题。最后放一下优化后的代码。( Taskloop 挂钩的方面没说明但是写了)

import collections
import heapq
import itertools
import random
import threading
import time
from time import sleep

count = itertools.count(0)
blocked = 0


async def task():
""" 新建一个 task """
print("TASK BEGIN...")

print(" MainStep...")

main_result = await main_step()

print(f" MainStep Finished with result {main_result}")

print("TASK END")


async def main_step():
print(" SmallStep(s)...")

small_result = await small_step()

print(f" SmallStep(s) Finished with result {small_result}")

return small_result * 100


async def small_step():
fut = Future()
fake_io(fut)
result = await fut
return result


class Future:
def __init__(self):
self._result = None
self._done = False
self._callbacks = []

def add_done_callback(self, cb):
self._callbacks.append(cb)

def set_result(self, result):
if self._done:
raise RuntimeError() # 不允许的操作
self._result = result
self._done = True

for cb in self._callbacks:
cb() # 当然可能有其他参数

@property
def result(self):
if self._done:
return self._result
raise RuntimeError()

def __await__(self):
yield self
return self.result


class Task(Future):
def __init__(self, _task):
super().__init__()
self._loop = loop
self.coro = _task
self._id = f"Task-{next(count)}"
self._loop.call_soon(self.run)
self._start_time = time.time()

def run(self):
print(f"--------- {self._id} --------")
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self.set_result(_e.value)
global blocked
blocked += time.time() - self._start_time
else:
x.add_done_callback(self.run)
else:
... # 这不应该出现, 应该抛出异常
print("-------------------------")


class Event:
def __init__(self):
self._queue = collections.deque()
self._scheduled = []
self._stopping = False

def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))

def call_later(self, delay, callback, *args, **kwargs):
_t = time.time() + delay
heapq.heappush(self._scheduled, (_t, callback, args, kwargs))

def stop(self):
self._stopping = True

def run_forever(self):
while True:
self.run_once() # 至少要执行一次, 所以把判断写在下面
if self._stopping:
break

def run_once(self):
now = time.time()
if self._scheduled and now > self._scheduled[0][0] + (10 ** -5):
_, cb, args, kwargs = heapq.heappop(self._scheduled)
self._queue.append((cb, args, kwargs))

task_num = len(self._queue) # 防止运行过程中队列添加更多任务
for _ in range(task_num):
cb, args, kwargs = self._queue.popleft()
cb(*args, **kwargs)


def fake_io(fut):
def read():
sleep(t_ := random.random()) # IO 阻塞
fut.set_result(t_)
threading.Thread(target=read).start()


def run_until_all_task(tasks):
if tasks := [_task for _task in tasks if not _task._done]:
loop.call_soon(run_until_all_task, tasks)
else:
loop.call_soon(loop.stop)


loop = Event()
all_tasks = [Task(task()) for _ in range(1000)]
loop.call_soon(run_until_all_task, all_tasks)
t1 = time.time()
loop.run_forever()
print(time.time() - t1, blocked)

Loading Comments...