这个系列一直没再写,很多原因,中间经历了换工作,熟悉项目,熟悉新团队等等一系列的事情。并发课题对于Java来说是一个又重要又难的一大块,除非气定神闲、精力满满,否则我本身是不敢随便写这个话题的。随便出一个生涩、浮于表面的文章,我觉得意义不大。所以一直就搁置到现在。这一次开启,有一个小小的契机:我自己面试中,已经被问烂了的构造函数的几个参数有什么意义,这种问题,发现其实很多人并不了解。就着这次的机会,我就重开这个课题。

一、基本的一些准备知识

李老爷子的线程池堪称经典,老爷子也因此风靡全球开发者圈子,阅读了源码,你才能感受到什么叫做编程思想,我们普普通通的CRUD简直都弱爆了!老爷子牛逼点也在于,源码中的注释非常完备,这不得不佩服:思想牛逼一方面,能把思想完善、由浅入深的表述出来,我觉得更牛逼!其中对于这个ThreadPoolExecutor的基础知识的了解,我觉得完全可以看注释就可以学全了。要想了解线程池源码,我们先要了解如下几个方面:

线程池的几种状态线程池的状态表述状态的使用的方式线程池的构造函数

1、线程池的几种状态

最关键的是几种扭转的状态,让我们直接上老爷子的注释:

/* The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * * (下面是几种转态转换的根本的基本方式,很简单的英文,不用翻译) * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completed * * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. */

RUNNING:接受新的任务,并且也继续运行阻塞队列里面的任务SHUTDOWN:不接受新的任务了,但是可以继续执行阻塞队列里面的任务STOP:不接受新的任务了,也不运行阻塞队列里面的任务了,并且去打断正在执行的任务TIDYING:所有的任务都已经终止了,workerCount(任务数)是0,线程池运行到了这个状态之后,会去调terminated()这个方法TERMINATED:terminated()这个方法执行完成

2、线程池的状态表述

同样,上源码:

// ctl这是一个很重要的参数,使用位标记方式,将当前线程池状态和当前线程池创建的任务多少杂糅到了一起private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 预留三位private static final int COUNT_BITS = Integer.SIZE - 3;// 线程池最大线程大小:(2^29)-1 (about 500 million)private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 线程池状态位,使用int的高三位进行储存private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// 通过ctl值计算出运行线程池状态值private static int runStateOf(int c) { return c & ~CAPACITY; }// 通过ctl值计算出线程池当前任务多少的值private static int workerCountOf(int c) { return c & CAPACITY; }// 通过运行状态和任务多少的两个值,生成ctl这个包装的值private static int ctlOf(int rs, int wc) { return rs | wc; }

思想也很简单:大家熟知的int类型,是占四字节,32位的。为了状态操作的高效与空间节约,老爷子使用了位操作来控制。其中32位的高三位用来存储线程池的状态;低29位用来控制当前线程池有多少个线程。上面的源码就是对位操作的基本实现(都是基本的位操作,我这里不在累赘)

3、状态的使用的方式

这里会给出几个源码中,对状态和线程数量操控的方式:

// (c:ctl,s:state)当前线程池的状态,是不是小于给定的状态private static boolean runStateLessThan(int c, int s) { return c < s;}// (c:ctl,s:state)当前线程池的状态,是不是大于等于给定的状态private static boolean runStateAtLeast(int c, int s) { return c >= s;}// 当前线程池的状态是RUNNING的吗private static boolean isRunning(int c) { return c < SHUTDOWN;}// 使用CAS原理对当前线程池线程数量值加一private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1);}// 使用CAS原理对当前线程池线程数量值减一private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1);}// 使用CAS原理对当前线程池线程数量值减一,直到成功为止private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get()));}

下面的源码是对线程状态修改源码:

