[Python進階]Python的並行計算 - Futures

無論是哪門程式語言,併發(Concurrent)和並行(Parallel)都是很常用且重要的知識點。舉個例子,像是爬蟲技術被廣泛運用在工業界的資料收集領域,其中的核心技術就是併發並行程式設計。

正確合理地使用併發和並行,無疑會讓我們的程式性能大幅提升。之前我們聊到過Python Coroutine,今天我們來聊聊Python的Futures。

併發(Concurrent)和並行(Parallel)的差別

  1. 併發:指的是在同一個CPU下,透過上下文切換(Context switch),讓使用者覺得不同程式段同時執行。這通常應用在I/O頻繁或耗時的場景,例如下載多個檔案,I/O所消耗的時間會比CPU多。
  2. 並行:則是真正意義上的多個CPU同時執行,適合CPU heavy的場景,例如MapReduce的計算,為了加快速度,需要更多的CPU來完成。

需要注意的是,即使電腦有四顆CPU,理論上可以同時並行跑四個Threads,但在Python卻不能這樣。Python的直譯器有race condition的問題,因此同一時刻只能允許一個Thread執行。具體原因在後續講到GIL時會深入探討。

Python的Futures Library

Python的Futures library提供了方便的併發和並行框架,能夠建立thread pool、process pool等。下面是一個簡單的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import concurrent.futures
import time
import random

def loading_one(name):
time.sleep(random.random())
print('Job %s is completed.' % name)

def loading(names):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(loading_one, names)

def main():
jobnames = ['JobA', 'JobB', 'JobC', 'JobD']
start_time = time.perf_counter()
loading(jobnames)
end_time = time.perf_counter()
print('Load {} jobs in {} seconds'.format(len(jobnames), end_time - start_time))

if __name__ == '__main__':
main()

# Output
# Job JobB is completed.
# Job JobD is completed.
# Job JobA is completed.
# Job JobC is completed.
# Load 4 jobs in 0.706298665 seconds

執行結果顯示耗時不到一秒,由於random.random()返回介於0和1之間,證明確實是併發執行的。這裡我們建立了thread pool:

1
2
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(loading_one, names)

最多可以同時使用五個Threads,而executor.map就像Python裡的map一樣,對每個names的元素呼叫loading_one函數。雖然可以自己設定thread的數量,但不是越多越好,因為thread的創建、維護、刪除都有性能消耗,設定太大可能導致速度更慢。

當然,我們也可以把上述程式碼改成”併行”:

1
2
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
executor.map(loading_one, names)

函數ProcessPoolExecutor代表創建Process pool。有興趣的讀者可以嘗試,通常改成ProcessPoolExecutor後耗時更多,這是因為並行的方式適合CPU heavy的情況。

Python Futures和asyncio的比較

有沒有發覺Python的Futures library和asyncio非常像?它們都會將等待的job放進list裡,這些job可以隨時查詢。當然,它們的結果和異常(job.result())也能在結束後拿到。

再介紹一個Futures常用的方法 - done(),表示該job是否已完成。done()是non-blocking的,呼叫後會立即返回。還有一個方法是add_done_callback(func),當job完成後,會通知並呼叫callback function。

總結

  • 我們學習了併發(Concurrent)和並行(Parallel)的差別:
    • 併發:通常用於I/O密集的場景
    • 並行:適合CPU heavy的場景
  • 展示了Python Futures library的例子,透過Thread Pool和Process Pool展示如何利用併發和並行執行程式。