yield这个关键字在2001年2.2版本的时候就出现了。2006加入yield.send功能。然而直到2006年2.5版本才看到使用在contextmanager上,tornado 2011年才用它实现了神奇的gen逻辑。令我没想到的是

  1. yield出现的这么早
  2. yield厉害一点的应用(contentmanager)居然过了五年才加入到标准库。
  3. tornado的1.0.0版根本和yield没有一毛钱关系
  4. tornado最先引入yield居然并不是实现了gen

yield最先在tornado里面展露头角是在这个commit里面。大神就是大神,虽然这个代码仅仅只有几十行。可是我觉得思路很新奇,膜拜😀(代码基于v1.0.0和v1.1.0)

建议打开该commit,先看一看它做了什么事情。除去测试用例,它改动的地方可谓非常少了

起因

观察下面的代码(可以将tornado的源码在1.0.0版本和1.1.0版本切换发现不同,在1.0.0版本客户端无法得到响应,1.1.0版本客户端得到500错误)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import tornado.ioloop
import tornado.web
import tornado.httpserver

class Index(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
tornado.ioloop.IOLoop.instance().add_callback(self.callback)

def callback(self,*args):
raise Exception("Error")

application = tornado.web.Application(handlers=[('/', Index)])
tornado.httpserver.HTTPServer(application).listen(8888)
tornado.ioloop.IOLoop.instance().start()

这是一个异步的http响应。在接受到请求之后执行get函数体,最终它将self.callback加入到IOLoop循环列表,get函数结束。因为存在asynchronous装饰器,所以并没有主动调用self.finish()。待IOLoop执行self.callback回调时。此时并函数内部触发了异常。但是这个异常已经和get函数没有任何关系,因此get函数无法捕获到这个异常
对比一下get函数体自身触发异常。由web.RequestHandler._execute调用get函数。此时它将捕获异常并调用self._handle_request_exception.最简流程如下

1
2
3
4
5
6
7
8
def _execute(self, transforms, *args, **kwargs):
self._transforms = transforms
try:
getattr(self, self.request.method.lower())(*args, **kwargs)
if self._auto_finish and not self._finished:
self.finish()
except Exception, e:
self._handle_request_exception(e)

self._handler_request_exception的流程也比较简单输出异常调用栈。返回客户端500错误,该http流程结束

可是对于一个回调,它的异常由IOLoop捕获,然而可悲的是它捕获到异常却并不能确定是谁添加到IOLoop里面的,异常捕获之后是不是只用输出调用栈就好。所以需求出来了,对于凡是由get操作下面加入到IOLoop的回调,我们期待能和get的异常处理逻辑一致

历史上这个问题是怎么处理的

这肯定是一个非常常见的问题,当执行ioloop.IOLoop.instance().add_callback之后。callback函数触发的异常添加回调的函数无法感知。最容易想到的方法就是我们把callback整体加上异常捕捉逻辑

1
2
3
4
5
def callback(self,*args):
try:
...
except Exception as e:
self._handle_request_exception(e)

但是这么恶心的方法明显是不可取的。意味着我们每写一个回调逻辑都需要在外部加上try..except。在1.0.0版本提供了一个装饰器函数提供类似的功能async_callback,所以上面的写法只需要将tornado.ioloop.IOLoop.instance().add_callback(self.callback)变更成tornado.ioloop.IOLoop.instance().add_callback(self.async_callback(self.callback))就可以了。虽然方法很不好,可是临时解决了问题,在1.1.0版本就引入了StackContext逻辑,解决了这个问题

预备知识Yield

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def gen_fn():
result = yield 1
print("result of yueld: "+result)
result2 = yield 2
print("result of 2nd yield: " + result2)
return 'we are done'

caller = gen_fn()
x = caller.send(None)
print(x)
y = caller.send('joe')
print(y)
try:
caller.send("bob")
except StopIteration:
print()
  1. 对于一个生成器来说,调用它(gen())并不会立即被执行。只是生成了一个生成器对象
  2. nex(g)等同于g.send(None)。所以对于生成器只需要掌握send函数就可以了
  3. send(value)返回的是yield右边的值,同时value赋值给yield左边的对象,千万不要被第一次的send(None)误导,认为经过第一次send(None)后result的值为None,你可以在心目中将没一个yield划上竖线,运行到那里就返回,然后下一次send的值被赋值给yield左边
  4. 上例中we are done没有被打印,因为不断的被send后生成器会触发StopIteration异常。最终被except StopIteration所捕获。可是他并没有将捕获的值输出,因而没有被显示处理。同时python2不允许在生成器执行return,所以在tornado中经常看到raise gen.Return(value)这种写法,它实质就是触发StopIteration异常,然后被except捕获后得到值,完成传递过程,到了python3语法就直接允许执行return了

预备知识 with

python有很多魔术方法(可以参考这篇文章)。其中如果一个类存在__enter____exit__方法。那么当使用with语句之后。会执行__enter__后得到对象返回给with as后面的变量。在退出的时候执行__exit__方法,最常见的就是我们使用with open() as f去打开一个文件了,处理完成后自动关闭文件,避免了内存泄漏。仔细想一下,其实用装饰器是很容易实现的。可是with语句还有以一个很大的优点,Python的编码是基于缩进来决定代码块的,装饰器是作用在函数上,而with是当跳出它的缩进则执行__exit__(可能描述的不太好),很多情况下with要比装饰器的写法优美很多。

将with和yield联合起来使用还真的是挺神奇的。from contextlib import contextmanager,该模块将with语句做到了装饰器的效果。比如计算一个with语句下面的执行时间

1
2
3
4
5
6
7
8
9
10
11
12
13
import time
from contextlib import contextmanager

@contextmanager
def timeit():
start = time.time()
try:
yield
finally:
print("elapsed time: {}".format(time.time() - start))

with timeit():
time.sleep(1)

作为不明白它实现的人,也能够很容易看出来。yield将timeit划分为了两部分。前面的相当于执行__enter__动作,后面的相当于执行__exit__动作。

想象一下它的内部contextmanager装饰器是如何实现的。首先被装饰的函数timeit是一个生成器。因此,在执行__enter__的时候只需要执行send(None)操作就能到达yield的右边并暂停,再__exit__的时候再次执行send(None),此时执行到了finally语句并触发StopIteration异常,流程结束。整个过程可以这样认为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def contextmanager(func):
def inner(*args, **kwargs):
g = func(*args,**kwargs)
class w():
def __enter__(self):
g.send(None)

def __exit__(self, exc_type, exc_val, exc_tb):
try:
g.send(None)
except Exception:
print("4")
return w()
return inner

@contextmanager
def log():
try:
print("1")
yield
raise Exception("E")
finally:
print("3")

with log():
print("2")

是否还是感觉相对比较简单呢

StackContext的实现

在起因部分,解释了callback中会存在的问题(期待RequestHandler.get里面所有相关内容发生错误都会导致返回500错误给客户端)。1.0版本的解决方案就是给每个回调函数套上一个装饰器。触发错误的时候就可以返回500了,感觉不太好讲述这段天才般的代码了😕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import contextlib

_state = ()

@contextlib.contextmanager
def StackContext(context_factory):
global _state
old_contexts = _state
try:
_state = old_contexts + (context_factory,)
with context_factory():
yield
finally:
_state = old_contexts

@contextlib.contextmanager
def ErrorCapture():
try:
yield
except Exception:
pass

with StackContext(ErrorCapture):
print(_state)
raise Exception("Error")
print(_state)

这个地方StackContext是一个工厂模式,它传入的参数context_factory也是一个被contextlib.contextmanager装饰的对象。在StackContext的内部它又执行了with语句来初始化我们的context_factory,emmm.这个其实和多层嵌套装饰器并没有太大的区别,它实现了什么效果呢,在调用with StackContext(context_factory)的时候将context_factory加入到老的里面。注意,当前的context_factory和1.0.0版本里面的async_callback效果是一样样的,在with的语句块下面,我们可以使用_state变量。当跳出with之后_state就被恢复到原来的样子

那么我们该如何使用这个_state变量呢,注意到with语句进入的时候会加入context_factory,退出的时候会清除。这意味着在with下面的代码块里面我们可以安全的使用它

1
2
3
4
5
6
7
def wrap(fn):
def wrapped(callback, contexts):
with contextlib.nested(*[i() for i in contexts]):
callback()
contexts = _state
result = functools.partial(wrapped, fn, contexts)
return result

这里,我们添加一个装饰器。在IOLoop中所有需要添加回调的地方均由wrap包装起来

     def add_callback(self, callback):
         """Calls the given callback on the next I/O loop iteration."""
-        self._callbacks.add(callback)
+        self._callbacks.add(stack_context.wrap(callback))
         self._wake()

那么,在它被调用的时候实际执行的是wrapped函数,它会先调用with contextlib.nested(*[i() for i in contexts]).此步骤恢复了with StackContext时期加入的对象(即人们常说的上下文)。对于异常捕获来说,就是能够正确的捕获callback的异常并且返回500给客户端。另外,最开始的时候捕获RequestHandle.get的异常是在_execute套上try…except,将他变动为with StackContext就好了。这里应该谨记下面两点

  • Tornado的运行是单线程的,with StackContext运行的过程中不可能存在别的步骤另外去改变了_state的值(这个值很重要)
  • StackContext是可以进行嵌套的

改进

当然以上只是为了表述原理讲解的最简代码,实际上源码中略有不同。首先考虑多线程的情况,在单线程中_state的改动是很显然的,执行with StackContext即允许改动。在多线程中就不太一样了。多线程中如果也执行了with StackContext操作,那么有可能造成_state被改动。因此,_state被继承自thread.local对象。那么不同线程内的修改是互相独立的
其次如果一个函数已经被wrapped包装,那么就不必要再次进行包装了(想象一下递归调用add_callback)。对于不存在的with StackContext也需要考虑,如果执行contextlib.nested的参数为空,无疑是会报错的。
其他有的独立的运行的模块不希望被已存在的_state影响,因此独立设置了NullContext

1
2
3
4
5
6
7
8
@contextlib.contextmanager
def NullContext():
old_contexts = _state.contexts
try:
_state.contexts = ()
yield
finally:
_state.contexts = old_contexts

如果不希望被_state干扰,那么在前面加上with StackContext(NullContext)。_state即被置为空。显然add_callback传入的contexts为空。在它被执行的时候,也和with语句没什么事情了

应用

我最开始看到的应用是这个,在Tornado里面使用Sqlalchemy。默认Sqlalchemy存在sessionmaker一个工厂函数,它会确定如何生成一个session会话对象以供使用。默认的情况是使用线程id,这对于多线程环境非常非常方便。因为多线程web框架中每一个HTTP请求的处理都是在独立的线程内进行的,那么一个http处理生命周期对应一个sqlalchemy的session生命周期非常完美。
可是在Tornado这种单线程的框架中发生了变化,因为它不好确定每一个http的生命周期标识。StackContext给了我们方法,虽然起因是callback的异常处理,可是我们一样可以参照它的套路,在_execute执行期间写入变量,在单个http生命周期均可以访问变量,http完结后清除它。代码可以参考给出的gist链接

用例

下面给一个整篇文章的代码总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
try:
from contextlib import nested
except ImportError:
from contextlib import ExitStack, contextmanager

@contextmanager
def nested(*contexts):
with ExitStack() as stack:
for ctx in contexts:
stack.enter_context(ctx)
yield contexts
import functools

@contextmanager
def StackContext(context_factory):
global _state
old_contexts = _state
try:
_state = old_contexts + (context_factory,)
with context_factory():
yield
finally:
_state = old_contexts


@contextmanager
def ErrorCapture():
try:
yield
except Exception:
pass

_state = ()


def wrap(fn):
def wrapped(callback, contexts):
with nested(*[i() for i in contexts]):
callback()

contexts = _state
result = functools.partial(wrapped, fn, contexts)
return result


loop_list = []


def call_back():
print("WTF")
print(get_current_request_id())


g = {}

@contextmanager
def ThreadRequestContext(**data):
global g
try:
g = data
yield
finally:
g.clear()


def get_current_request_id():
return g['request_id']


global_data = {"request_id": 12}
with StackContext(functools.partial(ThreadRequestContext, **global_data)):
with StackContext(ErrorCapture):
loop_list.append(wrap(call_back))
print(get_current_request_id())

for i in loop_list:
i()