亲宝软件园·资讯

展开

啃不透--线程池

2J 人气:0

引子   

  上一篇文章《火焰图--记一次cpu降温过程》提到接手了单车投放的应用,上周已经有大量流量切到新应用,然而心中还是惴惴不安的,投放动作业务逻辑复杂,使用了线程池异步处理。对于线程池认知只是停留在一个模糊的状态。这是一个什么状态呢:感觉什么都懂,就是说不出来(就像考试前啥都懂,考试时咬笔杆)。每次使用线程遇到问题都不能用已经有知识去思考,比如3个核心参数应该如何设置,阻塞队列应该用什么...通过搜索引擎查询后,又感觉自己懂了,实际上只是做了一次无用的循环,这次输入的内容,因为没有体系根基,很快又被新的内容冲刷得毫无痕迹。

  最近加入同事文博组织的虚拟PMO团队--Thor,大家在交流分享结构化思维时,方才意识到自己在学习线程池上花了很多时间,还是不能清楚的描述它,更本原因就是没有从宏观上认识,建立初始知识体系,没有这个基础,零散学习只是增加了很多无效时间。这也就是为啥有的人学习快,有些慢;有的人可以举一反三,有的确不能。

  一个良好的学习过程应该是:

  1,找到初始化知识体系。

      2,补充丰富知识体系。

      我们都知道系统学习可以建立初始化知识体系,比如阅读一本相关书籍,写一些文章。那么如何补充丰富呢,一句话:拿到锤子,看到什么都是钉子。那应该如何理解这句话呢:这篇文章就是这个的实践。

  

                                        文博分享的关于结构化思维的脑图

 

为什么要用线程池

  随着处理器的核心越来越多,利用多线程技术可以把计算逻辑拆分成多个片段,分配到多个核心上,可以显著减少处理时间,提高效率;复杂的业务逻辑,可以使用多线程并发处理,缩短响应时间,提高用户体验。java的线程机制是抢占式协作多线程, 调用机制会周期性的中断线程,将上下文切换到另一个进程,线程越多,竞争会越多,切换会更频繁。所以增加线程带来的性能增加不是线性的,这就是amdahl定律。

  再者,线程的创建与销毁,上下文切换都不是免费的。《并发编程实战》一书中对于线程创建与销毁开销说明:

Thread lifecycle overhead. Thread creation and teardown are not free. The actual overhead varies across platforms, but thread creation takes time, introducing latency into request processing, and requires some processing activity by the JVM and OS. If requests are frequent and lightweight, as in most server applications, creating a new thread for each request can consume significative computing resources.

  大意如下:“线程生命周期开销:创建和销毁都是有代价的。实际开销虽因平台有所不同,但是都要消耗时间,jvm和os 需要执行一些处理程序;在大数请求频繁的服务端应用中,如果为每个请求创建一个线程将消耗非常可观的计算机资源”。以上概念层的开销,那一个java线程的创建实际开销则是这样的:  

  • A large block of memory has to be allocated and initialized for the thread stack. 为线程栈分配内存
  • System calls need to be made to create / register the native thread with the host OS.  为os 创建和注册本地线程进行系统调用
  • Descriptors needs to be created, initialized and added to JVM internal data structures.  创建和初始化描述符,添加到jvm内部的数据结构。

  上下问切换(context switching)也是有开销的,需要分配内存存储当前状态,克隆系统调用等,具体可以参考文末参考资料[2]

 

    正是因为创建线程的代价是如此昂贵的(expensive),所以线程池出现了, 它以“池化”思想来管理资源,按需创建,分配,回收;并重复利用已有的线程资源。既然大家都用线程池,那么它的”真面目“是怎么样的呢-- 从源开开始。

 

源码分析

  java为多线程编程提供了良好的,考究并且一致的编程模型,让我们只需关注问题本身,而ThreadPoolExecutor类就是java为我们提供的线程池模型,其继承体系如下图,顶层接口定义了统一的行为,并将任务提交与任务执行的策略解藕开来;而AbstractExecutorService 抽象任务执行流程并串连起来;如此,子类只用关注某个具体方法了。

                  

 

   一般而言 ThreadPoolExecutor.execute(Runnable()) 是我们使用线程池的入口

 

