ReentrantReadWriteLock
心城以北 人气:0一、读写锁简介
现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁(读多写少)。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源(读读可以并发);但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了(读写,写读,写写互斥)。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。
针对这种场景,JAVA的并发包提供了读写锁 ReentrantReadWriteLock,它内部,维护了一对相关的锁,一个用于只读操作,称为读锁;一个用于写入操作,称为写锁,描述如下:线程进入读锁的前提条件:
- 没有其他线程的写锁
- 没有写请求或者有写请求,但调用线程和持有锁的线程是同一个。
线程进入写锁的前提条件:
- 没有其他线程的读锁
- 没有其他线程的写锁
而读写锁有以下三个重要的特性:
- 公平选择性:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平。
- 可重入:读锁和写锁都支持线程重入。以读写线程为例:读线程获取读锁后,能够再次获取读锁。写线程在获取写锁之后能够再次获取写锁,同时也可以获取读锁。
- 锁降级:遵循获取写锁、再获取读锁最后释放写锁的次序,写锁能够降级成为读锁。
看了上面的描述大家可能有点晕,我就举一个之前开发订单的例子,辅助大家理解。 我们的订单有一个主单和子单的概念:主单编码为 orderCode
, 子单编码为 subOrderCode
对应关系是 1:N。 我在退款的时候,需要支持子单,主单退款。 子单退款,的维度是 subOrderCode
主单退款,的维度是 orderCode
可能出现并发的情况,我们可以对 orderCode
加一把读写锁
- 如果是主单退款的情况,是不是子单退款就是互斥的
- 如果是子单退款的情况,其实就可以并行的,但是子单是
subOrderCode
维度,还需要加一个subOrderCode
的互斥锁。
二、读写锁使用
如何同时存储读写锁,可以通过 state 的值进行存储,高 16 位表示读锁,低 16 位表示写锁。 比如: 0000 0000 0000 0000 (1<<16) 0000 0000 0000 0000 高 16 位不为0: 有读锁 c >>>16 低 16 位不为0: 有写锁 5
ReadWriteLock 接口
我们可以看到 ReentranReadWriteLock 有两把锁,一把读锁,一把写锁。
使用例子
缓存操作:
public class ReentrantReadWriteLockCacheTest { static Map<String, Object> map = new HashMap<String, Object>(); static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock r = rwl.readLock(); static Lock w = rwl.writeLock(); // 获取一个key对应的value public static final Object get(String key) { r.lock(); try { return map.get(key); } finally { r.unlock(); } } // 设置key对应的value,并返回旧的value public static final Object put(String key, Object value) { w.lock(); try { return map.put(key, value); } finally { w.unlock(); } } // 清空所有的内容 public static final void clear() { w.lock(); try { map.clear(); } finally { w.unlock(); } } }
上述示例中,Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的 读锁和写锁来保证Cache是线程安全的。在读操作get(String key)方法中,需要获取读锁,这 使得并发访问该方法时不会被阻塞。写操作put(String key,Object value)方法和clear()方法, 在更新 HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被 阻塞,而 只有写锁被释放之后,其他读写操作才能继续。Cache使用读写锁提升读操作的并发 性,也保证每次写操作对所有的读写操作的可见性,同时简化了编程方式。
三、锁的降级
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。锁降级可以帮助我们拿到当前线程修改后的结果而不被其他线程所破坏,防止更新丢失。
锁降级的使用示例
因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作。
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); private final Lock readLock = rwl.readLock(); private final Lock writeLock = rwl.writeLock(); private volatile boolean update = false; public void processData() { readLock.lock(); if (!update) { // 必须先释放读锁 readLock.unlock(); // 锁降级从写锁获取到开始 writeLock.lock(); try { if (!update) { // TODO 准备数据的流程(略) update = true; } readLock.lock(); } finally { writeLock.unlock(); } // 锁降级完成,写锁降级为读锁 } try { //TODO 使用数据的流程(略) } finally { readLock.unlock(); } }
注意事项:
- 读锁不支持条件变量
- 重入时不升级不支持:持有读锁的情况下,去获取写锁,会导致永久等待
- 重入时支持降级:持有写锁的情况下可以去获取读锁
四、ReentranReadWriteLock 结构
方法结构设计
读写状态设计
五、源码分析
写锁的加锁
方法 tryAcquire
是写锁的加锁核心逻辑
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); // 获取写锁状态 int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire // 重入 setState(c + acquires); return true; } // 获取写锁 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // 设置写锁 owner setExclusiveOwnerThread(current); return true; }
写锁的释放
protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; }
读锁的获取
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 首次获取读锁 if (r == 0) { firstReader = current; // 第一个线程重入 firstReaderHoldCount = 1; } else if (firstReader == current) { // 重入 firstReaderHoldCount++; } else { // 后续线程,通过 ThreadLocal 获取重入次数 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
fullTryAcquireShared
方法如下:
final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = null; for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; // else we hold the exclusive lock; blocking here // would cause deadlock. } else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
读锁的释放
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
加载全部内容