介绍完StackContext、gen.engine、gen.coroutine之后Tornado3.0以前的核心内容基本已经完结了。本篇介绍一下gen.coroutine中Future的应用,在tornado中生成一个队列的数据结构。如果经常写web的情况,单单只处理用户的输入给出输出,那么要求不会太多,也不会用到队列。可是如果你去写一个爬虫,肯定需要限速啥的,此时队列就是一个很有用的东西,很容易实现生产者消费者模型。Tornado官方的demo中也有一个[爬虫代码(https://github.com/tornadoweb/tornado/blob/master/demos/webspider/webspider.py)]演示如何使用队列(本篇基于v5.0.0) 
目标 理解Queue模块给出的例子就可以了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from  tornado import  genfrom  tornado.ioloop import  IOLoopfrom  tornado.queues import  Queueq = Queue(maxsize=2 ) @gen.coroutine def  consumer ():    while  True :         item = yield  q.get()         try :             print('Doing work on %s'  % item)             yield  gen.sleep(0.01 )         finally :             q.task_done() @gen.coroutine def  producer ():    for  item in  range (5 ):         yield  q.put(item)         print('Put %s'  % item) @gen.coroutine def  main ():         IOLoop.current().spawn_callback(consumer)     yield  producer()       yield  q.join()       print('Done' ) IOLoop.current().run_sync(main) 
控制权转移 在python自带模块里面的queue是给多线程使用的.意味着当队列里面没有元素的时候会阻塞主线程。阻塞主线程这种操作在协程里面当然是不允许的。协程里面需要的是当某事需要进行等待的时候主动让出控制权限,让其他的事件执行。具体来说,如何让出控制权呢。理解上一篇的coroutine就可以发现。yield 一个暂时还没有结果的Future就会导致控制权转移,等Future结果到来yield就会被恢复 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from  tornado.gen import  coroutine,sleepfrom  tornado.ioloop import  IOLoop@coroutine def  xx ():    yield  sleep(1 )     print("xx" ) @coroutine def  x ():    xx()     print("x" ) IOLoop.instance().run_sync(x) 
观察这个例子。正常的写法是yield xx(),然后会调用xx(),过一秒之后打印xx再打印x。如果直接使用xx,其实这段代码也没有什么问题。xx返回了一个Future对象。因为没有使用yield,所以它不必等待xx函数执行完再进行下面的操作。xx返回的Future已经开始了运行(它并不会什么都不发生),它依旧在一秒钟之后被执行然后打印xx。
Queue的实现思路 核心思路是
get、put操作均返回Future对象 
没有被设置值的Future执行yield会有阻塞效果 
 
创建三个队列分别为正常队列_queue、未来get队列_getters,未来put队列_putters。
执行put就是相反的操作,优先从_getters里面取(有值则执行Future.set_result,取消获取过程的阻塞)。再检查正常队列的元素是否超过最大值。否则加入未来队列
那么队列还有task_done和join操作,前者每一个任务执行完毕都会被调用表示该任务完成,后者表示等待加入到队列的所有任务都完成,否则阻塞。顺便说一下tornado的锁是怎么实现的
依旧是依靠没有被设置值的Future被yield会造成阻塞。该Future被设置值则会继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class  Event :    def  __init__ (self ):         self.f = Future()     def  set (self ):         if  not  self.f.done():             self.f.set_result(None )     def  clear (self ):         if  self.f.done():             self.f = Future()     def  wait (self ):         return  self.f 
注意。这个地方调用set和clear都是不带yield的,只有当最后调用wait才使用yield。
Queue的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class  Queue :    def  __init__ (self, maxsize=None  ):         self.max_size = maxsize or  2  << 16          self._queue = collections.deque([])         self._getters = collections.deque([])          self._putters = collections.deque([])          self._unfinished_tasks = 0          self._finished = Event()         self._finished.set ()     def  get (self ):         f = Future()         if  len (self._queue) > 0 :             value = self._queue.popleft()             f.set_result(value)         elif  len (self._putters) > 0 :             item, future = self._putters.popleft()             f.set_result(item)             future.set_result(None )         else :             self._getters.append(f)         return  f     def  put (self, value ):         self._unfinished_tasks += 1          self._finished.clear()         f = Future()         if   len (self._getters) > 0 :             future = self._getters.pop()             f.set_result(None )             future.set_result(value)         elif  len (self._queue) < self.max_size:             self._queue.append(value)             f.set_result(None )         else :             self._putters.append((value, f))         return  f     def  task_done (self ):         self._unfinished_tasks -= 1          if  self._unfinished_tasks == 0 :             self._finished.set ()     def  join (self ):         return  self._finished.wait() 
源码做了更多的适配将_get和_put独立出来。然后使用优先级队列,先进后出队列满足不同的出入顺序要求