一止长渊

ThreadPoolExecutor

N 人看过
字数:4.8k字 | 预计阅读时长:20分钟

本文中围绕着 ThreadPoolExecutor 将进行相关源码解析

  • 入参参数解析
  • execute 方法
  • Worker 类
  • addWorker 创建新线程的原理
  • runWorker 方法
  • getTask 方法

1、七个参数五种状态四个饱和策略(参数选择、饱和策略选择)

构造函数七个参数:

  • corePoolSize:最小可同时运行的线程数量
  • maximumPoolSize:最大可同时运行的线程数量(当队列中存放的任务达到了队列容量时,当前可以同时运行的线程数量变为最大线程数)
  • keepAliveTime:当线程池的数量超过 corePoolSize 时,这时候如果没有任务提交,corePoolSize 以外空闲的线程经过 keepAliveTime 后会被回收销毁
  • unit:keepAliveTime 单位
  • BlockingQueue 任务队列:当线程池中同时运行的线程数量超过 corePoolSize,新进入的任务会加入到任务队列

BlockingQueue 叫做阻塞队列的特殊含义就是:当阻塞队列中没有任务,调用 get 方法来获取队列中元素就会陷入阻塞,直到队列不为空

  • ThreadFactory:线程工程
  • handler:饱和策略(在同时运行的线程数量达到了 maximumPoolSize 并且队列达到最大容量,此时再进入的线程就会执行饱和策略)

corePoolSize 参数怎么设置:

优先按照机器性能corePoolSize 需要查看任务类型是 CPU 密集型还是 IO 密集型以及CPU 核心数然后再来看实际任务是多少,如果太大了的话,就要考虑集群了

  • CPU 密集型:CPU 核心数 + 1
  • IO 密集型:CPU 核心数 * 2

此外还要根据具体 QPS 并发量来看:
例如每秒的并发量为 500 ~ 1000,每个任务处理时间 0.1 秒,最大可忍受响应时间 1s
那么单个线程,1 秒内最多可以执行完 1 / 0.1 = 10 个任务
在多线程的情况下,需要将 1s 内进来的任务全部处理完,也就是  (500 ~ 1000)_0.1 = 50~100 个线程
那么 corePoolSize 可以设置大于等于 50,根据 8020 原则,80%的时间每秒任务数是小于 1000 _ 0.8 = 800 的, 那么 corePoolSize 就可以设置成 80
https://blog.csdn.net/riemann_/article/details/104704197

maxPoolSize 参数:

最大线程数生产环境中可以设置成 corePoolSize 一样,可以减少处理过程中创建线程的开销

拒绝策略选择:

根据执行的任务重不重要,如果不重要则可以执行 discard 直接丢弃策略,如果重要可以利用 CallRunPolicy(CallerRunsPolicy 适合大量的计算任务执行)或一些缓冲机制来执行(不让超过我执行最大请求数在网关层面进来,或者利用 Sleuth 超过指定 QPS 请求进行熔断执行返回默认数据)

BlockingQueue 任务队列:

corePoolSize 确定为 80 之后,0.1 最多 80 个任务执行完毕,根据最大响应时间 1s,队列里的线程可以等待 1s,也就是任务队列中的任务都需要在 1s 内全部完成清空,那么就是 1 /0.1 * 80 = 800

创建线程的两个条件:

  • 线程池处于 running 状态(硬性条件)
  • 同时运行数量小于 corePoolSize 或者队列已满但是还未达到 maximumPoolSize

执行饱和策略的时机:

  • 线程数到达 maximumPoolSize,且此时任务队列已经满了
  • 线程池处于 ShutDown 状态即以上,新进入的任务会被执行饱和策略(线程池调用 shutdown 方法后,会等待线程池中的任务执行完毕,在调用 shutdown 和线程池真正 shutdown 之间提交任务,会拒绝新任务)
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 如果corePoolSize为20,maximumPoolSize为10,可行吗?
        // 不可行,构造函数初始化都过不去,会抛出非法参数异常, maximumPoolSize一定要大于等于corePoolSize
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor 内部有 AtomicInteger,其中高三位为标记线程池状态(五种),线程个数(低 29 位,允许最大线程数量为 2 ^ 29 - 1)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

线程池五种状态:

//(高3位):11100000000000000000000000000000
//此时接受新任务并且处理阻塞队列里的任务
private static final int RUNNING    = -1 << COUNT_BITS;

//(高3位):00000000000000000000000000000000
//拒绝新任务但是处理阻塞队列里的任务(关门了)
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//(高3位):00100000000000000000000000000000
//拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
private static final int STOP =  1 << COUNT_BITS;

//(高3位):01000000000000000000000000000000
//所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
private static final int TIDYING    =  2 << COUNT_BITS;

//(高3位):01100000000000000000000000000000
//终止状态。terminated方法调用完成以后的状态
private static final int TERMINATED =  3 << COUNT_BITS;//

