Python線程池與進程池

前言

前面我們已經將線程併發編程與進程並行編程全部摸了個透,其實我第一次學習他們的時候感覺非常困難甚至是喫力。因爲概念實在是太多了,各種鎖,數據共享同步,各種方法等等讓人十分頭痛。所以這邊要告訴你一個好消息,前面的所有學習的知識點其實都是爲本章知識點做鋪墊,在學習了本章節的內容後關於如何使用多線程併發與多進程並行就採取本章節中介紹的方式即可。

這裏要介紹一點與之前內容不同的地方 ,即如果使用隊列進行由進程池創建的進程之間數據共享的話不管是 multiprocessing 模塊下的 Queue 還是 queue 模塊下的 Queue 都不能爲進程池中所創建的進程進行數據共享,我們需要用到另一個隊列即 中的 當然這個我也會在下面介紹到。那麼開始學習吧!

  官方文檔

執行器

最早期的Python2中是沒有線程池這一概念的,只有進程池。直到Python3的出現才引入了線程池,其實關於他們的使用都是非常簡單,而且接口也是高度統一甚至說一模一樣的。 而線程池與進程池的作用即是爲了讓我們能夠更加便捷的管理線程或進程。

我們先說一下,如果需要使用線程池或進程池,需要導入模塊 concurrent.futures

from concurrent.futures import ThreadPoolExecutor # 線程池執行器

from concurrent.futures import ProcessPoolExecutor # 進程池執行器

這裏介紹一下,關於線程池或者進程池創建出的線程與進程與我們使用 multiprocessing 模塊或者 threading 模塊中創建的線程或進程有什麼區別。我們以多線程爲例:

import threading

def task():
    ident = threading.get_ident()
    print(ident)
    # 銷燬當前執行任務的線程

if __name__ == '__main__':

    for i in range(10):
        t1 = threading.Thread(target=task,)  # 領任務
        t1.start()  # 等待CPU調度,而不是立即執行  # 執行

# ==== 執行結果 ==== Ps:可以看到每個線程的id號都不一樣,這也印證了圖上說的。

"""
10392
12068
5708
13864
2604
7196
7324
9728
9664
472
"""
import threading
from concurrent.futures import ThreadPoolExecutor  # 線程池執行器

def task():
    ident = threading.get_ident()
    print(ident)
    # 結束任務,不銷燬當前執行任務的線程,直到所有任務都執行完畢。

if __name__ == '__main__':
    pool = ThreadPoolExecutor(max_workers=2)  # 這裏代表有2個線程可以領取任務
    for i in range(10):
        pool.submit(task)  # 執行器啓動任務,將這些任務給2個人分配,也就是說task這個任務會被這2個線程不斷的執行,直到執行完畢後這2個線程纔會死亡

# ==== 執行結果 ==== Ps:可以看到這裏都讓這2個線程把任務接了,內存開銷相比於上面的要小。

"""
7272
7272
7272
7272
11596
7272
11596
11596
11596
11596
"""

方法大全

執行器方法大全
submit( fn , * args, * *kwargs) 調度可調用對象 fn ,以 fn(*args **kwargs) 方式執行並返回 Future 對象代表可調用對象的執行。(異步提交!極爲牛逼!)
map( func , *iterables , timeout=None , chunksize=1 ) 類似於 map(func, *iterables)
shutdown( wait=True ) 等待,類似 join() 方法,並且在所有的任務完成後關閉執行器。 wait=True 爲關閉,爲 False 則是不關閉執行器的意思。
Ps:其實對於線程池或進程池來說,他們的池都有一個官方的名稱叫做執行器,接口都是一樣的。那麼接下來我就將線程池進程池這樣的名字換做執行器了,也是方便理解。  

基本使用

其實關於執行器的使用,我們有兩種方式,一種是依賴於 with 語句,一種是不依賴於 with 語句,那麼我在這裏推薦使用依賴於wait語句的執行器。

不依賴於 with 語句的執行器使用:

import threading
from concurrent.futures import ThreadPoolExecutor  # 線程池執行器

def task():
    print("執行了")

if __name__ == '__main__':
    pool = ThreadPoolExecutor(max_workers=2)  # 這裏代表有2個線程可以領取任務 , 對於線程池來講它是默認值是CPU核心數+4,對於進程池來講最大開啓的進程數是CPU核心數。
    for i in range(10):
        pool.submit(task)  # 執行器啓動任務,將這些任務給2個人分配,也就是說task這個任務會被這2個線程不斷的執行,直到執行完畢後這2個線程纔會死亡


# ==== 執行結果 ==== Ps:可以看到這裏都讓這2個線程把任務接了,內存開銷相比於上面的要小。

"""
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
"""

依賴於 with 語句的執行器使用:

import threading
from concurrent.futures import ThreadPoolExecutor  # 線程池執行器

def task():
    print("執行了")
    # 銷燬

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=2) as pool:  # 這裏代表有2個線程可以領取任務 , 對於線程池來講它是默認值是CPU核心數+4,對於進程池來講最大開啓的進程數是CPU核心數。
        for i in range(10):
            pool.submit(task)  # 執行器啓動任務,將這些任務給2個人分配,也就是說task這個任務會被這2個線程不斷的執行,直到執行完畢後這2個線程纔會死亡


# ==== 執行結果 ==== Ps:可以看到這裏都讓這2個線程把任務接了,內存開銷相比於上面的要小。

"""
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
執行了
"""

期程對象

方法大全

期程對象(由執行器執行的任務的返回結果)方法大全
方法/屬性名稱 功能描述
cancel() 嘗試取消調用。 如果調用正在執行或已結束運行不能被取消則該方法將返回 False ,否則調用會被取消並且該方法將返回 True
cancelled() 如果調用成功取消返回 True
running() 如果調用正在執行而且不能被取消那麼返回 True
done() 如果調用已被取消或正常結束那麼返回 True
result( timeout=None ) 即獲取任務的返回結果,最大等待timeout秒,如不設置則死等,超時觸發 CancelledError 異常。
add_done_callback( fn ) 增加回調函數 fn ,這個 fn 應該至少有一個形參來接收當前期程對象。
exception( timeout=None ) 返回由調用引發的異常。如果調用還沒完成那麼這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成, concurrent.futures.TimeoutError 將會被觸發。 timeout 可以是整數或浮點數。如果 timeout 沒有指定或爲 ,那麼等待時間就沒有限制。
Ps:還有一些期程對象的方法沒有舉例出來。詳情參見文檔  

期程對象的作用

我們可以看到,我們上面的函數並沒有返回值,如果有返回值的話怎麼辦呢?

import threading
from concurrent.futures import ThreadPoolExecutor  # 線程池執行器

def task():
    print("執行了")
    return "玫瑰花"
    # 銷燬

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=2) as pool:
        res = pool.submit(task)
        print(res)  # <Future at 0x2539ea97850 state=finished returned str> 這個就是期程對象,可以看到他裏面還有當前任務的執行狀態。 finished = 執行完了的意思
        print(res.result())  # 通過該方法就可以拿到任務的返回結果

# ==== 執行結果 ====

"""
執行了
<Future at 0x2539ea97850 state=finished returned str>
玫瑰花
"""

期程對象,也被稱爲未來對象,是一個非常重要的概念。這裏可以記一筆,在 Django 框架中也有些地方採取了期程對象這樣的設定,這是後話,後面再聊。

期程對象如何獲取返回結果

我們嘗試着將它的任務數量增多,發現使用期程對象直接獲取任務結果會導致阻塞,怎麼解決?

import time
import threading
from concurrent.futures import ThreadPoolExecutor  # 線程池執行器

def task(x):
    print("執行了,這是第%s個任務"%x)
    time.sleep(3)
    return "玫瑰花"
    # 銷燬

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=2) as pool:
        for i in range(10):
            res = pool.submit(task,i)
            print(res.result())  # 每次獲取結果的時候都是阻塞,怎麼辦?這個速率就變得非常的Low逼了。

# ==== 執行結果 ====

"""
執行了,這是第0個任務
玫瑰花
執行了,這是第1個任務
玫瑰花
執行了,這是第2個任務
玫瑰花
執行了,這是第3個任務
玫瑰花
執行了,這是第4個任務
玫瑰花
執行了,這是第5個任務
玫瑰花
執行了,這是第6個任務
玫瑰花
執行了,這是第7個任務
玫瑰花
執行了,這是第8個任務
玫瑰花
執行了,這是第9個任務
玫瑰花
"""

我這裏有一個辦法,可以值得嘗試一下。就是執行器本身有個方法 shutdown(wait=True) ,它會導致當前主線程的阻塞。那麼我們就可以這樣操作,主程序阻塞住,再將啓程對象全部放到一個列表中,當所有任務處理完畢後阻塞通行,這個時候我們再循環這個列表拿出其中的結果。

import time
import threading
from concurrent.futures import ThreadPoolExecutor  # 線程池執行器

def task(x):
    print("執行了,這是第%s個任務"%x)
    time.sleep(3)
    return "玫瑰花"
    # 銷燬

if __name__ == '__main__':

    res_list = [] # 用於存放所有期程對象

    with ThreadPoolExecutor(max_workers=2) as pool:
        for i in range(10):
            res = pool.submit(task,i)
            res_list.append(res)  # 將期程對象放入列表

        pool.shutdown(wait=True)  # 代表必須將所有子線程的任務跑完再繼續向下執行主線程。

    for i in res_list:
        print(i.result())

# ==== 執行結果 ====

"""
執行了,這是第0個任務
執行了,這是第1個任務
執行了,這是第2個任務
執行了,這是第3個任務
執行了,這是第4個任務
執行了,這是第5個任務
執行了,這是第6個任務
執行了,這是第7個任務
執行了,這是第8個任務
執行了,這是第9個任務
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
玫瑰花
"""

如果你覺得這種方法很贊,我只能送你兩個字,太low了。我們注意執行器的 submit() 方法,這玩意兒是異步提交。異步提交的結果需要用到回調函數來進行調用,我們來看一下它有多牛逼。

回調函數

import time
import threading
from concurrent.futures import ThreadPoolExecutor  # 線程池執行器

def task(x):
    print("執行了,這是第%s個任務"%x)
    time.sleep(3)
    return "玫瑰花"
    # 銷燬

def callback(res): # 必須有一個形參,來接收期程對象
    print(res.result())   # 打印結果,即task任務的返回結果


if __name__ == '__main__':


    with ThreadPoolExecutor(max_workers=2) as pool:
        for i in range(10):
            res = pool.submit(task,i)
            res.add_done_callback(callback)  # <--- 增加回調函數,當期程對象中的任務處理狀態完畢後將自動調用回調函數



# ==== 執行結果 ==== # 異步提交牛逼不?只要任務返回了我們立馬就可以獲取到結果進行處理。

"""
執行了,這是第0個任務
執行了,這是第1個任務
玫瑰花
玫瑰花
執行了,這是第2個任務
執行了,這是第3個任務
玫瑰花
玫瑰花
執行了,這是第4個任務
執行了,這是第5個任務
玫瑰花
玫瑰花
執行了,這是第6個任務
執行了,這是第7個任務
玫瑰花
玫瑰花
執行了,這是第8個任務
執行了,這是第9個任務
玫瑰花
玫瑰花
"""

擴展:進程池執行器任務數據共享

當我們使用進程池執行器啓動多進程執行任務時,如果想用數據共享,單純 multiprocessing.Queue 進程隊列並不支持。

import multiprocessing
from concurrent.futures import ProcessPoolExecutor  # 進程池執行器

def task_1(q):
    q.put("玫瑰花")
    print("放完了...")

def task_2(q):
    print(q.get())
    print("取到了")

if __name__ == '__main__':

    q = multiprocessing.Queue()

    with ProcessPoolExecutor(max_workers=2) as pool:
        pool.submit(task_1,q)
        pool.submit(task_2,q)


# ==== 執行結果 ==== # 阻塞住

"""

"""

這個時候我們需要用到 multiprocessing 中的 Manager() 中的 Queue

from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor  # 進程池執行器

def task_1(q):
    q.put("玫瑰花")
    print("放完了...")

def task_2(q):
    print(q.get())
    print("取到了")

if __name__ == '__main__':

    q = Manager().Queue()

    with ProcessPoolExecutor(max_workers=2) as pool:
        pool.submit(task_1,q)
        pool.submit(task_2,q)


# ==== 執行結果 ==== # 成功

"""
放完了...
玫瑰花
取到了
"""
相關文章