JAVA帮助开发者封装了一些现成的线程池调用,但是每种线程池都有自己的使用场景,如果不了解里面的原理,那么很容易掉进坑里,线程池原理也是面试的重灾区,因此本问将完整分析线程池的原理。

一、new thread弊端

从学习java多线程开始,我们就学习了用new thread来创建线程。但是他有一定的弊端:

  • 每次new Thread新建对象,性能差
  • 线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或OOM
  • 缺少更多功能,如更多执行、定期执行、线程中断

二、线程池好处

  • 重用存在的线程,减少对象创建、消亡的开销,性能佳
  • 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
  • 提供定时执行、定期执行、单线程、并发数控制等功能

三、线程池相关参数

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize:核心线程数量

默认情况下,在创建了线程池后,线程池中的线程数为0,
(除非调用prestartAllCoreThreads()prestartCoreThread()方法,从方法名字可以看出,是预创建线程的意思,即在没有任务到来之前,就创建corePoolSize个线程或1个线程)当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。

  • maximumPoolSize:线程最大线程数

线程池中的最大线程数,表示线程池中最多能创建多少个线程。

超过就执行reject策略:如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务

  • workQueue:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响,一般有以下几种选择:

ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序;

LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列;

SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列;

PriorityBlockingQueue:一个具有优先级的无限阻塞队列;底层用DelayedWorkQueue实现。

  • keepAliveTime:线程没有任务执行时最多保持多久时间终止

当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。(但是如果调用了allowCoreThreadTimeOut(boolean value)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;)

  • unit:keepAliveTime的时间单位
  • threadFactory:线程工厂,用来创建线程

threadFactory用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字

  • handler:饱和策略

当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。

这些参数全部传给ThreadPoolExecutor之后,ThreadPoolExecutor就可以为我们提供一个线程池,我们可以对这个线程池提交以及终止线程任务。

四、饱和策略

当线程池中已经到了完全没有办法再接收新的线程进来的时候,就会启动饱和策略。

1
2
3
4
java.util.concurrent.ThreadPoolExecutor.AbortPolicy
java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy
java.util.concurrent.ThreadPoolExecutor.DiscardPolicy
  1. AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认)
  2. CallerRunsPolicy:只用调用所在的线程运行任务
  3. DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  4. DiscardPolicy:不处理,丢弃掉,不抛出异常。

五、线程池的源码解读

程序中要声明线程池,是这样写的:

1
2
ExecutorService exec = Executors.newCachedThreadPool();
exec.excute(Runnable command);

先来看看ExecutorService其中的奥秘。

5.1 ExecutorService和Executor的关系

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

1
2
3
public interface Executor {
void execute(Runnable command);
}

