多线程之美7一ReentrantReadWriteLock源码分析
夕阳下飞奔的猪 人气:3目录
前言
在多线程环境下,为了保证线程安全, 我们通常会对共享资源加锁操作,我们常用Synchronized关键字或者ReentrantLock 来实现,这两者加锁方式都是排他锁,即同一时刻最多允许一个线程操作,然而大多数场景中对共享资源读多于写,那么存在线程安全问题的是写操作(修改,添加,删除),我们是否应该考虑将读和写两个分开,只要运用合理,并发性能是不是可以提高,吞吐量增大呢? ReentrantReadWriteLock已经为我们实现了这种机制,我们一起来看它是怎样实现的吧!
1、读写锁的一些概念
在查看可重入读写锁的源码前,有几个概念需要先知道,对于后面理解源码很有帮助。
1、ReentrantReadWriteLock 内部 Sync类依然是继承AQS实现的,因此同步状态字段 state,依然表示对锁资源的占用情况。那么如何实现一个 int类型的state 同时来表示读写锁两种状态的占用情况呢? 这里实现非常巧妙,将4个字节的int类型, 32位拆分为2部分,高16位表示读锁的占用情况,低16位表示写锁的占用情况,这样读写锁互不影响,相互独立;也因此读写锁的最大值是2^16-1 = 65535,不能超过16位,下面源码有体现。
state值表示如图所示:
2、读锁是共享锁,只要不超过最大值,可多个线程同时获取; 写锁是排他锁,同一时刻最多允许一个线程获取。
写锁与其他锁都互斥,含写写互斥,写读互斥,读写互斥。
3、state可同时表示读写锁的状态,state的高16位表示获取读锁的线程数,读锁支持可重入,即一个线程也可多次获取读锁,怎么维护每个读锁线程的重入次数的? 每个线程有一个计数器 HoldCounter,用ThreadLocal来存放每个线程的计数器;state的低16位表示写锁的同步状态,因为写锁是排他锁,这里就不能表示获取写锁的线程数了,只能表示写锁的重入次数,获取写锁的线程可多次重复获取写锁(支持重入)。
读锁的计数器的实现原理如下:
可见ThreadLocalHoldCounter继承 ThreadLocal,每个获取读锁的线程是通过其本地变量来存储自己的计数器,来统计获取读锁的重入次数。ThreadLocal原理解析
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
//重写了ThreadLocal的initialValue方法
public HoldCounter initialValue() {
return new HoldCounter();
}
}
4、state的高16位需要记录获取读锁的线程数,每增加一个线程获取读锁,在state的高16执行加1操作,即state+2^16,写锁增加重入次数,直接 state+1即可。
5、锁降级:获取写锁的线程,可以再次获取到读锁,即写锁降级为读锁。
读锁可以升级为写锁吗? 不可以,因为存在线程安全问题,试想获取读锁的线程有多个,其中一个线程升级为写锁,对临界区资源进行操作,比如修改了某个值,对其他已经获取读锁的线程不可见,出现线程安全问题。
代码演示:
1、读写状态
AQS(AbstractQueuedSynchronizer的简称)中同步状态字段 private volatile int state, int类型,4个字节,32位,拆分为高16位表示读状态,低16位表示写状态,如下定义了一些常量,实现获取读写锁的数量。
ReentrantReadWriteLock部分代码如下:
//分隔位数,16位
static final int SHARED_SHIFT = 16;
//读锁加1的数量,1左位移16位, (16)0x10000 = (2)1000000000000000= (10) 65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//读写锁的最大数量, (16)0xFFFFFFFF =(2)1111111111111111 =(10)65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//写锁的掩码,用于计算写锁重入次数时,将state的高16全部置为0, 等于(2)1111111111111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//获取读锁数,表示当前有多少个线程获取到读锁
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//获取写锁重入次数(不等于0表示有线程持有独占锁,大于1,表示写锁有重入)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
分别看一下获取读写锁数量的方法。
获取占用读锁的线程数,代码如下:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
传入的c为 state,state 无符号右移16位,抹去低16位值,左边补0
示例图如下:
获取写锁的值的方法
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
与运算,将高16全部置为0,低16值代表写锁的值,&运算,相同为1,不同为0,得到低16位写锁值。
示例图如下:
2、三个锁概念
- int c =getState() ,获取state的值,代表同步锁状态,该值包含读写两个锁的同步状态
- int w = exclusiveCount(c); w代表写锁的同步状态,通过c获取到写锁的状态值
- int r = sharedCount(c); r 代表读锁的同步状态,通过c获取到读锁的状态值
以下分析三种情况下state,r, w 的值及代表的含义:
- 1、一个线程获取到写锁:
state =1, w =1, r =0
获取写锁加1操作就比较简单了,因为写锁是独占锁,与正常的ReentrantLock获取锁实现一样,占用state的低16位表示,不用看state的高16,左边补16位0。获取写锁一次,直接 c+1;
- 2、一个线程获取到读锁:
state =65536, w= 0, r=1
c初始为0 ,获取读锁,则读锁数量+1,执行 c + SHARED_UNIT, SHARED_UNIT = (2)1000000000000000 = (10)65536,括号内表示进制,SHARED_UNIT是每次读锁加1的数值。
如下图所示: 在获取读锁数量 r时,将state的低16位抹去,r=1,而state此时的值= 2^16 =65536,state的实际值可能会很大,但其实分别拆分读写锁的值不一定大,只是读锁值表示在高位,会造成state值很大。
- 3、一个线程获取到写锁,又获取到读锁情况(锁降级):
state = 65537,w=1, r=1
state二进制表示: 00000000 00000001 00000000 00000001
锁降级代码演示如下:
package readwritelock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author zdd
* 2019/12/30 上午
* Description: 锁降级测试
*/
public class ReadWriteLockTest {
static Integer shareVar = 0;
public static void main(String[] args) {
ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
//1,首先获取写锁
rw.writeLock().lock();
//2.修改共享变量值
shareVar = 10 ;
//3.再获取读锁
rw.readLock().lock();
System.out.println("读取变量值 shareVar:"+ shareVar);
//4.释放写锁
rw.writeLock().unlock();
//5.释放读锁
rw.readLock().unlock();
}
}
2、类结构和构造方法
ReentrantReadWriteLock 类中有ReadLock和WriteLock,分别对应读锁和写锁,而读写锁又分为公平方式和非公平方式获取锁。
简略类图结构如下:
构造方法如下:根据传入参数设置公平或者非公平获取锁方式,默认是非公平方式
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
3、写锁
由于写锁是独占锁,由于写锁是独占锁,获取写锁的方式在AQS中已经说过了,详见AQS源代码分析, 只是每个子类的尝试获取锁方式不同,所以ReentrantReadWriteLock类获取写锁过程就看一下尝试获取锁方法的源码。
3.1、尝试获取锁
tryAcquire(int acquires),获取锁失败则加入同步队列中等待获取锁,源代码如下:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//1,获取同步状态state的值,注意该值可表示读写锁的同步状态
int c = getState();
//2,获取写锁状态,低16位的值
int w = exclusiveCount(c);
//3,如果同步锁状态不为0,有线程已经获取到了锁
if (c != 0) {
//4,w==0则表示写锁为0,那么一定有线程获取了读锁,需要等待,读写互斥
//current != getExclusiveOwnerThread() 当前线程不等于已经获取到写锁的线程,则也需等待其释放,写写互斥
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//5,此时再次获取锁,判断锁重入次数是否超过最大限定次数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//更新写锁重入次数
setState(c + acquires);
return true;
}
//6,代码执行这,一定是c==0,同步锁空闲情况
//writerShouldBlock该方法是基于公平锁和非公平锁2种方式的体现
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//获取到锁,设置独占锁为当前写锁线程
setExclusiveOwnerThread(current);
return true;
}
写锁是否应该阻塞等待
- 1、 非公平锁方式
final boolean writerShouldBlock() {
//直接返回false
return false; // writers can always barge
}
- 2、公平锁方式
需要判断同步队列中是否还有其他线程在挂起等待,如存在应该按照入队顺序获取锁
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
public final boolean hasQueuedPredecessors() {
//1.获取同步队列头,尾节点
Node t = tail;
Node h = head;
Node s;
// h !=t 同步队列不为空
// 队列中还有其他线程在等待锁,则返回true
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
3.2、释放写锁
unlock方法释放锁
public void unlock() {
sync.release(1);
}
可见,调用内部类Sync的release方法,Sync继承AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {
//1,释放锁成功
Node h = head;
if (h != null && h.waitStatus != 0)
//2.唤醒同步队列中等待线程
unparkSuccessor(h);
return true;
}
return false;
}
核心在尝试释放锁方法上,看看写锁的释放锁方法tryRelease
protected final boolean tryRelease(int releases) {
//1,判断当前线程是否持有当前锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2,同步状态 - 需要释放的写锁同步值
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
//3,free ==true,完全释放写锁,将当前获取独占锁线程置空
setExclusiveOwnerThread(null);
//4,更新state值
setState(nextc);
return free;
}
注: 在释放写锁占用次数时, state的高16的读锁有值也不影响,减去releases,首先减去的state低位的数,而且在释放写锁时,state的低16位的值一定>=1,不存在减少读锁的值情况。
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
也可改写为如下面代码
//1,获取state值
int c = getState();
//2,获取写锁的值
int w= exclusiveCount(c);
int remain = w- releases;
boolean free = remain== 0;
4、读锁
4.1、获取读锁
读锁调用lock方法加锁,实际调用Sync的acquireShared方法
public void lock() {
sync.acquireShared(1);
}
走进acquireShared,获取共享锁方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
尝试获取锁tryAcquireShared,如果返回值<0, 表示获取读锁失败
主要执行步骤:
1、首先判断是否存在其他线程在占用写锁,有需要挂起等待;
2、在不用阻塞等待,且读锁值没有超过最大值,cas更新成功了state的值,可以获取到读锁,还会做以下事:
a. 第一个获取读锁的,直接记录线程对象和其重入获取读锁的次数
b. 非第一个获取读锁的,则获取缓存计数器(cachedHoldCounter),其记录上一次获取读锁的线程,如果是同一个线程,则直接更新其计数器的重入次数,如果缓存计数器为空或缓存计数器的线程不是当前获取读锁的线程,则从当前线程本地变量中获取自己的计数器,更新计数器的值
protected final int tryAcquireShared(int unused) {
//1,获取当前线程对象
Thread current = Thread.currentThread();
//2,获取同步锁的值
int c = getState();
/*3,exclusiveCount(c) != 0 计算写锁的同步状态,不等于0,说明有写锁已经获取到同步锁,
*需要判断当前线程是否等于获取写锁线程,
*是,可以允许再次获取读锁,这里涉及到锁降级问题,写锁可以降为读锁
*否则不让获取,写读互斥
*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//4,获取读锁同步状态
int r = sharedCount(c);
/**
*此处3个判断条件
* 1.是否应该阻塞等待,这里也是基于公平锁和非公平获取锁实现
* 2.读锁同步状态值是超过最大值,即限制获取读锁的最大线程数
* 3.cas更新读锁同步状态是否成功
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//可以获取到读锁
//r==0表示是第一个获取读锁的线程
if (r == 0) {
firstReader = current;
//记录第一个线程读锁的重入次数
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//是第一个获取读锁线程,锁重入,锁重入次数+1
firstReaderHoldCount++;
} else {
// 已有其他线程获取到读锁
/*
*1,获取缓存记录的计数器,计数器是用来统计每一个获取读锁线程的重入次数的,
*由每个线程的ThreadLocal,即线程内的副本存储,相互独立;
*此处也不是放入缓存,在有多个线程同时获取读锁情况,
*用一个变量记录上一个获取读锁的线程的计数器,可能考虑多次获取读锁线程大概率是同一个线程情况,
*这样做是可提高执行效率
*/
HoldCounter rh = cachedHoldCounter;
// rh==null,第一个获取读锁,rh没有值
// 或者计数器存储的上一次线程的id与当前线程不等, 即不是相同一个线程,
//那么就获取当前线程内部的计数器,并赋值给cachedHoldCounter变量,这样可以让下一次获取读锁线程获取比较了
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
/*进入该条件,我理解是在线程获取读锁再释放后,同一线程再次获取读锁情况,
* 缓存计数器会记录上一个线程计数器,因为线程释放读锁后,count=0,
* 这里重新将计数器放入线程内部中,
* 因为线程在使用完线程内部变量后会防止内存泄漏,会执行remove,释放本地存储的计数器。
*/
readHolds.set(rh);
//计数器+1
rh.count++;
}
return 1;
}
//上面3个条件没有同时满足,没有成功获取到读锁,开始无限循环尝试去获取读锁
return fullTryAcquireShared(current);
}
无限循环尝试获取共享锁 fullTryAcquireShared方法
主要执行步骤:
1、 如果有其他线程获取到了写锁,写读互斥,应该去挂起等待;
2、如果可以获取读锁,判断是否应该阻塞等待,在公平获取锁方式中,同步队列中有其他线程在等待,则应该去排队按照FIFO顺序获取锁,非公平获取锁方式,可以直接去竞争获取锁。
3、可以获取锁,则尝试cas更新state的值,更新成功,获取到锁。
final int fullTryAcquireShared(Thread current){
HoldCounter rh = null;
//无限循环
for (;;) {
//获取同步锁状态
int c = getState();
//判断写锁值不为0,且不是当前线程,不可获取读锁
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
//没有线程获取到写锁情况,公平获取锁情况,
//同步队列中有其他线程等待锁,该方法主要是在需要排队等待,计数器重入次数==0情况,清除计数器
if (firstReader == current) {
//此处firstReader !=null, 则第1个获取读锁的线程还没释放锁,可允许该线程继续重入获取锁
//计数器count一定>0
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
//清除计数器
readHolds.remove();
}
}
// 为什么rh.count == 0就不让线程获取到锁了,基于公平获取锁方式,去同步队列中等待
if (rh.count == 0)
return -1;
}
}
//获取读锁线程超过最大限制值 65535
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// cas执行读锁值+1
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
//1,第一个获取读锁
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//2,第一个获取读锁重入
firstReaderHoldCount++;
} else {
//3,非第一个线程获取读锁,存在多个线程获取读锁
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
//缓存计数器变量记录此次获取读锁线程的计数器
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
tryAcquireShared 返回< 0, 获取锁失败,执行 doAcquireShared
在获取读锁失败后,执行以下步骤:
1、将节点加入同步队列中
2、如果前置节点是头节点,将再次尝试获取锁,如果成功,设置当前节点为head节点,并根据tryAcquireShared方法的返回值r判断是否需要继续唤醒后继节点,如果 r大于0,需要继续唤醒后继节点,r=0不需要唤醒后继节点。
3、如果前置节点不是头节点,则在队列中找到安全位置,设置前置节点 ws=SIGNAL, 挂起等待。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//如果前继节点是头节点,再次尝试获取共享锁
int r = tryAcquireShared(arg);
//r>=0,表示获取到锁,
//r=0,表示不需要唤醒后继节点
//r>0,需要继续唤醒后继节点
if (r >= 0) {
//该方法实现2个步骤
//1,设置当前节点为头节点
//2,r>0情况会继续唤醒后继节点
setHeadAndPropagate(node, r);
//旧的头节点移出队列
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate 该方法是与独占锁获取锁的区别之处,获取到锁后,设置为头结点还需要继续传播下去。
private void setHeadAndPropagate(Node node, int propagate) {
//记录是的旧的头节点
Node h = head; // Record old head for check
//设置当前获取到锁节点为头节点
setHead(node);
//propagate >0,表示还需要继续唤醒后继节点
//旧的头节点和新头节点为空,或者ws<0,满足条件之一,尝试去唤醒后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//后继节点为空或者是共享节点(获取读锁的线程)
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared 方法较难理解,在释放锁中也有调用,留着后面一起分析。
4.2、释放读锁
public void unlock() {
sync.releaseShared(1);
}
AQS中释放共锁方法releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
看一下读写锁具体实现tryReleaseShared 的方法
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//1,更新或者移出线程内部计数器的值
if (firstReader == current) {
//当前线程是第一个获取读锁的线程
if (firstReaderHoldCount == 1)
//直接置空
firstReader = null;
else
//该线程获取读锁重入多次,计数器-1
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
//非第一个获取读锁线程,避免ThreadLocal内存泄漏,移出计数器
readHolds.remove();
if (count <= 0)
//此处是调用释放锁次数比获取锁次数还多情况,直接抛异常
throw unmatchedUnlockException();
}
--rh.count;
}
//2,循环cas更新同步锁的值
for (;;) {
int c = getState();
//读锁同步状态-1
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
//返回完全释放读锁,读锁值是否==0,完全释放,等待写锁线程可获取
return nextc == 0;
}
}
tryReleaseShared 返回true情况,表示完全释放读锁,执行doReleaseShared,那就需要唤醒同步队列中等待的其他线程
在读写锁中存在几种情况
情况一、如果当前获取锁的线程占用的是写锁,则后来无论是获取读锁还写锁的线程都会被阻塞在同步队列中,
同步队列是FIFO队列,在占用写锁的释放后,node1获取读锁,因读锁是共享的,继续唤醒后一个共享节点。
如上图,在node1获取到读锁时,会调用doReleaseShared方法,继续唤醒下一个共享节点node2,可以持续将唤醒动作传递下去,如果node2后面还存在几个等待获取读锁的线程,这些线程是由谁唤醒的?是其前置节点,还是第一个获取读锁的节点? 应该是第1个获取锁的节点,这里即node1, 由下代码可见,在无限循环中,只有头节点没有变化时,即再没其他节点获取到锁后,才会跳出循环。
private void doReleaseShared() {
for (;;) {
//获取同步队列中头节点
Node h = head;
//同步队列中节点不为空,且节点数至少2个
if (h != null && h != tail) {
int ws = h.waitStatus;
//1,表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继节点
unparkSuccessor(h);
}
//2,后继节点暂时不需要唤醒,设置节点 ws = -3, 确保后面可以继续传递下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果头节点发生变化,表示已经有其他线程获取到锁了,需要重新循环,确保可以将唤醒动作传递下去。
if (h == head) // loop if head changed
break;
}
}
5、思考
1、在非公平获取锁方式下,是否存在等待获取写锁的线程始终获取不到锁,每次都被后来获取读锁的线程抢先,造成饥饿现象?
存在这种情况,从获取读锁源码中看出,如果第一个线程获取到读锁正在执行情况下,第二个等待获取写锁的线程在同步队列中挂起等待,在第一个线程没有释放读锁情况下,又陆续来了线程获取读锁,因为读锁是共享的,线程都可以获取到读锁,始终是在读锁没有释放完毕加入获取读锁的线程,那么等待获取写锁的线程是始终拿不到写锁,导致饥饿。为什么默认还是非公平模式?因为减少线程的上下文切换,保证更大的吞吐量。
6、总结
1、读写锁可支持公平和非公平两种方式获取锁。
2、支持锁降级,写锁可降级为读锁,但读锁不可升级为写锁。
3、大多数场景是读多于写的,所以ReentrantReadWriteLock 比 ReentrantLock(排他锁)有更好的并发性能和吞吐量。
4、读写锁中读锁和写锁都支持锁重入。
5、在获取Condition对象实现阻塞唤醒机制,ReentrantReadWriteLock.WriteLock 重写了 newCondition方法,ReadLock不支持,即读锁不支持与Condition配合使用,使用阻塞唤醒机制。
加载全部内容