亲宝软件园·资讯

展开

透过 ReentrantLock 分析 AQS 的实现原理

huansky 人气:2

对于 Java 开发者来说,都会碰到多线程访问公共资源的情况,这时候,往往都是通过加锁来保证访问资源结果的正确性。在 java 中通常采用下面两种方式来解决加锁得问题:

  1. synchronized 关键字;

  2. Java.util.concurrent.locks 包中的 locks 包下面的锁(Lock 接口和 ReentrantLock 等实现类);

synchronized 是 java 底层支持的,而 concurrent 包则是 jdk 实现。关于 synchronized 的原理可以阅读 再有人问你synchronized是什么,就把这篇文章发给他。

Lock 接口

Lock 是一个接口,方法定义如下

// 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放
void lock()

// 和 lock()方法相似, 但阻塞的线程可中断,抛出 java.lang.InterruptedException异常
void lockInterruptibly() 

// 非阻塞获取锁;尝试获取锁,如果成功返回true
boolean tryLock()

// 带有超时时间的获取锁方法
boolean tryLock(long timeout, TimeUnit timeUnit) 

// 释放锁
void unlock() 

Lock 的实现

实现 Lock 接口的类有很多,以下为几个常见的锁实现

  • ReentrantLock:表示重入锁,它是唯一一个实现了 Lock 接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数

  • ReentrantReadWriteLock:重入读写锁,它实现了 ReadWriteLock 接口,在这个类中维护了两个锁,一个是 ReadLock,一个是 WriteLock,他们都分别实现了 Lock 接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是:读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。

  • StampedLock: stampedLock 是 JDK8 引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock 是一种乐观的读策略,使得乐观锁完全不会阻塞写线程

AQS (AbstractQueuedSynchronizer) 

AQS 的全称为(AbstractQueuedSynchronizer),这个类也是在 java.util.concurrent.locks 下面。这是一个抽象类,采用设计模式中的模板模式来设计的,内部提供了一系列公共的方法,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。

可以这么说,只要搞懂了AQS,那么 J.U.C 中绝大部分的 API 都能轻松掌握。

下图是 AQS 的子类:

可以看到,AQS 还是有很多子类的。下面将详细讲解下 AQS。

AQS 原理概述

AQS 解决了多线程访问共享资源安全性的问题。其原理图可以表示如下:

AQS 利用了一个 volatile 类型的 int 变量 state 来表示同步状态,当其他线程访问带有锁的共享资源的时候,会被阻塞,然后会被放入 FIFO 的 CLH (Craig, Landin, and Hagersten)

队列中,等待在此被唤醒。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。由此确保了每个线程有序访问共享资源,避免出现数据不一致的情况。

AQS 框架图

下面通过一张架构图来整体了解一下 AQS 框架:

  • 上图中有颜色的为 Method,无颜色的为 Attribution。

  • 总的来说,AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外暴露的 API 到底层基础数据。

  • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入 AQS 内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

CLH 队列

前面提到了 AQS 使用内置的 FIFO 队列来完成获取资源线程的排队工作。既然是队列,就是由很多节点(Node)组成的,下面来看下 Node 的数据构成。

