线程池原理详解
JAVA帮助开发者封装了一些现成的线程池调用,但是每种线程池都有自己的使用场景,如果不了解里面的原理,那么很容易掉进坑里,线程池原理也是面试的重灾区,因此本问将完整分析线程池的原理。
一、new thread弊端
从学习java多线程开始,我们就学习了用new thread
来创建线程。但是他有一定的弊端:
- 每次
new Thread
新建对象,性能差 - 线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能占用过多系统资源导致死机或OOM
- 缺少更多功能,如更多执行、定期执行、线程中断
二、线程池好处
- 重用存在的线程,减少对象创建、消亡的开销,性能佳
- 可有效控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
- 提供定时执行、定期执行、单线程、并发数控制等功能
三、线程池相关参数
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize
:核心线程数量
默认情况下,在创建了线程池后,线程池中的线程数为0,
(除非调用prestartAllCoreThreads()
和prestartCoreThread()
方法,从方法名字可以看出,是预创建线程的意思,即在没有任务到来之前,就创建corePoolSize
个线程或1个线程)当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize
后,就会把到达的任务放到缓存队列当中;当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
maximumPoolSize
:线程最大线程数
线程池中的最大线程数,表示线程池中最多能创建多少个线程。
超过就执行
reject
策略:如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务
workQueue
:阻塞队列,存储等待执行的任务,很重要,会对线程池运行过程产生重大影响,一般有以下几种选择:
ArrayBlockingQueue
:是一个基于数组结构的有界阻塞队列,此队列按FIFO
(先进先出)原则对元素进行排序;
LinkedBlockingQueue
:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue
。静态工厂方法Executors.newFixedThreadPool()
使用了这个队列;
SynchronousQueue
:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
,静态工厂方法Executors.newCachedThreadPool
使用了这个队列;
PriorityBlockingQueue
:一个具有优先级的无限阻塞队列;底层用DelayedWorkQueue
实现。
keepAliveTime
:线程没有任务执行时最多保持多久时间终止
当线程池中的线程数大于
corePoolSize
时,如果一个线程空闲的时间达到keepAliveTime
,则会终止,直到线程池中的线程数不超过corePoolSize
。(但是如果调用了allowCoreThreadTimeOut(boolean value)
方法,在线程池中的线程数不大于corePoolSize
时,keepAliveTime
参数也会起作用,直到线程池中的线程数为0;)
unit
:keepAliveTime
的时间单位threadFactory
:线程工厂,用来创建线程
threadFactory
用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
handler
:饱和策略
当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是
AbortPolicy
,表示无法处理新任务时抛出异常。
这些参数全部传给ThreadPoolExecutor
之后,ThreadPoolExecutor
就可以为我们提供一个线程池,我们可以对这个线程池提交以及终止线程任务。
四、饱和策略
当线程池中已经到了完全没有办法再接收新的线程进来的时候,就会启动饱和策略。
1 | java.util.concurrent.ThreadPoolExecutor.AbortPolicy |
AbortPolicy
:丢弃任务并抛出RejectedExecutionException
异常(默认)CallerRunsPolicy
:只用调用所在的线程运行任务DiscardOldestPolicy
:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)DiscardPolicy
:不处理,丢弃掉,不抛出异常。
五、线程池的源码解读
程序中要声明线程池,是这样写的:
1 | ExecutorService exec = Executors.newCachedThreadPool(); |
先来看看ExecutorService
其中的奥秘。
5.1 ExecutorService和Executor的关系
Executor
是一个顶层接口,在它里面只声明了一个方法execute(Runnable)
,返回值为void
,参数为Runnable
类型,从字面意思可以理解,就是用来执行传进去的任务的;
1 | public interface Executor { |
ExecutorService
接口继承了Executor
接口,并声明了一些方法:submit
、invokeAll
、invokeAny
以及shutDown
等;
1 | public interface ExecutorService extends Executor { |
可以看出,ExecutorService
具备管理执行器和任务生命周期的方法,提交任务机制更加完善。Executor
只是运行新任务的简单接口,目的是将任务提交和任务执行解耦。
5.2 ThreadPoolExecutor重要方法
我们知道,在执行Executors.newCachedThreadPool()
的时候,内部是调用ThreadPoolExecutor
的构造函数来生成Exceutors
对象,即生成了线程池,因为继承关系是:ThreadPoolExecutor extends AbstractExecutorService implements ExecutorService extends Executor
。构建好之后,就可以构建工作线程去执行任务。其中,流程是这样的:
所以,用于execute()
或者submit()
的线程任务都是被封装成worker
去执行的。下面来看看execute()
和submit()
等核心方法。
在ThreadPoolExecutor
类中有几个非常重要的方法:
execute()
execute()
方法实际上是Executor
中声明的方法,在ThreadPoolExecutor
进行了具体的实现,这个方法是ThreadPoolExecutor
的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()
submit()
方法是在ExecutorService
中声明的方法,这个方法也是用来向线程池提交任务的,但是它和execute()
方法不同,它能够返回任务执行的结果,去看submit()
方法的实现,会发现它实际上还是调用的execute()
方法,只不过它利用了Future
来获取任务执行结果。
shutdown()
将线程池状态置为SHUTDOWN
,并不会立即停止:
停止接收外部
submit
的任务内部正在跑的任务和队列里等待的任务,会执行完等到第二步完成后,才真正停止
shutdownNow()
将线程池状态置为STOP
。企图立即停止,事实上不一定:
跟
shutdown()
一样,先停止接收外部提交的任务忽略队列里等待的任务尝试将正在跑的任务interrupt
中断返回未执行的任务列表它试图终止线程的方法是通过调用
Thread.interrupt()
方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有sleep
、wait
、Condition
、定时锁等应用,interrupt()
方法是无法中断当前的线程的。所以,ShutdownNow()
并不代表线程池就一定立即就能退出,它也可能必须要等待所有正在执行的任务都执行完成了才能退出。但是大多数时候是能立即退出的
awaitTermination(long timeOut, TimeUnit unit)
接收
timeout
和TimeUnit
两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService
是否已经关闭,若关闭则返回true
,否则返回false
。一般情况下会和shutdown
方法组合使用。
5.3 Executors生成线程池
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors
类里面提供了一些静态工厂,生成一些常用的线程池。这个就涉及上面我们反复提及的核心类:ThreadPoolExecutor
。
⭐其实都是通过调用ThreadPoolExecutor
来完成的,最后可以返回ExecutorService
对象,其实说白了都是Excutor
对象。
下面来分别看看比较常用的线程池。
newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
1 | //创建一个核心线程个数和最大线程个数都为1的线程池 |
demo:
1 | 4j |
运行结果:
1 | task:10,index:0 |
运行结果分析:单线程+有序。
newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
1 | //创建一个核心线程个数和最大线程个数都为nThreads的线程池 |
demo:
1 | 4j |
运行结果:
1 | task:11,index:1 |
结果分析:只创建了三个线程来执行。
newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
1 | //创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE |
demo:
1 | 4j |
运行结果:
1 | task:10,index:0 |
结果分析:按需创建线程,几乎一次循环就创建了一个新的线程来执行。
newScheduledThreadPool
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
1 | //创建一个最小线程个数corePoolSize,最大为Integer.MAX_VALUE |
demo:
1 | //多长时间之后执行一次 |
newSingleThreadScheduledExecutor
创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。
1 | //创建一个最小线程个数corePoolSize为1,最大为Integer.MAX_VALUE |
同上。demo不再赘述。
5.4 线程池实现原理–线程池状态
static final int RUNNING = 0;
当创建线程池后,初始时,线程池处于RUNNING状态;
static final int SHUTDOWN = 1;
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
static final int STOP = 2;
如果调用了
shutdownNow()
方法,则线程池处于STOP
状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
static final int TERMINATED = 3;
当线程池处于
SHUTDOWN
或STOP
状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED
状态。
6.5 线程池实现原理–任务的执行
corePoolSize
与maximumPoolSize
的关系举个简单的例子形象理解就是:
假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
然后就将任务也分配给这4个临时工人做;
如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
这个例子中的corePoolSize
就是10,而maximumPoolSize
就是14(10+4)。
maximumPoolSize
可以看作是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
在ThreadPoolExecutor
类中,最核心的任务提交方法是execute()
方法,虽然通过submit
也可以提交任务,但是实际上submit
方法里面最终调用的还是execute()
方法,所以我们只需要研究execute()
方法的实现原理即可:
注:execute()
方法和submit()
方法已经在前面讲过区别了。
1 | public void execute(Runnable command) { |
对应的程序流程图为:
为了理解更加得透彻,用下图配合文字总结一下:
- 1.如果当前运行的线程少于
corePoolSize
,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。 - 2.如果运行的线程等于或多于
corePoolSize
,则将任务加入BlockingQueue
。 - 3.如果无法将任务加入
BlockingQueue
(队列已满),则在非corePool
中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。 - 4.如果创建新线程将使当前运行的线程超出
maximumPoolSize
,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()
方法。
ThreadPoolExecutor
采取上述步骤的总体设计思路,是为了在执行execute()
方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor
完成预热之后(当前运行的线程数大于等于corePoolSize
),几乎所有的execute()
方法调用都是执行步骤2,而步骤2不需要获取全局锁。