ExecutorService接口继承了Executor接口,并声明了一些方法:submitinvokeAllinvokeAny以及shutDown等;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface ExecutorService extends Executor {

void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

可以看出,ExecutorService具备管理执行器和任务生命周期的方法,提交任务机制更加完善。Executor只是运行新任务的简单接口,目的是将任务提交和任务执行解耦。

5.2 ThreadPoolExecutor重要方法

我们知道,在执行Executors.newCachedThreadPool()的时候,内部是调用ThreadPoolExecutor的构造函数来生成Exceutors对象,即生成了线程池,因为继承关系是:ThreadPoolExecutor extends AbstractExecutorService implements ExecutorService extends Executor。构建好之后,就可以构建工作线程去执行任务。其中,流程是这样的:

image

所以,用于execute()或者submit()的线程任务都是被封装成worker去执行的。下面来看看execute()submit()等核心方法。

ThreadPoolExecutor类中有几个非常重要的方法:

  • execute()

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  • submit()

submit()方法是在ExecutorService中声明的方法,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。

  • shutdown()
    将线程池状态置为SHUTDOWN,并不会立即停止:

停止接收外部submit的任务内部正在跑的任务和队列里等待的任务,会执行完等到第二步完成后,才真正停止

  • shutdownNow()
    将线程池状态置为STOP。企图立即停止,事实上不一定:

shutdown()一样,先停止接收外部提交的任务忽略队列里等待的任务尝试将正在跑的任务interrupt中断返回未执行的任务列表

它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleepwaitCondition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的

  • awaitTermination(long timeOut, TimeUnit unit)

接收timeoutTimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。

5.3 Executors生成线程池

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。这个就涉及上面我们反复提及的核心类:ThreadPoolExecutor

⭐其实都是通过调用ThreadPoolExecutor来完成的,最后可以返回ExecutorService对象,其实说白了都是Excutor对象。

下面来分别看看比较常用的线程池。

  • newSingleThreadExecutor

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//创建一个核心线程个数和最大线程个数都为1的线程池
//阻塞队列长度为Integer.MAX_VALUE
//keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收
//线程由DefaultThreadFactory默认创建,有统一的命名规范,并且优先级是一样的
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

//使用自己的线程工厂来创建线程
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
public class ThreadPoolTest3 {

public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();

for(int i=0;i<10;i++){
final int index = i;
exec.execute(() -> {
log.info("task:{},index:{}",Thread.currentThread().getId(),index);
});
}

exec.shutdown();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
task:10,index:0
task:10,index:1
task:10,index:2
task:10,index:3
task:10,index:4
task:10,index:5
task:10,index:6
task:10,index:7
task:10,index:8
task:10,index:9

运行结果分析:单线程+有序。

  • newFixedThreadPool

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//创建一个核心线程个数和最大线程个数都为nThreads的线程池
//阻塞队列长度为Integer.MAX_VALUE
//keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//使用自己的线程工厂来创建线程
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
public class ThreadPoolTest2 {

public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(3);

for(int i=0;i<10;i++){
final int index = i;
exec.execute(() -> {
log.info("task:{},index:{}",Thread.currentThread().getId(),index);
});
}

exec.shutdown();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
task:11,index:1
task:11,index:3
task:11,index:4
task:11,index:5
task:11,index:6
task:11,index:7
task:11,index:8
task:11,index:9
task:10,index:0
task:12,index:2

结果分析:只创建了三个线程来执行。

  • newCachedThreadPool

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE
//阻塞队列为同步队列
//keeyAliveTime=60说明只要当前线程60s内空闲则回收
//特殊在于加入到同步队列的任务会被马上被执行,同步队列里面最多只有一个任务,并且存在后马上会拿出执行
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

//使用自己的线程工厂来创建线程
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
public class ThreadPoolTest1 {

public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();

for(int i=0;i<10;i++){
final int index = i;
exec.execute(() -> {
log.info("task:{},index:{}",Thread.currentThread().getId(),index);
});
}

exec.shutdown();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
task:10,index:0
task:12,index:2
task:14,index:4
task:16,index:6
task:18,index:8
task:11,index:1
task:13,index:3
task:15,index:5
task:17,index:7
task:19,index:9

结果分析:按需创建线程,几乎一次循环就创建了一个新的线程来执行。

  • newScheduledThreadPool

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

1
2
3
4
5
//创建一个最小线程个数corePoolSize,最大为Integer.MAX_VALUE
//阻塞队列为DelayedWorkQueue的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//多长时间之后执行一次
@Slf4j
public class ThreadPoolTest4 {

public static void main(String[] args) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(3);

exec.schedule(new Runnable() {
@Override
public void run() {
log.info("schedule run");
}
},3, TimeUnit.SECONDS);
exec.shutdown();
}
}

//定时执行,这里是每隔3秒执行一次
@Slf4j
public class ThreadPoolTest4 {

public static void main(String[] args) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(3);

exec.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
log.info("schedule run");
}
},1,3,TimeUnit.SECONDS);//一开始延迟1秒执行任务,之后每隔3秒执行一次任务,不适合调用exec.shutdown();,因为会被关闭
}
}
  • newSingleThreadScheduledExecutor

创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。

1
2
3
4
5
6
//创建一个最小线程个数corePoolSize为1,最大为Integer.MAX_VALUE
//阻塞队列为DelayedWorkQueue的线程池。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

同上。demo不再赘述。

5.4 线程池实现原理–线程池状态

image

  • static final int RUNNING = 0;

当创建线程池后,初始时,线程池处于RUNNING状态;

  • static final int SHUTDOWN = 1;

如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

  • static final int STOP = 2;

如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

  • static final int TERMINATED = 3;

当线程池处于SHUTDOWNSTOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

6.5 线程池实现原理–任务的执行

corePoolSizemaximumPoolSize的关系举个简单的例子形象理解就是:

假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。

因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;

然后就将任务也分配给这4个临时工人做;

如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。

当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

maximumPoolSize可以看作是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

注:execute()方法和submit()方法已经在前面讲过区别了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//
// Proceed in 3 steps:
//
// 1.
// 判断当前的线程数是否小于corePoolSize,如果是,使用入参任务通过addWord方法创建一个新的线程,
// 如果能完成新线程创建exexute方法结束,成功提交任务
// 2.
// 在第一步没有完成任务提交;状态为运行并且能够成功加入任务到工作队列后,再进行一次check,如果状态
// 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要
// reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;
// 3.
// 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池
// 已经达到饱和状态,所以reject这个任务
//
int c = ctl.get();
// 工作线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 直接启动新线程,true表示会再次检查workerCount是否小于corePoolSize
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果工作线程数大于等于核心线程数
// 线程的的状态为RUNNING并且队列notfull
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 移除成功,拒绝该非运行的任务
reject(command);
else if (workerCountOf(recheck) == 0)
// 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
// 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
addWorker(null, false);
}
// 如果队列满了或者是非运行的任务都拒绝执行
else if (!addWorker(command, false))
reject(command);
}

对应的程序流程图为:

image

为了理解更加得透彻,用下图配合文字总结一下:

image

  • 1.如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
  • 2.如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue
  • 3.如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
  • 4.如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。