ThreadPoolExcutor

它实现了ExecutorService接口。

线程池状态

它使用一个int变量来记录线程池状态,高3位表示状态,低29位表示线程数量。

分为RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));

// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

构造方法

public ThreadPoolExecutor(int corePoolSize,
         int maximumPoolSize,
         long keepAliveTime,
         TimeUnit unit,
         BlockingQueue<Runnable> workQueue,
         ThreadFactory threadFactory,
         RejectedExecutionHandler handler)
  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • keepAliveTime:生存时间:针对核心线程数以外的线程
  • unit:生存时间的时间单位
  • workQueue:阻塞队列,放线程
  • threadFactory:线程工厂,可以为它起个好名字
  • handler:拒绝策略

使用方法

  1. 线程池刚开始没有线程,当把一个任务提交给线程池后,线程池会创建一个新线程执行任务

  2. 当线程数达到corePoolSize并且没有线程空闲时,这时再加入任务,新加入的任务会在阻塞队列中排队

  3. 如果队列选择有界队列,那么任务队列超过了队列大小时,会创建maximumPoolSize-corePoolSize数目的线程救急

  4. 如果线程到达maximumPoolSize仍有新任务,这时会执行拒绝策略。有四种默认的实现:

    1. AbortPolicy

      throw new RejectedExecutionException();

    2. CallerRunsPolicy

      r.run()

    3. DiscardPolicy:放弃本次任务

    4. DiscardOldestPolicy:放弃队列中最早的任务,让本任务替代它

  5. 当高峰过去后,超过corePoolSize的紧急线程根据keepAliveTime和unit的设置,结束线程。

其它的一些线程池

Executors里面提供了一些定义好的线程池,可以看看几个:

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
  • 核心线程数与最大线程数相同,因此无需超时时间

  • 阻塞队列是无界的,可以放任意数量的任务

适用于任务量已知但是相对耗时的任务。


newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  • 核心线程是0,最大线程数是21e+,说明
    • 60s后全部被回收
    • 回收之后需要用时再创建
    • 因此,适用于短任务
  • 采用SynchronousQueue,没有容量,没有线程来取是放不进去的。这里的意思就是不用阻塞队列存,而是直接创建线程运行。

newSingleThreadExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

看名字就知道,它是只有一个线程在执行,要排队。

使用了装饰器模式,只暴露ScheduledThreadPoolExecutor接口。

(突然发现那些wrapper就是装饰器。。。)

使用

其实使用是最简单的,只要调用它的接口就行。

提交任务:

// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);

// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
 throws InterruptedException;

// ......

关闭线程池

/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();

/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();