public void execute(Runnable command) {
if (command == null) // 三种情况: int c = ctl.get(); //1,线程数 少于 核心线程 直接创建线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //线程数数超过 核心线程,但是blockqueue 未满,enqueue. if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // queue 已经满,直接创建线程(超过max reject) else if (!addWorker(command, false)) reject(command); }

  execute方法的三个分支,决定了线程池中线程的创建执行策略(面试中经常碰到的场景就是:添加了多个任务时,创建了多少个线程):

  1,线程数 少于 核心线程 直接创建线程
  2,线程数数超过 核心线程,但是blockqueue 未满,enqueue.
  3, queue 已经满,直接创建线程(超过max reject)
  
  下图展示了线程的创建过程

  

   上面的代码中的判断条件中有两个:workerCountOf(c) -- 获取当前线程数; isRunning(c)  -- 线程池是否是运行状态。这两个方法的参数都是一个int类型,那么一个int是如何能同时表示两个类型呢。一个int 4个字节,32位,这里就是用指定位数(3位)来表示状态,剩下的29位表示线程数,下图展示了这个关系。jdk中还有一些其他类也同步用了这样方法,比如:ReentrantReadWriteLock,高16位表示共享锁的数量,低16位表示互斥锁的数量。

 

  

  

// CAPACITY= 00011111111111111111111111111111(29个1)
// 获取当前线程数 // 线程池的最大数就是2^29-1 private static int workerCountOf(int c) { return c & CAPACITY; }

 

  线程池做为一个对象,有自己的状态机,其状态变化是有内部事件驱动的。下图展示了每个状态以及对应值(状态值是3位二进制),及对应的行为。这里有个插曲:以前面试被问到线程池shutwon和stop的差别。当时认识不清说得特别含糊,其实从这两个状态的英文单词的含义就可以看出7,8分了。 showdown 牛津翻译为:the act of closing a factory or business or stopping a large machine from working, either temporarily or permanently。体现的是进行时,closing,stopping;stop 意思比较多,但都是表示的一个意思:end /  not continue。大师的变量名命名那真是相当精确的,要不怎么都提倡程序员学好英语呢。

  

   看完了线程池的调度入口,了解了线程池的状态机,我们继续来看下方法 addWorker(Runnable firstTask, boolean core),前文说到线程池的把任务的提交和执行解藕,那就是如何串连的呢,addWorker方法就很好的完成的这个串连。这个方法主要分两个部分:

  1,根据线程池状态及线程数判断是返回还是继续。其中第一个 if 条件尤为复杂,已经有注释。

  2,创建工作进程对象 Worker w ,并执行其持有的线程对象thread 的start 方法。顺利让解藕的执行部分开始工作。

  这里的代码逻辑不复杂,有一个标记还是有意思的: retry:(标记,可以写成任意如:abc:) / continue retry ;(跳出当前循环) /break retry; (跳出外层循环)。 以后跳出双重循环是不是也可以这样写?

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
       
        // 如果 是shutdown 以上, 后在有三个条件 都满足才可以接续执行
        1, shutdown 执行原有任务,可能加新任务。
        2, firstTask 必须为空。
        3, queue 不能为空(有任务才能接续执行。)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

 

   接下来任务的执行就交给了工作线程 Worker w 了,这是一个内部类实现了接口 Runnable,构造函数中对的 属性thread初始化传是this,  如此 addWorker 方法中的 t.start(); 就顺利调用了Worker的run 方法了,而run方法又调用 runWorker。所以真正执行任务的最终方法在这里 -- runWorker。

Worker 
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 注意:这里,这个线程 传的runnable 是this, 也就是 worker本身, 所以start()后进入runnable状态,等到获取时间片后,就执行 run方法。
    this.thread = getThreadFactory().newThread(this);
}
}
/** Delegates main run loop to outer runWorker  */
public void run() {
runWorker(this);
}
 

  我们继续来读最关键的方法runWorker,我删除了一些判断以及异常处理的代码,让我们可以清晰看到处理逻辑:获取任务,执行,回收线程。获取任务有两种情况:1,线程数小于核心数和队列满了但线程未到最大线程数时直接传入了任务;2,从阻塞获取任务,getTask()方法完成了这一任务

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();try {
                    Throwable thrown = null;
                    try {
                        task.run();
                    }
            afterExecute(task, thrown);

} finally { task = null;
            // 统计完成任务数
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
//回收工作线程,尝试更新状态。 processWorkerExit(w, completedAbruptly); } }

  

   ThreadPoolExecutor 中定义了 HashSet<Worker>worker 工作线程队列,BlockingQueue<Runnable>workQueue 任务队列 来实现了工作线程和任务管理与解藕。到里线程的任务添加流程和执行过程就分析完了,当然中间抛弃了大量细节,比如锁的使用,比如状态机的变化等等。还是如前文所说,先建立初始化知识体系,后面再研究细节补充体系,每次的投入都是在强化它,再也不是无效时间了。简版调用时序图如下:

  

 

线程池监控

   文章开头提到流量增大,心中不安,很大一部分原因,就是因为无法监控到线上线程池的状态,比如阻塞队列中的任务数,活跃线程数,线程池大小等等。当然这也是原于早前的无知,平时我们写代码主要分成两部分:功能性代码,实现业务功能;运维性代码,监控程序状态,分析问题。大师的代码也不例外,只是优雅很多。ThreadPoolExecutor 中有提供了相关运维代码,并在runWorker 中使用模板方法设计模式,为我们获取线程池状态等信息提供接口了,比如:beforeExecute(wt, task);  afterExecute(task, thrown);   ThreadPoolExecutor中这两个方法都是空实现,我们可以继承,并重写完成状态的获取。获取线程池运维状态提代了如下方法下图。

  

  参考了一位网友的代码(忘记出处了),继承ThreadPoolExecutor ,重写afterExecute,打印线程池相关信息

@Slf4j
public class ThreadPoolMonitor  extends ThreadPoolExecutor {

    private String poolName;

    /**
     * 调用线程池的构造方法,并记录线程池名
     *
     * @param corePoolSize    线程池核心线程数
     * @param maximumPoolSize 线程池最大线程数
     * @param keepAliveTime   线程的最大空闲时间
     * @param unit            空闲时间的单位
     * @param workQueue       保存被提交任务的队列
     * @param poolName        线程池名称
     */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                new HamThreadFactory(poolName));
        this.poolName = poolName;
    }

    /**
     * 任务执行之后,将相关状态记录日志。
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、
        // 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、
        // 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
        log.info("{}-pool-monitor: " +
                        " PoolSize: {}, CorePoolSize: {}, Active: {}, " +
                        "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                        "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                this.poolName, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
                this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
    }

}
View Code

 

 结语

  最近学习一直停留在输入(看)层面,所看内容无法转化成自己的知识体系,因而很多东西都无法深入,我们当然知道原因,但是总是说忙得没时间整理。入职哈啰后看到很多优秀的人都是每天大量记录心得感想。等到文博和我们分享他的读书笔记时,从震撼到忏愧。知识只有经历了 输入-消化-输出 才会最终成为我们拥有的。为此文博还在Thor团队发起对赌打卡 -- 哈哈哈,我们当然要应战。

  文章到这里就结束了,因为个人经验还有很多不足,文章中的分析也比较粗浅,甚至有错误的地方,希望大家可以拍砖,狠狠的拍。

 

   成为一名优秀的程序员!

   参考资料:

  [1] https://intellipaat.com/community/36170/why-is-creating-a-thread-said-to-be-expensive

  [2] https://eli.thegreenplace.net/2018/measuring-context-switching-and-memory-overheads-for-linux-threads/

  [3] https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww

  [4] 《java并发编程》

 

加载全部内容

相关教程
猜你喜欢
用户评论