摘要:那么由此我们可以知道,当大量任务被放入线程池之后,先是被核心线程执行,多余的会被放进队列里,当队列满了之后才会创建额外的线程进行处理,再多就会采取拒绝策略。// 如果有空闲等待的线程,则将任务放入队列中,让线程去处理任务 if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable)。

针对 IO 密集型的任务,我们可以针对原本的线程池做一些改造,从而可以提高任务的处理效率。

基本

阿里巴巴泰山版java开发手册 中有这么一条:

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,
这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

那么如果要使用 ThreadPoolExecutor ,那就先来看看构造方法中的所有入参:

corePoolSize : 核心线程数,当线程池中的线程数量为 corePoolSize 时,即使这些线程处于空闲状态,也不会销毁(除非设置 allowCoreThreadTimeOut)。
maximumPoolSize : 最大线程数,线程池中允许的线程数量的最大值。
keepAliveTime : 线程空闲时间,当线程池中的线程数大于 corePoolSize 时,多余的空闲线程将在销毁之前等待新任务的最长时间。
workQueue : 任务队列
unit :线程空闲时间的单位。
threadFactory :线程工厂,线程池创建线程时使用的工厂。
handler : 拒绝策略,因达到线程边界和任务队列满时,针对新任务的处理方法。

这么说可能有些难以理解,你可以结合下图进行参考:

那么由此我们可以知道,当大量任务被放入线程池之后,先是被核心线程执行,多余的会被放进队列里,当队列满了之后才会创建额外的线程进行处理,再多就会采取拒绝策略。

但这样真的能满足我们的所有需求吗?

任务的分类

正常来说,我们可以把需要处理的任务按照消耗资源的不同,分为两种: CPU 密集型IO 密集型

CPU 密集型

既然名字里带有 CPU 了,说明其消耗的主要资源就是 CPU 了。

具体是指那种包含大量运算、在持有的 CPU 分配的时间片上一直在执行任务、几乎不需要依赖或等待其他任何东西。

这样的任务,在我的理解中,处理起来其实没有多少优化空间,因为处理时几乎没有等待时间,所以一直占有 CPU 进行执行,才是最好的方式。

唯一能想到优化的地方,就是当单个线程累计较多任务时,其他线程能进行分担,类似 fork/join框架 的概念。

设置线程数时,针对单台机器,最好就是有几个 CPU ,就创建几个线程,然后每个线程都在执行这种任务,永不停歇。

IO 密集型

和上面一样,既然名字里带有 IO 了,说明其消耗的主要资源就是 IO 了。

我们所接触到的 IO ,大致可以分成两种: 磁盘 IO网络 IO

磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。

网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。

IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入 缓冲区 ,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU 时间片让出去,直到 缓冲区 写满。

既然这样,IO 密集型任务其实就有很大的优化空间了(毕竟存在等待),那现有的线程池可以很好的满足我们的需求吗?

线程池的优化

还记得上面说的, ThreadPoolExecutor 针对多余任务的处理,是先放到等待队列中,当队列塞满后,再创建额外的线程进行处理。

假设我们的任务基本都是 IO 密集型,我们希望程序可以有更高的吞吐量,可以在更短的时间内处理更多的任务,那么上面的 ThreadPoolExecutor 明显是不满足我们的需求,那该如何解决呢?

也许再来看看 ThreadPoolExecutor 的 execute 方法,会让我们有一些思路:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 如果当前活跃线程数,小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 则优先创建线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 如果任务可以成功放入队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果不可以成功放入队列,则创建线程
        else if (!addWorker(command, false))
            // 如果无法继续创建线程,则拒绝任务
            reject(command);
    }

针对放入队列的操作,如果队列放入失败,线程池就会选择去创建线程了。因此,我们或许可以尝试自定义线程池,针对 offer 操作,做一些自定义处理。

也就是将任务放入队列时,先检查线程池的线程数是否小于最大线程数,如果是,则拒绝放入队列,否则,再尝试放入队列中。

如果你有看过 dubbo 或者 tomcat 的线程池,你会发现他们就有这样的实现方法。

比如 dubbo 中的 TaskQueue,我们来看看它的 offer 方法:

@Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        // 如果有空闲等待的线程,则将任务放入队列中,让线程去处理任务
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // 如果当前线程数小于最大线程数,则返回 false ,让线程池去创建新的线程
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // 否则,就将任务放入队列中
        return super.offer(runnable);
    }

这样就可以让线程池优先新建线程了。需要注意的,此时的队列因为需要根据线程池中的线程数决定是否放入任务成功,所以需要持有 executor 对象,这点不要忘记奥。

总结

通过本篇文章,主要是让大家重新了解了一下 ThreadPoolExecutor ,并针对高吞吐场景下如何进行局部优化。

有兴趣的话可以访问我的博客或者关注我的公众号,说不定会有意外的惊喜。

https://death00.github.io/

公众号:健程之道

点击此处留言

相关文章