2013年Tornado3.0版本。gen.coroutine上线,直到今天它依旧是目前Tornado中使用最为广泛的装饰器。同时它也是接替gen.engine的存在。它基本去掉了gen.Task的套路。相对gen.engine而言。它只需要gen.coroutine就够了,写法上更为美观

目标

需要实现的目标是这样的

1
2
3
4
5
6
7
8
class GenAsyncHandler(RequestHandler):
@asynchronous
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")

在这里,它并不是gen.engine时代的作风。在上一篇中可以看到,添加gen.engine的时候没有改动过任何已有代码。可是在gen.coroutine时代。它去掉了gen.Task。于此同时,改动的是http_client.fetch。

Future

Task这个名词不够炫酷,然后它新增了一个类叫做Future。未来这个词看起来就很有科技感。是的,它表述的是,在未来某一刻,这个对象会得到结果,当然这背后的一切仍然是换汤不换药由回调操作来完成。看一下Future都有啥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Future:
def __init__(self):
self._val = None
self._callback = set()

def set_result(self, value):
self._val = value
for callback in self._callback:
callback(self)

def result(self):
return self._val

def add_done_callback(self, callback):
self._callback.add(callback)

一个Future类可以如此的简单。它只是提供了一个_val的存储场所,另外当使用set_result设置值的时候会顺便将加入到该对象的回调进行调用。提供了添加回调的函数add_done_callback

回忆gen.Task,它做的事情是对包含callback参数的函数A进行包装,当函数A的callback被调用的时候,实际上调用的是Runner.result_callback。这种做法的缺点呢就是在RequestHandler.get里面需要出现gen.Task。现在我们不希望它出现。我们完全可以改变这一逻辑。因为http_client.fetch肯定是会调用callback的。可以想象它最终的语句是callback(result)既然框架希望让用户不必输入gen.Task,那么可以选择在http_client.fetch中加入Future,设置它的callback为Future.set_result。随便举例如下。假设有一个函数是add_one,原本逻辑如下

1
2
3
def add_one(ret, callback):
ret += 1
IOLoop.instance().add_callback(lambda: callback(ret))

改写成这个样子

1
2
3
4
5
6
7
def add_one(ret):
future = Future()
ret += 1
def callback():
return future.set_result(ret)
IOLoop.instance().add_callback(callback)
return future

虽然这个地方最终被future.set_result执行。可是Future提供了add_done_callback接口,意味着允许我们自己的callback能够在add_one执行完毕后被future.set_result所触发,同时注意add_one返回的是一个future对象

coroutine

假如我们类比gen.engine去实现它

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Runner():

def __init__(self, g):
self.g = g
# 得到一个Future
self.yielded = self.g.send(None)
self.yielded.add_done_callback(self.future_callback)

def future_callback(self, future):
return self.result_callback(future.result())

def result_callback(self, value):
self.result = value
self.run()

def run(self):
try:
self.yielded = self.g.send(self.yielded.result())
self.yielded.add_done_callback(self.future_callback)
except StopIteration:
return

def coroutine(func):
@wraps(func)
def inner(*args, **kwargs):
result = func(*args, **kwargs)
return Runner(result)
return inner

是不是几乎一模一样,还可以正常运行。但是这里有一个极大的缺陷。它无法被嵌套使用,什么意思呢

1
2
3
4
5
6
7
8
from tornado.gen import coroutine
@coroutine
def a():
yield b()

@coroutine
def b():
return 1

同时在gen.engine时代, 这种调用方式也是不被允许的,可是gen.coroutine实现了这种方式的调用。主要原理是我们规定,所有yield右边的值全部是Future对象,对一个Future对象处理完毕后再处理下一个。那么就要修改coroutine了

1
2
3
4
5
6
7
8
def coroutine(func):
def inner():
f = Future()
def final_callback(value):
f.set_result(value)
Runner(func, final_callback)
return f
return inner

对于每一个子coroutine,它们都会生成一个Runner对象(此时Runner已经将生成器进行初始化,执行send(None),并add_done_callback),只是它们返回的并不是Runner,而是Future.在Future被执行set_result操作的时候子coroutine的yield往下走。直到遇到StopIteration异常,此时final_callback函数被调用,它被父coroutine所接受,触发父coroutine的yield往下走。。。。堪称完美。。。。

就这样,它实现了和yield from差不多的逻辑。父生成器调用子生成器,简直六的飞起不服不行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Runner():
def __init__(self, gen, final_callback):
self.final_callback = final_callback
self.gen = gen()
self.yielded = self.gen.send(None)
# 会回调future对象
self.yielded.add_done_callback(self.future_callback)

def future_callback(self, future):
self.run()

def run(self):
try:
self.yielded = self.gen.send(self.yielded.result())
except StopIteration as e:
self.final_callback(e.value)
return
else:
self.yielded.add_done_callback(self.future_callback)

说实话,还是感觉有点绕,希望你能够看明白

改进

