一、使用

这里是线程池的简单使用

二、分析

可以创建4种线程池

以固定数量的线程池为例,跟踪源码

 public static void main(String[] args) {
       ExecutorService executorService = Executors.newFixedThreadPool(2);// 创建两个固定线程
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " aaa");
            }
        });
        executorService.shutdown();
    }

一直跟这行代码 Executors.newFixedThreadPool(2);

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

上面的代码,其实就算线程池的初始化过程,涉及7个参数

 int corePoolSize, // 核心线程数
 int maximumPoolSize, // 最大线程数
 long keepAliveTime, //  空闲线程存活时间
 TimeUnit unit,      // 时间单位
 BlockingQueue<Runnable> workQueue, // 线程池所使用的缓冲队列
 ThreadFactory threadFactory, // 线程池创建线程使用的工厂
 RejectedExecutionHandler handler // 线程池对拒绝任务的处理策略

有几个参数需要说明下

keepAliveTime:是空闲线程存活时间 指的是非核心线程执行完任务后不会立关闭,而是在这个时间里面等待下看是否有其他的任务需要执行,如果超时了没任务,就关闭该线程。

workQueue: 任务来了,但是核心线程满了,就放入队列里面

handler: 线程池对拒绝任务的处理策略,任务来了,线程池满了,队列也满了,对新来的任务进行处理,

有四种方案:1)抛异常 (默认方式) 2)丢弃任务 3)剔除队列里面最老的任务 4)不使用线程池执行

跟下这行代码:executorService.execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

执行流程如下:

  1. 如果线程池当前线程数小于corePoolSize,则调用addWorker创建新线程执行任务,成功返回true,失败执行步骤2。
  2. 如果线程池处于RUNNING状态,则尝试加入阻塞队列,如果加入阻塞队列成功,则尝试进行Double Check,如果加入失败,则执行步骤3。
  3. 如果线程池不是RUNNING状态或者加入阻塞队列失败,则尝试创建新线程直到maxPoolSize,如果失败,则调用reject()方法运行相应的拒绝策略。

在步骤2中如果加入阻塞队列成功了,则会进行一个Double Check的过程。Double Check过程的主要目的是判断加入到阻塞队里中的线程是否可以被执行。如果线程池不是RUNNING状态,则调用remove()方法从阻塞队列中删除该任务,然后调用reject()方法处理任务。否则需要确保还有线程执行。

addWork():

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 获取当前线程状态
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;
        // 内层循环,worker + 1
        for (;;) {
            // 线程数量
            int wc = workerCountOf(c);
            // 如果当前线程数大于线程最大上限CAPACITY  return false
            // 若core == true,则与corePoolSize 比较,否则与maximumPoolSize ,大于 return false
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // worker + 1,成功跳出retry循环
            if (compareAndIncrementWorkerCount(c))
                break retry;

            // CAS add worker 失败,再次读取ctl
            c = ctl.get();

            // 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {

        // 新建线程:Worker
        w = new Worker(firstTask);
        // 当前线程
        final Thread t = w.thread;
        if (t != null) {
            // 获取主锁:mainLock
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {

                // 线程状态
                int rs = runStateOf(ctl.get());

                // rs < SHUTDOWN ==> 线程处于RUNNING状态
                // 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {

                    // 当前线程已经启动,抛出异常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();

                    // workers是一个HashSet<Worker>
                    workers.add(w);

                    // 设置最大的池大小largestPoolSize,workerAdded设置为true
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            // 启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {

        // 线程启动失败
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
  1. 判断当前线程是否可以添加任务,如果可以则进行下一步,否则return false;

    1. rs >= SHUTDOWN ,表示当前线程处于SHUTDOWN ,STOP、TIDYING、TERMINATED状态
    2. rs == SHUTDOWN , firstTask != null时不允许添加线程,因为线程处于SHUTDOWN 状态,不允许添加任务
    3. rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true,不允许添加线程,因为firstTask == null是为了添加一个没有任务的线程然后再从workQueue中获取任务的,如果workQueue == null,则说明添加的任务没有任何意义。
  2. 内嵌循环,通过CAS worker + 1
  3. 获取主锁mailLock,如果线程池处于RUNNING状态获取处于SHUTDOWN状态且 firstTask == null,则将任务添加到workers Queue中,然后释放主锁mainLock,然后启动线程,然后return true,如果中途失败导致workerStarted= false,则调用addWorkerFailed()方法进行处理。

addWorker中的参数,在execute()方法中,有三处调用了该方法:
第一次:workerCountOf(c) < corePoolSize ==> addWorker(command, true),这个很好理解,当然线程池的线程数量小于 corePoolSize ,则新建线程执行任务即可,在执行过程core == true,内部与corePoolSize比较即可。
第二次:加入阻塞队列进行Double Check时,else if (workerCountOf(recheck) == 0) ==>addWorker(null, false)。如果线程池中的线程==0,按照道理应该该任务应该新建线程执行任务,但是由于已经该任务已经添加到了阻塞队列,那么就在线程池中新建一个空线程,然后从阻塞队列中取线程即可。
第三次:线程池不是RUNNING状态或者加入阻塞队列失败:else if (!addWorker(command, false)),这里core == fase,则意味着是与maximumPoolSize比较。

简单说,在新建线程执行任务时,将Runnable包装成一个Worker,Woker为ThreadPoolExecutor的内部类

三、总结线程池运行流程

  • 先判断核心线程数是否已满,核心线程数大小和corePoolSize参数有关,未满则创建线程执行任务
  • 若核心线程池已满,判断队列是否满,队列是否满和workQueue参数有关,若未满则加入队列中
  • 若队列已满,判断线程池是否已满,线程池是否已满和maximumPoolSize参数有关,若未满创建线程执行任务
  • 若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和handler参数有关

四、阿里为什么禁用 Executors 创建线程池

  public static void main(String[] args) {
        ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("线程-%d");
        ThreadFactory factory = threadFactory.build();
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                2,
                2,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(1024),
                // Executors.defaultThreadFactory(),
                factory,
                new ThreadPoolExecutor.AbortPolicy()
        );
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " aaa");
            }
        });
        executorService.shutdown();
    }

阿里规范手册禁止使用 Executors 创建线程池 ,标准的用法是上面的代码,其实就是自己手动的塞需要的那七个参数。

new LinkedBlockingQueue<Runnable>()
  --->
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
---->
@Native public static final int  MAX_VALUE = 0x7fffffff; // int类型的最大值

可以看到 如果使用默认的阻塞队列而不不限制最大数量的话,队列最大值是int类型的最大值21亿,可能会造成大量的线程阻塞问题,所以阿里禁止使用 Executors 创建线程池。

Last modification:December 2nd, 2019 at 07:22 pm