C++ ThreadPool
喂喂喂–学编程 人气:0什么是线程
线程是进程中的⼀个执⾏单元,负责当前进程中程序的执⾏,⼀个进程中⾄少有⼀个线程。⼀个进程中是可以有多个线程的,这个应⽤程序也可以称之为多线程程序。多线程程序作为一种多任务、并发的工作方式
并发与并⾏
早期计算机的 CPU 都是单核的,一个 CPU 在同一时间只能执行一个进程/线程,当系统中有多个进程/线程等待执行时,CPU 只能执行完一个再执行下一个。为了提高 CPU 利用率,减少等待时间,人们提出了一种 CPU 并发工作的理论.
并发:指两个或多个事件在同⼀个时间段内发⽣,当系统中有多个进程/线程等待执行时,CPU只能执行完一个再执行下一个。
并⾏:指两个或多个事件在同⼀时刻发⽣(同时发⽣),多核 CPU 的每个核心都可以独立地执行一个任务,而且多个核心之间不会相互干扰。在不同核心上执行的多个任务,是真正地同时运行,这种状态就叫做并行。。
什么是线程池
顾名思义:线程池就是线程的池子,有很多线程,但是数量不会超过池子的限制。需要用到多执行流进行任务出路的时候,就从池子中取出一个线程去处理,线程池就类似于一个实现了消费者业务的生产者与消费者模型。
本质上:这就是一个基于生产者消费者模型来实现的线程池,那么同样遵守三种规则,生产者和生产者之间存在互斥,处理任务的线程之间存在互斥关系,生产者和消费者之间存在同步和互斥关系
线程池解决什么问题
线程池维护者多个线程,等待着分配可并发执行的任务,可以避免在短时间创建和销毁大量线程带来时间成本。
总结为三点:
1.避免线程因为不限制创建数量导致的资源耗尽风险
2.任务队列缓冲任务,支持忙线不均的作用
3.节省了大量频繁创建/销毁线程的时间成本
怎么用线程池
下面展示一些 threadpool
实现,源码来自openharmony。
/* * Copyright (c) 2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef NETSTACK_THREAD_POOL #define NETSTACK_THREAD_POOL #include <atomic> #include <condition_variable> #include <queue> #include <thread> #include <vector> namespace OHOS::NetStack { template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool { public: /** * disallow default constructor */ ThreadPool() = delete; /** * disallow copy and move */ ThreadPool(const ThreadPool &) = delete; /** * disallow copy and move */ ThreadPool &operator=(const ThreadPool &) = delete; /** * disallow copy and move */ ThreadPool(ThreadPool &&) = delete; /** * disallow copy and move */ ThreadPool &operator=(ThreadPool &&) = delete; /** * make DEFAULT_THREAD_NUM threads * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated */ explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true) { for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) { std::thread([this] { RunTask(); }).detach(); } } /** * if ~ThreadPool, terminate all thread */ ~ThreadPool() { // set needRun_ = false, and notify all the thread to wake and terminate needRun_ = false; while (runningNum_ > 0) { needRunCondition_.notify_all(); } } /** * push it to taskQueue_ and notify a thread to run it * @param task new task to Execute */ void Push(const Task &task) { PushTask(task); if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) { std::thread([this] { RunTask(); }).detach(); } needRunCondition_.notify_all(); } private: bool IsQueueEmpty() { std::lock_guard<std::mutex> guard(mutex_); return taskQueue_.empty(); } bool GetTask(Task &task) { std::lock_guard<std::mutex> guard(mutex_); // if taskQueue_ is empty, means timeout if (taskQueue_.empty()) { return false; } // if run to this line, means that taskQueue_ is not empty task = taskQueue_.top(); taskQueue_.pop(); return true; } void PushTask(const Task &task) { std::lock_guard<std::mutex> guard(mutex_); taskQueue_.push(task); } class NumWrapper { public: NumWrapper() = delete; explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num) { ++num_; } ~NumWrapper() { --num_; } private: std::atomic<uint32_t> &num_; }; void Sleep() { std::mutex needRunMutex; std::unique_lock<std::mutex> lock(needRunMutex); /** * if the thread is waiting, it is idle * if wake up, this thread is not idle: * 1 this thread should return * 2 this thread should run task * 3 this thread should go to next loop */ NumWrapper idleWrapper(idleThreadNum_); (void)idleWrapper; needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_), [this] { return !needRun_ || !IsQueueEmpty(); }); } void RunTask() { NumWrapper runningWrapper(runningNum_); (void)runningWrapper; while (needRun_) { Task task; if (GetTask(task)) { task.Execute(); continue; } Sleep(); if (!needRun_) { return; } if (GetTask(task)) { task.Execute(); continue; } if (runningNum_ > DEFAULT_THREAD_NUM) { return; } } } private: /** * other thread put a task to the taskQueue_ */ std::mutex mutex_; std::priority_queue<Task> taskQueue_; /** * 1 terminate the thread if it is idle for timeout_ seconds * 2 wait for the thread started util timeout_ * 3 wait for the thread notified util timeout_ * 4 wait for the thread terminated util timeout_ */ uint32_t timeout_; /** * if idleThreadNum_ is zero, make a new thread */ std::atomic<uint32_t> idleThreadNum_; /** * when ThreadPool object is deleted, wait until runningNum_ is zero. */ std::atomic<uint32_t> runningNum_; /** * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated */ std::atomic_bool needRun_; std::condition_variable needRunCondition_; }; } // namespace OHOS::NetStack #endif /* NETSTACK_THREAD_POOL */
这份源码的实现,没有使用一些较难理解的语法,基本上就是使用线程+优先级队列实现的。提前创建指定数目的线程,每次取一个任务并执行。任务队列负责存放线程需要处理的任务,工作线程负责从任务队列中取出和运行任务,可以看成是一个生产者和多个消费者的模型。
#include "doctest.h" DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN #include <stdexcept> DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END //#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN //#define DOCTEST_CONFIG_DISABLE #include <string> #include <iostream> #include "thread_pool.h" // // Created by Administrator on 2022/8/10. // class Task { public: Task() = default; explicit Task(std::string context){ mContext = context; } bool operator<(const Task &e) const{ return priority_ < e.priority_; } void Execute(){ std::lock_guard<std::mutex> guard(mutex_); std::cout << "task is execute,name is:"<<mContext<<std::endl; } public: uint32_t priority_; private: std::string mContext; static std::mutex mutex_; }; #define DEFAULT_THREAD_NUM 3 #define MAX_THREAD_NUM 6 #define TIME_OUT 500 std::mutex Task::mutex_; static int threadpoolTest(){ static OHOS_NetStack::ThreadPool<Task, DEFAULT_THREAD_NUM, MAX_THREAD_NUM> threadPool_(TIME_OUT); Task task1("name_1"); Task task2("name_2"); Task task3("name_3"); Task task4("name_4"); threadPool_.Push(task1); threadPool_.Push(task2); threadPool_.Push(task3); threadPool_.Push(task4); return 0; } TEST_CASE("threadPool simple use example, test by doctest unit tool") { threadpoolTest(); }
以上该版本thread_pool的简单使用示例,可以看到使用稍微麻烦了些。必须定义格式如下的task类,必须实现operator<和Execute()方法,不过整体实现还是很不错的,通俗易懂!
总结
线程池的应用场景:当有大量的数据请求,需要多执行流并发/并行处理时,可以采用线程池来处理任务,可避免大量线程频繁创建或销毁所带来的时间成本,也可避免在峰值压力下,系统资源耗尽的风险。
加载全部内容