Java线程池必知必会
ForeverKobe 人气:01、线程数使用开发规约
阿里巴巴开发手册中关于线程和线程池的使用有如下三条强制规约
【强制】创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
正例:自定义线程工厂,并且根据外部特征进行分组,比如,来自同一机房的调用,把机房编号赋值给whatFeatureOfGroup
public class UserThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger nextId = new AtomicInteger(1); /** * 定义线程组名称,在利用 jstack 来排查问题时,非常有帮助 */ UserThreadFactory(String whatFeatureOfGroup) { namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + "-Worker-"; } @Override public Thread newThread(Runnable task) { String name = namePrefix + nextId.getAndIncrement(); Thread thread = new Thread(null, task, name, 0); System.out.println(thread.getName()); return thread; } }
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。
如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这
样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
2、 ThreadPoolExecutor源码
1. 构造函数
UML图:
ThreadPoolExecutor的构造函数共有四个,但最终调用的都是同一个:
2.核心参数
corePoolSize => 线程池核心线程数量
maximumPoolSize => 线程池最大数量
keepAliveTime => 线程池的工作线程空闲后,保持存活的时间。如果任务多而且任务的执行时间比较短,可以调大keepAliveTime,提高线程的利用率。
unit => 时间单位
workQueue => 线程池所使用的缓冲队列,队列类型有:
ArrayBlockingQueue,基于数组结构的有界阻塞队列,按FIFO(先进先出)原则对任务进行排序。使用该队列,线程池中能创建的最大线程数为maximumPoolSize
LinkedBlockingQueue,基于链表结构的无界阻塞队列,按FIFO(先进先出)原则对任务进行排序,吞吐量高于ArrayBlockingQueue。使用该队列,线程池中能创建的最大线程数为corePoolSize。静态工厂方法 Executor.newFixedThreadPool()使用了这个队列。
SynchronousQueue,一个不存储元素的阻塞队列。添加任务的操作必须等到另一个线程的移除操作,否则添加操作一直处于阻塞状态。静态工厂方法 Executor.newCachedThreadPool()使用了这个队列。
PriorityBlokingQueue:一个支持优先级的无界阻塞队列。使用该队列,线程池中能创建的最大线程数为corePoolSize。
threadFactory => 线程池创建线程使用的工厂
handler => 线程池对拒绝任务的处理策略,主要有4种类型的拒绝策略:
AbortPolicy:无法处理新任务时,直接抛出异常,这是默认策略。
CallerRunsPolicy:用调用者所在的线程来执行任务。
DiscardOldestPolicy:丢弃阻塞队列中最靠前的一个任务,并执行当前任务。
DiscardPolicy:直接丢弃任务。
3.execute()方法
如果当前运行的线程少于corePoolSize,则创建新的工作线程来执行任务(执行这一步骤需要获取全局锁)。
如果当前运行的线程大于或等于corePoolSize,而且BlockingQueue未满,则将任务加入到BlockingQueue中。
如果BlockingQueue已满,而且当前运行的线程小于maximumPoolSize,则创建新的工作线程来执行任务(执行这一步骤需要获取全局锁)。
如果当前运行的线程大于或等于maximumPoolSize,任务将被拒绝,并调用RejectExecutionHandler.rejectExecution()方法。即调用饱和策略对任务进行处理。
3、线程池的工作流程
执行逻辑说明:
判断核心线程数是否已满,核心线程数大小和corePoolSize参数有关,未满则创建线程执行任务
若核心线程池已满,判断队列是否满,队列是否满和workQueue参数有关,若未满则加入队列中
若队列已满,判断线程池是否已满,线程池是否已满和maximumPoolSize参数有关,若未满创建线程执行任务
若线程池已满,则采用拒绝策略处理无法执执行的任务,拒绝策略和handler参数有关
4、Executors创建返回ThreadPoolExecutor对象(不推荐)
Executors创建返回ThreadPoolExecutor对象的方法共有三种:
1. Executors#newCachedThreadPool => 创建可缓存的线程池
corePoolSize => 0,核心线程池的数量为0
maximumPoolSize => Integer.MAX_VALUE,可以认为最大线程数是无限的
keepAliveTime => 60L
unit => 秒
workQueue => SynchronousQueue
弊端:maximumPoolSize => Integer.MAX_VALUE可能会导致OOM
2. Executors#newSingleThreadExecutor => 创建单线程的线程池
SingleThreadExecutor是单线程线程池,只有一个核心线程:
corePoolSize => 1,核心线程池的数量为1
maximumPoolSize => 1,只可以创建一个非核心线程
keepAliveTime => 0L
unit => 毫秒
workQueue => LinkedBlockingQueue
弊端:LinkedBlockingQueue是长度为Integer.MAX_VALUE的队列,可以认为是无界队列,因此往队列中可以插入无限多的任务,在资源有限的时候容易引起OOM异常
3. Executors#newFixedThreadPool => 创建固定长度的线程池
corePoolSize => 1,核心线程池的数量为1
maximumPoolSize => 1,只可以创建一个非核心线程
keepAliveTime => 0L
unit => 毫秒
workQueue => LinkedBlockingQueue
它和SingleThreadExecutor类似,唯一的区别就是核心线程数不同,并且由于使用的是LinkedBlockingQueue,在资源有限的时候容易引起OOM异常
5、线程池的合理配置
从以下几个角度分析任务的特性:
任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
任务的优先级:高、中、低。
任务的执行时间:长、中、短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
任务性质不同的任务可以用不同规模的线程池分开处理。可以通过 Runtime.getRuntime().availableProcessors()
方法获得当前设备的 CPU 个数。
CPU 密集型任务:配置尽可能小的线程,如配置 cpu核心数+1 个线程的线程池。
IO 密集型任务 :由于线程并不是一直在执行任务,则配置尽可能多的线程,如2 ∗ Ncpu。
混合型任务:如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务。只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率;如果这两个任务执行时间相差太大,则没必要进行分解。
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理,它可以让优先级高的任务先得到执行。但是,如果一直有高优先级的任务加入到阻塞队列中,那么低优先级的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,线程数应该设置得较大,这样才能更好的利用 CPU。
建议使用有界队列,有界队列能增加系统的稳定性和预警能力。可以根据需要设大一点,比如几千。使用无界队列,线程池的队列就会越来越大,有可能会撑满内存,导致整个系统不可用。
处理拒绝策略有以下几种比较推荐:
在程序中捕获RejectedExecutionException异常,在捕获异常中对任务进行处理。针对默认拒绝策略使用CallerRunsPolicy拒绝策略,该策略会将任务交给调用execute的线程执行【一般为主线程】,此时主线程将在一段时间内不能提交任何任务,从而使工作线程处理正在执行的任务。此时提交的线程将被保存在TCP队列中,TCP队列满将会影响客户端,这是一种平缓的性能降低自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可如果任务不是特别重要,使用DiscardPolicy和DiscardOldestPolicy拒绝策略将任务丢弃也是可以的如果使用Executors的静态方法创建ThreadPoolExecutor对象,可以通过使用Semaphore对任务的执行进行限流也可以避免出现OOM异常。
6、拒绝策略
有以下几种比较推荐:
在程序中捕获RejectedExecutionException异常,在捕获异常中对任务进行处理。针对默认拒绝策略
使用CallerRunsPolicy拒绝策略,该策略会将任务交给调用execute的线程执行【一般为主线程】,此时主线程将在一段时间内不能提交任何任务,从而使工作线程处理正在执行的任务。此时提交的线程将被保存在TCP队列中,TCP队列满将会影响客户端,这是一种平缓的性能降低
自定义拒绝策略,只需要实现RejectedExecutionHandler接口即可
如果任务不是特别重要,使用DiscardPolicy和DiscardOldestPolicy拒绝策略将任务丢弃也是可以的如果使用Executors的静态方法创建ThreadPoolExecutor对象,可以通过使用Semaphore对任务的执行进行限流也可以避免出现OOM异常。
参考文章:8大拒绝策略
7、线程池的五种运行状态
线程状态:
不同于线程状态,线程池也有如下几种 状态:
• RUNNING :该状态的线程池既能接受新提交的任务,又能处理阻塞队列中任务。
• SHUTDOWN:该状态的线程池不能接收新提交的任务,但是能处理阻塞队列中的任务。(政府服务大厅不在允许群众拿号了,处理完手头的和排队的政务就下班)
处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。
注意:finalize() 方法在执行过程中也会隐式调用shutdown()方法。
• STOP:该状态的线程池不接受新提交的任务,也不处理在阻塞队列中的任务,还会中断正在执行的任务。(政府服务大厅不再进行服务了,拿号、排队、以及手头工作都停止了。)
在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用shutdownNow() 方法会使线程池进入到该状态;
• TIDYING:如果所有的任务都已终止,workerCount (有效线程数)=0。
线程池进入该状态后会调用 terminated() 钩子方法进入TERMINATED 状态。
• TERMINATED:在terminated()钩子方法执行完后进入该状态,默认terminated()钩子方法中什么也没有做。
【参考文章】
【1】《JAVA并发编程艺术》
【2】tech.meituan.com/2020/04/02/…
总结
加载全部内容