4 种饱和策略(执行时机:线程池中同时运行的线程数量达到了 maximumPoolSize 并且任务队列容量已满)

  • CallerRunsPolicy(直接调用 Runnable 任务自己的 run 方法,会当做一个普通方法执行)
 //如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行;
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 调用run方法的前提是线程池仍然是Running状态才调用
            if (!e.isShutdown()) {
                //如果线程池仍在运行状态,直接执行Runnable的run方法
                r.run();
            }
        }
    }
  • AbortPolicy(拒绝任务,同时会抛出 RejectedExecutionException)
 //该策略会直接抛出异常,阻止系统正常工作;
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * 直接抛出RejectedExecutionException异常
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
  • DiscardPolicy(直接抛弃该任务,什么也不做,会当做提交成功了,实际已经被抛弃了)
//静默策略,不予任何处理。
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 什么也没做,
        }
    }
  • DiscardOldestPolicy(抛弃任务队列中第一个任务,然后尝试提交该任务,因为 ThreadPoolExecutor 多线程同时操作下,可能你刚把队首任务删除,别人就加入了队列,自己可能就加入不了了,所以是尝试提交)
//该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去,马上要被执行的那个任务,并尝试再次提交;
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                //直接移除任务队列中第一个任务
                e.getQueue().poll();
                //再次尝试提交
                e.execute(r);
            }
        }
    }

2、execute 方法,进入一个任务

1.首先检查当前的活动线程数是否超过核心数

  • 未超过,尝试利用 addWorker(command, true)创建新线程执行任务

    • 为什么说尝试呢?因为 addWorker 会再次检查线程池状态(必须为 Running)和线程池同时运行线程数量(同时运行的线程数量超过了 corePoolSize 但是队列未满或者 maximumPoolSize,也不会创建新线程)

    2.如果超过了核心数量,则会尝试将该任务加入到任务队列

  • 如果加入任务队列成功

此时会再次查询线程池的状态

  • 如果线程池再次查询已经不是 Running 状态,则会将该任务从队列中移除并执行指定的饱和策略
  • 如果线程池再次查询时 Running 状态,则查看当前线程数量是否为 0,如果为 0 则会调用 addWorker 尝试添加一个 Worker,此时任务还在队列中,执行的是队列的第一个任务
  • 进入到任务队列失败,说明此时队列已满
    • 则查看当前是否已经达到了 maximumPoolSize,如果未达到则尝试添加新线程,然后执行队列中的第一个任务
    • 如果达到了 maximumPoolSize,则会执行指定的饱和策略

3、Worker 类

Worker 类是继承了 AQS 的,目的是 Worker 对象封装的线程在运行期间处理任务时上锁,防止被其他操作中断 interpret,ThreadPoolExecutor 其他方法需要终端 Worker 时,需要首先获得 Worker 中 AQS 的锁;interruptIdleWorkers 在进行中断时会使用 tryLock 来判断该工作线程是否正在处理任务,如果 tryLock 返回 true,说明该工作线程当前未执行任务,这时才可以被中断。是要中断工作线程,还要判断工作线程是否是空闲的,如果工作线程正在处理任务,就不应该发生中断;