// java.util.concurrent.locks.AbstractQueuedSynchronizer 
static final class Node { /** 共享模式 */ static final Node SHARED = new Node(); /** 独占模式 */ static final Node EXCLUSIVE = null; /** 取消等待,比如线程等待超时或者被中断 */ static final int CANCELLED = 1; /** 线程需要 unpark 操作来唤醒 */ static final int SIGNAL = -1; /** 线程处于 condition 等待 */ static final int CONDITION = -2; /** 共享模式下使用,表示下一次共享模式获取同步状态时会被无条件传播下去 */ static final int PROPAGATE = -3;      // 当前线程在队列中的等待状态 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; /** 获取同步状态的线程 */ volatile Thread thread; // 指向下一个处于 CONDITION 的节点 Node nextWaiter; // 如果是共享模式返回true final boolean isShared() { return nextWaiter == SHARED; } /** 返回前驱节点,没有就抛出NPE */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } /** Establishes initial head or SHARED marker. */ Node() {} /** 将线程构成一个 node 添加到队列中,通过调用 addWaiter 使用. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; U.putObject(this, THREAD, Thread.currentThread()); } /** 在 condition 队列使用,通过调用 addConditionWaiter 使用. */ Node(int waitStatus) {        // 通过 unsafe 类以及对应的 Node 属性在内存中的偏移量来修改对应实例的属性值。 U.putInt(this, WAITSTATUS, waitStatus); U.putObject(this, THREAD, Thread.currentThread()); } /** CASes waitStatus field. */ final boolean compareAndSetWaitStatus(int expect, int update) { return U.compareAndSwapInt(this, WAITSTATUS, expect, update); } /** CASes next field. */ final boolean compareAndSetNext(Node expect, Node update) { return U.compareAndSwapObject(this, NEXT, expect, update); } private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); private static final long NEXT; static final long PREV; private static final long THREAD; private static final long WAITSTATUS; static { try {          //获取 Node 的属性 next 在内存中的偏移量,下面同理 NEXT = U.objectFieldOffset (Node.class.getDeclaredField("next")); PREV = U.objectFieldOffset (Node.class.getDeclaredField("prev")); THREAD = U.objectFieldOffset (Node.class.getDeclaredField("thread")); WAITSTATUS = U.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); } catch (ReflectiveOperationException e) { throw new Error(e); } } }

对于 Node 类,可以发现其内部操作都是通过 Unsafe 类来保证是原子性操作。同时内部部分变量都是采用 volatile 来修饰,确保该变量对其他线程也是可见的。此外,还可以得出存在两种不同模式,一种是独占模式,一种是共享模式。

再看看 AQS 中两个跟 Node 类相关的属性:

  // java.util.concurrent.locks.AbstractQueuedSynchronizer 
// 头结点 private transient volatile Node head; // 尾节点 private transient volatile Node tail;

整个结构如下图所示:

入队操作

如上图了解了同步队列的结构, 我们在分析其入列操作在简单不过。无非就是将 tail(使用 CAS 保证原子操作)指向新节点,新节点的 prev 指向队列中最后一节点(旧的 tail 节点),原队列中最后一节点的 next 节点指向新节点以此来建立联系,来张图帮助大家理解。

  

出队操作

同步队列(CLH)遵循 FIFO,首节点是获取同步状态的节点,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点,这个过程非常简单。如下图

 

 

设置首节点是通过获取同步状态成功的线程来完成的(获取同步状态是通过 CAS 来完成),只能有一个线程能够获取到同步状态,因此设置头节点的操作并不需要 CAS 来保证,只需要将首节点设置为其原首节点的后继节点并断开原首节点的 next(等待 GC 回收)应用即可。

同步状态 state

在了解数据结构后,接下来了解一下 AQS 的同步状态 —— State。AQS 中维护了一个名为 state 的字段,意为同步状态,是由 Volatile 修饰的,用于展示当前临界资源的获锁情况。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private volatile int state;
  • 当state=0时,表示无锁状态

  • 当state>0时,表示已经有线程获得了锁,也就是 state=1,但是因为 ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放 5 次直到 state=0 其他线程才有资格获得锁

下面提供了几个访问这个字段的方法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer 
// 获取State的值 protected final int getState() // 设置State的值 protected final void setState(int newState) // 使用CAS方式更新State protected final boolean compareAndSetState(int expect, int update)

这几个方法都是 Final 修饰的,说明子类中无法重写它们。我们可以通过修改 State 字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。

                  

对于我们自定义的同步工具,需要自定义获取同步状态和释放状态的方式,也就是 AQS 架构图中的第一层:API 层。

需要注意的是:不同的 AQS 实现,state 所表达的含义是不一样的。

清楚了 AQS 的基本架构以后,我们来分析一下 AQS 的实现原理,仍然以 ReentrantLock 为模型。

