亲宝软件园·资讯

展开

C++11学习之多线程的支持详解

Shawn-Summer 人气:0

C++11中的多线程的支持

千禧年以后,主流的芯片厂商都开始生产多核处理器,所以并行编程越来越重要了。在C++98中根本没有自己的一套多线程编程库,它采用的是C99中的POSIX标准的pthread库中的互斥锁,来完成多线程编程。

首先来简单一个概念:原子操作,即多线程程序中"最小的且不可以并行化的操作"。通俗来说,如果对一个资源的操作是原子操作,就意味著一次性只有一个线程的一个原子操作可以对这个资源进行操作。在C99中,我们一般都是采用互斥锁来完成粗粒度的原子操作。

#include<pthread.h>
#include<iostream>
using namespace std;

static long long total =0;
pthread_mutex_t m=PTHREAD_MUTEX_INITIALIZER;//互斥锁

void * func(void *)
{
    long long i;
    for(i=0;i<100000000LL;i++)
    {
        pthread_mutex_lock(&m);
        total +=i;
        pthread_mutex_unlock(&m);
    }
}

int main()
{
    pthread_t thread1,thread2;
    if(pthread_create(&thread1,nullptr,&func,nullptr))
    {
        throw;
    }
    if(pthread_create(&thread2,nullptr,&func,nullptr))
    {
        throw;
    }
    pthread_join(thread1,nullptr);
    pthread_join(thread2,nullptr);
    cout<<total<<endl;//9999999900000000
}

可以看出来,上书代码中total +=i;就是原子操作。

1.C++11中的原子类型

我们发现,在C99中的互斥锁需要显式声明,要自己开关锁,为了简化代码,C++11中定义了原子类型。这些原子类型是一个class,它们的接口都是原子操作。如下所示:

#include<atomic>
#include<thread>
#include<iostream>
using namespace std;

atomic_llong total {0};//原子数据类型

void func(int)
{
    for(long long i=0;i<100000000LL;i++)
    {
        total+=i;
    }
}
int main()
{
    thread t1(func,0);
    thread t2(func,0);

    t1.join();
    t2.join();
    cout<<total<<endl;//9999999900000000
}

上述代码中total就是一个原子类对象,它的接口例如这里的重载operator+=()就是一个原子操作,所以我们不需要显式调用互斥锁了。

总共有多少原子类型呢?C++11的做法是,存在一个atomic类模板,我们可以通过这个类模板定义出想要的原子类型:

using atomic_llong = atomic<long long>;

所以我们想把什么类型搞成原子类型,只需要传入不同的模板实参就行了。

总之,C++11中原子操作就是atomic模板类的成员函数。

1.1 原子类型的接口

我们知道原子类型的接口就是原子操作,但是我们现在关注一下,它们有哪些接口?

原子类型属于资源类数据,多个线程只能访问单个预祝你类型的拷贝。所以C++11中的原子类型不支持移动语义和拷贝语义,原子类型的操作都是对那个唯一的一份资源操作的,原子类型没有拷贝构造,拷贝赋值,移动构造和移动赋值的。

atomic<float> af{1.2f};//正确
atomic<float> af1{af};//错误,原子类型不支持拷贝语义
float f=af;//正确,调用了原子类型的接口
af=0.0;//正确,调用了原子类型的接口

看一下上表中的一些原子类型的接口,load()是进行读取操作的,例如

atomic<int> a(2);
int b=a;
b=a.load();

上如代码中的,b=a就是等价于b=a.load(),实际上,atomic<int>中存在operator int()接口,这个接口中:

operator __int_type() const noexcept
{ return load(); }

store()接口是用来写数据的的:

atomic<int> a;
a=1;
a.store(1);

上述代码中a=1相当于a.load(1),atomic<int>中存在operator=(int)接口,它的实现如下:

__int_type operator=(__int_type __i) noexcept
{
store(__i);
return __i;
}

