ThreadPoolExcutor
它实现了ExecutorService
接口。
线程池状态
它使用一个int变量来记录线程池状态,高3位表示状态,低29位表示线程数量。
分为RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- keepAliveTime:生存时间:针对核心线程数以外的线程
- unit:生存时间的时间单位
- workQueue:阻塞队列,放线程
- threadFactory:线程工厂,可以为它起个好名字
- handler:拒绝策略
使用方法
-
线程池刚开始没有线程,当把一个任务提交给线程池后,线程池会创建一个新线程执行任务
-
当线程数达到corePoolSize并且没有线程空闲时,这时再加入任务,新加入的任务会在阻塞队列中排队
-
如果队列选择有界队列,那么任务队列超过了队列大小时,会创建maximumPoolSize-corePoolSize数目的线程救急
-
如果线程到达maximumPoolSize仍有新任务,这时会执行拒绝策略。有四种默认的实现:
-
AbortPolicy
throw new RejectedExecutionException();
-
CallerRunsPolicy
r.run()
-
DiscardPolicy:放弃本次任务
-
DiscardOldestPolicy:放弃队列中最早的任务,让本任务替代它
-
-
当高峰过去后,超过corePoolSize的紧急线程根据keepAliveTime和unit的设置,结束线程。
其它的一些线程池
Executors里面提供了一些定义好的线程池,可以看看几个:
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
核心线程数与最大线程数相同,因此无需超时时间
-
阻塞队列是无界的,可以放任意数量的任务
适用于任务量已知但是相对耗时的任务。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 核心线程是0,最大线程数是21e+,说明
- 60s后全部被回收
- 回收之后需要用时再创建
- 因此,适用于短任务
- 采用SynchronousQueue,没有容量,没有线程来取是放不进去的。这里的意思就是不用阻塞队列存,而是直接创建线程运行。
newSingleThreadExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
看名字就知道,它是只有一个线程在执行,要排队。
使用了装饰器模式,只暴露ScheduledThreadPoolExecutor接口。
(突然发现那些wrapper就是装饰器。。。)
使用
其实使用是最简单的,只要调用它的接口就行。
提交任务:
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// ......
关闭线程池
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();