阿粉萬字長文帶你解析 ThreadPoolExecutor

爲什麼要用線程池

你有沒有這樣的疑惑,爲什麼要用線程池呢?可能你會說,我可以複用已經創建的線程呀;線程是個重量級對象,爲了避免頻繁創建和銷燬,使用線程池來管理最好了

沒毛病,各位都很懂哈~

不過使用線程池還有一個重要的點:可以控制併發的數量。如果併發數量太多了,導致消耗的資源增多,直接把服務器給搞趴下了,肯定也是不行的

繞不過去的幾個參數

提到 ThreadPoolExecutor 那麼你的小腦袋肯定會想到那麼幾個參數,咱們來瞅瞅源碼(我就直接放有 7 個參數的那個方法了):

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler)

咱們分別來看:

  • corePoolSize :

核心線程數,在線程池中有兩種線程,核心線程和非核心線程。在線程池中的核心線程,就算是它什麼都不做,也會一直在線程池中,除非設置了 allowCoreThreadTimeOut 參數

  • maximumPoolSize:

線程池能夠創建的最大線程數。這個值 = 核心線程數 + 非核心線程數

  • keepAliveTime & unit :

線程池是可以撤銷線程的,那麼什麼時候撤銷呢?一個線程如果在一段時間內,都沒有執行任務,那說明這個線程很閒啊,那是不是就可以把它撤銷掉了?

所以呢,如果一個線程不是核心線程,而且在 keepAliveTime & unit 這段時間內,還沒有幹活,那麼很抱歉,只能請你走人了 核心線程就算是很閒,也不會將它從線程池中清除,沒辦法誰讓它是 core 線程呢~

  • workQueue :

工作隊列,這個隊列維護的是等待執行的 Runnable 任務對象

常用的幾個隊列: LinkedBlockingQueue , ArrayBlockingQueue , SynchronousQueue , DelayQueue

大廠的編碼規範,相信各位都知道,並不建議使用 Executors ,最重要的一個原因就是: Executors 提供的很多方法默認使用的都是無界的 LinkedBlockingQueue ,在高負載情況下,無界隊列很容易就導致 OOM ,而 OOM 會讓所有請求都無法處理,所以在使用時, 強烈建議使用有界隊列 ,因爲如果你使用的是有界隊列的話,當線程數量太多時,它會走拒絕策略

  • threadFactory :

創建線程的工廠,用來批量創建線程的。如果不指定的話,就會創建一個默認的線程工廠

  • handler :

拒絕處理策略。在 workQueue 那裏說了,如果使用的是有界隊列,那麼當線程數量大於最大線程數的時候,拒絕處理策略就起到作用了

常用的有四種處理策略:

- AbortPolicy :默認的拒絕策略,會丟棄任務並拋出 RejectedExecutionException 異常

- CallerRunsPolicy :提交任務的線程,自己去執行這個任務

- DiscardOldestPolicy :直接丟棄新來的任務,也沒有任何異常拋出

- DiscardOldestPolicy :丟棄最老的任務,然後將新任務加入到工作隊列中

默認拒絕策略是 AbortPolicy ,會 throw RejectedExecutionException 異常,但是這是一個運行時異常,對於運行時異常編譯器不會強制 catch 它,所以就會比較容易忽略掉錯誤。

所以,如果線程池處理的任務非常重要, 儘量自定義自己的拒絕策略

線程池的幾個狀態

在源碼中,能夠清楚地看到線程池有 5 種狀態:

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

同時,使用 AtomicInteger 修飾的變量 ctl 來控制線程池的狀態,而 ctl 保存了 2 個變量:一個是 rs 即 runState ,線程池的運行狀態;一個是 wc 即 workerCount ,線程池中活動線程的數量

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 線程池創建之後就處於 RUNNING 狀態

  • 調用 shutdown() 方法之後處於 SHUTDOWN 狀態,此時線程池不再接受新的任務,清除一些空閒 worker ,等待阻塞隊列的任務完成

  • 調用 shutdownNow() 方法後處於 STOP 狀態,此時線程池不再接受新的任務,中斷所有的線程,阻塞隊列中沒有被執行的任務也會被全部丟棄

  • 當線程池中執行的任務爲空時,也就是此時 ctl 的值爲 0 時,線程池會變爲 TIDYING 狀態,接下來會執行 terminated() 方法

  • 執行完 terminated() 方法之後,線程池的狀態就由 TIDYING 轉到 TERMINATED 狀態