例如其他操作,比如exchange是做交换,compare_exchange_weak/strong()是比较并交换(CAS操作的),它们的实现会更复杂一些,还有一些符号的重载,这里就不一一介绍了,<<C++ Concurrency in Action>>的第5章和第7章会详细介绍这部分内容。

值得注意的是,这里有一个特殊的原子类型atomic_flag,这是一个原子类型是无锁的,也就是说线程对这种类型的数据的访问是无锁的,所以它就不需要接口:load和store,即多个线程可以同时操作这个资源。我们可以用它来实现自旋锁

1.2简单自旋锁的实现

互斥锁是说,当一个线程访问一个资源的时候,他会给进入临界区代码设置一把锁,出来的时候就把锁给打开,当其他进程要进入临界区的时候,就会实现看一下锁,如果锁是关的,那就阻塞自己,这样core就会去执行其他工作,导致上下文切换吗,所以它的效率会比较低。原子类型中is_lock_free()就是说明的这个原子类型的访问是否使用的互斥锁。

与互斥锁相反的是自旋锁,区别是,当其他进程要进入临界区的时候,如果锁是关的,它不会阻塞自己,而是不断查看锁是不是开的,这样就不会引发上下文切换,但是同样也会增加cpu利用率。

我们可以使用atomic_flag原子类型来实现自旋锁,因为在atomic_flag本身是无锁的,所以多个线程可以同时访问它,相当于同时访问这把自旋锁,实现如下:

#include<thread>
#include<atomic>
#include<iostream>
#include<unistd.h>
using namespace std;
atomic_flag lock=ATOMIC_FLAG_INIT;//获得自旋锁
void f(int n)
{
    while(lock.test_and_set())
    {//尝试获得原子锁
       cout<<"Waiting from thread "<<n<<endl;
    }
    cout<<"Thread "<<n<<" starts working"<<endl;
}
void g(int n)
{
    cout<<"Thread "<<n<<" is going to start"<<endl;
    lock.clear();//打开锁
    cout<<"Thread "<<n<<" starts working"<<endl;
}
int main()
{
    lock.test_and_set();//关上锁
    thread t1(f,1);
    thread t2(g,2);
    t1.join();
    usleep(100000);
    t2.join();
}

这里的test_and_set()是一个原子操作,它做的是,写入新值并返回旧值。在main()中,我们首先给这个lock变量,写入true值,即关上锁,然后再线程t1中,它不断尝试获得自旋锁,再线程t2中,clear()接口,相当于将lock变量值变成false,这时自旋锁就打开了,这样子,线程t1就可以执行剩下的代码了。

简单的我们可以将lock封装一下

void Lock(atomic_flag & lock)
{
    while(lock.test_and_set());
}
void Unlock(atomic_flag & lock)
{
    lock.clear();
}

上面操作中,我们就相当于完成了一把锁,可以用其实现互斥访问临界区的功能了。不过这个和C99中的pthread_mutex_lock()和pthread_mutex_unlock()不一样,C99中的这两个锁是互斥锁,而上面代码中实现的是自旋锁。

2.提高并行程度

#include <thread>
#include <atomic>
 
atomic<int> a;
atomic<int> b;
void threadHandle()
{
     int t = 1;
     a = t;
     b = 2; // b 的赋值不依赖 a
}

在上面代码中,对a和b的赋值语句实际上可以不管先后的,如果允许编译器或者硬件对其重排序或者并发执行,那就会提高并行程度。

在单线程程序中,我们根部不关心它们的执行顺序,反正结果都是一样的,但是多线程不一样,如果执行顺序不一样,结果就会不同。

#include <thread>
#include <atomic>
#include<iostream>
using namespace std;
atomic<int> a{0};
atomic<int> b{0};
void ValueSet(int )
{
     int t = 1;
     a = t;
     b = 2; // b 的赋值不依赖 a
}
int Observer(int)
{
    cout<<"("<<a<<","<<b<<")"<<endl;
}
int main()
{
    thread t1(ValueSet,0);
    thread t2(Observer,0);
    t1.join();
    t2.join();
    cout<<"Final: ("<<a<<","<<b<<")"<<endl;
}