 ReentrantLock 实现原理分析

特性概览

ReentrantLock 意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁。为了帮助大家更好地理解 ReentrantLock 的特性,我们先将 ReentrantLock 跟常用的 Synchronized 进行比较,其特性如下(蓝色部分为本篇文章主要剖析的点):

下面通过伪代码,进行更加直观的比较:

// **************************Synchronized的使用方式**************************
// 1.用于代码块
synchronized (this) {}
// 2.用于对象
synchronized (object) {}
// 3.用于方法
public synchronized void test () {}
// 4.可重入
for (int i = 0; i < 100; i++) {
    synchronized (this) {}
}
// **************************ReentrantLock的使用方式**************************
public void test () throw Exception {
    // 1.初始化选择公平锁、非公平锁
    ReentrantLock lock = new ReentrantLock(true);
    // 2.可用于代码块
    lock.lock();
    try {
        try {
            // 3.支持多种加锁方式,比较灵活; 具有可重入特性
            if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
        } finally {
            // 4.手动释放锁
            lock.unlock()
        }
    } finally {
        lock.unlock();
    }
}

ReentrantLock 的时序图

调用 ReentrantLock 中的 lock() 方法,源码的调用过程采用时序图来展现:

从图上可以看出来,当锁获取失败时,会调用 addWaiter() 方法将当前线程封装成 Node 节点加入到 AQS 队列,基于这个思路,我们来分析 AQS 的源码实现。ReentrantLock 与 AQS 之间的关系

首先来看看 ReentrantLock 的构造方法,它的构造方法有两个,如下所示:

    // 默认是非公平锁    
    public ReentrantLock() {
        sync = new NonfairSync();
    }

   // true 是公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

可以发现,构造函数中引用了两个内部类,分别是 FairSync (公平锁) 和 NonfairSync (非公平锁)。并且都是 Sync 的子类。

// 非公平锁
static final class NonfairSync extends Sync {

}

// 公平锁
static final class FairSync extends Sync {

} 

从这里也可以发现 Sync 类的重要性,而前面的截图也说明了 Sync 又是 AbstractQueuedSynchronizer 的子类,到这里,他们之间的关系就浮出水面了:

对于 FairSync 与 NonfairSync :

  • 公平锁 表示所有线程严格按照 FIFO 来获取锁

  • 非公平锁 表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁

公平锁和非公平锁的实现上的差异,会在文章后面做一个解释,接下来的分析仍然以非公平锁作为主要分析逻辑。


 

Lock 方法

对于 ReentrantLock 默认是 NonfairSync,我们以这个为例了解其背后的原理。

    
  // java.util.concurrent.locks.ReentrantLock#NonfairSync
  static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ // Android-removed: @ReservedStackAccess from OpenJDK 9, not available on Android. // @ReservedStackAccess final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }

看 lock 方法代码的含义:

  • 若通过 CAS 设置变量 State(同步状态)成功,也就是获取锁成功,则将当前线程设置为独占线程。

  • 若通过 CAS 设置变量 State(同步状态)失败,也就是获取锁失败,则进入 Acquire 方法进行后续处理。

compareAndSetState 的代码实现逻辑如下

  protected final boolean compareAndSetState(int expect, int update) {
        return U.compareAndSwapInt(this, STATE, expect, update);
    }

这段代码其实逻辑很简单,就是通过 CAS 乐观锁的方式来做比较并替换。上面这段代码的意思是,如果当前内存中的 state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返回 false。这个操作是原子的,不会出现线程安全问题。

lock 方法的第一步很好理解,但第二步获取锁失败后,后续的处理策略是怎么样的呢?这块可能会有以下思考:

