在多线程多进程当道的年代这个被称为高级并发的模块还是很令人惊艳的。concurrent.future模块的厉害之处在于封装了多线程、多进程、锁、队列到一起。这些细节使用者都不需要知道。四五行代码能实现以往二三十行实现的效果。而且多线程和多进程的使用方式完全一样,堪称完美杰作。可惜,现在已经是异步时代了,主要是因为tornado用到了它的Future模块。我就看看它大概是怎样实现的

来自官方文档的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))

首先考虑一个问题。在多线程里面,一般我们直接threading.Thread(target=),但是直接运行不会得到返回的结果的。这里就用到了Future对象。类似与装饰器,最终运行的函数被嵌套,结果被添加到Future对象上。你可以将Future当做一个及其简单的对象,就像下面这样

1
2
3
4
5
6
7
8
9
10
11
class Future:
def __init__(self):
self.result = None
self._state = "PENDING"

def set_result(self, value):
self._state = "FINISHED"
self.result = value

def get_result(self):
return self.result

因此最终最新的并非是原始的函数。此处被封装成了WorkItem对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class WorkerItem:
def __init__(self, future, fn, *args, **kwargs):
self.fn = fn
self.args = args
self.kwargs = kwargs
self.future = future

def run(self):
try:
ret = self.fn(*self.args, **self.kwargs)
self.future.set_result(ret)

except Exception as e:
self.future.set_result(e)


def _worker(work_queue):
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
except BaseException:
traceback.print_exc()

其次它能够允许对创建的线程数量进行控制max_client。这个也可以想到通过队列来控制,当submit创建任务的时候并不是直接调用threading.Thread创建一个全新的线程。而是仅仅把它加入到队列,判断当前线程数量是否小于最大数量。当然在线程中就是不断循环执行Work。FutureWorkItem_worker都是影藏没有对用户暴露。真正对用户暴露的只有一个ThreadPoolExecutor,大概可以这样认为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ThreadPoolExecutor:
def __enter__(self):
return self

def __init__(self, max_workers):
self.max_workers = max_workers
self._queue = Queue()
self._thread = set()

def submit(self, fn, *args, **kwargs):
f = Future()
worker_item = WorkerItem(f, fn, *args, **kwargs)
self._queue.put(worker_item)
if len(self._thread) < self.max_workers:
t = Thread(target=_worker, args=(self._queue,))
t.daemon = True
t.start()
return f

def __exit__(self, exc_type, exc_val, exc_tb):
for i in self._thread:
i.join()

从上面可以看到每次submit后会得到一个Future对象。设想一下当线程执行得到结果后我们可以根据Future的状态轮询得到我们需要的结果。大概可以这样写

1
2
3
4
5
6
7
8
9
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 1): url for url in URLS}
while 1:
for i in future_to_url.copy().keys():
if i._state == "FINISHED":
print(i.get_result())
future_to_url.pop(i)
if len(future_to_url) == 0:
break

但是可能作者觉得这种方式太low,效率太低。实现了一个as_completed函数,直接让理解难度提升一个数量级。。。。

大概就是先生成了一个waiters对象(这个对象有threading.Event),并将这些对象添加到所有的未完成的Futures中。任一对象完成则会触发事件的设置。这样阻塞被暂时取消,可以得到结果返回,代码注释如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def as_completed(fs, timeout=None):
if timeout is not None:
end_time = timeout + time.time()

fs = set(fs)
with _AcquireFutures(fs):
# 先挑选出所有已经有结果的Future
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
# 创建了一个waiter对象。该对象用于threading.Event。然后将自身添加到了所有的Futures上面
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)

try:
# 先依次返回已完成的Future
yield from finished

while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.time()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))

# 当Future被进行设置结果会触发threading.Event.set
# 那么该处的阻塞就会被取消
waiter.event.wait(wait_timeout)

with waiter.lock:
# 又得到一批已完成的Futures
finished = waiter.finished_futures
waiter.finished_futures = []
# 取消状态,等待下一次的set
waiter.event.clear()

# 依次返回并从pending队列中移出
for future in finished:
yield future
pending.remove(future)

finally:
for f in fs:
with f._condition:
f._waiters.remove(waiter)

多线程的方式相较多进程要简单很多。多进程的实现有机会再写续篇~~