上面代码中,Observer()中的输出结果,会和a和b的赋值顺序有关,它的输出结果肯可能是:(0,0) (1,0) (0,2) (1,2)。这就说明了,多线程程序中,如果执指令行顺序不一样,结果就会不同。

影响并行程度的两个关键因素是:编译器是否有权对指令进行重排序和硬件是否有权对汇编代码重排序。

C++11中,我们可以显式得告诉编译器和硬件它们的权限,进而提高并发程度。通俗来说,如果我们要求并行程度最高,那么我们就授权给编译器和硬件,允许它们重排序指令。

2.1 memory_order的参数

原子类型的成员函数中,大多数都可以接收一个类型为memory_order的参数,它就是可以告诉编译器和硬件,是否可以重排序。

typedef enum memory_order {
    memory_order_relaxed,    // 不对执行顺序做保证
    memory_order_acquire,    // 本线程中,所有后续的读操作必须在本条原子操作完成后执行
    memory_order_release,    // 本线程中,所有之前的写操作完成后才能执行本条原子操作
    memory_order_acq_rel,    // 同时包含 memory_order_acquire 和 memory_order_release
    memory_order_consume,    // 本线程中,所有后续的有关本原子类型的操作,必须在本条原子操作完成之后执行
    memory_order_seq_cst    // 全部存取都按顺序执行
    } memory_order;

在C++11中,memory_order的参数的默认值是memory_order_seq_cst,即不允许编译器和硬件进行重排序,这样一来,在上吗代码中的Observer()中输出结果就不可能是(0,2),因为对a的赋值语句是先于b的。这实际上就是:顺序一致性,准确来说就是在同一个线程中,原子操作的顺序和代码的顺序保持一致。

而如果我们改动一下代码:

#include <thread>
#include <atomic>
#include<iostream>
using namespace std;
atomic<int> a{0};
atomic<int> b{0};
void ValueSet(int )
{
     int t = 1;
     a.store(t,memory_order_relaxed);
     b.store(2,memory_order_relaxed); // b 的赋值不依赖 a
}
int Observer(int)
{
    cout<<"("<<a<<","<<b<<")"<<endl;
}
int main()
{
    thread t1(ValueSet,0);
    thread t2(Observer,0);
    t1.join();
    t2.join();
    
    
    cout<<"Final: ("<<a<<","<<b<<")"<<endl;
}

在上面代码中的Observer()中输出结果是有可能是:(0,2)的,因为这里的memory_order_relaxed不对原子操作的顺序有严格要求,就有可能发生b先被赋值了,而此时a还没被赋值的情况。

所以,为了进一步开发原子操作的并行程度,我们的目标是:保证程序既快又对。

2.2 release-acquire内存顺序

#include <thread>
#include <atomic>
#include<iostream>
using namespace std;
atomic<int> a;
atomic<int> b;
void Thread1(int )
{
     int t = 1;
     a.store(t,memory_order_relaxed);
     b.store(2,memory_order_release); // 本操作前的写操作必须先完成,即保证a的赋值快于b
}
void Thread2(int )
{
    while(b.load(memory_order_acquire)!=2);//必须等该原子操作完成后,才执行下面代码
    cout<<a.load(memory_order_relaxed)<<endl;//1

}
int main()
{
    thread t1(Thread1,0);
    thread t2(Thread2,0);
    t2.join();
    t1.join();
}

上面代码中,实际上也是实现了一种自旋锁的操作,我们保证了a.store快于b.store,而b.load又一定快于a.load。而且,对于b的store和load就实现了一种release-acquire内存顺序.

2.3 release-consume内存顺序

#include<thread>
#include<atomic>
#include<cassert>
#include<string>
using namespace std;

atomic<string*> ptr;
atomic<int> date;
void Producer()
{
    string *p=new string("hello");
    date.store(42,memory_order_relaxed);
    ptr.store(p,memory_order_release);//date赋值快于ptr
}
void Consumer()
{
    string *p2;
    while(!(p2=ptr.load(memory_order_consume)));
    assert(*p2=="hello");//一定成立
    assert(date.load(memory_order_relaxed)==42);//可能断言失败,因为这个指令可能在本线程中首先执行
}
int main()
{
    thread t1(Producer);
    thread t2(Consumer);
    t1.join();
    t2.join();
}

