c#多线程间同步 c#中多线程间的同步示例详解
化云随风 人气:0一、引入
先给出一个Num类的定义
internal class Num { public static int odd = 50000; public static int even = 10000; }
假设现在要求输出小于 odd 的所有奇数,输出小于 even 的所有偶数,不考虑多线程时可以写出如下的代码:(为了演示多线程时线程间的争用,先把值赋给了 num,实际上这个赋值操作毫无意义 )
//同步代码段 public long Sum() { Stopwatch sw = new Stopwatch(); sw.Start(); int num = 0; for (int i = 0; i <= Num.odd; i++) { num = i; if ((i & 1) == 1) { Console.WriteLine($"奇数:{num}"); } } for (int i = 0; i <= Num.even; i++) { num = i; if ((i & 1) == 0) { Thread.Sleep(10); Console.WriteLine($"偶数:{num}"); } } sw.Stop(); return sw.ElapsedMilliseconds; }
现在,因为耗时太长,引入多线程进行处理,修改为如下形式:
//NoLock Task private readonly object sync = new(); int num = 0; public int Sum() { Stopwatch sw = new Stopwatch(); sw.Start(); var ta =Task.Run(() => { for (int i = 0; i <= Num.odd; i++) { num = i; //判断条件之前赋值是为了提高触发几率 if((i & 1) == 1) { Console.WriteLine($"奇数:{num}"); } } }); var tb = Task.Run(() => { for (int i = 0; i <= Num.even; i++) { num = i; if ((i & 1) == 0) { Thread.Sleep(10); //在此处添加延时,在 tb 线程等待时,num.sum的值可能已经被 ta 修改为了其他值 Console.WriteLine($"偶数:{num}"); } } }); Task.WaitAll(ta, tb); //为了保证任务完成,获取执行时间 sw.Stop(); return sw.ElapsedMilliseconds; } }
上面这段代码中,我们期望线程ta会输出odd以内的奇数值,线程tb会输出even以内的偶数。但实际运行时会出现下图所示的情况。
如上,当程序涉及到多线程的时候,在 [各线程间共享的数据] 总会因为线程间的争用导致意料之外的情况,为此,各种语言也会提供协助线程间同步的特性,这里简单记录一下我对c#中机制的理解。
二、Lock
借用Lock的方法对示例进行修改,可以有两种方式,此处展示ta部分,tb 与ta做相同修改:
//方式一:在Task内全局Lock var ta = Task.Run(() => { lock (sync) { for (int i = 0; i <= 10000; i++) { num = i; //判断条件之前赋值是为了提高触发几率 if ((i & 1) == 1) { Console.WriteLine($"奇数:{num}"); } } } }); //方式二: 为每一次For循环Lock var ta = Task.Run(() => { //lock (sync) for (int i = 0; i <= Num.odd; i++) { lock(sync) { num = i; //判断条件之前赋值是为了提高触发几率 if ((i & 1) == 1) { Console.WriteLine($"奇数:{num}"); } } } });
上述的两种方式中,
- 方法一相当于对Task进行了锁定,同一时刻只能运行一个被锁定的代码段(即取决于当前对象实例中ta,tb谁先取得了使用权),这样多线程其实退化为了单线程处理。
- 方法二应该是更合理的使用方式,每一次循环时进行锁定,保证了每次赋值及使用时的独占性,也不影响另一个线程的循环操作。方式二仍然会存在一个线程等待的情况,只是会比第一种方式好一些。但是,对每一次循环进行Lock,性能是需要考虑的一个点。
说回Lock本身,网上有很多文章介绍,lock只是一个语法糖,编译器会将其转换为对 monitor 的调用。
IL代码如下图:
可以看到,编译器会帮我们构建try块,并在finally块调用Monitor.Exit方法。若要获取更精细的控制,可以自己调用Monitor进行使用。
三、Monitor
monitor与lock相比更为灵活,可以使用IsEntered(object)判断当前线程是否获取到了sync的锁定,可以使用 TryEnter()尝试获取排它锁,也可以调用重载方法指定等待时间。
需要指出的是,
1、所有等待获取锁定的线程会处于阻塞状态。
2、在等待获取锁定的线程上执行Thread.Interrupt会中断当前线程的等待并抛出ThreadInterruptedException的异常
3、monitor与lock锁定的对象sync必须为引用类型,查看反编译的代码会发现在每一次lock之前,会将sync赋值给一个object对象,如果sync为值类型,则会被装箱为一个新的对象。
4、以锁定For循环为例,在定义锁定对象时,可以定义为
public class LockFor { private readonly object sync = new(); .... }
或者定义为
public class LockForStaticSync { private static readonly object sync = new(); ... }
后者添加一个 static 修饰符将其定义静态只读对象。
我们知道,static 修饰的变量并不属于对象,而是归属于 class 本身,按照我的理解来说,如果在多个线程中都实例化了 LockForStaticSync 的对象,并同时调用一个 SUM 方法时,不论线程间是否会发生冲突,只能有一个线程取得sync的锁定继续执行,其他的线程会处于阻塞等待状态。
而如果是第一种定义方式,则多个线程的多个对象间,不会产生冲突,所有线程都可以执行。
此例的场景下,多实例调用时,方式一应该会有速度上的绝对优势。为此我删除了任务内打印及等待的部分,执行了如下内容进行验证:
private static void Invocation() { Stopwatch sw = new Stopwatch(); sw.Start(); var t1 = new TaskFactory().StartNew(() => new LockFor().Sum()); var t2 = new TaskFactory().StartNew(() => new LockFor().Sum()); var t3 = new TaskFactory().StartNew(() => new LockFor().Sum()); Task.WaitAll(t1, t2, t3); sw.Stop(); Console.WriteLine(sw.ElapsedMilliseconds); sw.Restart(); var t4 = new TaskFactory().StartNew(() => new LockForStaticSync().Sum()); var t5 = new TaskFactory().StartNew(() => new LockForStaticSync().Sum()); var t6 = new TaskFactory().StartNew(() => new LockForStaticSync().Sum()); Task.WaitAll(t4, t5, t6); sw.Stop(); Console.WriteLine(sw.ElapsedMilliseconds); }
但实际执行结果是出乎意料的,多次运行后,类变量锁的运行速度远超对象变量的方式,这是为什么呢?
思考之后,考虑到类变量省略了每次创建锁定对象的时间,而数量较少的循环次数可能无法弥补这个时间差,于是我逐渐调大Num类中定义的变量。随着循环次数的增加,也就是方法执行时间的增加,对象变量的优势逐渐显现
既然对象变量有速度上的优势,而使用过程中又不可避免的会出现多方调用的情况,那么是不是应该一直选择定义为对象变量呢?其实不然,比如静态类中需要的锁定对象,全局缓存字典,文件操作帮助类都应该是静态锁,更多场景,欢迎补充。
四、Interlocked
Interlocked类中的方法可以实现原子操作,通过操作系统及硬件CPU级别的控制,确保CPU在执行当前操作时不会被中断,这个类里面提供了一些简单的方法,如Add,Increment等。
五、Semaphore
信号量是一种计数的互斥锁定。什么意思呢?以Monitor来讲,从monitor.enetr开始,到monitor.exit为止,被包裹着的这一段代码,同一时刻只能由一个线程访问,而 Semaphore 可以定义同时访问某些资源的线程数量,即允许多线程同时访问被保护的代码。
Semaphore有三种签名的构造函数,其中 Semaphore(int initialCount, int maximumCount) 的参数指定最初释放的信号量可用数量与最大量,两者的差值归创建信号量的线程所有。
以下内容来自 MSDN
class SemaphoreTest { // A semaphore that simulates a limited resource pool. private static Semaphore _pool; // 协助设置线程休眠时间. private static int _padding; public static void Main() { // 这里是创建了最大可以有三个访问线程的信号量,但此时可用为0,需要当前线程释放以后才可用。 // 如果此处设置可用量非0,主线程释放时仍然传递了3,程序运行过程中会出现SemaphoreFullException的异常 _pool = new Semaphore(0, 3); for (int i = 1; i <= 5; i++) { Thread t = new Thread(new ParameterizedThreadStart(Worker)); t.Start(i); } // 主线程休眠,让其他线程运行到等待信号量的状态 Thread.Sleep(500); //主线程调用Release(3),将可用量设置为最大值,正在等待的线程会获得信号 Console.WriteLine("Main thread calls Release(3)."); _pool.Release(3); Console.WriteLine("Main thread exits."); Console.ReadLine(); } private static void Worker(object num) { // 阻塞当前线程,直到获取到信号量 Console.WriteLine("Thread {0} begins " +"and waits for the semaphore.", num); _pool.WaitOne(); // 每一个线程等待的时间间隔参数 int padding = Interlocked.Add(ref _padding, 100); Console.WriteLine("Thread {0} enters the semaphore.", num); //与padding共用,让输出更有顺序 Thread.Sleep(1000 + padding); Console.WriteLine("Thread {0} releases the semaphore.", num); Console.WriteLine("Thread {0} previous semaphore count: {1}",num, _pool.Release()); } }
目前为止,提到的内容均是在同一个进程内同步的方法,信号量作为一个系统级的存在,是可以帮助我们实现进程间同步的,只需要在创建 Seamphore 对象的实例时,为信号量指定名称即可。
另外特别需要注意的是,信号量是可重入的,简单说就是可以在一个线程内执行多次 WaitOne() 方法,多次调用时,如果处理不好,就可能出现意外情况,修改 Worker 为下方内容
private static void Worker(object num) { Console.WriteLine("Thread {0} begins " + "and waits for the semaphore.", num); _pool.WaitOne(); Console.WriteLine($"{num} getOne"); // A padding interval to make the output more orderly. int padding = Interlocked.Add(ref _padding, 100); _pool.WaitOne(); //此处添加一处调用 Console.WriteLine("Thread {0} enters the semaphore.", num); Thread.Sleep(1000 + padding); Console.WriteLine("Thread {0} releases the semaphore.", num); Console.WriteLine("Thread {0} previous semaphore count: {1}", num, _pool.Release()); //这里应该再调用一次_pool.Release(),或者在上一句传入参数,改为 _pool.Release(2) }
建议实际运行一下,查看运行的表现,运行过后会发现,任务线程全部阻塞在了第二处 WaitOne,导致程序无法向下执行。
因此,重入必须要谨慎,而且退出时必须要释放等量的锁定数值,如果执行了多余的Release(),最终程序在运行过程中会出现
六、Event
与信号量一样,事件也是一个系统范围内的资源同步方法。又分为ManualResetEvent,AutoResetEvent,CountdownEvent以及ManualResetEventSlim,在构建对象实例时若传入了name参数,代表这是一个可以跨进程的系统级同步事件。
以 ManualResetEvent 为例,该类有 signaled 和 nonsignaled 两种状态,这两种状态通过实例化对象时的 布尔类型 参数决定,TRUE 就是signaled, False相反
文档中常见的翻译是发出信号的状态和未发出信号的状态,微软官网的机翻是终止状态和非终止状态,还有一些释放线程之类的描述,直观上难以理解。其实就是改变状态而已,还不如英文的好理解。
ManualResetEvent 的基类 EventWaitHandle 中提供了Set() 和Reset() 方法,用于改变状态,Set 将事件修改为 signaled,Reset重置为 nonsignaled
这里说一下Set和Reset,这两个方法是改变了事件的状态,并不是一个瞬时性的动作,也就意味着在调用Set后,调用Reset之前,事件都处于 signaled 状态(AutoResetEvent会自动调用Reset重置事件状态)
WaitHandle 类中提供了众多等待信号的方法,EventWaitHandle 继承自WaitHandle, ManualResetEvent中也可以调用WaitOne等方法。
回头再看一下信号量 Semaphore 的示例,也调用 WaitOne 等待信号,因为它也继承自 Waithandler。
有了上面的基础,可以看一下下面 MSDN 的示例
private static ManualResetEvent mre = new ManualResetEvent(false); static void Main() { for (int i = 0; i <= 2; i++) { Thread t = new Thread(ThreadProc); t.Name = "Thread_" + i; t.Start(); } Thread.Sleep(500); Console.WriteLine(@"线程012都会处于等待状态,直至 mre修改状态,按 Enter继续"); Console.ReadLine(); mre.Set(); Thread.Sleep(500); Console.WriteLine(@"调用了mre.set后,事件处于 signaled 状态,下一个线程不会被阻塞"); Console.ReadLine(); for (int i = 3; i <= 4; i++) { Thread t = new Thread(ThreadProc); t.Name = "Thread_" + i; t.Start(); } Thread.Sleep(500); Console.WriteLine("调用mre.reset后,事件处于nonsignaled状态,此时会被阻塞"); Console.ReadLine(); mre.Reset(); Thread t5 = new Thread(ThreadProc); t5.Name = "Thread_5"; t5.Start(); Thread.Sleep(500); Console.WriteLine("\nPress Enter to call Set() and conclude the demo."); Console.ReadLine(); mre.Set(); Console.ReadLine(); } private static void ThreadProc() { string name = Thread.CurrentThread.Name; Console.WriteLine(name + " starts and calls mre.WaitOne()"); mre.WaitOne(); Console.WriteLine(name + " ends."); }
- AutoResetEvent,名字可以看出来这个类会自动调用 Reset方法,事实也是这样,这个类会在等待线程执行结束后将事件置为non-signaled
- ManualResetEventSlim 是 ManualResetEvent的轻量实现,他并不是继承自 EventWaitHandle基类,
- CountdownEvent 也不是继承自 EventWaitHandle基类,它会在初始化时得到一个数值,称为InitialCount,同时赋给CurrentCount , 每次调用Signal时,CurrentCount会减少相应的值,当调用后CurrentCount为0时,会发出信号,并将其设置为 IS_SET 状态。
七、Barrier
Barrier 是一个有意思的类,可以使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作。
Enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases.
大白话来讲,借助于这个对象,可以管控多个并行任务间的合作关系。
Barrier的构造函数中可以传入并行任务的数量(participantCount),也可以通过 AddParticipant,RemoveParticipant 动态地调整参与者的数量。另外可以通过 CurrentPhaseNumber 得知当前是第几个参与者,通过 ParticipantsRemaining 知道还有几个参与者未到达任务点。
还可以传入一个可空委托,这个委托会在接收到所有参与者线程发出信号后执行。
就好像几个人一起玩游戏闯关,关卡boss需要所有人一起才能打败,照顾到每个人的游戏理解不同,规定每个人可以按照自己的安排推进游戏进度。
在这个比喻中,每个人都是单独的线程,可能有人很快就抵达了关卡,但是因为关卡的性质,他必须在这里等待其他人都到达后,大家一起打boss,打败boss之后存档,之后大家再各玩各的,直到下一个关卡BOSS。
Barrier就承担了boss的任务,他负责让所有线程抵达某一个预设的点后再一起放行。放行之后,会执行初始化对象时传入的委托,比如上方说的存档。只是我们可以通过委托,更灵活地指定要进行的操作。
而所谓的预设点,其实就是调用 SignalAndWait,发出信号并等待其他线程发出信号的代码
如果你暂时没有理解我上面的比喻,那就看一下下面的代码吧,代码是在MSDN拿来的,我自己加了几个console语句,可以运行看看效果
public static void Barriers() { int count = 0; //初始化 3 个参与者,传入委托 Barrier barrier = new Barrier(3, (b) => { Console.WriteLine("Post-Phase action: count={0}, phase={1},threadid={2}", count, b.CurrentPhaseNumber,Thread.CurrentThread.ManagedThreadId); if (b.CurrentPhaseNumber == 2) throw new Exception("D'oh!"); }); barrier.AddParticipants(2); barrier.RemoveParticipant(); //剩下4个 Console.WriteLine($"主Thread:{Thread.CurrentThread.ManagedThreadId}"); // This is the logic run by all participants Action action = () => { Console.WriteLine($"action1 Thread:{Thread.CurrentThread.ManagedThreadId}"); Interlocked.Increment(ref count); barrier.SignalAndWait(); // during the post-phase action, count should be 4 and phase should be 0 Console.WriteLine($"action2 Thread:{Thread.CurrentThread.ManagedThreadId}"); Interlocked.Increment(ref count); barrier.SignalAndWait(); // during the post-phase action, count should be 8 and phase should be 1 Console.WriteLine($"action3 Thread:{Thread.CurrentThread.ManagedThreadId}"); // 线程3会引发委托里抛出的异常,异常信息所有线程可见 Interlocked.Increment(ref count); try { barrier.SignalAndWait(); } catch (BarrierPostPhaseException bppe) { Console.WriteLine("Caught BarrierPostPhaseException: {0}", bppe.Message); } Console.WriteLine($"action4 Thread:{Thread.CurrentThread.ManagedThreadId}"); // The fourth time should be hunky-dory Interlocked.Increment(ref count); barrier.SignalAndWait(); // during the post-phase action, count should be 16 and phase should be 3 Console.WriteLine($"action5 Thread:{Thread.CurrentThread.ManagedThreadId}"); }; // 启动与 Barrier设置数量相同的任务,如果启动数目超过设置值,会引发如下异常 //"System.InvalidOperationException: The number of threads using the barrier exceeded the total number of registered participants." Parallel.Invoke(action, action, action, action); Console.WriteLine($"主Thread2:{Thread.CurrentThread.ManagedThreadId}"); // It's good form to Dispose() a barrier when you're done with it. barrier.Dispose(); }
执行效果如下:
可以在24行,27行打断点,借助上面的比喻,理解一下Barrier的用法。
最后要声明的是Barrier的public protected 成员是线程安全的,可以跨线程使用。但是Dispose是非线程安全的,意味着一旦调用,所有线程都会受到影响,应该在任务代码之外执行,另外既然有dispose的方法,就要注意使用完毕后调用该方法释放资源。
八、ReaderWriterLockSlim
读写锁,允许多个线程处于读取模式,允许一个线程处于具有独占锁定权限的写入模式,并且允许具有读取访问权限的一个线程处于可升级读取模式,在该模式下,线程可以升级到写入模式,而无需放弃对资源的读取访问权限。
读写锁具有三种模式,读锁,写锁,可升级的读锁
读锁可以通过 EnterReadLock 进入,通过 ExitReadLock 退出,写锁类似EnterWriteLock,ExitWriteLock
可升级的读锁是指可以直接由读转换为写模式的状态,EnterUpgradeableReadLock及ExitUpgradeableReadLock
与其他同步对象相同,读写锁需要正确的进行释放,不然会引发问题
读写锁初始化时,可以传入 LockRecursionPolicy 指定递归状态,默认的构造函数为NoRecursion,微软官网并不建议新手使用递归策略,因为这具有更高的复杂性,而且容易带来死锁的问题,我自己也没有用过递归策略。
与ReaderWriterLock相比,ReaderWriterLockSlim是被推荐使用的对象
对于 ReaderWriterLockSlim 的使用,我在园子里有个提问请教ReaderWriterLockSlim的问题,建议转过去看下,我这边就不放代码了,当时找到了另一篇文章解答了我的疑惑,地址也贴在这里 C# ReaderWriterLockSlim 实现 - dz45693.
这里面就有一些需要注意的点,这几个点全部是摘抄自上面那篇文章,请知悉:
- 对于同一把锁、多个线程可同时进入读模式。
- 对于同一把锁、同时只允许一个线程进入写模式。
- 对于同一把锁、同时只允许一个线程进入可升级的读模式。
- 通过默认构造函数创建的读写锁是不支持递归的,若想支持递归 可通过构造 ReaderWriterLockSlim(LockRecursionPolicy) 创建实例。
- 对于同一把锁、同一线程不可两次进入同一锁状态(开启递归后可以)
- 对于同一把锁、即便开启了递归、也不可以在进入读模式后再次进入写模式或者可升级的读模式(在这之前必须退出读模式)。
- 再次强调、不建议启用递归。
- 读写锁具有线程关联性,即两个线程间拥有的锁的状态相互独立不受影响、并且不能相互修改其锁的状态。
- 升级状态:在进入可升级的读模式 EnterUpgradeableReadLock后,可在恰当时间点通过EnterWriteLock进入写模式。
- 降级状态:可升级的读模式可以降级为读模式:即在进入可升级的读模式EnterUpgradeableReadLock后, 通过首先调用读取模式EnterReadLock方法,然后再调用 ExitUpgradeableReadLock 方法。
具体的代码示例请参考我的提问及 dz45693 的文章
九、Mutex
Mutex 同Event,Semaphore类似,可以跨进程同步内容,定义跨进程的Mutex只需要在初始化时为其指定名字即可;都继承自WaitHandle,所以也有waitone的方法可以调用。
使用上与 Monitor 类似,属于互斥锁,所以在任务的最后必须要调用 ReleaseMutex()。
Mutex 实现了IDispose接口,所以需要在finally块内调用 Dispose() 方法。
Mutex可以用来限定winform程序只能有一个实例运行,实例如下:
static void Main() { bool runone; //获取名为 single_test的互斥的初始所有权,runone指定是否成功 Mutex run = new Mutex(true, "single_test", out runone); if (runone) //true代表当前未创建改互斥 { run.ReleaseMutex(); Application.EnableVisualStyles(); Application.SetCompatibleTextRenderingDefault(false); FrmRemote frm = new FrmRemote(); int hdc = frm.Handle.ToInt32(); // write to ... Application.Run(frm); IntPtr a = new IntPtr(hdc); } else { MessageBox.Show("已经运行了一个实例了。"); } }
十、ThreadLocal ,AsyncLocal,Volatile
在定义一个类时,有时会定义全局变量,如果在编写类时未考虑在多线程中使用,那么在类中定义的全局变量很有可能会因为多线程调用引发异常,比如文章开头的引子中,将 num 定义为了类的全局变量,造成多线程调用时出现错误的情况,所以需要减少全局变量的使用。但是,有时候,我们又不得不借助全局变量帮助我们实现需求,这时候就可以考虑上面提到的这几个。
我们可能希望定义的变量对每个线程是唯一的,这时候就可以借助ThreadLocal,如果是使用了async,await的写法,因为在await之后执行线程会发生变化,这时候就可以使用AsyncLocal,只是需要注意一下变量在父子进程间的传递关系是怎么样的。
AsyncLocal变量可以在父子线程中传递,创建子线程时父线程会将自己的AsyncLocal类型的上下文变量赋值到子线程中,但是,当子线程改变线程上下文中AsnycLocal变量值后,父线程不会同步改变。也就是说AsnycLocal变量只会影响他的子线程,不会影响他的父级线程。ThreadLocal只是当前线程的上下文变量,不能在父子线程间同步。
具体可看 AsnycLocal与ThreadLocal
至于Volatile,在 c # 中,对 volatile 字段使用修饰符可保证对该字段的每个访问都是易失性内存操作。我们知道vs在release模式下编译时会对代码进行优化,优化的过程中可能会修改一些内容,volatile修饰的字段则不会被编译器进行优化。除此之外,多线程协作时,希望对某一个变量的修改可以立即反馈到其他线程中,这时候也可以借助 volatile,但 volatile 修饰符不能应用于数组元素。 Volatile.Read和 Volatile.Write 方法可用于数组元素。
volatile 值立马反馈到其他线程是因为处理被标记字段时,处理器不会使用缓存,而是每次都去内存里读取该字段。
至于处理器缓存之类的,如果有兴趣,可以自行了解。
AsyncLocal 和 volatile 我自己并没有实际用过,只是在网上看了一些内容,在官方文档看了一点内容,如有需要建议自行搜索。
十一、有意思的示例
记得之前看过一个例子,两个线程循环输出文字,不记得在哪看的了,试着写一下
public class Sample { //先执行的线程设置为 true ManualResetEvent even = new ManualResetEvent(true); ManualResetEvent odd = new ManualResetEvent(false); public void Sum() { var ta = Task.Run(() => PrintEven(even, odd)); var tb = Task.Run(() => PrintOdd (even,odd)); } //等待自己的信号,控制另一个线程的信号 public void PrintEven(EventWaitHandle evenHandle, EventWaitHandle oddHandle) { string design = "偶数"; for (int i = 0; i <= 20; i++) { evenHandle.WaitOne(); if ((i & 1) == 0) { Console.WriteLine($"{design}:{i}"); evenHandle.Reset(); oddHandle.Set(); } } } public void PrintOdd(EventWaitHandle evenHandle, EventWaitHandle oddHandle) { string design = "奇数"; for (int i = 0; i <= 20; i++) { oddHandle.WaitOne(); if ((i & 1) == 1) { Console.WriteLine($"{design}:{i}"); oddHandle.Reset(); evenHandle.Set(); } } } }
流水账似的写下了任务同步的实现方法,于我自己而言,对文章中提到的内容加深了很多理解,希望对读到文章的你也有帮助。
总结
加载全部内容