private void advanceRunState(int targetState) { // 这是一个死循环,直到修改成功才break for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; }}

这里有两个判断条件,只要一个成功就会break循环

runStateAtLeast:如果当前状态和要设置的状态相等,或者比要设置的状态大。说明线程池状态的不可逆,说明,如果一个线程池已经是SHUTDOWN了,是不能设置回RUNNING状态的compareAndSet:CAS设置ctl值。根据短路原理,到了这个方法执行已经说明当前状态是小于要设置状态了,所以可以修改ctl的状态位值。如果设置失败,返回false,继续死循环。成功,break

3、线程池的构造函数

常用的JDK推荐的,或者各大“api使用”书籍介绍的,无非都是下面的几个方法,进行创建线程池:

Executors.newCachedThreadPoolExecutors.newFixedThreadPoolExecutors.newScheduledThreadPoolExecutors.newSingleThreadExecutor

可是当我们深入源码,才发现:这几个方法的内部无非都调用了ThreadPoolExecutor的构造函数,即使是newScheduledThreadPool这个方法,表面调用了ScheduledThreadPoolExecutor类,可是深入源码才发现:ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor,并且构造函数使用了super进行了构建。这就给我们了一个很好的切入口:只要研究ThreadPoolExecutor构造函数就行。进一步,还会发现,ThreadPoolExecutor有四个构造函数,入参不一样,也都不约而同,最终调用了入参最多的那个(入参少的时候使用默认值),我们看看ThreadPoolExecutor中入参最多的构造函数的源码:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 对入参进行合法性校验 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); // 获取系统安全管理器(不做分析) this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); // 核心几大参数的赋值操作 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}

corePoolSize:核心运行线程数maximumPoolSize:最大运行运行程workQueue:阻塞队列keepAliveTime:当线程大于核心线程数时,且阻塞队列没有元素,最大等待时间threadFactory:生成线程的工厂类handler:超出线程池最大承受能力之后的失败策略方法对象

对于线程池表现出来的各种特性,就是通过这几个参数控制的,所以很关键!

二、线程池的基本执行图解

对于线程池源码,我们先主要从execute执行方法入手进行分析,下面主要用一个图进行大致流程的展示:

配合上代码,我们先指出对应代码的大致位置,我们有个大体的概念比较好:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 下面大约就是①的过程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 下面大约就是②的过程 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 下面大约就是③的过程 else if (!addWorker(command, false)) // 下面大约就是④的过程 reject(command);}

三、线程池细节源码分析

1、addWorker方法

a、addWorker,我们先来看看

