Java ThreadPoolExecutor
niuyongzhi 人气:01.线程池Executors的简单使用
1)创建一个线程的线程池。 Executors.newSingleThreadExecutor(); //创建的源码 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 2)创建固定大小的线程池,参数为int,是线程池核心线程和最大线程的数量 Executors.newFixedThreadPool(2); //创建的源码 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 3)创建一个线程数不设限的线程池, //创建的源码,核心线程是0,最大线程是Integer.MAX_VALUE Executors.newCachedThreadPool(); public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
使用方法,使用同步代码块,保证线程池实例是唯一的。
使用方法: private static ExecutorService sSingleThreadExecutor = null; // lazy, guarded by class public static ExecutorService singleThreadExecutor() { //当前的类对象为锁 synchronized (ThreadPool.class) { if (sSingleThreadExecutor == null) { sSingleThreadExecutor = Executors.newSingleThreadExecutor(); } return sSingleThreadExecutor; } }
通过以上三种方式,可以创建一个简单的线程池。
但是有弊端:
newSingleThreadExecutor和newFixedThreadPool,运行的请求队列是长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而造成oom。
而newCachedThreadPool允许的线程数量为最大值Integer.MAX_VALUE,也会造成oom。
2.通过ThreadPoolExecutor创建线程池
下面是OkHttp中Dispatcher.java线程池:
ExecutorService executorService; public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
OkHttp中ConnectionPool.java
private static final Executor executor = new ThreadPoolExecutor(0 , Integer.MAX_VALUE , 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
使用方式:
//call 实现 Runnable 接口。调用execute方法即可将入线程池,执行run方法中的代码。 executorService().execute(call);
3.ThreadPoolExecutor各个参数的含义
corePoolSize:核心线程数,即使是空闲线程也不会销毁。这样做的目的是为了降低执行任务时创建线程的时间和性能开销。
maximumPoolSize:最大线程数。当核心线程被用完时,会创建新的线程来执行任务,但是创建的数量不能超过这个最大值。
keepAliveTime:线程的存活时间。除核心线程外,其他线程一旦执行完任务,就会处于空闲状态,超过这个时间就会被销毁。
unit:keepAliveTime设置的时间单位。
workQueue:任务的阻塞队列。线程数量有限,当任务过多来不及执行时,就会加入到这个阻塞队列中,等到有空闲进程,
就会从这个队列取出任务去执行。队列都是先进先出的FIFO。
threadFactory:新线程产生的方式。
handler:拒绝策略,超过任务队列设置的最大值时。再有新的任务进来,就会执行这个拒绝策略。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { }
线程池的阻塞队列:
ArrayBlockingQueue:
是基于数组的任务队列。里面用一个数组来存放任务。当我们new的时候,需要指定数组大小。
还有两个int变量putIndex和takeIndex用来表示队列的头部和尾部在数组中的位置。
LinkedBlockingQueue:
是基于链表的,内部用一个单向链表来存放任务。创建时可以指定大小,如果不指定则是Integer.MAX_VALUE
PriorityBlockingQueue:
基于优先级的阻塞队列。
SynchronousQueue:
一种无缓冲的等待队列。有新任务进来直接交给线程执行。
OkHttp中使用的就是这种队列,他的最大线程数为Integer.MAX_VALUE。保证有任务进来就能马上执行。
RejectedExecutionHandler拒绝策略,这是一个接口。不同的实现执行不同的策略。
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); } AbortPolicy:拒绝行为直接抛出异常 RejectedExecutionException public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } DiscardPolicy:保持静默,什么也不做。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } DiscardOldestPolicy:丢弃任务队里中最老的任务,尝试将新任务加入队列 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } CallerRunsPolicy:直接由提交任务这执行这个任务。 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } 如果在创建线程池的时候,不知道具体的拒绝策略。那么ThreadPoolExecutor默认的策略是AbortPolicy。 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
线程池可以执行两种类型的任务:Runable和Callable
class MyRunable implements Runnable{ @Override public void run() { } } class MyCallable implements Callable{ @Override public Object call() throws Exception { return null; } } Runnable 没有返回值,返回的是void,不允许抛出异常。 Callable 有返回值,返回的是Object,允许抛出异常。
4.线程池的源码分析
线程池的状态:
//运行状态,可以接受新任务,并且处理排队任务。 private static final int RUNNING = -1 << COUNT_BITS; //关闭状态,不再接受新任务,不过仍然会处理排队任务。 private static final int SHUTDOWN = 0 << COUNT_BITS; //停止状态,不再接受新任务,也不处理排队任务,同时中断处理中的任务 private static final int STOP = 1 << COUNT_BITS; //整理状态,当前所有任务终止,workerCount计数为0,线程切换为TIDYING状态,并且执行terminal()方法 private static final int TIDYING = 2 << COUNT_BITS; //终止状态,说明terminal()方法执行完成。 private static final int TERMINATED = 3 << COUNT_BITS;
ctlof是得到新的ctl值。通过ctl可以计算线程池的状态和数量
runStateOf 计算当前线程池的状态。
workerCountOf计算线程池的数量。
// ctlOf计算ctl的新值,也就是线程池状态和线程池中线程数量。 private static int ctlOf(int rs, int wc) { return rs | wc; } //获取ctl的高三位,也就是线程池的状态。 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取ctl的低29位,也就是线程池中的线程数。 private static int workerCountOf(int c) { return c & CAPACITY; } 其中runStateOf(int c)和workerCountOf(int c)的参数c就是通过ctlOf(int rs, int wc)获得的ctl值。
向线程池中添加一个任务:executorService().execute(call);
然后看看源码中是如何执行的,是如何添加任务的。
ctl 用来表示线程池的状态和线程数量, 在ThreadPoolExcutor中使用32位二进制数来表示线程池的状态和线程中线程数量。 其中前3位表示线程池的状态,后29位表示线程池中的线程数。 public void execute(Runnable command) { int c = ctl.get(); //如果工作线程数量小于核心线程数, //提交的任务会通过addWorker(command, true)创建一个新的核心线程来执行, 这个参数传的是true,表示去新增核心线程。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)){ //添加成功则return return; } //添加核心线程失败则重新获取线程池的状态和数量 c = ctl.get(); } //进入到下面说明当前工作线程大于或等于核心线程。 //如果线程池处于运行状态,则加入队列 if (isRunning(c) && workQueue.offer(command)) { //如果入队成功,则重新获取线程池的状态 int recheck = ctl.get(); //如果线程池不处于运行状态,则从队列中remove if (!isRunning(recheck) && remove(command)){ //成功删除,则执行拒绝策略 reject(command); }else if (workerCountOf(recheck) == 0){ //进入这个分支有两种情况1.线程池处于运行状态 2.线程从不处于运行状态,但是remove失败 则会判断workerCountOf如果工作线程为0,则会创建非核心线程去执行任务。 addWorker为null,和false。false表示非核心线程。null说明创建的线程去执行队列里的任务。 addWorker(null, false); } //进入到这个分支有两种情况1.线程池处于非运行状态2.运行状态但是入队失败了。 这时候创建非核心线程去执行任务 }else if (!addWorker(command, false)){ 如果创建非核心线程失败了,则执行拒绝策略。 reject(command); } }
通过以上源码分析,线程池的运行原理可以总结为一下几点:
1.通过execute方法提交任务时,运行线程小于corePoolSize时,则会创建新的核心线程来执行这个任务。
2.通过excute方法提交任务时,运行线程大于等于corePoolSize时,则会加入到队列中,等待线程调度执行。
3.通过excuete方法提交任务时,运行线程大于等于corePoolSize时,并且加入队列失败(队列满了),新提交的任务将会通过创建新的线程执行。
4.通过excute方法提交任务时,运行线程大于maximumPoolSize时,队列也满了,则会执行拒绝策略。
5.当线程池中的线程执行完任务处于空闲状态时,则会尝试从任务队列中取头结点任务执行。
接下来看addWorker如何添加任务。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果线程池处于非运行状态,则不会创建线程。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())){ return false; } //如果线程池处于运行状态,则直接走下面的创建添加逻辑。 for (;;) { //获取工作线程数量 int wc = workerCountOf(c); //wc >= CAPACITY 工作线程大于最大容量 // wc >= (core ? corePoolSize : maximumPoolSize) 如果工作线程大于了核心线程或最大线程, //只要这两个条件有一个成立则return。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){ return false; } //创建线程数量+1,这里用到了CAS。关于CAS后面再写文章分析。 if (compareAndIncrementWorkerCount(c)){ break retry; } //如果CAS操作失败,线程数量没有加1,则重新获取线程的状态。 c = ctl.get(); // Re-read ctl //判断当前状态和之前状态,如果不同,说明线程池状态发生了变化。重新跳到retry的外层循环。 //如果相同,则说明线程池没有变化,继续进行内层循环。 if (runStateOf(c) != rs){ continue retry; } // else CAS failed due to workerCount change; retry inner loop } } //执行到这说明线程数量已经完成+1,接下来进行线程的创建。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //这个创建一个worker对象。在worker构造方法中,会利用ThreadPoolExecutor中传递过了的ThreadFactory创建一个Thread //默认是通过Executors.defaultThreadFactory(),创建一个线程。 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //拿到一个重入锁对象。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //拿到线程池的状态 int rs = runStateOf(ctl.get()); //如果线程池处于运行状态或者处于关闭状态并且firstTask == null if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) { throw new IllegalThreadStateException(); } //添加到work集合 workers.add(w); int s = workers.size(); if (s > largestPoolSize){ //更新一下最大线程数 largestPoolSize = s; } //标志位,添加成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //添加成功则启动线程 t.start(); //启动成功 workerStarted = true; } } } finally { //如果没有启动成功则从线程池中移除。 if (! workerStarted){ addWorkerFailed(w); } } return workerStarted; }
关键代码看看 w = new Worker(firstTask);
做了啥
Worker(Runnable firstTask) { setState(-1); //将传进来的任务赋值给成员变量 this.firstTask = firstTask; //创建一个线程,并把Worker本身当做Runnable传进了Thread中去。 this.thread = getThreadFactory().newThread(this); } public interface ThreadFactory { Thread newThread(Runnable r); }
注意newThread(this)。Worker把自己当做Runnable传到了线程中去。当调用t.start()方法时会调用Worker的run方法。
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //如果task不为null,则先执行当前任务 //如果task传进来是null则从队列中取任务,执行队列里的任务。 //getTask()就是从任务队列中提取在等待的队伍。 while (task != null || (task = getTask()) != null) { w.lock(); //(runStateAtLeast(ctl.get(), STOP) 线程池处于STOP,TIDYING,TERMINATED状态 处于这些状态的线程池是无法执行任务的。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()){ //中断线程 wt.interrupt(); } //执行到下面说明线程池处于RUNNING或SHUTDOWN状态 //由此也可以看出SHUTDOWN状态的线程池,是可以执行队列里的任务的,但是队列不在接收新的任务添加 try { beforeExecute(wt, task); Throwable thrown = null; try { //执行任务的 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
getTask()从任务队列中,提取任务。
private Runnable getTask() { boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; try { //从任务队列中取出任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }
通过以上源码分析,可以总结一下几点。
addWorker(Runnable firstTask, boolean core)
1.如果firstTask为null,则会创建线程去执行队列里的任务。
2.如果不为null,则会去执行当前任务,然后再执行队列里的任务。
3.core 如果为true,则会创建核心线程,如果为false,则会创建非核心线程。
4.addWorker 会创建线程,启动线程,执行任务。
在创建线程之前会判断线程池的状态、以及核心线程或最大线程数。
如果创建成功启动线程的start方法,然后调用worker的runWorker()方法。
加载全部内容