上面的内存顺序也叫生产者-消费者顺序。

实际上,总共的内存模型就是4个:顺序一致性,松散的(relaxed),release-consume和release-acquire。

2.4 小结

实际上,对于并行编程来说,最根本的的在于,并行算法,而不是从硬件上搞内存模型优化啥的,如果你嫌麻烦的话,全部使用顺序一致性内存模型,对并行效率的影响也不是很大。

3.线程局部存储

线程拥有自己的栈空间,但是堆空间,静态数据区(文件data,bss段,全局/静态变量)是共享的。线程之间互相共享,静态数据当然是很好的,但是我们也需要线程自己的局部变量

#include<pthread.h>
#include<iostream>
using namespace std;

int thread_local errorCode=0;
void* MaySetErr(void *input)
{
    if(*(int*)input==1)
        errorCode=1;
    else if(*(int*)input==2)
        errorCode=2;
    else
        errorCode=0;
    cout<<errorCode<<endl;
}
int main()
{
    int input_a=1;
    int input_b=2;
    pthread_t thread1,thread2;
    pthread_create(&thread1,nullptr,&MaySetErr,&input_a);
    pthread_create(&thread2,nullptr,&MaySetErr,&input_b);
    pthread_join(thread1,nullptr);
    pthread_join(thread2,nullptr);
    cout<<errorCode<<endl;//0
}

上面代码中的errorCode是一个thread_local变量,它意味著它是一个线程内部的全局变量,线程开始时,他会被初始化,然后线程结束时,该值就不会有效。实际上两个进程中会有各自的errorCode,而main函数中也有自己的errorCode

4.快速退出

在C++98中,我们会见到3中终止函数:terminate,abort,exit。而在C++11中我们增加了quick_exit终止函数,这种终止函数主要用在线程种。

1.terminate函数,它是C++种的异常机制有关的,通常没有被捕获的异常就会调用terminate

2.abort函数是底层的终止函数,terminate就是调用它来终止进程的,但是abort调用时,不会调用任何析构函数,会引发内存泄漏啥的,但是一般来说,他会给符合POSIX的操作系统抛出一个信号,此时signal handler就会默认的释放进程种的所有资源,来避免内存泄漏。

3.exit函数是正常退出,他会调用析构函数,但是有时候析构函数狠复杂,那我们还不如直接调用absort函数,将释放资源的事情留给操作系统。

在多线程情况下,我们一般都是采用exit来退出的,但是这样容易卡死,当线程复杂的时候,exit这种正常退出方式,太过于保守了,但是abort这种退出方式又太激进了,所以有一种新的退出函数:quick_exit函数。

quick_exit

这个函数不执行析构函数,而使得程序终止,但是和abort不同的是,abort一般都是异常退出,而quick_exit是正常退出。

#include<cstdlib>
#include<iostream>
using namespace std;

struct A{~A(){cout<<"Destruct A."<<endl;}};
void closeDevice(){cout<<"device is closed."<<endl;}
int main()
{
    A a;
    at_quick_exit(closeDevice);
    quick_exit(0);
}
//样容易卡死,当线程复杂的时候,`exit`这种正常退出方式,太过于保守了,但是`abort`这种退出方式又太激进了,所以有一种新的退出函数:`quick_exit`函数。
//这个函数不执行析构函数,而使得程序终止,但是和`abort`不同的是,`abort`一般都是异常退出,而`quick_exit`是正常退出。
#include<cstdlib>
#include<iostream>
using namespace std;

struct A{~A(){cout<<"Destruct A."<<endl;}};
void closeDevice(){cout<<"device is closed."<<endl;}
int main()
{
    A a;
    at_quick_exit(closeDevice);
    quick_exit(0);
}

加载全部内容

相关教程
猜你喜欢
用户评论