private boolean addWorker(Runnable firstTask, boolean core) { // 死循环,在某些条件下,会返回挥着break retry: for (;;) { int c = ctl.get(); // 当下线程池运行状态 int rs = runStateOf(c); // 下面是对线程池状态的一些列判断 // 这个判断稍微有点绕,返回false的条件是: // 线程池是SHUTDOWN、STOP、TIDYING、TERMINATED其中的任意一个状态 // 且(线程池状态为STOP、TIDYING、TERMINATED 或者 firstTask不为空 或者 阻塞队列为空) // 同样是返回false,添加失败 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 下面是对线程池当下线程数的一系列判断 int wc = workerCountOf(c); // 线程数如果大于等于最大线程池允许数量((2^29)-1)或者大于等于设置的 // 核心线程数或者最大线程数 // 同样是返回false,添加失败 if (wc >= CAPACITY || // 这里也是一个玄妙之处: // 如果传入的core为true情况,可见线程数量依赖值为核心线程数 // 如果为false,数量依赖于最大的线程数。通过这个core值,就可以 // 控制什么时候,依赖什么值进行创建线程 wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 下面是CAS的经典操作: // 这个第一个if如果设置成功,就结束整体的外部循环。没成功说明有竟态 if (compareAndIncrementWorkerCount(c)) break retry; // 再次获取一遍ctl,算是double check c = ctl.get(); // 这里判断,如果为true,说明线程池当下状态已经被修改 // 要重新通过外层循环的状态判断来确定返回值,所以continue了 if (runStateOf(c) != rs) continue retry; // 到了这里,说明线程池状态没有被翻转,那就是说当前线程数因为竟态 // 原因没有设置成功,那直接内部循环在执行一次,继续进行CAS的设置 } } // 下面是启动线程的主要代码 // 线程是否启动成功 boolean workerStarted = false; // 线程是否添加成功 boolean workerAdded = false; // 封装传入的线程对象Worker,这个也是很关键的对象,接下来会分析 Worker w = null; try { // 封装线程的初始化工作,下面会分析 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 当下线程池的主锁,最大的一把锁,上锁期间主要对线程池容器进行维护 // 这个容器是一个HashSet,保存当前运行的封装线程Worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次获取线程池当前状态,因为很有可能期间被人更改了 int rs = runStateOf(ctl.get()); // rs < SHUTDOWN:线程池是RUNNING状态 // rs == SHUTDOWN && firstTask == null: // 线程池是SHUNTDOWN且firstTask为空,这种情况主要是因为 // 线程池再SHUNDOWN状态了,可是阻塞队列还有没运行完的线程 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) // 保持一个线程最大线程池状态 largestPoolSize = s; // 到这里线程添加到容器成功 workerAdded = true; } } finally { mainLock.unlock(); } // 如果添加容器成功,就启动封装的线程,且设置启动标识位为true if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果封装线程启动失败,会进行一系列的失败处理 if (! workerStarted) addWorkerFailed(w); } return workerStarted;}

b、下面是对addWorkerFailed方法的解说

private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 同样的,获取主锁 mainLock.lock(); try { // 不为空的情况将封装线程从容器中移除 // 为空的情况,主要是new Worker的时候报错 if (w != null) workers.remove(w); // 循环登陆,减少一个线程数 decrementWorkerCount(); // 试着看看,能不能结束线程池,就是把线程池TERMINASTE了 tryTerminate(); } finally { mainLock.unlock(); }}

c、下面是tryTerminate方法的解说

final void tryTerminate() { // 发现没,又是个死循环,老爷子很喜欢这种方式啊,而且是用for! for (;;) { int c = ctl.get(); // 三种情况直接方法返回: // 1、正处在RUNNING状态的线程池 // 2、线程池的状态是TIDYING或者是TERMINATE // 3、线程池是SHUNDOWN状态的,但是阻塞队列不为空 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 走到这里,线程池的状态可能是:SHUTDOWN(且阻塞队列空)、STOP // 如果此时线程数不为0的话,要进行打断操作了 if (workerCountOf(c) != 0) { // 这里入参的意思是只打断容器里第一个封装线程里面的线程 interruptIdleWorkers(ONLY_ONE); return; } // 执行到这里,说明线程池的状态是:SHUTDOWN(阻塞队列为空)、STOP // 此时线程数为0,说明线程池可以进行终结操作了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // CAS先将线程池设置成TIDYING的状态 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 执行用户实现的terminated方法 terminated(); } finally { // 无论怎么样都会将线程池设置成TERMINATED状态 ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } // 到这里说明终结成功,不过根据Java原理,返回前 // 先执行finally里面的解主锁的方法 return; } } finally { mainLock.unlock(); } // 如果能执行到这里,说明CAS设置TIDYING状态失败 // 说明是竟态状态 }}private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; //线程没有被打断且获取到封装线程的锁 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); }}// 用户自己实现的结束方法protected void terminated() { }

到这里,已经讲完了一个很主要的内部方法:addWorker。下面我们对封装线程对象Worker进行讲解

2、Worker对象

这个东西,是一个很很很很很很很很经典的Java并发模型:AQS。这片文章不做AQS的讲解,放到后续

a、具体的Worker对象张什么样

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // 这个就是最终启动的线程,看到了吧 final Thread thread; // 我们传入的Runnable对象被放到了这里 Runnable firstTask; // 这里记录完成的任务数。 // 这里说明下一个理念:一个Worker,是最终被运行的Runnanle对象 // 在很大的情况下(下面做分析)Worker这个线程会一直存在 // 存在的意义是不断读取阻塞队列里面存储的我们传进来的Runnable对象 // 然后运行。所以我们实现的Runnable对象的run方法,最终不是被 // start方法调用执行的,而是通过直接调用执行的! volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // AQS对象状态!也是一大难的东西! this.firstTask = firstTask; // 这里的getThreadFactory方法使用的就是我们传入的threadFactory // 对象 this.thread = getThreadFactory().newThread(this); } public void run() { // 看到了吧,这里执行了外层对象的方法,去直接调用传入的 // Runnable中的run方法,等下解说 runWorker(this); } // 下面的几个函数都是AQS必须要实现的方法,这里不累赘 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } public ThreadFactory getThreadFactory() { return threadFactory;}

b、默认的线程工厂DefaultThreadFactory:

static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); // 这里记录了线程名的前缀,可见会将线程池序号进行递增操作 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { // 这里就是生成喜闻乐见的Thread对象了,结合上面这里的r就是我们的Worker对象 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }}

3、runWorker方法

a、接下来又是一个关键性方法runWorker

final void runWorker(Worker w) { // 获取当前运行着的Worker线程 Thread wt = Thread.currentThread(); // 这个就是我们当下传入的Runnable Runnable task = w.firstTask; // 置空的意思是:Worker其实是一个壳子,以后会一直运行着,不断执行其他阻塞队列 // 里面的Runnable对象的run方法 w.firstTask = null; // 这里做解锁操作,是表示下面所有操作是可以被打断的 // 另外AQS默认情况下不做unlock操作,lock会阻塞 w.unlock(); // 这个标志位表示线程执行过程中有没有被打断,或者运行异常 boolean completedAbruptly = true; try { // 这个While循环里面的语句相当关键,包含了线程池执行流程的枢纽! // 我先大致说一下,下面会详细分析getTask方法: // 主要就是判断如果当前Worker里面的Runnable对象不为空 // 就会执行这个对象的run方法;执行完之后,还会回到这个循环 // 再下面的finally块里面将task置空了,所以就去调用getTask方法 // 而getTask方法是一个很大可能阻塞的方法,阻塞的原因就是等待 // 阻塞队列里面放入对象!所以也就形成了,一个Worker对象,循环 // 不停的执行传入的Runnable对象run方法。这也就构成了corePoolSize // 与maxPoolSize两个参数控制系统级别的线程多少的目的!也就是说, // 这就是线程池里面,“池”这个概念的由来~ while (task != null || (task = getTask()) != null) { w.lock(); // 这里主要是判断是否要打断当前Worker所在的线程 // 要满足两个个条件: // 1、当前线程池是STOP、TIDYING、TERMINATED // 2、当前线程是没有被打断的情况 // 其中Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) // 主要用于清除线程终端标志,因为很大可能线程池刚刚转换成STOP if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 在执行线程体之前执行的方法,用户实现 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { // 请看下面几个异常,都是直接抛了出去 // 而并没有处理,所以处理内部异常也是 // 线程池的一个关键点 thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 在执行完线程体之后的方法,用户实现 // 异常同时也传入这里了,所以可以自己实现一个子类 // 自己实现这个方法,进行异常处理 afterExecute(task, thrown); } } finally { // 这个地方肯定会被执行,所以无论run方法怎么样 // Worker运行完成线程数都会加一 task = null; w.completedTasks++; // 这里进行解锁操作 w.unlock(); } } // 注意代码执行到了这里说明while循环跳出来了 // 大致有如下几种情况: // 1、阻塞队列里面没值了 // 2、线程池状态翻转,便成了大于等于SHUTDOWN状态的了 // 由于是正常结束,所以异常结束标志是false completedAbruptly = false; } finally { // 这里肯定会被执行,但是有两种情况跳入这个代码块 // 1、run方法没有抛异常,completedAbruptly为false // 2、run方法抛异常,completedAbruptly为true // 下面也会进行解说 processWorkerExit(w, completedAbruptly); }}

b、我们来看核心的getTask方法

private Runnable getTask() { // 这个标志位主要用于后面的poll方法是否超时 boolean timedOut = false; // 又来了,李老爷子!是一个死循环判断! for (;;) { int c = ctl.get(); // 获取当前线程池运行状态 int rs = runStateOf(c); // 如果同时符合下面两种情况,直接返回null,并减少线程数量 // 1、线程池状态是:SHUTDOWN、STOP、TIDYING、TERMINTED // 2、线程池的状态是STOP、TIDYING、TERMINTED或者队列为空 // 这预示着线程池要进行关闭操作了,此Worker要结束声明周期! if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 这里是循环指导CAS成功设置 decrementWorkerCount(); return null; } // 这里获取当前线程池的线程数 int wc = workerCountOf(c); // 这个标识位要解释解释: // 1、allowCoreThreadTimeOut成员变量,可设置 // 2、wc > corePoolSize线程数是否大于核心线程数 // 简单说就是:这个标志位控制线程池的收缩! // 很关键是不是! // 正常情况下只要超出核心线程数的线程才要进行收缩的 // 收缩的条件是根据传入的阻塞队列超时时间 // 但是我们可以通过设置allowCoreThreadTimeOut为true // 这样核心线程也可以收缩! boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这里maximumPoolSize不能为零和负数 // 这里判断很复杂,简单理解就是: // 如果线程池线程数超出了设置的最大线程数或者阻塞队列被打断了 // 且当前Worker所在线程不是最后一个线程或者阻塞队列空了。 // 这里如果wc>maximumPoolSize,那一定大于1,那就说明 // 一定会执行if方法体;如果小于等于maximumPoolSize情况, // 那就说明是线程合理的收缩,这种时候,只有allowCoreThreadTimeOut // 被置位或者线程数大于核心线程数,当然如果要是只有一个线程数且队列不为空 // 的情况也不能收缩,要保证有封装线程能执行阻塞队列里面线程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // 这里如果设置成功说明可以终结这个Worker了 return null; // 这里是continue,因为有竟态 continue; } try { // 注意这里的timed的取值,timed为true的时候是: // 1、allowCoreThreadTimeOut被置位 // 2、或者线程数大于核心线程数 // 其他情况是直接take方法,直接阻塞的。除非被打断 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 正常情况是拿到了Runnable,直接返回了 return r; // 这种是阻塞队列超时了 timedOut = true; } catch (InterruptedException retry) { // 打断情况并非阻塞队列超时,所以这里设置成false timedOut = false; } }}

c、下面是对processWorkerExit分析

private void processWorkerExit(Worker w, boolean completedAbruptly) { // 这个判断说明当前Worker所在的线程执行Runnable中的run方法抛了异常 // 所以这个时候,要将线程数减一 if (completedAbruptly) decrementWorkerCount(); // 获取主锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 将当前Worker存在期间一共执行了多少个Runnable累加到 // 线程池的统计字段上面 completedTaskCount += w.completedTasks; // 将封装线程从容器中移除 workers.remove(w); } finally { mainLock.unlock(); } // 上面的方法在这里执行了,分析请看上面 tryTerminate(); int c = ctl.get(); // 如果现在线程池的状态是:RUNNING、SHUTDOWN,执行if代码块 if (runStateLessThan(c, STOP)) { // 如果没有抛异常情况,执行这个if代码块 if (!completedAbruptly) { // 这个代码块,主要是要保证如果阻塞队列中还有Runnable线程 // 而又走到了即将结束当前WOrker的代码,线程池要保证,至少还有 // 运行着的Worker对阻塞队列中的线程进行处理,执行 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 为0的情况表示允许核心线程收缩,或者核心线程直接设成了0 // 阻塞队列不为空要保证最小可用的Worker为1 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 判断当前线程数是不是比最小的还要小 if (workerCountOf(c) >= min) // 这里表明,有足够的Worker去执行 return; // 代码运行到这里,表明没有足够的Worker了,下面去创建 } // 这里添加一个Worker的原因是: // RUNNING和SHUTDOWN状态都是允许继续执行阻塞队列中的线程的 // 所以这里创建一个firstTask为null,依赖getTast去获取队列中的 // 线程去执行。false的原因是创建依据maximumPoolSize addWorker(null, false); }}

四、结尾

到此为止,线程池的主要源码,都分析了,剩下,还有几个附加功能源码,留着接下来有精力再一点点回补吧。当然,对于下一步的深入,就要到AQS的分析了。可见,这里的Worker本身就是一个AQS,在Worker上面调用lock或是unlock方法,都是进入一个内部的阻塞队列的管理的。其中最最底层,还会涉及到操作系统中线程的同步原语:mutex!接下来,我会分析那个,敬请期待!

查看原文 >>
相关文章