ThreadPoolExecutor 就是個坑

主要問題可參考這個 issue,裡面有以下片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import concurrent.futures as cf

bucket = range(30_000_000)

def _dns_query(target):
from time import sleep
sleep(0.1)

def run():
with cf.ThreadPoolExecutor(3) as executor:
future_to_element = dict()

for element in bucket:
future = executor.submit(_dns_query, element)
future_to_element[future] = element

for future in cf.as_completed(future_to_element):
elt = future_to_element[future]
print(elt)

run()

可發現這是照著文件例子寫的,只是數量增加一下,記憶體就爆了。問題乍看出在 future_to_element 長太大,但就算不把 submit() 的結果放在容器中,只要在 with clause 中就一樣會佔用記憶體。也就是說在工作數量大的情況下,ThreadPoolExecutor 並不實用。Issue 中建議是另外加個 loop 把總工作切成一批批的處理,每次處理一批就要等到全部跑完,然後換下一批。若為了節省時間把批次處理量加大,記憶體又消耗很多,不是很理想。

這其實是不必要的:future 可用以得知是否出錯、以及工作完成的結果,但要知道結果有其他機制可以利用,最典型的例如送到 queue,問題就只剩錯誤處理。先前在「出錯就盡快停止」的條件下,寫過類似下面的程式片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import threading
import typing


class ThreadExecutor:

def __init__(self, max_workers: int):
self.max_workers = max_workers
self.semaphore = threading.Semaphore(max_workers)
self.last_error = None

def _wrapper(self, fn: typing.Callable, args, kwargs):
try:
fn(*args, **kwargs)
except Exception as e:
self.last_error = e
raise e
finally:
self.semaphore.release()

def submit(self, fn: typing.Callable, *args, **kwargs) -> bool:
if self.last_error:
return False
self.semaphore.acquire()
threading.Thread(
target=self._wrapper, args=(fn, args, kwargs)
).start()
return True

def join(self):
for _ in range(self.max_workers):
self.semaphore.acquire()
if self.last_error:
raise self.last_error


def task(x):
import random
import time
if random.random() < 0.001:
raise RuntimeError(x)
print(x)
time.sleep(1)
return


def main():
excr = ThreadExecutor(100)
for x in range(500):
if not excr.submit(task, x):
break
excr.join()


if __name__ == '__main__':
main()

如此不用批次也沒有消耗記憶體問題。

但以上提的還算是表面。最終是會需要 Scala concurrent.Map 那樣的機制,而且必須是 lazy,或是 Go 用 channel 也能處理得很漂亮。只能說 Python 在 concurrency 領域還差得很遠,這也限制了它作為 backend, infrastructure 等等應用的普及度。