所以面试回答如何 ThreadPollExecutor 如何判断一个线程当前是空闲状态还是处理任务,只需要通过该 Worker 查看是否上锁了即可,如果上锁了说明在处理任务,任务处理完毕才会释放该锁,在处理任务期间由于加上了锁,其他人是无法中断的,中断前需要获得该锁

    public void shutdown() {
        // 获取主锁,保证shutDown期间其他方法无法对线程进行操作,防止别人在SHUTDOWN期间,又创建了新的线程
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查是否可以中断
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            // 停止所有空闲的线程,怎么判断一个线程是否空闲?查看这个Worker是否上锁即可,没有上锁就是空闲中断该线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 对workers集合每个worker进行遍历,发现没上锁就表明空闲,中断该Worker线程
            for (Worker w : workers) {
                Thread t = w.thread;
                // 只能中断空闲的线程,如何判断一个线程是在处理任务还是在空闲状态呢?
                // 只需要上没上锁,tryLock就是尝试上锁,上锁成功说明可以中断,中断完后再释放该锁方便下次唤醒该线程
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

4、addWorker 创建新线程并取出队列中队首任务执行

addWorker 第一步:for 循环 cas 让 worker 数量+1
addWorker 第二步:利用线程工厂新创建的线程和队列中的第一个任务封装到 Woker 中,并通过抢占 ReentrantLock 将新创建的 worker 对象加入到 Woker 集合中
addWorker 第一步:启动新创建的线程执行任务

addWorker 方法主要执行了三个功能,可以看到要添加新的线程还是比较困难重重:
1.for 循环 cas 检查线程池状态和线程池中同时运行线程数量,如果符合条件则尝试将 worker 数量 + 1(此时新 worker 还未创建,只是把 worker 数量加 1 成功)
2.cas worker 数量加 1 操作成功,则才会利用线程工厂创建一个新线程并取出队列中的第一个任务封装到 Worker 中,此时线程还是未启动,只有接下来抢占可重入锁 ReentrantLock,将新建的 Worker 对象加入到 workers 集合中 3.加入到 workers 集合中成功,start 启动这个线程

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 1、for循环CAS尝试将worker数量+1,期间会不断检查线程池状态和运行线程数量来决定是否直接返回
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 2、CAS尝试+1成功后,利用线程工厂新建一个Worker对象,加入当前任务
        // 抢占ReentrantLock来让新建的Worker对象加入到workers集合中
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // worker添加成功,启动线程执行任务
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

5、runWorker 方法

while 轮询去 BlockingQueue 中去获取任务然后执行任务,如果队列为空,则该 Worker 就会进入阻塞

    final void runWorker(Worker w) {
        // 获取当前线程
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 利用AQS释放1个资源,表明这时候允许中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 轮询从任务队列中获取任务,如果返回为null,则为以下四种情况:
            // 1.线程数量超过maximumPoolSize,无法获取到任务,该线程要被清理掉
            // 2.线程池状态处于stop(含stop以上):此时就不会再处理任务了(shutdown仍然会处理完阻塞队列中的任务)
            // 3.线程池处于shutdown并且阻塞队列为空
            // 4.线程超过了keepAliveTime也没有新任务进来
            // BlockingQueue叫做阻塞队列的特点就是在队列为空时尝试利用take获取队列中的元素时会阻塞直到队列不为空
            // 所以除了以上几种情况之外:如果队列队列为空,则此处getTask会一直阻塞直到队列不为空
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果线程池至少是stop状态,则中断此线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); // 执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
            // 如果completedAbruptly仍然为false,说明出现了异常跳出了while循环
        } finally {
            // 如果没有获取到任务,tryTerminate方法尝试中断一个空闲线程,
            // 避免队列中为空取任务一直阻塞的情况,就会执行processWorkerExit函数
            processWorkerExit(w, completedAbruptly);
        }
    }

如果没有获取到任务,就会执行 processWorkerExit 函数
processWorkerExit 函数目的就是尝试销毁一个空闲线程,销毁之前查看是否是上面异常 completedAbruptly 进入到该函数的,如果是则还会新建一个 worker 将之前出现异常死掉的线程也会清理掉

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果为true说明给线程出现了异常,则直接将worker数量减1
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        // 抢占可重入锁目的,防止要清理该worker时别人此时又使用到该worker
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 统计该线程完成的任务数
            completedTaskCount += w.completedTasks;
            // 从workers集合中删除该worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 根据线程池状态进行判断是否结束线程池
        // 会尝试清理一个空闲线程,防止阻塞队列为空,线程都调用get方法都陷入阻塞
        tryTerminate();

        /**
         * allowCoreThreadTimeOut:是否允许核心线程被销毁回收,允许则最小允许存活空闲线程数量就为0,不允许则最小数量为corePoolSize
         * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
         * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
         * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
         */
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty()) // 允许核心线程销毁回收,并且队列非空则至少保留一个线程
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 是异常造成的,由于前面减了一个worker,现在又新建一个worker
            addWorker(null, false);
        }
    }

6、getTask 方法

1、检查线程池以及队列任务状态
2、根据 maximumPoolSize、超时时间和队列任务控制线程数量(当线程数量超过 maximumPoolSize,不会再分配任务给该空闲线程,而是将该空闲线程杀掉)
3、满足条件从 workQueue 队列中获取任务;
为了防止空闲线程都来空任务队列获取任务全部陷入阻塞的状态,此时的 worker 因为空闲肯定是 AOS 没有上锁的,然后 shutdown 方法可以调用 interruptIdleWorkers 方法来给未上锁的 worker(能够上锁就表明是空闲状态),就可以进行中断
抛出 InterruptedException 后接触阻塞的状态

private Runnable getTask() {
        //超时标志
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            //获取线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);


            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //如果线程池状态异常或任务队列为空,返回NULL
                decrementWorkerCount();
                return null;
            }
            //获取当前线程数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            /*
             * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
             * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
             * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
             * 如果减1失败,则返回重试。
             * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
             */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 为了防止空闲线程都来空任务队列获取任务全部陷入阻塞的状态,此时的worker因为空闲肯定是AOS没有上锁的
                // 然后shutdown方法可以调用interruptIdleWorkers方法来给未上锁的worker(能够上锁就表明是空闲状态),就可以进行中断
                // 抛出InterruptedException后接触阻塞的状态
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

本作品采用 知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议 (CC BY-NC-ND 4.0) 进行许可。