java 线程数量
Java个体户 人气:0线程池配置
线程池配置,假设是:
1.最小数量是5
2.阻塞队列容量是10
3.最大数量是20
线程池里的业务线程数量小于最小数量(5)
第一个请求
第一个请求进来的时候,这个时候,线程池没有线程,就创建新的工作线程(即Worker线程)。
然后,这个工作线程去处理当前请求的业务线程。
第二个请求
第二个请求进来的时候,这个时候,线程池已经有了一个工作线程。
但是,要注意,这个时候是不会复用线程池里已有的工作线程的。而是创建新的工作线程。
因为,线程池里根本没有复用线程的概念。
说白了,无论线程池里已有的这个工作线程是否在处理业务线程,即不管它空闲与否,其实都会创建新的工作线程。
第三个请求
同上,仍然创建新的工作线程。
。。。
第五个请求
同上。仍然创建新的工作线程。
注意,现在,线程池有几个工作线程?5个。
即,每个请求进来,都创建一个新的工作线程。
小于阻塞队列容量(10)
第六个请求
第六个请求进来的时候,提交到阻塞队列。
然后,再慢慢消费。
具体来说,是由线程池里的工作线程来慢慢消费。
具体消费的源码,参考:复用线程小节。
第七个请求
同上,也是先添加到阻塞队列。
。。。
第15个请求
同上,也是先添加到阻塞队列。
小于最大数量(20)
第16个请求
先来看,正常情况下,阻塞队列还没塞满(生产环境的容量一般是1000),就会被快速处理掉。
然后,当新的请求进来的时候,继续丢到阻塞队列里面去。
这个是和上面讲的一样。
但是,我们为了方便理解,现在假设之前的15个请求是同时到达,即
- 前面5个请求
创建5个新的请求。
- 后面10个请求
全部丢到阻塞队列。
这个时候,阻塞队列已经满了。接着,第16个请求进来了,怎么办?
继续创建新的工作线程。
。。。
第35个请求
同上,继续创建新的工作线程。
注意,这个时候,线程池里的工作线程的数量是多少?20。
因为
- 前面5个请求,创建了5个新的工作线程。
- 最后面的15个请求(第16到第35),创建了15个新的工作线程。
所以,总共,创建了20个新的工作线程。线程池,总共有20个工作线程。
拒绝策略
第36个请求
假设前面的请求都没有处理完,这个时候,来了第36个请求,怎么办?
只能采取拒绝策略。
具体采用哪个拒绝策略?比如说,一般情况下,都是采用丢弃。
复用线程
前面说了,线程池里没有复用线程的概念。
那到底是怎么回事呢?既然不能复用线程,那搞个线程池有个几把用?
具体是这样子,虽然,线程池里的工作线程不能被复用,仅仅是指类似数据库连接池里的连接的那种复用,即
- 用的时候,从连接池取
- 用完了,归还到连接池
线程池里的对象复用,是基于循环,而不是用完之后再还回去。
什么意思呢?就是工作线程,不断的从阻塞队列里取业务线程,然后执行业务线程。
伪代码
工作线程{ run(){ while(){ 1.从阻塞队列,取业务线程 2.执行业务线程; } } }
所以,线程池和连接池的区别在于,线程池的对象是线程,可以不断的循环读业务线程。而连接池的对象,是用完了归还到连接池里去。
jdk源码-java.util.concurrent.ThreadPoolExecutor#runWorker
/** * 核心步骤 * 1.从阻塞队列,读业务线程 * 2.执行业务线程 * * --- * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //从阻塞队列里获取业务线程 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //执行业务线程 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
加载全部内容