AQS系列(三)- ReentrantReadWriteLock读写锁的加锁
张曾经 人气:5前言
前两篇我们讲述了ReentrantLock的加锁释放锁过程,相对而言比较简单,本篇进入深水区,看看ReentrantReadWriteLock-读写锁的加锁过程是如何实现的,继续拜读老Lea凌厉的代码风。
一、读写锁的类图
读锁就是共享锁,而写锁是独占锁。读锁与写锁之间的互斥关系为:读读可同时执行(有条件的);读写与写写均互斥执行。注意此处读读可并行我用了有条件的并行,后文会对此做介绍。
继续奉上一张丑陋的类图:
可以看到ReentrantReadWriteLock维护了五个内部类,ReentrantReadWriteLock中存放了Sync、ReadLock、WriteLock三个成员变量,如下截图所示:
而ReadLock和WriteLock中又存放了Sync变量,截图如下所示,这样一组合,有了四种锁,公平读锁、公平写锁、非公平读锁、非公平写锁。对于公平与非公平的实现区别,我们上一篇已经做过讲解,本文将着重关注读锁和写锁的实现区别。
二、加锁源码
在前文中我们知道,ReentrantLock中用state来判断当前锁是否被占用,而读写锁ReentrantReadWriteLock中由于同时存在两种锁,所以老Lea用state的高16位来存放读锁的占用状态以及重入次数,低16位存放写锁的占用状态和重入次数。
1、读锁加锁,即共享锁加锁
1 public void lock() { 2 sync.acquireShared(1); // 获取共享锁方法 3 }
上述lock方法中调用的获取共享锁方法是在AbstractQueuedSynchronizer中实现的,代码如下:
1 public final void acquireShared(int arg) { 2 if (tryAcquireShared(arg) < 0) 3 doAcquireShared(arg); 4 }
可以看到获取共享锁分成了两步,第一步是尝试获取,如果获取不到再进入if里面执行doAcquireShared方法,下面分别追踪。
1)、tryAcquireShared方法
1 protected final int tryAcquireShared(int unused) { 2 Thread current = Thread.currentThread(); 3 int c = getState(); 4 // 1.有写锁占用并且不是当前线程,则直接返回获取失败 5 if (exclusiveCount(c) != 0 && 6 getExclusiveOwnerThread() != current) 7 return -1; 8 // 执行到这里,有两种情况 没有写锁占用或者是当前线程 9 int r = sharedCount(c); // 获取读锁次数 10 // 2、不应该阻塞则获取锁 @此方法有点意思,需着重讲解,作用:判断读锁是否需要阻塞 11 if (!readerShouldBlock() && 12 r < MAX_COUNT && 13 compareAndSetState(c, c + SHARED_UNIT)) { 14 // 如果CAS成功,则将当前线程对应的计数+1 15 if (r == 0) { // 如果读锁持有数为0,则说明当前线程是第一个reader,分别给firstReader和firstReaderHoldCount初始化 16 firstReader = current; 17 firstReaderHoldCount = 1; 18 } else if (firstReader == current) { // 如果读锁持有数不为0且当前线程就是firstReader,那么直接给firstReaderHoldCount+1,表示读锁重入 19 firstReaderHoldCount++; 20 } else { // 其他情况,即当前线程不是firstReader且还有其他线程持有读锁,则要获取到当前线程对应的HoldCounter,然后给里面的计数+1 21 HoldCounter rh = cachedHoldCounter; 22 if (rh == null || rh.tid != getThreadId(current)) 23 cachedHoldCounter = rh = readHolds.get(); 24 else if (rh.count == 0) 25 readHolds.set(rh); 26 rh.count++; 27 } 28 return 1; 29 } 30 // 3、应该阻塞或者CAS失败则进入此方法获取锁 31 return fullTryAcquireShared(current); 32 }
结合上述代码中的注释,将逻辑分三部分,我们一步步分析此方法的逻辑。
首先第一步,判断如果有写锁并且当前线程不是写锁的线程,则直接退出获取读锁的尝试,因为读写是互斥的,退出此方法后就会进入doAcquireShared方法,后续逻辑见下面的2)。但此处还是要看一下写锁状态统计方法exclusiveCount和读锁状态统计方法sharedCount,方法源码如下截图所示:
可以看到,exclusiveCount方法是将c和独占掩码进行与操作,独占掩码EXCLUSIVE_MASK高16位均为0,低16位均为1,按位与计算之后就剩下c的低16位,这就是第二部分一开始说的低16位存放写锁重入次数;同理看sharedCount方法,将c有符号右移16位,这样移位之后低16位就是原来的高16位,即读锁的加锁次数。老Lea通过这两个方法实现了用一个int类型的state存放写锁读锁两个加锁次数的结果,是不是看起来就很高端!
然后看第二步,判断读不应该阻塞(即readerShouldBlock方法返回false)且读锁持有次数小于最大值且CAS成功,则进入方法中尝试获取读锁。先看看重点方法readerShouldBlock什么时候会返回false(不阻塞)什么时候返回true(阻塞)。此方法在非公平模式和公平模式中有不同的实现,公平模式代码:
1 final boolean readerShouldBlock() { 2 return hasQueuedPredecessors(); 3 }
看到了一个熟悉的身影,hashQueuedPredecessors方法,这不就是在ReentrantLock中公平锁加锁时的方法么?详细可看我的AQS系列(一)中的讲解,总结一下就是该方法判断队列前面是否有在排队的非当前线程,意思就是按排队顺序获取锁,不要争抢。
非公平模式代码:
1 final boolean readerShouldBlock() { 2 return apparentlyFirstQueuedIsExclusive(); 3 }
1 final boolean apparentlyFirstQueuedIsExclusive() { 2 Node h, s; 3 return (h = head) != null && 4 (s = h.next) != null && 5 !s.isShared() && 6 s.thread != null; 7 }
在后面的方法中,返回了一个四个条件组成的布尔值,逻辑为头节点不为空并且头节点后的第一个节点不为空并且这个节点是独占的并且线程不为空,此时返回true即当前这个读操作应该阻塞,不让它获取到锁。那么问题来了,为什么要有这个逻辑?此处是为了避免一种异常情况的发生,如果后面有一个排队的写锁在等待获取锁,而这时有一个读锁正在执行中,若在读锁执行完之前又来了一个读锁,因为读锁与读锁不阻塞所以后来的的读锁又获取到了锁,这时在队列第一个位置排队的写锁仍然在傻傻的等着,没办法,谁让你没关系。就这样,如果一直有读锁在当前正在执行的读锁执行完之前进来获取读锁,那么后面的写锁就会一直傻等在那,永远都没法获取锁。所以Lea就设计了这个方法来避免这种情况的发生,即如果判断队列第一位排队的是写锁,那么后面的读锁就先等一等,等这个写锁执行完了你们再执行。这也就是我在文章的开始讲的-读读同时执行是有条件的,这个条件就是指这里。
看第二步之前要先说说读锁的处理逻辑,因为是可重入的读锁,所以需要记录每个获取读锁线程的重入次数,即每个读的线程都有一个与其对应的重入次数。然后继续看第二步中读锁获取锁成功(即CAS成功)之后的逻辑:如果读锁持有数为0,则说明当前线程是第一个reader,分别给firstReader和firstReaderHoldCount初始化;如果读锁持有数不为0且当前线程就是firstReader,那么直接给firstReaderHoldCount+1,表示读锁重入;否则,即当前线程不是firstReader且还有其他线程持有读锁,则要获取到当前线程对应的HoldCounter,然后给里面的计数+1。
下面再一起看看【否则】中的逻辑,粘贴一下Sync中的部分代码
1 abstract static class Sync extends AbstractQueuedSynchronizer { 2 // ... 3 static final class HoldCounter { 4 int count = 0; 5 // Use id, not reference, to avoid garbage retention 6 final long tid = getThreadId(Thread.currentThread()); 7 } 8 9 static final class ThreadLocalHoldCounter 10 extends ThreadLocal<HoldCounter> { 11 public HoldCounter initialValue() { 12 return new HoldCounter(); 13 } 14 } 15 16 private transient ThreadLocalHoldCounter readHolds; 17 18 private transient HoldCounter cachedHoldCounter; 19 20 private transient Thread firstReader = null; 21 private transient int firstReaderHoldCount; 22 23 Sync() { 24 readHolds = new ThreadLocalHoldCounter(); 25 setState(getState()); // ensures visibility of readHolds 26 } 27 // ... 28 }
可以看到,Sync中缓存了一个HoldCounter,存放的是最近一次读锁记录。而如果当前线程不是最近一次记录的HoldCounter,则去readHolds中取,readHolds是ThreadLocalHoldCounter类型,在Sync的无参构造器中初始化,它与HoldCounter都是Sync的内部类,ThreadLocalHoldCounter就是一个ThreadLocal,内部维护了一个线程与HoldCounter的键值对map,一个线程对应一个HoldCounter。所以【否则】中的逻辑加注释如下所示:
1 HoldCounter rh = cachedHoldCounter; // 获取最近一次记录的HoldCounter,此缓存是为了提高效率,不用每次都去ThreadLocal中取 2 if (rh == null || rh.tid != getThreadId(current)) // 判断当前线程是不是最近一次记录的HoldCounter 3 cachedHoldCounter = rh = readHolds.get(); // 如果不是,则去Sync中的ThreadLocal中获取,然后再放在缓存中 4 else if (rh.count == 0) // 如果count计数为0,说明是第一次重入,则将HoldCounter加入ThreadLocal中 5 readHolds.set(rh); 6 rh.count++; // 当前线程重入次数+1
下面进入第三步,fullTryAcquireShared方法,进入此方法的前提条件是没有写锁且 (读应该阻塞或者读锁CAS失败)。看这个full方法的逻辑:
1 final int fullTryAcquireShared(Thread current) { 2 3 HoldCounter rh = null; 4 for (;;) { // 无限循环直到有确定的结果返回 5 int c = getState(); 6 if (exclusiveCount(c) != 0) { // 1、有独占锁且不是当前线程,直接返回读锁加锁失败 7 if (getExclusiveOwnerThread() != current) 8 return -1; 9 // else we hold the exclusive lock; blocking here 10 // would cause deadlock. 11 } else if (readerShouldBlock()) { // 2、判断读是否应该阻塞 12 // Make sure we're not acquiring read lock reentrantly 13 if (firstReader == current) { // 判断如果当前线程就是firstReader,那么什么都不做,进入3中尝试获取锁,why? 因为这说明当前线程之前就持有了锁还没释放,所以可以继续获取 14 // assert firstReaderHoldCount > 0; 15 } else { // 2.5 此处逻辑需要仔细研读,乍看时看的一头雾水 16 if (rh == null) { // 第一次进来时rh肯定==null 17 rh = cachedHoldCounter; 18 if (rh == null || rh.tid != getThreadId(current)) { 19 rh = readHolds.get(); 20 if (rh.count == 0) // 如果当前线程没获取到过读锁,则从本地线程变量中移除HoldCounter,因为下一步就要判定它获取锁失败先不让它获取了 21 readHolds.remove(); 22 } 23 }// 能走到这里,说明当前读锁应该阻塞且不是firstReader 24 if (rh.count == 0) // 再加上当前线程没获取到过读锁,则先不让它尝试获取锁了,直接返回获取失败 25 return -1; 26 } 27 } 28 if (sharedCount(c) == MAX_COUNT) 29 throw new Error("Maximum lock count exceeded"); 30 // 3、再次尝试获取锁 31 if (compareAndSetState(c, c + SHARED_UNIT)) { 32 if (sharedCount(c) == 0) { 33 firstReader = current; 34 firstReaderHoldCount = 1; 35 } else if (firstReader == current) { 36 firstReaderHoldCount++; 37 } else { 38 if (rh == null) 39 rh = cachedHoldCounter; 40 if (rh == null || rh.tid != getThreadId(current)) 41 rh = readHolds.get(); 42 else if (rh.count == 0) 43 readHolds.set(rh); 44 rh.count++; 45 cachedHoldCounter = rh; // cache for release 46 } 47 return 1; 48 } 49 } 50 }
详细看看注解以及源代码注释、代码逻辑,相信能理解这个过程。
2)、doAcquireShared方法
1 private void doAcquireShared(int arg) { 2 // 将当前读锁加到队列后面 3 final Node node = addWaiter(Node.SHARED); 4 boolean failed = true; 5 try { 6 boolean interrupted = false; 7 for (;;) { 8 // 得到前一个节点 9 final Node p = node.predecessor(); 10 if (p == head) { // 如果前一个节点是头节点,则尝试获取锁 11 int r = tryAcquireShared(arg); 12 if (r >= 0) { // 设置头节点并且激活后续的节点 13 setHeadAndPropagate(node, r); 14 p.next = null; // help GC 15 if (interrupted) 16 selfInterrupt(); 17 failed = false; 18 return; 19 } 20 }// 判断应该挂起则挂起线程 21 if (shouldParkAfterFailedAcquire(p, node) && 22 parkAndCheckInterrupt()) 23 interrupted = true; 24 } 25 } finally { 26 if (failed) 27 cancelAcquire(node); 28 } 29 }
该方法跟之前系列中ReentrantLock的加锁过程类似,在此就不做过多的解释了,总之还是通过park来挂起。
2、写锁加锁,即独占锁加锁
进入lock方法:
1 public void lock() { 2 sync.acquire(1); 3 }
熟悉的样子,继续 点进去:
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
还是原先的方法,但是各个方法的实现有区别了。先看第一个tryAcquire:
1 protected final boolean tryAcquire(int acquires) { 2 Thread current = Thread.currentThread(); 3 int c = getState(); 4 int w = exclusiveCount(c); 5 if (c != 0) { // 如果排它锁存在,则判断是不是当前线程,如果也不是当前线程,则直接返回获取失败 6 // (Note: if c != 0 and w == 0 then shared count != 0) 7 if (w == 0 || current != getExclusiveOwnerThread()) 8 return false; 9 if (w + exclusiveCount(acquires) > MAX_COUNT) 10 throw new Error("Maximum lock count exceeded"); 11 // Reentrant acquire 12 setState(c + acquires); 13 return true; 14 } // 判断读锁要不要阻塞,此处针对公平锁和非公平锁有不同的实现,对于非公平锁统一返回false表示不要阻塞,而公平锁则会查看前面还有没有锁来判断要不要阻塞 15 if (writerShouldBlock() || 16 !compareAndSetState(c, c + acquires)) 17 return false; 18 setExclusiveOwnerThread(current); 19 return true; 20 }
然后是addWaiter在队列末尾添加node节点排队,这个方法在AbstractQueuedSynchronizer中,同样是熟悉的方法了,此处略过不提。
最后是acquireQueued方法,如下所示,又是熟悉的代码,跟ReentrantLock中的加锁方法一毛一样,唯一的不同点是第7行调用的tryAcquire方法的实现,此处调的是ReentrantReadWriteLock类中Sync的方法,也就是上面的第一个方法。
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 final Node p = node.predecessor(); 7 if (p == head && tryAcquire(arg)) { 8 setHead(node); 9 p.next = null; // help GC 10 failed = false; 11 return interrupted; 12 } 13 if (shouldParkAfterFailedAcquire(p, node) && 14 parkAndCheckInterrupt()) 15 interrupted = true; 16 } 17 } finally { 18 if (failed) 19 cancelAcquire(node); 20 } 21 }
写锁的加锁过程基本就这些了,相对来说比读锁加锁容易了很多,因为大多都跟ReentrantLock中的实现相仿。
后记
读写锁的加锁过程到此为止,最近每晚下班回来读一会,断断续续的四晚上才搞定,难受 ><
加载全部内容