2013年Tornado3.0版本。gen.coroutine上线,直到今天它依旧是目前Tornado中使用最为广泛的装饰器。同时它也是接替gen.engine的存在。它基本去掉了gen.Task的套路。相对gen.engine而言。它只需要gen.coroutine就够了,写法上更为美观
目标 需要实现的目标是这样的
1 2 3 4 5 6 7 8 class GenAsyncHandler (RequestHandler ): @asynchronous @gen.coroutine def get (self ): http_client = AsyncHTTPClient() response = yield http_client.fetch("http://example.com" ) do_something_with_response(response) self.render("template.html" )
在这里,它并不是gen.engine时代的作风。在上一篇中可以看到,添加gen.engine的时候没有改动过任何已有代码。可是在gen.coroutine时代。它去掉了gen.Task。于此同时,改动的是http_client.fetch。
Future Task这个名词不够炫酷,然后它新增了一个类叫做Future。未来
这个词看起来就很有科技感。是的,它表述的是,在未来某一刻,这个对象会得到结果,当然这背后的一切仍然是换汤不换药由回调操作来完成。看一下Future都有啥
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class Future : def __init__ (self ): self._val = None self._callback = set () def set_result (self, value ): self._val = value for callback in self._callback: callback(self) def result (self ): return self._val def add_done_callback (self, callback ): self._callback.add(callback)
一个Future类可以如此的简单。它只是提供了一个_val的存储场所,另外当使用set_result设置值的时候会顺便将加入到该对象的回调进行调用。提供了添加回调的函数add_done_callback
回忆gen.Task,它做的事情是对包含callback参数的函数A进行包装,当函数A的callback被调用的时候,实际上调用的是Runner.result_callback。这种做法的缺点呢就是在RequestHandler.get里面需要出现gen.Task。现在我们不希望它出现。我们完全可以改变这一逻辑。因为http_client.fetch肯定是会调用callback的。可以想象它最终的语句是callback(result)
。既然框架希望让用户不必输入gen.Task,那么可以选择在http_client.fetch中加入Future,设置它的callback为Future.set_result 。随便举例如下。假设有一个函数是add_one,原本逻辑如下
1 2 3 def add_one (ret, callback ): ret += 1 IOLoop.instance().add_callback(lambda : callback(ret))
改写成这个样子
1 2 3 4 5 6 7 def add_one (ret ): future = Future() ret += 1 def callback (): return future.set_result(ret) IOLoop.instance().add_callback(callback) return future
虽然这个地方最终被future.set_result执行。可是Future提供了add_done_callback接口,意味着允许我们自己的callback能够在add_one执行完毕后被future.set_result所触发 ,同时注意add_one返回的是一个future对象
coroutine 假如我们类比gen.engine去实现它
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class Runner (): def __init__ (self, g ): self.g = g self.yielded = self.g.send(None ) self.yielded.add_done_callback(self.future_callback) def future_callback (self, future ): return self.result_callback(future.result()) def result_callback (self, value ): self.result = value self.run() def run (self ): try : self.yielded = self.g.send(self.yielded.result()) self.yielded.add_done_callback(self.future_callback) except StopIteration: return def coroutine (func ): @wraps(func ) def inner (*args, **kwargs ): result = func(*args, **kwargs) return Runner(result) return inner
是不是几乎一模一样,还可以正常运行。但是这里有一个极大的缺陷。它无法被嵌套使用,什么意思呢
1 2 3 4 5 6 7 8 from tornado.gen import coroutine@coroutine def a (): yield b() @coroutine def b (): return 1
同时在gen.engine时代, 这种调用方式也是不被允许的,可是gen.coroutine实现了这种方式的调用。主要原理是我们规定,所有yield右边的值全部是Future对象,对一个Future对象处理完毕后再处理下一个。那么就要修改coroutine了
1 2 3 4 5 6 7 8 def coroutine (func ): def inner (): f = Future() def final_callback (value ): f.set_result(value) Runner(func, final_callback) return f return inner
对于每一个子coroutine,它们都会生成一个Runner对象(此时Runner已经将生成器进行初始化,执行send(None),并add_done_callback),只是它们返回的并不是Runner,而是Future.在Future被执行set_result操作的时候子coroutine的yield往下走。直到遇到StopIteration异常,此时final_callback函数被调用,它被父coroutine所接受,触发父coroutine的yield往下走。。。。堪称完美。。。。
就这样,它实现了和yield from差不多的逻辑。父生成器调用子生成器,简直六的飞起不服不行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class Runner (): def __init__ (self, gen, final_callback ): self.final_callback = final_callback self.gen = gen() self.yielded = self.gen.send(None ) self.yielded.add_done_callback(self.future_callback) def future_callback (self, future ): self.run() def run (self ): try : self.yielded = self.gen.send(self.yielded.result()) except StopIteration as e: self.final_callback(e.value) return else : self.yielded.add_done_callback(self.future_callback)
说实话,还是感觉有点绕,希望你能够看明白
改进 和上面的简易写法不同。Tornado源码的实现要复杂一些,因为它要考虑更复杂的需求
兼容性,它不能因为有了Future对象后就完全不顾以前gen.Task的写法
它同样需要实现一次yield多个Future的需求
所以,它另外生成了一个和gen.Task类似的对象YieldFuture。和gen.Task拥有的成员对象一样start
、is_ready
、get_result
。同时由于Future在Tornado中应用是如此的普遍。IOLoop新增了一个方法add_future函数(还有一方面就是前面提到的callback异常问题)
1 2 3 4 5 def add_future (self, future, callback ): assert isinstance (future, Future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
在使用的时候
1 2 3 4 self.runner = runner self.key = object() runner.register_callback(self.key) self.io_loop.add_future(self.future, runner.result_callback(self.key))
实际上如果不考虑异常情况,和它是等价的
1 Future.add_done_callback(self.future_callback)
和gen.engine一样。coroutine也会遇到这种问题
1 2 3 4 5 6 7 8 9 @gen.coroutine def get (self ): http_client = AsyncHTTPClient() response1, response2 = yield [http_client.fetch(url1), http_client.fetch(url2)] response_dict = yield dict (response3=http_client.fetch(url3), response4=http_client.fetch(url4)) response3 = response_dict['response3' ] response4 = response_dict['response4' ]
一次yield 多个Future对象。那么解决办法也是和engine差不多的。当发现send(value)返回值是一个list或者dict对象时。它会使用Multi进行封装。在被回调的时候检查Multi对象的is_ready状态。仅仅当都得到结果才算完成
另外在Tornado3.1版本HandlerRequest._execute进行改动。被coroutine装饰的函数不需要再被asynchronous所装饰。至此这个从1.0.0跨越到3.1.0版本的装饰器的生命走到了尽头。然而依旧很多人不管不顾硬是要加上这个装饰器才安心
应用一 可以说自此之后Tornado只存在Future配合yield、coroutine的操作。你从头新建一个Tornado的异步库。用户调用最后必定是返回给用户Future对象。这里我印象比较深的就是用Future实现了celery结果的异步获取。说是异步。实质上就是轮询。代码在这里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from tornado.concurrent import TracebackFuturefrom tornado.ioloop import IOLoopdef async (task, *args, **kwargs ): future = TracebackFuture() callback = kwargs.pop("callback" , None ) if callback: IOLoop.instance().add_future(future, lambda future: callback(future.result())) result = task.delay(*args, **kwargs) IOLoop.instance().add_callback(_on_result, result, future) return future def _on_result (result, future ): if result.ready(): future.set_result(result.result) else : IOLoop.instance().add_callback(_on_result, result, future)
celery执行task.delay立刻返回了result对象。然后将resule和future加入回调,查询resule.ready确定任务是否完成。一旦完成则调用future.set_resule。Future对象设置值之后yield就会继续往下走。完美~~~,否则它会再次循环,直到得到结果为止。可以看到它这种方式还是相当的粗暴的,因为一旦没有结果就会不停的循环。可是这种方式胜在代码简单
应用二 再来一个简单的例子,gen.sleep() 比较没有想到,它会是4.1.0版本才加入的,实现如下
1 2 3 4 def sleep (duration ): f = Future() IOLoop.current().call_later(duration, lambda : f.set_result(None )) return f
(⊙v⊙)嗯,意不意外。这应该是我在Tornado里面发现最简单的代码了,创建一个Future对象,然后在IOLoop的_timeout列表中加入一个到期执行回调,设置Future的值。至此yield继续往下走~~
简易代码 可以用下面这段简易代码观察一下coroutine的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import tornado.ioloopfrom functools import partialclass Future : def __init__ (self ): self._val = None self._callback = set () def set_result (self, value ): self._val = value for callback in self._callback: callback(self) def result (self ): return self._val def add_done_callback (self, callback ): self._callback.add(callback) class Runner (): def __init__ (self, gen, final_callback ): self.final_callback = final_callback self.gen = gen() self.yielded = self.gen.send(None ) self.yielded.add_done_callback(self.future_callback) def future_callback (self, future ): self.run() def run (self ): try : self.yielded = self.gen.send(self.yielded.result()) except StopIteration as e: self.final_callback(e.value) return else : self.yielded.add_done_callback(self.future_callback) def coroutine (func ): def inner (): f = Future() def final_callback (value ): f.set_result(value) Runner(func, final_callback) return f return inner @coroutine def haha (): c = yield haha1() r = yield add_one(1 ) b = yield add_one(2 ) print(r, b, c) @coroutine def haha1 (): r = yield add_one(1 ) b = yield add_one(2 ) return r + b def add_one (ret ): f = Future() def callback (future, result ): future.set_result(result) tornado.ioloop.IOLoop.instance().add_callback(partial(callback, f, ret + 1 )) return f haha() tornado.ioloop.IOLoop().instance().start()