和上面的简易写法不同。Tornado源码的实现要复杂一些,因为它要考虑更复杂的需求

  • 兼容性,它不能因为有了Future对象后就完全不顾以前gen.Task的写法
  • 它同样需要实现一次yield多个Future的需求

所以,它另外生成了一个和gen.Task类似的对象YieldFuture。和gen.Task拥有的成员对象一样
startis_readyget_result。同时由于Future在Tornado中应用是如此的普遍。IOLoop新增了一个方法add_future函数(还有一方面就是前面提到的callback异常问题)

1
2
3
4
5
def add_future(self, future, callback):
assert isinstance(future, Future)
callback = stack_context.wrap(callback)
future.add_done_callback(
lambda future: self.add_callback(callback, future))

在使用的时候

1
2
3
4
self.runner = runner
self.key = object()
runner.register_callback(self.key)
self.io_loop.add_future(self.future, runner.result_callback(self.key))

实际上如果不考虑异常情况,和它是等价的

1
Future.add_done_callback(self.future_callback)

和gen.engine一样。coroutine也会遇到这种问题

1
2
3
4
5
6
7
8
9
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response1, response2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
response_dict = yield dict(response3=http_client.fetch(url3),
response4=http_client.fetch(url4))
response3 = response_dict['response3']
response4 = response_dict['response4']

一次yield 多个Future对象。那么解决办法也是和engine差不多的。当发现send(value)返回值是一个list或者dict对象时。它会使用Multi进行封装。在被回调的时候检查Multi对象的is_ready状态。仅仅当都得到结果才算完成

另外在Tornado3.1版本HandlerRequest._execute进行改动。被coroutine装饰的函数不需要再被asynchronous所装饰。至此这个从1.0.0跨越到3.1.0版本的装饰器的生命走到了尽头。然而依旧很多人不管不顾硬是要加上这个装饰器才安心

应用一

可以说自此之后Tornado只存在Future配合yield、coroutine的操作。你从头新建一个Tornado的异步库。用户调用最后必定是返回给用户Future对象。这里我印象比较深的就是用Future实现了celery结果的异步获取。说是异步。实质上就是轮询。代码在这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from tornado.concurrent import TracebackFuture
from tornado.ioloop import IOLoop

def async(task, *args, **kwargs):
future = TracebackFuture()
callback = kwargs.pop("callback", None)
if callback:
IOLoop.instance().add_future(future,
lambda future: callback(future.result()))
result = task.delay(*args, **kwargs)
IOLoop.instance().add_callback(_on_result, result, future)
return future

def _on_result(result, future):
# if result is not ready, add callback function to next loop,
if result.ready():
future.set_result(result.result)
else:
IOLoop.instance().add_callback(_on_result, result, future)

celery执行task.delay立刻返回了result对象。然后将resule和future加入回调,查询resule.ready确定任务是否完成。一旦完成则调用future.set_resule。Future对象设置值之后yield就会继续往下走。完美~~~,否则它会再次循环,直到得到结果为止。可以看到它这种方式还是相当的粗暴的,因为一旦没有结果就会不停的循环。可是这种方式胜在代码简单

应用二

再来一个简单的例子,gen.sleep() 比较没有想到,它会是4.1.0版本才加入的,实现如下

1
2
3
4
def sleep(duration):
f = Future()
IOLoop.current().call_later(duration, lambda: f.set_result(None))
return f

(⊙v⊙)嗯,意不意外。这应该是我在Tornado里面发现最简单的代码了,创建一个Future对象,然后在IOLoop的_timeout列表中加入一个到期执行回调,设置Future的值。至此yield继续往下走~~

简易代码

可以用下面这段简易代码观察一下coroutine的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import tornado.ioloop
from functools import partial

class Future:

def __init__(self):
self._val = None
self._callback = set()

def set_result(self, value):
self._val = value
for callback in self._callback:
callback(self)

def result(self):
return self._val

def add_done_callback(self, callback):
self._callback.add(callback)


class Runner():
def __init__(self, gen, final_callback):
self.final_callback = final_callback
self.gen = gen()
self.yielded = self.gen.send(None)
# 会回调future对象
self.yielded.add_done_callback(self.future_callback)

def future_callback(self, future):
self.run()

def run(self):
try:
self.yielded = self.gen.send(self.yielded.result())
except StopIteration as e:
self.final_callback(e.value)
return
else:
self.yielded.add_done_callback(self.future_callback)

def coroutine(func):
def inner():
f = Future()
def final_callback(value):
f.set_result(value)
Runner(func, final_callback)
return f
return inner

@coroutine
def haha():
c = yield haha1()
r = yield add_one(1)
b = yield add_one(2)
print(r, b, c)

@coroutine
def haha1():
r = yield add_one(1)
b = yield add_one(2)
return r + b

def add_one(ret):
f = Future()
def callback(future, result):
future.set_result(result)
tornado.ioloop.IOLoop.instance().add_callback(partial(callback, f, ret + 1))
return f
haha()

tornado.ioloop.IOLoop().instance().start()