  • 某个线程获取锁失败的后续流程是什么呢?有以下两种可能:
  1. 将当前线程获锁结果设置为失败,获取锁流程结束。这种设计会极大降低系统的并发度,并不满足我们实际的需求。所以就是 2 这种流程,也就是 AQS 框架的处理流程。

  2. 存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。

  • 对于问题 1 的第二种情况,既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

  • 处于排队等候机制中的线程,什么时候可以有机会获取锁呢?

  • 如果处于排队等候机制中的线程一直无法获取锁,还是需要一直等待吗,还是有别的策略来解决这一问题?

可以看一下 else 分支的逻辑,acquire 方法:

    // java.util.concurrent.locks.AbstractQueuedSynchronizer 
   public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

Acquire 方法是 AQS 中的核心方法。这里它干了三件事情:

  • tryAcquire:会尝试再次通过 CAS 获取一次锁。

  • addWaiter:将当前线程加入上面锁的双向链表(等待队列)中

  • acquireQueued:通过自旋,判断当前队列节点是否可以获取锁


 

tryAcquire 方法

下面详细看下 NonfairSync 的 tryAcquire 方法,该方法会直接调用 nonfairTryAcquire 方法,代码如下:

     // java.util.concurrent.locks.ReentrantLock 
     final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();
       // c=0 说明此时没有获取没有线程占有锁 if (c == 0) {
          // CAS 操作去获取锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
       // 已经获取锁了,可以继续重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

简单来说上面的方法主要就是看能不能获取到锁,不能获取到就返回 false,然后就会调用 addWaiter 添加到等待队列中,具体代码如下:

