Java ReentrantLock
会飞的汤姆猫 人气:0ReentrantLock 原理
概念
基于AQS实现的可重入锁实现类。
核心变量和构造器
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; public ReentrantLock() { // 默认为非公平锁。为何默认为非公平锁?因为通过大量测试下来,发现非公平锁的性能优于公平锁 sync = new NonfairSync(); } public ReentrantLock(boolean fair) { // 由fair变量来表明选择锁类型 sync = fair ? new FairSync() : new NonfairSync(); } abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); // 非公平锁标准获取锁方法 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 当执行到这里时,正好获取所得线程释放了锁,那么可以尝试抢锁 if (c == 0) { // 继续抢锁,不看有没有线程排队 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 当前线程就是持有锁的线程,表明锁重入 else if (current == getExclusiveOwnerThread()) { // 利用state整形变量进行次数记录 int nextc = c + acquires; // 如果超过了int表示范围,表明符号溢出,所以抛出异常0111 1111 + 1 = 1000 0000 if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 返回false 表明需要AQS来将当前线程放入阻塞队列,然后进行阻塞操作等待唤醒获取锁 return false; } // 公平锁和非公平锁公用方法,因为在释放锁的时候,并不区分是否公平 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 如果当前线程不是上锁的那个线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 不是重入锁,那么当前线程一定是释放锁了,然后我们把当前AQS用于保存当前锁对象的变量ExclusiveOwnerThread设置为null,表明释放锁成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 注意:此时state全局变量没有改变,也就意味着在setState之前,没有别的线程能够获取锁,这时保证了以上的操作原子性 setState(c); // 告诉AQS,我当前释放锁成功了,你可以去唤醒正在等待锁的线程了 return free; } protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } } static final class NonfairSync extends Sync { // 由ReentrantLock调用获取锁 final void lock() { // 非公平锁,直接抢锁,不管有没有线程排队 if (compareAndSetState(0, 1)) // 上锁成功,那么标识当前线程为获取锁的线程 setExclusiveOwnerThread(Thread.currentThread()); else // 抢锁失败,进入AQS的标准获取锁流程 acquire(1); } protected final boolean tryAcquire(int acquires) { // 使用父类提供的获取非公平锁的方法来获取锁 return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { // 由ReentrantLock调用 final void lock() { // 没有尝试抢锁,直接进入AQS标准获取锁流程 acquire(1); } // AQS调用,子类自己实现获取锁的流程 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 此时有可能正好获取锁的线程释放了锁,也有可能本身就没有线程获取锁 if (c == 0) { // 注意:这里和非公平锁的区别在于:hasQueuedPredecessors看看队列中是否有线程正在排队,没有的话再通过CAS抢锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 抢锁成功 setExclusiveOwnerThread(current); return true; } } // 当前线程就是获取锁的线程,那么这里是锁重入,和非公平锁操作一模一样 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 返回false 表明需要AQS来将当前线程放入阻塞队列,然后进行阻塞操作等待唤醒获取锁 return false; } } }
核心方法
获取锁操作:
public void lock() { // 直接通过sync同步器上锁 sync.lock(); }
释放锁操作:
public void unlock() { sync.release(1); }
ReentrantReadWriteLock 原理
用例
将原来的锁,分割为两把锁:读锁、写锁。适用于读多写少的场景,读锁可以并发,写锁与其他锁互斥。写写互斥、写读互斥、读读兼容。
public class ThreadDemo { static volatile int a; public static void readA() { System.out.println(a); } public static void writeA() { a++; } public static void main(String[] args) { ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); Thread readThread1 = new Thread(() -> { readLock.lock(); try { readA(); } finally { readLock.unlock(); } }); Thread readThread2 = new Thread(() -> { readLock.lock(); try { readA(); } finally { readLock.unlock(); } }); Thread writeThread = new Thread(() -> { writeLock.lock(); try { writeA(); } finally { writeLock.unlock(); } }); readThread1.start(); readThread2.start(); writeThread.start(); } }
核心变量和构造器
该接口用于获取读锁和写锁对象
public interface ReadWriteLock { // 用于获取读锁 Lock readLock(); // 用于获取写锁 Lock writeLock(); }
readerLock和writerLock变量用于支撑以上描述的ReadWriteLock接口的读锁和写锁方法。通过构造方法得知,读写锁对象的创建和用例均依赖于公平锁或者非公平锁同步器。
public class ReentrantReadWriteLock implements ReadWriteLock { // 读锁对象 private final ReentrantReadWriteLock.ReadLock readerLock; // 写锁对象 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步器 final Sync sync; // 默认构造器,创建了非公平锁 public ReentrantReadWriteLock() { this(false); } // 根据fair变量,来选择创建不同的锁:公平锁 FairSync 和非公平锁 NonfairSync public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); // 用同步器来创建读写锁对象 readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } }
Sync类
核心变量和构造器
我们说读锁可以多个线程同时持有,而写锁只允许一个线程持有,此时我们称 读锁-----共享锁 写锁------互斥锁(排他锁)。然后我们在AQS中了解到一个变量state,它是32位的值,那么我们这里将其切割为高16位和低16位。
abstract static class Sync extends AbstractQueuedSynchronizer { // 高16位用于表示读锁 static final int SHARED_SHIFT = 16; // 用于对高16位操作:加1 减1 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 最大读锁量 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 用于获取低16位的值。例如 获取低八位:0000 0000 1111 1111 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 获取当前持有读锁的线程数量 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 获取当前持有写锁的线程数量 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 高16位为所有读锁获取,那么我想知道每个线程对于读锁重入的次数?采用ThreadLocal来进行统计,每个线程自己统计自己的 static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); } // 继承自ThreadLocal,重写了其中的initialValue方法,该方法将在线程第一次获取该变量时调用初始化HoldCounter计数器 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } // 创建ThreadLocal对象 private transient ThreadLocalHoldCounter readHolds; // 缓存最后一个线程获取的读锁数量 private transient HoldCounter cachedHoldCounter; // 保存获取到该锁的第一个读锁线程 private transient Thread firstReader = null; // 保存第一个该锁的第一个读锁线程获取到的读锁数量 private transient int firstReaderHoldCount; Sync() { // 构造器中初始化ThreadLocalHoldCounter ThreadLocal对象 readHolds = new ThreadLocalHoldCounter(); // 用于保证可见性,使用了state变量的volatile语义 setState(getState()); } }
tryAcquire获取写锁的流程
由AQS调用,用于子类实现自己的上锁逻辑,和原有获取互斥锁保持一致,
protected final boolean tryAcquire(int acquires) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取当前状态值和互斥锁的数量 int c = getState(); int w = exclusiveCount(c); // 状态值有效 if (c != 0) { // 有线程获取到了读锁或者当前线程不是持有互斥锁的线程 if (w == 0 || // 有线程获取到了读锁 current != getExclusiveOwnerThread()) // 有线程获取到了写锁 // 返回false 让AQS执行阻塞操作 return false; // 写锁重入,而又由于写锁的数量保存在低16位,所以直接加就行了 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } // 既没有读锁,也没有写锁 if (writerShouldBlock() || // 由子类实现判断当前线程是否应该获取写锁 !compareAndSetState(c, c + acquires)) // 通过CAS抢写锁 return false; // 获取写锁成功,那么将当前线程标识为获取互斥锁的线程对象 setExclusiveOwnerThread(current); return true; }
tryAcquireShared获取读锁的流程获取写锁的流程
protected final int tryAcquireShared(int unused) { // 获取到当前线程对象 Thread current = Thread.currentThread(); // 获取到当前状态值 int c = getState(); if (exclusiveCount(c) != 0 && // 有没有线程持有写锁 getExclusiveOwnerThread() != current) // 如果有线程获取到了互斥锁,那么进一步看看是不是当前线程 // 不是当前线程,那么直接返回-1,告诉AQS获取共享锁失败 return -1; // 获取到读锁的持有数量 int r = sharedCount(c); if (!readerShouldBlock() && // 让子类来判定当前获取读锁的线程是否应该被阻塞 r < MAX_COUNT && // 判断是否发生了溢出 compareAndSetState(c, c + SHARED_UNIT)) { // 直接CAS 增加state的高16位的读锁持有数量 // 增加高16位之前的计数为0,此时表明当前线程就是第一个获取读锁的线程 if (r == 0) { // 注意:持有两个变量来优化threadlocal firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 当前获取读锁的线程就是一个线程,那么此时表明:锁重入,直接++计数位即可 firstReaderHoldCount++; } else { // 当前线程不是第一个读线程,此时将其获取读锁的次数保存在ThreadLocal中 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } // 有很多同学走到这里,直接懵逼?不知道这是啥情况?经验:在看doug lea写的代码时,请注意:经常做优化,就是把一些常见的场景前置,保证性能 return fullTryAcquireShared(current); }
fullTryAcquireShared完全获取读锁流程
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); // 当前已经有线程获取到写锁且当前获取写锁的线程不是,当前线程 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // 子类判断当前线程应该阻塞 if (firstReader == current) { // 当前线程就是第一个获取到读锁的线程 } else { // 获取到当前线程记录读锁重入次数的HoldCounter对象 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 当前读锁重入次数为0时,表明没有获取读锁,此时返回-1,阻塞当前线程 if (rh.count == 0) return -1; } } // 读锁获取次数溢出 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS增加读锁次数 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1; } } }
tryRelease释放写锁的流程
protected final boolean tryRelease(int releases) { // 没有获取写锁,为啥能释放写锁呢? if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 释放完毕后,写锁状态是否为0(锁重入),因为此时计算的不是当前state,是nextc boolean free = exclusiveCount(nextc) == 0; // 如果下一个状态值为0,此时表明当前线程完全释放了锁,也即锁重入为0,那么将当前线程对象从OwnerThread中移除 if (free) setExclusiveOwnerThread(null); // 此时设置全局state变量即可 setState(nextc); // 如果返回为true,那么由AQS完成后面线程的唤醒 return free; }
tryReleaseShared释放读锁的流程
释放时,需要考虑:重入多少次,就释放多少次。总结:先完成自己的释放,然后再完成共享的高16位的释放。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 当前线程是第一个获取到读锁的线程 if (firstReader == current) { // 当前重入次数为1,代表什么?代表可以直接释放,如果不是1,那么表明还持有多个读锁,也即重入多次,那么直接-- if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { // 当前线程已经释放完读锁,那么不需要在ThreadLocal里持有HoldCounter对象 readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // CAS释放高16位计数 int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // 释放完毕后是否为0,为无锁状态,此时需要干啥?由AQS来唤醒阻塞的线程 return nextc == 0; } }
readerShouldBlock和writerShouldBlock模板方法公平锁实现
判断条件只有一个:hasQueuedPredecessors()方法,就是看看AQS的阻塞队列里是否有其他线程正在等待,如果有排队去。
总结:有人在排队,那么不插队。w->r->r->r 此时来了个r:w->r->r->r->r, 此时来了个w:w->r->r->r->w。
static final class FairSync extends Sync { final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } // w->r->r r获取锁 w->r->r-r }
readerShouldBlock和writerShouldBlock模板方法非公平锁实现
写线程永远false,因为读写锁本身适用的是读多写少,此时不应该 让写线程饥饿,而且非公平,写锁永远不阻塞,让它抢,不管前面是否有人排队,先抢了再说。apparentlyFirstQueuedIsExclusive()第一个排队的是不是写线程。r(10),当前线程是第十一个,此时已经有一个写线程排队,r(10)->w,此时排队去。r(10)->w->r。
static final class NonfairSync extends Sync { final boolean writerShouldBlock() { return false; } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } // w->r->r r获取锁 r->r->r }
加载全部内容