[平行處理] 考慮使用concurrent.futures模組來達成真正的平行處理

[平行處理] 考慮使用concurrent.futures模組來達成真正的平行處理

[原文擷自Effective Python中文版]結合自己的實作與理解

對於擁有越來越多CPU核心的現代電腦而言,我們可以合理的假設,改善程式碼效能其中一個解決方案就是平行處理(Parallelism)

然而,Python的GIL讓執行緒無法達到真正的平行處理,所以這個選項出局了。另一個常見的建議是將程式碼中效能最關鍵的地方以C語言改寫為一個擴充功能模組(extension module)

C讓我們更靠近底層機器,執行的比Python還要快,消除了平行處理的必要。

C擴充功能也能夠起始原生的執行緒來達到平行處理,也有完善的PythonApi可以供呼叫。是一個別無它法時的好選擇。

但是以C改寫可能要付出的成本,包含語法上從python的簡短易懂變成相當冗長複雜,移植工作也要投入廣泛的測試來確保功能正確性。

問題往往是只移植程式的一部分到C在大部分的時候,並不足夠。通常最佳化後的Python執行緩慢通常不會只有一個主要的原因,而是有不同因素組成。

本書介紹了一個更好的方式,可以保留我們對於Python的投資,並解決困難的計算問題。

 

concurrent.futures內建模組:

這個模組內的multiprocessing模組,可以藉由執行額外的直譯器作為子行程(child processing)。它能讓Python平行運用多個CPU核心。這些子行程與主直譯器是分開的。所以它們的GIL也是分開的。每個子行程都能完整地運用一個CPU核心。每個子行程也與主行程有連結,讓它得以接受計算指令並回傳結果。

以gcd為例(計算兩個數字間的最大公因數)-以下寫法分別是循序(上)與工作者執行緒(worker threads)來進行計算。為了突顯時間上的差異,加入了更多的數字組合。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from time import time

from multiprocessing import freeze_support


def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i

if __name__ == '__main__':
    freeze_support()

    numbers = [(1963309, 2265973), (2030677, 3814172), (1551645, 2229620), (2039045, 2020802), (2039045, 2020802), (2039045, 2020802), (2039045, 2020800), (2039145, 2020802), (2039065, 2120802), (2039045, 2020802), (2039845, 2020802), (2035045, 2467802), (205745, 2020802)]
    start = time()
    result = [gcd( pair ) for pair in numbers]
    end = time()
    print( result )
    print( '花費 %.3f 秒' % (end - start) )

    start = time()
    pool = ThreadPoolExecutor( max_workers=4 )
    result = list( pool.map( gcd, numbers ) )
    end = time()
    print( result )
    print( '花費 %.3f 秒' % (end - start) )

執行結果:

[1, 1, 5, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1]
花費 2.396 秒
[1, 1, 5, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1]
花費 2.542 秒

這次改寫工作者多執行緒的版本,甚至變得更慢,因為得負擔啟動執行緒儲存池(pool of threads)以及與它溝通的成本

 

然而接下來只要改寫一行,就能達到加速的效果

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from time import time

from multiprocessing import freeze_support


def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i

if __name__ == '__main__':
    freeze_support()

    start = time()
    pool = ProcessPoolExecutor( max_workers=4) #唯一改的一行 
    result = list( pool.map( gcd, numbers ) )
    end = time()
    print( result )
    print( '花費 %.3f 秒' % (end - start) )

[1, 1, 5, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1]
花費 1.297 秒

以下簡單說明ProcessPoolExecutor類別實際上所做的事情

1.它將來numbers輸入資料的每個項目帶給Map

2.它使用Pickle模組將之序列化成二進位資料

3.它透過一個本地端的socket將經過序列化的資料從主直譯器行程複製到子行程

4.子行程解序列化回python物件

5.匯入含有那個gcd函式的模組

6.它與其他子行程平行地在輸入資料上執行函式

7.結果序列化回二進位資料

8.透過socket複製回去

9.父行程解序列回python物件

10.將多個子行程的結果組合為單一串列回傳

上述邏輯看似簡單,例multiprocessing模組與ProcessPoolExecutor類別做了很多工作才讓平行處理變成可行。

其他多數語言中,唯一需要去協調兩個執行緒的地方就是單一鎖或是不可分作業。使用multiprocessing的成本之所以很高,是因為父行程與子行程之間

必須進行的那些序列化與反序列化作業。

 

這種解法很適合特定類型的孤立且高槓桿的任務。

孤立是指不必與程式其他部分共用狀態的函式

高槓桿指的是只需要在父行程與子行程間轉移少量資料,就能進行大量運算。

這次的gcd就是這個例子

 

作者建議如果我們的程式沒有這些特徵,那麼multiprocessing所帶來的成本可能讓我們無法透過平行化來加速程式。

當這種情況發生時,multiprocessing有提供更進階的工具可用於共用記憶體(shared memory)、跨行程鎖(cross-process locks)、佇列與代理(proxies)。這所有的這些功能都非常的複雜。

避開multiprocessing的所有上述的部分,並透過簡單的concurrent.futures模組來使用那些功能就好,除非你完全沒有其他的選擇了,就可以考慮直接使用multiprocessing模組

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *