摘要:from multiprocessing import Process import time import os # 兩個子進程將會調用的兩個方法 def worker_1(interval): print("worker_1,父進程(%s),當前進程(%s)"%(os.getppid(), os.getpid())) t_start = time.time() time.sleep(interval) # 程序將會被掛起interval秒 t_end = time.time() print("worker_1,執行時間爲'%0.2f'秒" % (t_end - t_start)) def worker_2(interval): print("worker_2,父進程(%s),當前進程(%s)" % (os.getppid(), os.getpid())) t_start = time.time() time.sleep(interval) t_end = time.time() print("worker_2,執行時間爲'%0.2f'秒" % (t_end - t_start)) if __name__ == '__main__': # 判斷是否爲主程序 # 輸出當前程序的ID print("進程ID:%s" % os.getpid()) """ 創建兩個進程對象,target指向這個進程對象要執行的對象名稱, args後面的元組中,是要傳遞給worker_1方法的參數, 因爲worker_1方法就一個interval參數,這裏傳遞一個整數2給它, 如果不指定name參數,默認的進程對象名稱爲Process-N,N爲一個遞增的整數 """ p1=Process(target=worker_1, args=(2,)) p2=Process(target=worker_2, name="王二狗", args=(1,)) # 使用"進程對象名稱.start()"來創建並執行一個子進程, # 這兩個進程對象在start後,就會分別去執行worker_1和worker_2方法中的內容 p1.start() p2.start() # 同時父進程仍然往下執行,如果p2進程還在執行,將會返回True print("p2.is_alive=%s " % p2.is_alive()) # 輸出p1和p2進程的別名和pid print("p1.name=%s" % p1.name) print("p1.pid=%s" % p1.pid) print("p2.name=%s" % p2.name) print("p2.pid=%s" % p2.pid) """ join括號中不攜帶參數,表示父進程在這個位置要等待p1進程執行完成後,再繼續執行下面的語句,一般用於進程間的數據同步 如果不寫這一句,下面的is_alive判斷將會是True, 改成p1.join(1), 因爲p2需要2秒以上纔可能執行完成,父進程等待1秒很可能不能讓p1完全執行完成,所以下面的print會輸出True,即p1仍然在執行 """ p1.join() print("p1.is_alive=%s" % p1.is_alive())。# 從multiprocessing中導入Process from multiprocessing import Process import os # 子進程要執行的代碼 def run_proc(name): print('子進程運行中,hello,= %s ,pid=%d...' % (name, os.getpid())) if __name__ == '__main__': # 判斷是否爲主程序 print('父進程 %d.' % os.getpid()) """ 創建Process對象, 表示一個子進程. 1. target參數表示子進程要做的任務(一個可執行對象) 2. args是一個元組, 表示傳遞給target的可執行對象的位置參數. 本例中就是把"王二狗"傳遞給函數f的name參數 """ p = Process(target=run_proc, args=('王二狗',)) print('子進程將要執行。

多進程和多線程

一、進程

1.1 進程的引入

現實生活中,有很多的場景中的事情是同時進行的,比如開車的時候 手和腳共同來駕駛汽車,再比如唱歌跳舞也是同時進行的;試想,如果把唱歌和跳舞這2件事情分開依次完成的話,估計就沒有那麼好的效果了(想一下場景:先唱歌,然後在跳舞,O(∩_∩)O哈哈~)

程序中

如下程序,來模擬“唱歌跳舞”這件事情

# 模擬唱歌,跳舞
from time import sleep


def sing():
    for i in range(3):
        print("正在唱歌...%d"%i)
        sleep(1)


def dance():
    for i in range(3):
        print("正在跳舞...%d"%i)
        sleep(1)


if __name__ == '__main__':
        sing()  # 唱歌
        dance() # 跳舞

運行結果

注意

  • 很顯然剛剛的程序並沒有完成唱歌和跳舞同時進行的要求
  • 如果想要實現“唱歌跳舞”同時進行,那麼就需要一個新的方法,叫做: 多任務

1.2 多任務的概念

什麼叫“多任務”呢?簡單地說,就是操作系統可以同時運行多個任務。打個比方,你一邊在用瀏覽器上網,一邊在聽MP3,一邊在用Word趕作業,這就是多任務,至少同時有3個任務正在運行。還有很多任務悄悄地在後臺同時運行着,只是桌面上沒有顯示而已。

現在,多核CPU已經非常普及了,但是,即使過去的單核CPU,也可以執行多任務。由於CPU執行代碼都是順序執行的,那麼,單核CPU是怎麼執行多任務的呢?

答案就是操作系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度實在是太快了,我們感覺就像所有任務都在同時執行一樣。

真正的並行執行多任務只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,操作系統也會自動把很多任務輪流調度到每個核心上執行。

其實就是CPU執行速度太快啦。。

1.2.1 進程

每個獨立執行的程序稱爲進程

進程是程序的一次動態執行過程,它經歷了從代碼加載、執行到執行完畢的一個完整過程,這個過程也是進程本身從產生、發展到最終消亡的過程。

多進程(多任務)操作系統能同時運行多個進程(程序),由於CPU具備分時機制,所以每個進程都能循環獲得自己的CPU時間片。由於CPU執行速度非常快,使得所有程序好象是在“同時”運行一樣。

在操作系統中進程是進行系統資源分配、調度和管理的最小單位,進程在執行過程中擁有獨立的內存單元。

比如:Windows採用進程作爲最小隔離單位,每個進程都有自己的數據段、代碼段,並且與別的進程沒有任何關係。因此進程間進行信息交互比較麻煩。

進程也可以通過派生 (fork 或 spawn)新的進程來執行其他任務,不過因爲每個新進程也都擁有自己的內存和數據棧等,所以只能採用進程間通信(IPC)的方式共享信息。

1.2.2 線程

爲了解決進程調度資源的浪費,爲了能夠共享資源,出現了線程。有時候把線程稱之爲輕量級進程.

線程是CPU調度和分派的基本單位,它可與同屬一個進程的其他的線程共享進程所擁有的全部資源,多個線程共享內存,從而極大地提高了程序的運行效率。

線程是比進程更小的執行單位,線程是進程內部單一的一個順序控制流。

所謂多線程是指一個進程在執行過程中可以產生多個線程,這些線程可以同時存在、同時運行,形成多條執行線索。一個進程可能包含了多個同時執行的線程。

一個或更多的線程構成了一個進程(操作系統是以進程爲單位的,而進程是以線程爲單位的,進程中必須有一個主線程main)

如果一個進程沒有了,那麼這個進程內的所有線程肯定會消失,如果線程消失了,但是進程未必會消失。只有所有的線程都結束了,進程纔會結束!!!而且所有線程都是在進程的基礎之上同時運行。

1.3 Python 和併發編程

在大多數系統上, Python支持多進程(基於消息傳遞)編程和多線程編程.

大多數人比較熟悉的是多線程編程, 但是在python中的多線程編程卻是有諸多的限制.

python中多線程的限制

爲了線程安全考慮, python的解釋器還是使用了內部的GIL(Global Interperter Lock, 全局解釋器鎖定), 在任意時刻只運行單個python的線程執行.即使有多個可用的cpu核心, 也是如此.這就限制了python只能在一個cpu核心上運行.

GIL的存在直接影響了程序的併發編程問題.

如果一個應用程序是大部分與I/O相關, 那麼使用線程一般沒有問題, 因爲大部分時間是在I/O等待.

如果一個應用程序是CPU密集型的, 則使用多線程的壞處大於好處, 返回會降低程序的運行速度, 一般比你想象的還要慢的多.

因此, 用戶在有些情況需要使用多進程(子進程和消息傳遞)

子進程和消息傳遞

展望未來, 如果要再python中進行各種類型的併發編程, 消息傳遞應該是最應該掌握的概念.

1.4 multiprocessing包

multiprocessing是一個package, 這個包支持使用類似threading模塊的類似API去創建新的進程.

multiprocessing支持本地和遠程併發編程, 通過使用子進程來代替線程高效的規避了GIL問題.

所以, multiprocessing允許程序員重複利用給定計算機的多核cpu.

由於python的跨平臺, 所以multiprocessing支持多個平臺:unix, window, linux.

1.4.1 Process類

Process語法結構如下:

Process([group [, target [, name [, args [, kwargs]]]]])

  • target:表示這個進程實例所調用對象;
  • args:表示調用對象的位置參數元組;
  • kwargs:表示調用對象的關鍵字參數字典;
  • name:爲當前進程實例的別名;
  • group:大多數情況下用不到;

最簡單的使用代碼:

# 從multiprocessing中導入Process
from multiprocessing import Process
import os


# 子進程要執行的代碼
def run_proc(name):
    print('子進程運行中,hello,= %s ,pid=%d...' % (name, os.getpid()))


if __name__ == '__main__':  # 判斷是否爲主程序
    print('父進程 %d.' % os.getpid())
    """
    創建Process對象, 表示一個子進程.
    1. target參數表示子進程要做的任務(一個可執行對象)
    2. args是一個元組, 表示傳遞給target的可執行對象的位置參數.
        本例中就是把"王二狗"傳遞給函數f的name參數
    """
    p = Process(target=run_proc, args=('王二狗',))
    print('子進程將要執行。。')
    p.start() # 啓動子進程
    p.join() # 等待進程終止
    print("子進程已經終止")

說明

  1. 創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啓動。
  2. join()方法可以等待子進程結束後再繼續往下運行,通常用於進程間的同步。

1.4.1.1 Process類的實例具有以下方法

Process實例p具有以下方法:

  1. p.start()

    啓動子進程. 這將運行代表進程的子進程, 並調用該子進程中的p.run()方法.

  2. p.join([timeout])

    等待進程p終止, timeout是可選的超時時間. 這個方法通常用戶進程間的同步.

  3. p.is_alive()

    測試進程p是否還在運行, 如果扔在運行, 則返回True

  4. run():

    如果沒有給定target參數,對這個對象調用start()方法時,就將執行對象中的run()方法;

  5. p.terminate()

    強制終止p進程. 如果調用此方法, 進程p將立即被終止, 同時不會進行任何清理工作. 如果再進程p中也開啓了子進程, 則這些子進程將成爲僵死進程.s如果p保存了一個鎖定或有進程間通信, 那麼終止可能會導致死鎖或I/O崩潰.

1.4.1.2 Process實例具有以下實例屬性:

Process實例p具有以下實例屬性:

  1. p.daemon

    一個布爾標誌, 指示這個進程是否爲後臺進程. 當創建他的python進程終止時, 後臺進程將自動終止.

    另外禁止後臺進程創建自己的新進程. p.daemon的值必須再進程啓動前設置.

  2. p.exitcode

    進程的整數退出碼. 如果進程仍在運行, 則它的值是None. 如果是負數, -N表示由信號N所終止

  3. p.name

    當前進程實例別名,默認爲Process-N,N爲從1開始遞增的整數;

  4. p.pid

    進程的整數ID

1.4.1.3 實例

實例1:

from multiprocessing import Process
import os
from time import sleep


# 子進程要執行的代碼
def run_proc(name, age, **kwargs):
    for i in range(10):
        print('子進程運行中,name= %s,age=%d ,pid=%d...' % (name, age,os.getpid()))
        print(kwargs)
        sleep(0.5)


if __name__=='__main__':
    print('父進程 %d.' % os.getpid())
    p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
    print('子進程將要執行')
    p.start()
    sleep(1)
    p.terminate()
    p.join()
    print('子進程已結束')

運行結果:

實例2:

from multiprocessing import Process
import time
import os


# 兩個子進程將會調用的兩個方法
def worker_1(interval):
    print("worker_1,父進程(%s),當前進程(%s)"%(os.getppid(), os.getpid()))
    t_start = time.time()
    time.sleep(interval)  # 程序將會被掛起interval秒
    t_end = time.time()
    print("worker_1,執行時間爲'%0.2f'秒" % (t_end - t_start))


def worker_2(interval):
    print("worker_2,父進程(%s),當前進程(%s)" % (os.getppid(), os.getpid()))
    t_start = time.time()
    time.sleep(interval)
    t_end = time.time()
    print("worker_2,執行時間爲'%0.2f'秒" % (t_end - t_start))


if __name__ == '__main__':  # 判斷是否爲主程序
    # 輸出當前程序的ID
    print("進程ID:%s" % os.getpid())

    """
    創建兩個進程對象,target指向這個進程對象要執行的對象名稱,
    args後面的元組中,是要傳遞給worker_1方法的參數,
    因爲worker_1方法就一個interval參數,這裏傳遞一個整數2給它,
    如果不指定name參數,默認的進程對象名稱爲Process-N,N爲一個遞增的整數
    """
    p1=Process(target=worker_1, args=(2,))
    p2=Process(target=worker_2, name="王二狗", args=(1,))

    # 使用"進程對象名稱.start()"來創建並執行一個子進程,
    # 這兩個進程對象在start後,就會分別去執行worker_1和worker_2方法中的內容
    p1.start()
    p2.start()

    # 同時父進程仍然往下執行,如果p2進程還在執行,將會返回True
    print("p2.is_alive=%s " % p2.is_alive())

    # 輸出p1和p2進程的別名和pid
    print("p1.name=%s" % p1.name)
    print("p1.pid=%s" % p1.pid)
    print("p2.name=%s" % p2.name)
    print("p2.pid=%s" % p2.pid)


    """
    join括號中不攜帶參數,表示父進程在這個位置要等待p1進程執行完成後,再繼續執行下面的語句,一般用於進程間的數據同步
    如果不寫這一句,下面的is_alive判斷將會是True,
    改成p1.join(1),
    因爲p2需要2秒以上纔可能執行完成,父進程等待1秒很可能不能讓p1完全執行完成,所以下面的print會輸出True,即p1仍然在執行
    """
    p1.join()
    print("p1.is_alive=%s" % p1.is_alive())

運行結果:

1.4.1.4 進程的創建-Process子類

創建新的進程還能夠使用類的方式,可以自定義一個類,繼承Process類,每次實例化這個類的時候,就等同於實例化一個進程對象

示例代碼:

from multiprocessing import Process
import time
import os


# 繼承Process類
class ProcessClass(Process):
    """
    因爲Process類本身也有__init__方法,這個子類相當於重寫了這個方法,
    但這樣就會帶來一個問題,我們並沒有完全的初始化一個Process類,所以就不能使用從這個類繼承的一些方法和屬性,
    最好的方法就是將繼承類本身傳遞給Process.__init__方法,完成這些初始化操作
    """
    def __init__(self,interval):
        Process.__init__(self)
        self.interval = interval

    # 重寫了Process類的run()方法
    def run(self):
        print("子進程(%s) 開始執行,父進程爲(%s)" % (os.getpid(), os.getppid()))
        t_start = time.time()
        time.sleep(self.interval)
        t_stop = time.time()
        print("(%s)執行結束,耗時%0.2f秒"%(os.getpid(), t_stop-t_start))


if __name__ == "__main__":
    t_start = time.time()
    print("當前程序進程(%s)"%os.getpid())
    p1 = ProcessClass(2)
    # 對一個不包含target屬性的Process類執行start()方法,就會運行這個類中的run()方法,所以這裏會執行p1.run()
    p1.start()
    p1.join()
    t_stop = time.time()
    print("(%s)執行結束,耗時%0.2f"%(os.getpid(),t_stop-t_start))

運行結果:

1.4.2 進程池:Pool

當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。使用類Pool可以創建進程池, 然後把各種數據處理任務都提交給進程池.

初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,纔會創建新的進程來執行

Pool([numprocess, initializer, initargs])

說明:

numprocess 是指要創建的線程數. 默認是cpu的核心數.(os.cpu_count()的返回值)

initializer 是每個進程啓動時要執行的可調用對象, 默認是None

initargs是傳遞給initializer的元組參數.

1.4.2.1 multiprocessing.Pool常用函數解析:

  • apply(func[, args[, kwds]]):使用 阻塞 方式調用func

    在進程池的一個工作進程中執行func函數, args是傳給func的元組參數. 注意使用這個方法讓多個進程去執行, 他們是同步執行的. 即:多個進程是順序執行的.

    func的返回值就是p.apply的返回值.

  • apply_async(func[, args, kwargs, callback]) :使用非阻塞方式調用func

    (並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args爲傳遞給func的參數列表,kwds爲傳遞給異步的執行func

    callback 是可調用對象, 當func執行結束, 則立即調用callback並把func的返回值傳遞給callback.

  • func的關鍵字參數列表;

  • close():關閉Pool,使其不再接受新的任務;

  • terminate():不管任務是否完成,立即終止;

  • join():主進程阻塞,等待子進程的退出, 必須在close或terminate之後使用;

AsyncResult對象(apply_async()的返回值)

apply_async()的返回值是AsyncResult實例. 具有如下方法:

  1. a.get([timeout])

等待返回結果, 結果就是任務函數的返回值.

  1. a.ready()

如果任務函數執行結束返回True

  1. a.successful()

如果任務函數執行結束, 且在執行的過程中沒有發生異常則

  1. a.wait([timeout])

等待任務結束, 這個方法與get()的區別就是它沒有返回值.

相關文章