    // java.util.concurrent.locks.AbstractQueuedSynchronizer 
  private Node addWaiter(Node mode) { Node node = new Node(mode); // 死循环 for (;;) { Node oldTail = tail; if (oldTail != null) { // 通过unsafe 类来对 Node.prev 节点赋值 U.putObject(node, Node.PREV, oldTail); // 更新 tail 节点为 node,该操作对其他线程是可见的,确保每次只有一个线程可以更新成功 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } } } // cas 设置 tail 节点 private final boolean compareAndSetTail(Node expect, Node update) { return U.compareAndSwapObject(this, TAIL, expect, update); } // 初始化 head 和 tail 节点 private final void initializeSyncQueue() { Node h; if (U.compareAndSwapObject(this, HEAD, null, (h = new Node()))) tail = h; }

addWaiter(Node node) 方法通过采用死循环方案,确保将该节点设置尾成尾节点。

  • 如果为尾节点不为空,需要将新节点添加到 oldTail 的 next 节点,同时将新节点的 prev 节点指向 oldTail;

  • 如果当前队列为空,需要进行初始化,此时 head 结点和 tail 节点都是 h =  new Node () 实例;此时 oldTail = h 不为空,node 的 prev 为 oldTail, oldTail 的 next 是 node。

这里代码很简单,但是却通过 CAS 操作保证了多个线程一起添加节点的时候,只有一个线程可以成功。

此外,入队操作还有个 enq 方法,这个方法和 addWaiter 一样的,就是返回值不一样,具体如下:

   // java.util.concurrent.locks.AbstractQueuedSynchronizer  
  private Node enq(Node node) { for (;;) { Node oldTail = tail; if (oldTail != null) { U.putObject(node, Node.PREV, oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { initializeSyncQueue(); } } }

但请注意,初始化的头结点并不是当前线程节点,而是调用了无参构造函数的节点。如果经历了初始化或者并发导致队列中有元素,则与之前的方法相同。

acquireQueued

将添加到队列中的 Node 作为参数传入 acquireQueued 方法,这里面会做抢占锁的操作:

    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
          // 获取前一个节点,为空,抛出 NPE final Node p = node.predecessor();
          // p==head 说明 node 是队列中的第一位,这时候还会再去获取一次锁 if (p == head && tryAcquire(arg)) {
            // 获取锁成功后,node 变成 head 节点,凡是 head 节点,其 thread 和 pre 都为空,next 保持不变。 setHead(node); p.next = null; // help GC
            // 注意这个中断记录是在获取锁之后才会被返回的,也就是说获取锁之后,才有资格处理中断 return interrupted; }
          // 获取锁失败,说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)
          // 或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
            // 说明在这个过程中发生过中断,需要补上 interrupted = true; } } catch (Throwable t) { cancelAcquire(node); throw t; } }

总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

下面来看获取失败后的处理,具体在看下面的代码: 

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     // 获取头结点的节点状态 int ws = pred.waitStatus;
     // 当前 prev node 的线程需要被 unpark 唤醒,也就是当前 node 可以接受 park 操作 if (ws == Node.SIGNAL)        // This node has already set status asking a release to signal it, so it can safely park. return true;
     // 前节点处于取消状态,跳过,获取再前一个的节点状态 if (ws > 0) { do {
         // 这里将取消状态的节点删除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0);
       // 同时设置下一个节点为 node pred.next = node; } else {
       // 设置前任节点等待状态为 SIGNAL pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 如果之前中断了,为 true,并清除中断标志 return Thread.interrupted(); }

如果 shouldParkAfterFailedAcquire 返回了true,则会执行:parkAndCheckInterrupt()方法,它是通过 LockSupport.park(this) 将当前线程挂起到 WATING 状态,它需要等待一个中断、unpark 方法来唤醒它,通过这样一种 FIFO 的机制的等待,来实现了 Lock 的操作。

LockSupport 类是 Java6 引入的一个类,提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数:

public native void unpark(Thread jthread);  
public native void park(boolean isAbsolute, long time);  

unpark 函数为线程提供“许可( permit )”,线程调用 park 函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。
permit相当于0/1的开关,默认是 0,调用一次 unpark 就加 1 变成了 1。调用一次 park 会消费 permit,又会变成 0,变成 0 不会影响原有线程的运行。 如果再调用一次 park 会阻塞,因为 permit 已经是 0 了。直到 permit 变成 1。这时调用 unpark 会把 permit 设置为 1 。每个线程都有一个相关的 permit,permit 最多只有一个,重复调用 unpark 不会累积。

这里需要说明的一点就是:acquireQueued 方法内部是一个死循环, shouldParkAfterFailedAcquire 和  parkAndCheckInterrupt 也都在这里面。这里对这个逻辑再整理下:

  1.  acquireQueued 本意是通过无限循环让队列中的第一个节点尝试去获取锁;当一个 node 被加入到队列中的时候,就会促发这个无限循环;

  2. 如果等待队列中的第一个节点获取到锁了,就会退出循环;

  3. 如果 node 是第一个加入等待队列的,此时 node 的 prev 节点是 head ( new Node() ),node 会先去获取锁,失败后,因为 prev 的 waitStatus = 0,这时候将其 waitStatus 设置为 -1,然后再次循环,再获取锁失败就会调用 parkAndCheckInterrupt 阻塞当前线程;

  4. shouldParkAfterFailedAcquire 过程中会将队列中处于 CANCELLED = 1 的节点删除。也就是说每添加一个节点,获取锁失败后,都可能会对队列做一遍整理;

  5. 被加入队列后的线程是不会响应中断的。当node 获取锁之后,如果线程在等待中被中断过,需要将这个中断补上,这样线程就可以响应中断操作,比如此时被取消了。

cancelAcquire 方法

如果在获取锁的过程中,发生了错误,就会响应  cancelAcquire(node) 方法。下面具体看下方法的源码,看看它做了啥:

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // 过滤掉那些被取消的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 获取过滤后的前驱节点的后继节点
        Node predNext = pred.next;

     // 把当前node的状态设置为CANCELLED 
        node.waitStatus = Node.CANCELLED;
      // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点 
   // 如果更新成功,将tail的后继节点设置为null,更新失败,说明 node 后面还有其他节点,node 不是尾接点 if (node == tail && compareAndSetTail(node, pred)) { pred.compareAndSetNext(predNext, null); } else {        // 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
       // 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next; if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next);
          // 走到这里,已经把 node 从 next 队列里面删除了,但是保留了 prev 指针 } else {
          // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点 unparkSuccessor(node); }
       // 这里修改了 node 的next 指针,但是保证了 prev 指针的不变 node.next = node; // help GC } }

当前的流程:获取当前节点的前驱节点,如果前驱节点的状态是 CANCELLED,那就一直往前遍历,找到第一个 waitStatus <= 0 的节点,将找到的 Pred 节点和当前 Node 关联,将当前Node 设置为 CANCELLED。

根据当前节点的位置,考虑以下三种情况:

  1. 当前节点是尾节点。

  2. 当前节点是Head的后继节点。

  3. 当前节点不是Head的后继节点,也不是尾节点。

根据上述第二条,我们来分析每一种情况的流程。

当前节点是尾节点。

当前节点是 Head 的后继节点。

当前节点不是 Head 的后继节点,也不是尾节点。

通过上面的流程,我们对于 CANCELLED 节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对 Next 指针进行了操作,而没有对 Prev 指针进行操作呢?什么情况下会对 Prev 指针进行操作?

执行 cancelAcquire 的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过 Try 代码块中的 shouldParkAfterFailedAcquire 方法了),如果此时修改 Prev指针,有可能会导致 Prev 指向另一个已经移除队列的 Node,因此这块变化 Prev 指针不安全。 shouldParkAfterFailedAcquire 方法中,会执行下面的代码,其实就是在处理 Prev 指针。shouldParkAfterFailedAcquire 是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更 Prev 指针比较安全。

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

 

unparkSuccessor

下面看下 unparkSuccessor 的逻辑:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
       // 将传入的参数node的等待状态变为 0
            node.compareAndSetWaitStatus(ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
       // 从后往前寻找那些没有被取消的线程
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

 unparkSuccessor 的作用如下:

  • 如果其下一个节点为空,或者其等待状态是取消状态,那么就从后往前找,找到一个等待状态 <=0 的,然后将其唤醒;

  • 如果下一个节点不为空,且等待状态 <=0,将其唤醒。

这个方法的找到一个需要唤醒的节点,看下后面怎么处理:

  // LockSupport 
  public static void unpark(Thread thread) {
        if (thread != null)
            U.unpark(thread);
    }

发现也是通过 unSafe 类来处理的。这里调用了 unpark 方法,那肯定有地方调用了 park 方法,这个是在  parkAndCheckInterrupt 里调用的。


 FairSync lock 公平锁

到这里,NonfairSync lock 的逻辑就讲完了 。那 FairSync lock 是如何保证公平的呢?且看代码:

// java.util.concurrent.locks.ReentrantLock    
   static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 加锁 final void lock() { acquire(1); }    // protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();
       // 当前没有线程获取锁 if (c == 0) {
          // 当前线程处于 head 之后,或者队列为空,就会去调用 CAS 获取锁,否则是没有机会获取锁的 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
       // 当前线程就是独占线程,可重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

可见对于公平锁,新加入的节点有以下几种操作:

  1. node 能获取锁的情况有两种:1 是当前没有线程持有锁,并且队列为空,或者 node 是 head 的下一个节点;2 是 node 本身持有锁,可重入。

  2. 在情况 1 后的 node,都将会被加入到队列中去;

这里就可以看出来,公平锁完全是按照先来后到的顺序进行排列等候的,不会给你机会去通过 CAS 操作获取锁的。对于非公平锁,每个线程去获取锁的时候都有机会去尝试获取锁的,成功锁就是你的,不成功就加入到队列中去。


 

unLock 方法 

讲完了 lock 方法以后,接下去讲 unLock 方法了。来看下 unlock 的逻辑:

  public void unlock() {
        sync.release(1);
    }

   // 释放锁
   public final boolean release(int arg) {
     // true 表示成功释放,就会唤醒下一个线程 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }   protected final boolean tryRelease(int releases) { int c = getState() - releases;
    // 确保是当前线程,非当前线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); }
    // 更新同步状态 setState(c); return free; }

unlock 的逻辑比较好理解,就是释放锁,更新同步状态,然后唤醒下一个等待线程。

其中 tryRelease 动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是 1 ),如果结果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它的线程有机会进行执行。

在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock() 的次数与 lock() 的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true。

hasQueuedPredecessors 是公平锁加锁时判断等待队列中是否存在有效节点的方法。如果返回 False,说明当前线程可以争取共享资源;如果返回 True,说明队列中存在有效节点,当前线程必须加入到等待队列中。

// java.util.concurrent.locks.ReentrantLock

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

看到这里,我们理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());为什么要判断的头结点的下一个节点?第一个节点储存的数据是什么?

双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位,这个可以从列表的第一次初始化也可以看出来。

真正的第一个有数据的节点,是在第二个节点开始的。当h != t时: 如果(s = h.next) == null,等待队列正在有线程进行初始化,但只是进行到了Tail指向Head,没有将Head指向Tail,此时队列中有元素,需要返回 True。

  • 如果(s = h.next) != null,说明此时队列中至少有一个有效节点。

  • 如果此时s.thread == Thread.currentThread(),说明等待队列的第一个有效节点中的线程与当前线程相同,那么当前线程是可以获取资源的;

  • 如果s.thread != Thread.currentThread(),说明等待队列的第一个有效节点线程与当前线程不同,当前线程必须加入进等待队列。

对于 unparkSuccessor 逻辑前面讲过了,就是唤醒下一个节点去获取锁。当然在唤醒过程中,对于非公平锁,其他线程是有机会去抢占的

到这里,就把加锁和解锁的逻辑都讲完了。


 

Lock 和 unLock 总结

以非公平锁为例,这里主要阐述一下非公平锁与 AQS 之间方法的关联之处,具体每一处核心方法的作用都已经在上文阐述清楚了。

为了帮助大家理解 ReentrantLock 和 AQS 之间方法的交互过程,以非公平锁为例,将加锁和解锁的交互流程单独拎出来强调一下,以便于对后续内容的理解。

加锁:

  • 通过 ReentrantLock 的加锁方法 Lock 进行加锁操作。

  • 会调用到内部类 Sync 的 Lock 方法,由于 Sync#lock 是抽象方法,根据 ReentrantLock 初始化选择的公平锁和非公平锁,执行相关内部类的 Lock 方法,本质上都会执行 AQS 的 Acquire 方法。

  • AQS 的 Acquire 方法会执行 tryAcquire 方法,但是由于 tryAcquire 需要自定义同步器实现,因此执行了 ReentrantLock 中的 tryAcquire 方法,由于 ReentrantLock 是通过公平锁和非公平锁内部类实现的 tryAcquire 方法,因此会根据锁类型不同,执行不同的 tryAcquire。

  • tryAcquire 是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟 ReentrantLock 自定义同步器无关。

解锁:

  • 通过 ReentrantLock 的解锁方法 Unlock 进行解锁。

  • Unlock 会调用内部类 Sync 的 Release 方法,该方法继承于 AQS。

  • Release 中会调用 tryRelease 方法,tryRelease 需要自定义同步器实现,tryRelease 只在 ReentrantLock 中的 Sync 实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。

  • 释放成功后,所有处理由 AQS 框架完成,与自定义同步器无关。

通过上面的描述,大概可以总结出ReentrantLock加锁解锁时API层核心方法的映射关系。

 

到这里,基本就讲完了。


 

关于 Lock 及 AQS 的一些补充:

1、 Lock 的操作不仅仅局限于 lock()/unlock(),因为这样线程可能进入 WAITING 状态,这个时候如果没有 unpark() 就没法唤醒它,可能会一直“睡”下去,可以尝试用 tryLock()、tryLock(long , TimeUnit) 来做一些尝试加锁或超时来满足某些特定场景的需要。例如有些时候发现尝试加锁无法加上,先释放已经成功对其它对象添加的锁,过一小会再来尝试,这样在某些场合下可以避免“死锁”哦。

看下相关代码:

    // ReentrantLock
    public boolean tryLock() {
     // 调用的是非公平锁来抢占锁 return sync.nonfairTryAcquire(1); } // ReentrantLock public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
     // 超过一定时间后再去获取锁 return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }// AQS
   // 拿不到锁时,等一段时间再拿不到就退出 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
     // 时间 <=0 直接返回 if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout;
     // 将当前线程加入到队列中 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor();
          // 这里如果当前线程是第一个有效节点,直接尝试去获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) {
            // 时间到了之后,就退出等待队列 cancelAcquire(node); return false; }
          // 需要等待,并且时长大于 1000L if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
            // 阻塞一定时间,再去获取锁 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }

2、 lockInterruptibly() 它允许抛出 InterruptException 异常,也就是当外部发起了中断操作,程序内部有可能会抛出这种异常,但是并不是绝对会抛出异常的。

    // ReentrantLock     
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    // AQS
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
     // 如果发生了中断,就抛出中断异常 if (Thread.interrupted()) throw new InterruptedException();
     // if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // 可中断的 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor();
          // 再次看能不能获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; }
          // park 前发现中断了,抛出中断错误 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }

可以发现,基本上可以中断的点,都会去判断线程是否有中断标志,有的话,直接抛出中断异常,但是在加入队列过程,和获取锁的过程是不响应中断的,只有之前之后会做中断判断。

3、 newCondition() 操作,是返回一个 Condition 的对象,Condition 只是一个接口,它要求实现 await()、awaitUninterruptibly()、awaitNanos(long)、await(long , TimeUnit)、awaitUntil(Date)、signal()、signalAll() 方法,AbstractQueuedSynchronizer 中有一个内部类叫做 ConditionObject 实现了这个接口,它也是一个类似于队列的实现,具体可以参考源码。大多数情况下可以直接使用,当然觉得自己比较牛逼的话也可以参考源码自己来实现。

4、 在 AQS 的 Node 中有每个 Node 自己的状态(waitStatus),我们这里归纳一下,分别包含:

  • SIGNAL 从前面的代码状态转换可以看得出是前面有线程在运行,需要前面线程结束后,调用 unpark() 方法才能激活自己,值为:-1

  • CANCELLED 当 AQS 发起取消或 fullyRelease() 时,会是这个状态。值为 1,也是几个状态中唯一一个大于 0 的状态,所以前面判定状态大于 0 就基本等价于是 CANCELLED 的意思。

  • CONDITION 线程基于 Condition 对象发生了等待,进入了相应的队列,自然也需要 Condition 对象来激活,值为 -2。

  • PROPAGATE 读写锁中,当读锁最开始没有获取到操作权限,得到后会发起一个 doReleaseShared() 动作,内部也是一个循环,当判定后续的节点状态为 0 时,尝试通过CAS自旋方式将状态修改为这个状态,表示节点可以运行。

  • 状态 0 初始化状态,也代表正在尝试去获取临界资源的线程所对应的 Node 的状态。 


 

总结

本文基于 ReentrantLock 非公平锁的独占锁源码来分析了 AQS 的内部实现原理。在获得同步锁时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease(int arg) 方法释放同步状态,然后唤醒头节点的后继节点。

 

参考文章

从ReentrantLock的实现看AQS的原理及应用

AQS的原理浅析

J.U.C|同步队列(CLH)

深入分析AQS实现原理

 

加载全部内容

相关教程
猜你喜欢
用户评论