Java并发编程:线程池ThreadPoolExecutor
耶low 人气:0多线程的程序的确能发挥多核处理器的性能。虽然与进程相比,线程轻量化了很多,但是其创建和关闭同样需要花费时间。而且线程多了以后,也会抢占内存资源。如果不对线程加以管理的话,是一个非常大的隐患。而线程池的目的就是管理线程。当你需要一个线程时,你就可以拿一个空闲线程去执行任务,当任务执行完后,线程又会归还到线程池。这样就有效的避免了重复创建、关闭线程和线程数量过多带来的问题。
Java并发包提供的线程池
注:摘自《实战Java高并发程序设计》
如图是Java并发包下提供的线程池功能。其中ExecutorService接口提供一些操作线程池的方法。而Executors相当于一个线程池工厂类,它里面有几种现成的具备某种特定功能的线程池工厂方法。看到这些应该不陌生,举个我们平时最常使用的例子:
//创建一个大小为10的固定线程池 ExecutorService threadpool= Executors.newScheduledThreadPool(10);
下面简单介绍一下这些工厂方法:
newFixedThreadPool()方法:固定线程数量线程池。传入的数字就是线程的数量,如果有空闲线程就去执行任务,如果没有空闲线程就会把任务放到一个任务队列,等到有线程空闲时便去处理队列中的任务。
newSingleThreadExecutor()方法:只有一个线程的线程池。同样,超出的任务会被放到任务队列,等这个线程空闲时就会去按顺序处理。
newCachedThreadPool()方法:可以根据实际情况拓展的线程池。当没有空闲线程去执行新任务时,就会再创建新的线程去执行任务,执行完后新建的线程也会返回线程池进行复用。
newSingleThreadScheduledExecutor()方法:返回的是ScheduledExecutorService对象。ScheduledExecutorService是继承于ExecutorService的,有一些拓展方法,如指定执行时间。这个线程池大小为1,在指定时间执行任务。关于指定时间的几个方法:schedule()是在指定时间后执行一次任务。scheduleAtFixedRate()和方法scheduleWithFixedDelay()方法,两者都是周期性的执行任务,但是前者是以上一次任务开始为周期起点,后者是以上一次任务结束为周期起点。具体的参数大家可以在IDE里面查看。
newScheduledThreadPool()方法:和上面一个方法一样,但是可以指定线程池大小,其实上面那个方法也是调用这个方法的,只是传入的参数是1。
线程池核心类
上面简单的对Java并发包下线程池的结构和API进行简单的介绍,下面开始深入了解一下线程池。如果大家在IDE上追踪一下上面几个工厂方法就会发现,其中最后都会调用一个方法,通过上图其实也可以发现。那就是ThreadPoolExecutor的构造方法,工厂方法只是帮我们传入不同的参数,从而实现不同的效果,所以如果你想更自由的控制自己的线程池,推荐直接使用ThreadPoolExecutor创建线程池。下面给出这个构造函数的参数列表:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数从上到下,作用依次为:
1.指定线程池种线程的数量。
2.线程池种最大的线程数量,也就是最大能拓展到多少。
3.当线程数量超过corePoolSize,多余的空闲线程多久会被销毁。
4.keepAliveTime的单位。
5.任务队列,当空闲线程不够,也不能再新建线程时,新提交的任务就会被放到任务队列种。
6.线程工厂,用于创建线程,默认的即可。
7.拒绝策略。当任务太多,达到最大线程数量、任务队列也满了,该如何拒绝新提交的任务。
任务队列
任务队列是一个BlockingQueue接口,在ThreadPoolExecutor一共有如下几种实现类实现了BlockingQueue接口。
SynchronousQueue:直接提交队列。这种队列其实不会真正的去保存任务,每提交一个任务就直接让空闲线程执行,如果没有空闲线程就去新建,当达到最大线程数时,就会执行拒绝策略。所以使用这种任务队列时,一般会设置很大的maximumPoolSize,不然很容易就执行了拒绝策略。newCachedThreadPool线程池的corePoolSize为0,maximumPoolSize无限大,它用的就是直接提交队列。
ArrayBlockingQueue:有界任务队列,其构造函数必须带一个容量参数,表示任务队列的大小。当线程数量小于corePoolSize时,有任务进来优先创建线程。当线程数等于corePoolSize时,新任务就会进入任务队列,当任务队列满了,才会创建新线程,线程数达到maximumPoolSize时执行拒绝策略。
LinkedBlockingQueue:无界任务队列,通过它的名字也应该知道了,它是个链表,除非没有空间了,不然不会出现任务队列满了的情况,但是非常耗费系统资源。和有界任务队列一样,线程数若小于corePoolSize,新任务进来时没有空闲线程的话就会创建新线程,当达到corePoolSize时,就会进入任务队列。会发现没有maximumPoolSize什么事,newFixedThreadPool固定大小线程池就是用的这个任务队列,它的corePoolSize和maximumPoolSize相等。
PriorityBlockingQueue:优先任务队列,它是一个特殊的无界队列,因为它总能保证高优先级的任务先执行。
拒绝策略
JDK提供了四种拒绝策略。
AbortPolicy:直接抛出异常,阻止系统正常工作。
CallerRunsPolicy:如果线程池未关闭,则在调用者线程里面执行被丢弃的任务,这个策略不是真正的拒绝任务。比如我们在T1线程中提交的任务,那么该拒绝策略就会把多余的任务放到T1线程执行,会影响到提交者线程的性能。
DiscardOldestPolicy:该策略会丢弃一个最老的任务,也就是即将被执行的任务,然后再次尝试提交该任务。
DiscardPolicy:直接丢弃多余的任务,不做任何处理,如果允许丢弃任务,这个策略是最好的。
以上内置的拒绝策略都实现了RejectedExecutionHandler接口,所以上面的拒绝策略无法满足你的要求,可以自定义一个:继承RejectedExecutionHandler并实现rejectedExecution方法。
线程工厂
线程池中的线程是由ThreadFactory负责创建的,一般情况下默认就行,如果有一些其他的需求,比如自定义线程的名称、优先级等,我们也可以利用ThreadFactory接口来自定义自己的线程工厂:继承ThreadFactory并实现newThread方法。
线程池的拓展
在ThreadPoolExecutor中有三个扩展方法:分别会在任务执行前beforeExecute、执行完成afterExecute、线程池退出时执行terminated。
这几个方法在哪调用的?在ThreadPoolExecutor中有一个内部类:Worker,每个线程的任务其实都是由这个类里面的run方法执行的,贴一下这个类的源码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; //....省略 /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } //....省略 }
接着进入这个runWorker方法:
final void runWorker(Worker w) { //...省略 try { //任务执行前 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任务执行完 afterExecute(task, thrown); } } //....省略 }
还有一个线程池退出时执行的方法是在何处执行的?这个方法被调用的地方就不止一处了,像线程池的shutdown方法就会调用
public void shutdown() { //....省略。这个方法里面就会调用terminated tryTerminate(); }
ThreadPoolExecutor中这三个方法默认是没有任何内容的,所以我们要自定义它也很简单,直接重写它们就行了:
ExecutorService threadpool= new ThreadPoolExecutor(5,5,0L,TimeUnit.SECONDS,new LinkedBlockingDeque<>()){ @Override protected void beforeExecute(Thread t, Runnable r) { //执行任务前 } @Override protected void afterExecute(Runnable r, Throwable t) { //执行任务后 } @Override protected void terminated() { //线程退出 } };
加载全部内容