介绍完StackContext、gen.engine、gen.coroutine之后Tornado3.0以前的核心内容基本已经完结了。本篇介绍一下gen.coroutine中Future的应用,在tornado中生成一个队列的数据结构。如果经常写web的情况,单单只处理用户的输入给出输出,那么要求不会太多,也不会用到队列。可是如果你去写一个爬虫,肯定需要限速啥的,此时队列就是一个很有用的东西,很容易实现生产者消费者模型。Tornado官方的demo中也有一个[爬虫代码(https://github.com/tornadoweb/tornado/blob/master/demos/webspider/webspider.py)]演示如何使用队列(本篇基于v5.0.0)
目标 理解Queue模块给出的例子就可以了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from tornado import genfrom tornado.ioloop import IOLoopfrom tornado.queues import Queueq = Queue(maxsize=2 ) @gen.coroutine def consumer (): while True : item = yield q.get() try : print('Doing work on %s' % item) yield gen.sleep(0.01 ) finally : q.task_done() @gen.coroutine def producer (): for item in range (5 ): yield q.put(item) print('Put %s' % item) @gen.coroutine def main (): IOLoop.current().spawn_callback(consumer) yield producer() yield q.join() print('Done' ) IOLoop.current().run_sync(main)
控制权转移 在python自带模块里面的queue是给多线程使用的.意味着当队列里面没有元素的时候会阻塞主线程。阻塞主线程这种操作在协程里面当然是不允许的。协程里面需要的是当某事需要进行等待的时候主动让出控制权限,让其他的事件执行。具体来说,如何让出控制权呢。理解上一篇的coroutine就可以发现。yield 一个暂时还没有结果的Future就会导致控制权转移,等Future结果到来yield就会被恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from tornado.gen import coroutine,sleepfrom tornado.ioloop import IOLoop@coroutine def xx (): yield sleep(1 ) print("xx" ) @coroutine def x (): xx() print("x" ) IOLoop.instance().run_sync(x)
观察这个例子。正常的写法是yield xx()
,然后会调用xx(),过一秒之后打印xx再打印x。如果直接使用xx,其实这段代码也没有什么问题。xx返回了一个Future对象。因为没有使用yield,所以它不必等待xx函数执行完再进行下面的操作。xx返回的Future已经开始了运行(它并不会什么都不发生),它依旧在一秒钟之后被执行然后打印xx。
Queue的实现思路 核心思路是
get、put操作均返回Future对象
没有被设置值的Future执行yield会有阻塞效果
创建三个队列分别为正常队列_queue、未来get队列_getters,未来put队列_putters。 执行get操作的时候创建一个Future对象。优先从_putters里面获取.获得的是一个值和一个Future对象,执行Future.set_result。此时代表put被阻塞,执行set_result取消阻塞。再考虑从_queue里面取,代表正常取值。如果都没有则放入_getters队列(此时因为Futures没有值,取值的地方被阻塞)。
执行put就是相反的操作,优先从_getters里面取(有值则执行Future.set_result,取消获取过程的阻塞)。再检查正常队列的元素是否超过最大值。否则加入未来队列
那么队列还有task_done和join操作,前者每一个任务执行完毕都会被调用表示该任务完成,后者表示等待加入到队列的所有任务都完成,否则阻塞。顺便说一下tornado的锁是怎么实现的
依旧是依靠没有被设置值的Future被yield会造成阻塞。该Future被设置值则会继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class Event : def __init__ (self ): self.f = Future() def set (self ): if not self.f.done(): self.f.set_result(None ) def clear (self ): if self.f.done(): self.f = Future() def wait (self ): return self.f
注意。这个地方调用set和clear都是不带yield的,只有当最后调用wait才使用yield。
Queue的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class Queue : def __init__ (self, maxsize=None ): self.max_size = maxsize or 2 << 16 self._queue = collections.deque([]) self._getters = collections.deque([]) self._putters = collections.deque([]) self._unfinished_tasks = 0 self._finished = Event() self._finished.set () def get (self ): f = Future() if len (self._queue) > 0 : value = self._queue.popleft() f.set_result(value) elif len (self._putters) > 0 : item, future = self._putters.popleft() f.set_result(item) future.set_result(None ) else : self._getters.append(f) return f def put (self, value ): self._unfinished_tasks += 1 self._finished.clear() f = Future() if len (self._getters) > 0 : future = self._getters.pop() f.set_result(None ) future.set_result(value) elif len (self._queue) < self.max_size: self._queue.append(value) f.set_result(None ) else : self._putters.append((value, f)) return f def task_done (self ): self._unfinished_tasks -= 1 if self._unfinished_tasks == 0 : self._finished.set () def join (self ): return self._finished.wait()
源码做了更多的适配将_get和_put独立出来。然后使用优先级队列,先进后出队列满足不同的出入顺序要求