懵了?別急,有張圖呢~

線程池處理任務

execute

做到線程複用,肯定要先 execute 起來吧

線程池處理任務的核心方法是 execute ,大概思路就是:

  • 如果 command 爲 null ,沒啥說的,直接拋出異常就完事兒了

  • 如果當前線程數小於 corePoolSize ,會新建一個核心線程執行任務

  • 如果當前線程數不小於 corePoolSize ,就會將任務放到隊列中等待,如果任務排隊成功,仍然需要檢查是否應該添加線程,所以需要重新檢查狀態,並且在必要時回滾排隊;如果線程池處於 running 狀態,但是此時沒有線程,就會創建線程

  • 如果沒有辦法給任務排隊,說明這個時候,緩存隊列滿了,而且線程數達到了 maximumPoolSize 或者是線程池關閉了,系統沒辦法再響應新的請求,此時會執行拒絕策略

來瞅瞅源碼具體是如何處理的:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
		
    int c = ctl.get();
    // 當前線程數小於 corePoolSize 時,調用 addWorker 創建核心線程來執行任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 當前線程數不小於 corePoolSize ,就將任務添加到 workQueue 中
    if (isRunning(c) && workQueue.offer(command)) {
    	// 獲取到當前線程的狀態,賦值給 recheck ,是爲了重新檢查狀態
        int recheck = ctl.get();
        // 如果 isRunning 返回 false ,那就 remove 掉這個任務,然後執行拒絕策略,也就是回滾重新排隊
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 線程池處於 running 狀態,但是沒有線程,那就創建線程執行任務
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果放入 workQueue 失敗,嘗試通過創建非核心線程來執行任務
    // 如果還是失敗,說明線程池已經關閉或者已經飽和,會拒絕執行該任務
    else if (!addWorker(command, false))
        reject(command);
}

在上面源碼中,判斷了兩次線程池的狀態,爲什麼要這麼做呢?

這是因爲在多線程環境下,線程池的狀態是時刻發生變化的,可能剛獲取線程池狀態之後,這個狀態就立刻發生了改變.如果沒有二次檢查的話,線程池處於非 RUNNING 狀態時, command 就永遠不會執行

有點兒懵?阿粉都懂你,一張圖走起~

addWorker

從上面能夠看出來,主要是 addWorker 方法

addWorker 主要是用來創建核心線程的,它主要的實現邏輯是:

  • 判斷線程數量有沒有超過規定的數量,如果超過了就返回 false

  • 如果沒有超過,就會創建 worker 對象,並初始化一個 Thread 對象,然後啓動這個線程對象

