C++线程间通信 C++线程间的互斥和通信场景分析
Redamanc 人气:0互斥锁(mutex)
为了更好地理解,互斥锁,我们可以首先来看这么一个应用场景:模拟车站卖票。
模拟车站卖票
场景说明:
Yang车站售卖从亚特兰蒂斯到古巴比伦的时光飞船票;因为机会难得,所以票数有限,一经发售,谢绝补票。
飞船票总数:100张;
售卖窗口:3个。
对于珍贵的飞船票来说,这个资源是互斥的,比如第100张票,只能卖给一个人,不可能同时卖给两个人。3个窗口都有权限去售卖飞船票(唯一合法途径)。
不加锁的结果
根据场景说明,我们可以很快地分析如下:
可以使用三个线程来模拟三个独立的窗口同时进行卖票;
定义一个全局变量,每当一个窗口卖出一张票,就对这个变量进行减减操作。
故写出如下代码:
#include <iostream> #include <thread> #include <list> using namespace std; int tickets = 100; // 车站剩余票数总数 void sellTickets(int win) { while (tickets > 0) { { if (tickets > 0) { cout << "窗口:" << win << " 卖出了第:" << tickets << "张票!" << endl; tickets--; } std::this_thread::sleep_for(std::chrono::microseconds(400)); } } } int main() { list<std::thread> tlist; for (int i = 1; i <= 3; ++i) { tlist.push_back(std::thread(sellTickets, i)); } for (std::thread& t : tlist) { t.join(); } cout << "所有窗口卖票结束!" << endl; return 0; }
运行结果如下:
通过运行,我们可以发现问题:
对于一张票来说,卖出去了多次!
这不白嫖吗???这合适吗?
原因也很简单,对于线程来说,谁先执行,谁后执行,完全是根据CPU的调度,根本不可能掌握清楚。
所以,这个代码是线程不安全
的!
那,怎么解决呢?
当然是:互斥锁了!
加锁后的结果
我们对上述代码做出如下修改:
#include <iostream> #include <thread> #include <list> #include <mutex> using namespace std; int tickets = 100; std::mutex mtx; void sellTickets(int win) { while (tickets > 0) { { lock_guard<std::mutex> lock(mtx); if (tickets > 0) { cout << "窗口:" << win << " 卖出了第:" << tickets << "张票!" << endl; tickets--; } std::this_thread::sleep_for(std::chrono::microseconds(400)); } } } int main() { list<std::thread> tlist; for (int i = 1; i <= 3; ++i) { tlist.push_back(std::thread(sellTickets, i)); } for (std::thread& t : tlist) { t.join(); } cout << "所有窗口卖票结束!" << endl; return 0; }
首先定义了一个全局的互斥锁std::mutex mtx
;接着在对票数tickets
进行减减操作时,定义了lock_guard
,这个就相当于智能指针scoped_ptr
一样,可以出了作用域自动释放锁资源。
运行结果如下:
我们可以看到这一次,就没问题了。
简单总结
互斥锁的使用可以有三种:
(首先都需要在全局定义互斥锁std::mutex mtx
)
- 首先可以直接在需要加锁和解锁的地方,手动进行:加锁
mtx.lock()
、解锁mtx.unlock()
; - 可以在需要加锁的地方定义保护锁:
lock_guard<std::mutex> lock(mtx)
,这个锁在定义的时候自动上锁,出了作用域自动解锁。(其实就是借助了智能指针
的思想,定义对象出调用构造函数底层调用lock()
,出了作用域调用析构函数底层调用unlock()
); - 可以在需要加锁的地方定义唯一锁:
unique_lock<std::mutex> lock(mtx)
,这个锁和保护锁类似,但是比保护锁更加好用。(可以类比智能指针中的scoped_ptr
和unique_ptr
的区别,二者都是将拷贝构造和赋值重载函数删除了,但是unique_ptr
和unique_lock
都定义了带有右值引用的拷贝构造和赋值)
条件变量(conditon_variable)
如果说,互斥锁
是为了解决线程间互斥
的问题,那么,条件变量就是为了解决线程间通信
的问题。
同样的,我们可以首先来看一个问题(模型):
生产者消费者线程模型
生产者消费者线程模型是一个很经典的线程模型;
首先会有两个线程,一个是生产者,一个是消费者,生产者只负责生产资源,消费者只负责消费资源。
产生问题
根据上述互斥锁的理解,我们可以写出如下代码:
#include <iostream> #include <thread> #include <mutex> #include <queue> using namespace std; std::mutex mtx; class Queue { public: void put(int num) { lock_guard<std::mutex> lock(mtx); que.push(num); cout << "生产者,生产了:" << num << "号产品" << endl; } void get() { lock_guard<std::mutex> lock(mtx); int val = que.front(); que.pop(); cout << "消费者,消费了:" << val << "号产品" << endl; } private: queue<int> que; }; void producer(Queue* que) { for (int i = 0; i < 10; ++i) { que->put(i); std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } void consumer(Queue* que) { for (int i = 0; i < 10; ++i) { que->get(); std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } int main() { Queue que; std::thread t1(producer, &que); std::thread t2(consumer, &que); t1.join(); t2.join(); return 0; }
同样的,我们定义了两个线程:t1
、t2
分别作为生产者
和消费者
,并且定义了两个线程函数
:producer
和consumer
,这两个函数接受一个Queue*
的参数,并且通过这个指针调用put
和get
方法,这两个方法就是往资源队列里面执行入队和出队操作。
运行结果如下:
我们会发现,出错了。
多运行几次试试:
我们发现,每次运行的结果还都不一样,但是都会出现系统崩溃的问题。
仔细来看这个错误原因:
我们再想想这个代码的逻辑:
一个生产者只负责生产;
一个消费者只负责消费;
他们共同在队列里面存取资源;
存取资源操作本身是互斥的。
发现问题了吗?
这两个线程之间彼此的操作独立,换句话说,
没有通信!
生产者生产的时候,消费者不知道;
消费者消费的时候,生产者也不知道;
但是消费者是要从队列里面取资源
的,如果某一个时刻,队列里为空了,它就不能取了!
解决问题
分析完问题之后,我们知道了:
问题出在:没有通信上面。
那么如何解决通信
问题呢?
当然就是:条件变量
了!
我们做出如下代码的修改:
#include <iostream> #include <thread> #include <mutex> #include <queue> #include <condition_variable> using namespace std; std::mutex mtx; // 互斥锁,用于线程间互斥 std::condition_variable cv;// 条件变量,用于线程间通信 class Queue { public: void put(int num) { unique_lock<std::mutex> lck(mtx); while (!que.empty()) { cv.wait(lck); } que.push(num); cv.notify_all(); cout << "生产者,生产了:" << num << "号产品" << endl; } void get() { unique_lock<std::mutex> lck(mtx); while (que.empty()) { cv.wait(lck); } int val = que.front(); que.pop(); cv.notify_all(); cout << "消费者,消费了:" << val << "号产品" << endl; } private: queue<int> que; }; void producer(Queue* que) { for (int i = 0; i < 10; ++i) { que->put(i); std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } void consumer(Queue* que) { for (int i = 0; i < 10; ++i) { que->get(); std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } int main() { Queue que; std::thread t1(producer, &que); std::thread t2(consumer, &que); t1.join(); t2.join(); return 0; }
这个时候我们再来看运行结果:
这个时候就是:
生产一个、消费一个。
原子类型(atomic)
我们前面遇到线程不安全的问题,主要是因为涉及++
、--
操作的时候,有可能被其他的线程干扰,所以使用了互斥锁
。
只允许得到锁
的线程进行操作;
其他没有得到锁
的线程只能眼巴巴的干看着。
但是,对于互斥锁来说,它是比较重的,它对于临界区代码做的事情比较复杂。
简单来说,如果只是为了++
、--
这样的简单操作互斥的话,使用互斥锁,就有点杀鸡用牛刀的意味了。
那么有没有比互斥锁
更加轻量的,并且能够解决问题的呢?
当然有,就是我们要说的原子类型
。
简单使用
我们可以简单设置一个场景:
定义十个线程,对一个公有的变量myCount
进行task
的操作,该操作是对变量进行100次的++
。
所以,如果顺利,我们会最终得到myCount = 1000
。
代码如下:
#include <iostream> #include <thread> #include <atomic> #include <list> volatile std::atomic_bool isReady = false; volatile std::atomic_int myCount = 0; void task() { while (!isReady) { // 线程让出当前的CPU时间片,等待下一次调度 std::this_thread::yield(); } for (int i = 0; i < 100; ++i) { myCount++; } } int main() { std::list<std::thread> tlist; for (int i = 0; i < 10; ++i) { tlist.push_back(std::thread(task)); } std::this_thread::sleep_for(std::chrono::milliseconds(200)); isReady = true; for (std::thread& it : tlist) { it.join(); } std::cout << "myCount:" << myCount << std::endl; return 0; }
运行结果如下:
改良车站卖票
对于原子类型来说,使用方法非常简单:
首先包含头文件:#include <atomic>
;
接着把需要原子操作的变量定义为对应的原子类型就好:
bool
-> atomic_bool
;
int
-> atomic_int
;
其他同理。
理解了这个以后,我们可以使用原子类型对我们的车站卖票进行改良:
#include <iostream> #include <thread> #include <list> #include <mutex> #include <atomic> using namespace std; std::atomic_int tickets = 100; // 车站剩余票数总数 void sellTickets(int win) { while (tickets > 0) { tickets--; cout << "窗口:" << win << " 卖出了第:" << tickets << "张票!" << endl; } } int main() { list<std::thread> tlist; for (int i = 1; i <= 3; ++i) { tlist.push_back(std::thread(sellTickets, i)); } for (std::thread& t : tlist) { t.join(); } cout << "所有窗口卖票结束!" << endl; return 0; }
可以看到,从代码长度来说就轻量
了很多!
运行结果如下:
虽然还有部分打印乱序的情况:
(毕竟线程的执行顺序谁也摸不清 😦 )
但是,代码的逻辑没有问题!
不会出现一张票被卖了多次的情况!
这个原子类型也被叫做:无锁类型,像是一些无锁队列
之类的实现,就是靠的这个东西。
加载全部内容