接下來瞅瞅源碼:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
		// 線程池狀態 >= SHUTDOWN 時,不再接受新的任務,直接返回 false
		// 如果 rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty() 同樣不接受新的任務,返回 false
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
			// wc >= CAPACITY 說明線程數不夠,所以就返回 false
			// wc >= (core ? corePoolSize : maximumPoolSize) 是在做判斷
				// 如果 core 爲 true ,說明要創建的線程是核心線程,接下來判斷 wc 是否大於 核心線程數 ,如果大於返回 false
				// 如果 core 爲 false ,說明要創建的線程是非核心線程,接下來判斷 wc 是否大於 最大線程數 ,如果大於返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
			// CAS 操作增加 workerCount 的值,如果成功跳出循環
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
			// 判斷線程池狀態有沒有變化,如果有變化,則重試
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

	// workerCount 增加成功之後開始走下面的代碼
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
		// 創建一個 worker 對象
        w = new Worker(firstTask);
		// 實例化一個 Thread 對象
        final Thread t = w.thread;
        if (t != null) {
			// 接下來的操作需要加鎖進行
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
					// 將任務線程添加到線程池中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
				// 啓動任務線程,開始執行任務
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
			// 如果任務線程啓動失敗調用 addWorkerFailed 
			// addWorkerFailed 方法裏面主要做了兩件事:將該線程從線程池中移除;將 workerCount 的值減 1
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker 類

在 addWorker 中,主要是由 Worker 類去做一些相應處理, worker 繼承 AQS ,實現 Runnable 接口

線程池維護的是 HashSet ,一個由 worker 對象組成的 HashSet

private final HashSet<Worker> workers = new HashSet<Worker>();

worker 繼承 AQS 主要是利用 AQS 獨佔鎖機制,來標識線程是否空閒;另外, worker 還實現了 Runnable 接口,所以它本身就是一個線程任務,在構造方法中創建了一個線程,線程的任務就是自己 this。thread = getThreadFactory().newThread(this);

咱們瞅瞅裏面的源碼:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        // 處理任務的線程
        final Thread thread;
        // worker 傳入的任務
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
        	// 將 state 設爲 -1 ,避免 worker 在執行前被中斷
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
			// 創建一個線程,來執行任務
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

runWorker

worker 類在執行 run 方法時,實際上調用的是 runWorker 方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 允許中斷
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        	// 判斷 task 是否爲空,如果不爲空直接執行
        	// 如果 task 爲空,調用 getTask() 方法,從 workQueue 中取出新的 task 執行
            while (task != null || (task = getTask()) != null) {
            	// 加鎖,防止被其他線程中斷
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 檢查線程池的狀態,如果線程池處於 stop 狀態,則需要中斷當前線程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                	// 執行 beforeExecute 
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	// 執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    	// 執行 afterExecute 方法
                        afterExecute(task, thrown);
                    }
                } finally {
                	// 將 task 設置爲 null ,循環操作
                    task = null;
                    w.completedTasks++;
                    // 釋放鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

在 runWorker 方法中,首先會去執行創建這個 worker 時就有的任務,當執行完這個任務之後, worker 並不會被銷燬,而是在 while 循環中, worker 會不斷的調用 getTask 方法從阻塞隊列中獲取任務然後調用 task。run() 來執行任務,這樣就達到了複用線程的目的。通過循環條件 while (task != null || (task = getTask()) != null) 可以看出,只要 getTask 方法返回值不爲 null ,就會一直循環下去,這個線程也就會一直在執行,從而達到了線程複用的目的

getTask

咱們來看看 getTask 方法的實現:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // allowCoreThreadTimeOut 變量默認爲 false ,也就是核心線程就算是空閒也不會被銷燬
            // 如果爲 true ,核心線程在 keepAliveTime 內是空閒的,就會被銷燬
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 如果運行線程數大於最大線程數,但是緩存隊列已經空了,此時遞減 worker 數量
            // 如果有設置允許線程超時或者線程數量超過了核心線程數量,並且線程在規定時間內沒有 poll 到任務並且隊列爲空,此時也遞減 worker 數量
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 如果 timed 爲 true ,會調用 workQueue 的 poll 方法
                	// 超時時間爲 keepAliveTime ,如果超過 keepAliveTime 時長的話, poll 就會返回 null 
                	// 如果返回爲 null ,在 runWorker 中 
                	// while (task != null || (task = getTask()) != null) 循環條件被打破,從而跳出循環,此時線程執行完畢
                // 如果 timed 爲 false ( allowCoreThreadTimeOut 爲 false ,並且 wc > corePoolSize 爲 false )
                	// 會調用 workQueue 的 take 方法阻塞到當前
                	// 當隊列中有任務加入時,線程被喚醒, take 方法返回任務,開始執行
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

源碼分析到這裏就差不多清楚了

線程複用主要體現在 runWorker 方法中的 while 循環中,在 while 循環裏面, worker 會不斷的調用 getTask 方法,而在 getTask 方法裏,如果任務隊列中沒有了任務,此時如果線程是核心線程則會一直卡在 workQueue。take 方法,這個時候會被阻塞並掛起,不會佔用 CPU 資源,直到拿到任務然後返回 true , 此時 runWorker 中得到這個任務來繼續執行任務,從而實現了線程複用

呦,沒想到吧,一不小心就看完了

相關文章