Java AbstractQueuedSynchronizer
自动化BUG制造器 人气:0AbstractQueuedSynchronizer
AbstractQueuedSynchronizer 简称 AQS ,抽象队列同步器,用来实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器的框架。这个类旨在为大多数依赖单个原子 int 值来表示同步状态的同步器提供基础的能力封装。 例如 ReentrantLock、Semaphore 和 FutureTask 等等都是基于 AQS 实现的,我们也可以继承 AQS 实现自定义同步器。
核心思想
网络上常见的解释是:
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
个人理解,可以把 AQS 当成一把锁,它内部通过一个队列记录了所有要使用锁的请求线程,并且管理锁自己当前的状态(锁定、空闲等状态)。相当于 AQS 就是共享资源本身,当有线程请求这个资源是,AQS 将请求资源的线程记录当前工作线程,并将自身设置为锁定状态。后续其他线程请求这个 AQS 时,将请求线程记录到等待队列中,其他线程此时未获取到锁,进入阻塞等待状态。
为什么需要 AQS
在深入 AQS 前,我们应该持有一个疑问是为什么需要 AQS ?synchronized 关键字和 CAS 原子类都提供了丰富的同步方案了。
但在实际的需求中,对同步的需求是各式各样的,比如,我们需要对一个锁加上超时时间,那么光凭 synchronized 关键字或是 CAS 就无法实现了,需要对其进行二次封装。而 JDK 中提供了丰富的同步方案,比如 ReentrantLock ,而 ReentrantLock 是就是基于 AQS 实现的。
用法
这部分内容来自 JDK 的注释
要将此类用作同步器的基础,请在适用时重新定义以下方法,方法是使用 getState、setState 和/或 compareAndSetState 检查和/或修改同步状态:
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
默认情况下,这些方法中的每一个都会引发 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该是短暂的而不是阻塞的。 定义这些方法是使用此类的唯一受支持的方法。 所有其他方法都被声明为最终方法,因为它们不能独立变化。
您可能还会发现从 AbstractOwnableSynchronizer 继承的方法对于跟踪拥有独占同步器的线程很有用。 鼓励您使用它们——这使监视和诊断工具能够帮助用户确定哪些线程持有锁。
即使此类基于内部 FIFO 队列,它也不会自动执行 FIFO 采集策略。
独占同步的核心形式为:
Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread;
(共享模式类似,但可能涉及级联信号。)
因为在入队之前调用了获取中的检查,所以新获取的线程可能会抢在其他被阻塞和排队的线程之前。 但是,如果需要,您可以定义 tryAcquire 和/或 tryAcquireShared 以通过内部调用一个或多个检查方法来禁用插入,从而提供公平的 FIFO 获取顺序。 特别是,如果 hasQueuedPredecessors(一种专门为公平同步器使用的方法)返回 true,大多数公平同步器可以定义 tryAcquire 返回 false。 其他变化是可能的。
默认插入(也称为贪婪、放弃和避免护送)策略的吞吐量和可扩展性通常最高。 虽然这不能保证公平或无饥饿,但允许较早排队的线程在较晚的排队线程之前重新竞争,并且每次重新竞争都有无偏见的机会成功对抗传入线程。 此外,虽然获取不是通常意义上的“旋转”,但它们可能会在阻塞之前执行多次调用 tryAcquire 并穿插其他计算。 当独占同步只是短暂地保持时,这提供了自旋的大部分好处,而没有大部分责任。 如果需要,您可以通过预先调用获取具有“快速路径”检查的方法来增加这一点,可能会预先检查 hasContended 和/或 hasQueuedThreads 以仅在同步器可能不会被争用时才这样做。
此类通过将其使用范围专门用于可以依赖 int 状态、获取和释放参数以及内部 FIFO 等待队列的同步器,部分地为同步提供了高效且可扩展的基础。 如果这还不够,您可以使用原子类、您自己的自定义 java.util.Queue 类和 LockSupport 阻塞支持从较低级别构建同步器。
用法示例
这是一个不可重入互斥锁类,它使用值 0 表示未锁定状态,使用值 1 表示锁定状态。 虽然不可重入锁并不严格要求记录当前所有者线程,但无论如何,此类都会这样做以使使用情况更易于监控。
它还支持条件并公开一些检测方法:
class Mutex implements Lock, java.io.Serializable { // Our internal helper class private static class Sync extends AbstractQueuedSynchronizer { // Acquires the lock if state is zero public boolean tryAcquire(int acquires) { assert acquires == 1; // Otherwise unused if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Releases the lock by setting state to zero protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (!isHeldExclusively()) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } // Reports whether in locked state public boolean isLocked() { return getState() != 0; } public boolean isHeldExclusively() { // a data race, but safe due to out-of-thin-air guarantees return getExclusiveOwnerThread() == Thread.currentThread(); } // Provides a Condition public Condition newCondition() { return new ConditionObject(); } // Deserializes properly private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } // The sync object does all the hard work. We just forward to it. private final Sync sync = new Sync(); public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isLocked(); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
这是一个类似于 CountDownLatch 的锁存器类,只是它只需要一个信号即可触发。 因为锁存器是非独占的,所以它使用共享的获取和释放方法。
class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } public void signal() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }
AQS 底层原理
父类 AbstractOwnableSynchronizer
AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer ,后者逻辑十分简单:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; // 设置当前持有锁的线程 protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
AbstractOwnableSynchronizer 只是定义了设置持有锁的线程的能力。
CLH 队列
AQS 的等待队列是 CLH (Craig , Landin , and Hagersten) 锁定队列的变体,CLH 锁通常用于自旋锁。AQS 将每个请求共享资源的线程封装程一个 CLH 节点来实现的,这个节点的定义是:
/** CLH Nodes */ abstract static class Node { volatile Node prev; // initially attached via casTail volatile Node next; // visibly nonnull when signallable Thread waiter; // visibly nonnull when enqueued volatile int status; // written by owner, atomic bit ops by others // methods for atomic operations final boolean casPrev(Node c, Node v) { // for cleanQueue return U.weakCompareAndSetReference(this, PREV, c, v); // 通过 CAS 确保同步设置 prev 的值 } final boolean casNext(Node c, Node v) { // for cleanQueue return U.weakCompareAndSetReference(this, NEXT, c, v); } final int getAndUnsetStatus(int v) { // for signalling return U.getAndBitwiseAndInt(this, STATUS, ~v); } final void setPrevRelaxed(Node p) { // for off-queue assignment U.putReference(this, PREV, p); } final void setStatusRelaxed(int s) { // for off-queue assignment U.putInt(this, STATUS, s); } final void clearStatus() { // for reducing unneeded signals U.putIntOpaque(this, STATUS, 0); } private static final long STATUS = U.objectFieldOffset(Node.class, "status"); private static final long NEXT = U.objectFieldOffset(Node.class, "next"); private static final long PREV = U.objectFieldOffset(Node.class, "prev"); }
CLH 的节点的数据结构是一个双向链表的节点,只不过每个操作都是经过 CAS 确保线程安全的。要加入 CLH 锁队列,您可以将其自动拼接为新的尾部;要出队,需要设置 head 字段,以便下一个符合条件的等待节点成为新的头节点:
+------+ prev +-------+ prev +------+ | | <---- | | <---- | | | head | next | first | next | tail | | | ----> | | ----> | | +------+ +-------+ +------+
Node 中的 status 字段表示当前节点代表的线程的状态。
status 存在三种状态:
static final int WAITING = 1; // must be 1 static final int CANCELLED = 0x80000000; // must be negative static final int COND = 2; // in a condition wait
- WAITING:表示等待状态,值为 1。
- CANCELLED:表示当前线程被取消,为 0x80000000。
- COND:表示当前节点在等待条件,也就是在条件等待队列中,值为 2。
在上面的 COND 中,提到了一个条件等待队列的概念。
首先,Node 是一个静态抽象类,它在 AQS 中存在三种实现类:
- ExclusiveNode
- SharedNode
- ConditionNode
前两者都是空实现:
static final class ExclusiveNode extends Node { } static final class SharedNode extends Node { }
而最后的 ConditionNode 多了些内容:
static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker { ConditionNode nextWaiter; // 检查线程是否中断或当前线程的状态已取消等待。 public final boolean isReleasable() { return status <= 1 || Thread.currentThread().isInterrupted(); } public final boolean block() { while (!isReleasable()) LockSupport.park(); return true; } }
ConditionNode 拓展了两个方法:
- 检查线程状态是否处于等待。
- 阻塞当前线程:当前线程正在等待执行,通过
LockSupport.park()
阻塞当前线程。这里通过 while 循环持续重试,尝试阻塞线程。
而到这一步,所有的信息都指向了一个相关的类 Condition 。
Condition
AQS 中的 Condition 的实现是内部类 ConditionObject :
public class ConditionObject implements Condition, java.io.Serializable
ConditionObject 实现了 Condition 接口和序列化接口,后者说明了该类型的对象可以进行序列化。而前者 Condition 接口,定义了一些行为能力:
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
Condition 中定义的能力与 Java 的 Object 类中提供的同步相关方法(wait、notify 和 notifyAll) 代表的能力极为相似。前者提供了更丰富的等待方法。类比的角度来看,如果 Object 是配合 synchronized 关键字使用的,那么 Condition 就是用来配合基于 AQS 实现的锁来使用的接口。
可以将 Condition 的方法分为两组:等待和唤醒。
用于等待的方法
// 等待,当前线程在接到信号或被中断之前一直处于等待状态 void await() throws InterruptedException; // 等待,当前线程在接到信号之前一直处于等待状态,不响应中断 void awaitUninterruptibly(); //等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态 long awaitNanos(long nanosTimeout) throws InterruptedException; // 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。 // 此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0 boolean await(long time, TimeUnit unit) throws InterruptedException; // 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态 boolean awaitUntil(Date deadline) throws InterruptedException;
用于唤醒的方法
// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。 void signal(); // 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。 void signalAll();
ConditionObject
分析完 Condition ,继续来理解 ConditionObject。 ConditionObject 是 Condition 在 AQS 中的实现:
public class ConditionObject implements Condition, java.io.Serializable { /** condition 队列头节点 */ private transient ConditionNode firstWaiter; /** condition 队列尾节点 */ private transient ConditionNode lastWaiter; // ---- Signalling methods ---- // 移除一个或所有等待者并将其转移到同步队列。 private void doSignal(ConditionNode first, boolean all) public final void signal() public final void signalAll() // ---- Waiting methods ---- // 将节点添加到条件列表并释放锁定。 private int enableWait(ConditionNode node) // 如果最初放置在条件队列中的节点现在准备好重新获取同步队列,则返回 true。 private boolean canReacquire(ConditionNode node) // 从条件队列中取消链接给定节点和其他非等待节点,除非已经取消链接。 private void unlinkCancelledWaiters(ConditionNode node) // 实现不可中断的条件等待 public final void awaitUninterruptibly() public final void await() public final long awaitNanos(long nanosTimeout) public final boolean awaitUntil(Date deadline) public final boolean await(long time, TimeUnit unit) // ---- support for instrumentation ---- // 如果此条件是由给定的同步对象创建的,则返回 true。 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) // 查询是否有线程在此条件下等待。 protected final boolean hasWaiters() // 返回在此条件下等待的线程数的估计值。 protected final int getWaitQueueLength() // 返回一个集合,其中包含可能正在等待此 Condition 的那些线程。 protected final Collection<Thread> getWaitingThreads() }
ConditionObject 实现了 Condition 能力的基础上,拓展了对 ConditionNode 相关的操作,方法通过其用途可以划分为三组:
- Signalling
- Waiting
- 其他方法
Signalling methods
public final void signal() { ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); if (first != null) doSignal(first, false); } public final void signalAll() { ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); if (first != null) doSignal(first, true); }
唤醒方法主要逻辑是通过 doSignal(ConditionNode first, boolean all)
实现的。doSignal
方法根据参数,进行一个 while 循环,
两个方法传递进来的都是头节点,也就是从 ConditionNode 双向链表的头节点开始遍历,如果第二个参数 all 设置为 false ,只执行一次遍历中逻辑。循环中的逻辑是:
// 最终都调用了这个方法 private void doSignal(ConditionNode first, boolean all) { while (first != null) { // 取出 first 的下一个节点,设置为 next ConditionNode next = first.nextWaiter; // 如果 first 是链表中唯一的一个节点,设置 lastWaiter 为 null if ((firstWaiter = next) == null) // lastWaiter = null; // 读取 first 的 status ,检查是否是 COND if ((first.getAndUnsetStatus(COND) & COND) != 0) { // first 处于 COND 状态,出队 enqueue(first); // 通过 all 来判断是否将等待的线程都进行唤醒逻辑。 if (!all) break; } first = next; // 循环指向下一个 } }
关键方法 enqueue(ConditionNode)
是 AQS 中的方法:
final void enqueue(Node node) { if (node != null) { for (;;) { // 获取尾节点 Node t = tail; // 避免不必要的内存屏障 node.setPrevRelaxed(t); if (t == null) // 空队列首先初始化一个头节点 tryInitializeHead(); else if (casTail(t, node)) { // 更新 tail 指针为 node (这里不是将 t = node) t.next = node; // 为节点 t 的 next 指针指向 node if (t.status < 0) // t 的状态 < 0 一般代表后续节点需要运行了 LockSupport.unpark(node.waiter); break; } } } }
可以看出 enqueue(ConditionNode)
中本质上是通过调用 LockSupport.unpark(node.waiter);
来唤醒线程的。
Waiting methods
对外提供的等待能力的方法包括:
// 实现不可中断的条件等待 public final void awaitUninterruptibly() public final void await() public final long awaitNanos(long nanosTimeout) public final boolean awaitUntil(Date deadline) public final boolean await(long time, TimeUnit unit)
它们内部都用到了公共的逻辑:
// 添加节点到 condition 列表并释放锁 private int enableWait(ConditionNode node) private boolean canReacquire(ConditionNode node) private void unlinkCancelledWaiters(ConditionNode node)
enableWait
private int enableWait(ConditionNode node) { if (isHeldExclusively()) { // 如果是当前线程持有锁资源 node.waiter = Thread.currentThread(); // 将节点的绑定的线程设置为当前线程 node.setStatusRelaxed(COND | WAITING); // 设置节点状态 ConditionNode last = lastWaiter; // 获取 尾节点 if (last == null) firstWaiter = node; // 如果列表为空, node 就是头节点 else last.nextWaiter = node; // 否则,将尾节点的下一个节点设置为 node lastWaiter = node; // 更新 lastWaiter 指针 int savedState = getState(); // 获取当前线程的同步状态 if (release(savedState)) // 在当前持有锁资源的线程尝试释放锁 return savedState; } node.status = CANCELLED; // 当前线程未持有锁资源,更新 node 的状态为 CANCELLED throw new IllegalMonitorStateException(); // 并抛出 IllegalMonitorStateException }
这个方法对传入的节点插入到等待队列的队尾,并根据当前线程的状态进行了检查。关键方法的 release(int)
:
public final boolean release(int arg) { if (tryRelease(arg)) { // 尝试释放锁资源 signalNext(head); // 释放成功,唤醒下一个等待中的线程 return true; } return false; }
唤醒给定节点的下一个节点(如果存在),通过调用 LockSupport.unpark(s.waiter)
唤醒节点对应的线程。
private static void signalNext(Node h) { Node s; if (h != null && (s = h.next) != null && s.status != 0) { s.getAndUnsetStatus(WAITING); LockSupport.unpark(s.waiter); } }
canReacquire
检查传入的 node 是否在链表中,且不为头节点:
// 如果最初放置在条件队列中的节点现在准备好重新获取同步队列,则返回 true。 private boolean canReacquire(ConditionNode node) { // 检查传入的 node 是否在链表中,且不为头节点 return node != null && node.prev != null && isEnqueued(node); }
// in AQS final boolean isEnqueued(Node node) { // 从 Node 双向链表尾部开始遍历,是否存在 node for (Node t = tail; t != null; t = t.prev) if (t == node) return true; return false; }
unlinkCancelledWaiters
private void unlinkCancelledWaiters(ConditionNode node) { // node 为空 / node 不是队尾 / node 是最后一个节点 if (node == null || node.nextWaiter != null || node == lastWaiter) { ConditionNode w = firstWaiter, trail = null; // w = first , trail = null // /从链表头节点开始遍历 while (w != null) { ConditionNode next = w.nextWaiter; // 取出下一个节点 if ((w.status & COND) == 0) { // 当前节点的状态包含 COND w.nextWaiter = null; // 当前节点的 next 设置为 null if (trail == null) // 如果 trail 指针为空 firstWaiter = next; // firstWaiter 指向 next else trail.nextWaiter = next; // trail 指针不为空,尾指针的 next 指向当前节点的下一个节点 if (next == null) lastWaiter = trail; // 最后将 lastWaiter 设置为 trail (过滤后的 trail 链表插入到队尾) } else trail = w; // 头节点状态不是 COND,当前节点设置为 trail 指针。 w = next; // 下一个循环 } } }
这个方法遍历 ConditionNode 队列,过滤掉状态不包含 COND 的节点。
对外提供的等待方法
上面三个方法是内部处理逻辑。而对外暴露的是以下五个方法:
public final void awaitUninterruptibly() public final void await() public final long awaitNanos(long nanosTimeout) public final boolean awaitUntil(Date deadline) public final boolean await(long time, TimeUnit unit)
除了awaitUninterruptibly()
,其他方法所代表的能力和 Condition 接口中定义的所代表的能力基本一致。
awaitUninterruptibly
awaitUninterruptibly()
是用于实现不可中断的条件等待:
public final void awaitUninterruptibly() { ConditionNode node = new ConditionNode(); // 创建一个新的 node int savedState = enableWait(node); // 将这个新 node 插入,并返回 node 的状态 LockSupport.setCurrentBlocker(this); // 设置 blocker boolean interrupted = false, rejected = false; // flag:中断和拒绝 while (!canReacquire(node)) { // 当前线程关联的 node 不再等待队列 if (Thread.interrupted()) // 尝试中断线程 interrupted = true; else if ((node.status & COND) != 0) { // 中断线程不成功的情况下,如果 node 状态包含 COND // 尝试阻塞线程 try { if (rejected) node.block(); // 实际上也是 LockSupport.park else ForkJoinPool.managedBlock(node); } catch (RejectedExecutionException ex) { rejected = true; // 拒绝执行 } catch (InterruptedException ie) { interrupted = true; // 中断 } } else Thread.onSpinWait(); // 当前线程无法继续执行 } // 不是队列中的唯一节点时执行下面逻辑 LockSupport.setCurrentBlocker(null); node.clearStatus(); // 清除 node 的 status acquire(node, savedState, false, false, false, 0L); // 【*】重点方法 if (interrupted) Thread.currentThread().interrupt(); }
在这个方法中,首先讲解两个方法:
Thread.onSpinWait()
表示调用者暂时无法继续,直到其他活动发生一个或多个动作。 通过在自旋等待循环构造的每次迭代中调用此方法,调用线程向运行时指示它正忙于等待。 运行时可能会采取措施来提高调用自旋等待循环构造的性能。ForkJoinPool.managedBlock(node)
则是通过 Blocker 来检查线程的运行状态,然后尝试阻塞线程。
最后是最关键的方法 acquire
,它的详细逻辑放到最后讲解, 这个方法的作用就是,当前线程进入等待后,需要将关联的线程开启一个自旋,挂起后能够持续去尝试获取锁资源。
await
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ConditionNode node = new ConditionNode(); int savedState = enableWait(node); LockSupport.setCurrentBlocker(this); // for back-compatibility boolean interrupted = false, cancelled = false, rejected = false; while (!canReacquire(node)) { if (interrupted |= Thread.interrupted()) { if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; // else interrupted after signal } else if ((node.status & COND) != 0) { try { if (rejected) node.block(); else ForkJoinPool.managedBlock(node); } catch (RejectedExecutionException ex) { rejected = true; } catch (InterruptedException ie) { interrupted = true; } } else Thread.onSpinWait(); // awoke while enqueuing } LockSupport.setCurrentBlocker(null); node.clearStatus(); acquire(node, savedState, false, false, false, 0L); if (interrupted) { if (cancelled) { unlinkCancelledWaiters(node); throw new InterruptedException(); } Thread.currentThread().interrupt(); } }
await()
方法相较于 awaitUninterruptibly()
,while 逻辑基本一致,最后多了一步 cancelled 状态检查,如果 cancelled = true ,调用 unlinkCancelledWaiters(node)
,去清理等待队列。
awaitNanos
awaitNanos(long)
在 await()
之上多了对超时时间的计算和处理逻辑:
public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); ConditionNode node = new ConditionNode(); int savedState = enableWait(node); long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout; long deadline = System.nanoTime() + nanos; boolean cancelled = false, interrupted = false; while (!canReacquire(node)) { if ((interrupted |= Thread.interrupted()) || (nanos = deadline - System.nanoTime()) <= 0L) { // 多了一个超时条件 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; } else LockSupport.parkNanos(this, nanos); } node.clearStatus(); acquire(node, savedState, false, false, false, 0L); if (cancelled) { unlinkCancelledWaiters(node); if (interrupted) throw new InterruptedException(); } else if (interrupted) Thread.currentThread().interrupt(); long remaining = deadline - System.nanoTime(); // avoid overflow return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE; }
awaitUntil
awaitUntil(Date)
和 awaitNanos(long)
同理,只是将超时计算改成了日期计算:
long abstime = deadline.getTime(); // ... boolean cancelled = false, interrupted = false; while (!canReacquire(node)) { if ((interrupted |= Thread.interrupted()) || System.currentTimeMillis() >= abstime) { // 时间检查 if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; } else LockSupport.parkUntil(this, abstime); }
await(long, TimeUnit)
await(long, TimeUnit)
则是逻辑更加与 awaitNanos(long)
相似了, 只是多了一步计算 awaitNanos(long nanosTimeout)
中的参数 nanosTimeout 的操作:
long nanosTimeout = unit.toNanos(time);
acquire 方法
在 wait 方法组中,最终都会调用到这个逻辑:
final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // 在取消第一个线程时重试 boolean interrupted = false, first = false; Node pred = null; // 入队时节点的前一个指针 /* * 反复执行: * 检查当前节点是否是 first * 若是, 确保 head 稳定,否则确保有效的 prev * 如果节点是第一个或尚未入队,尝试获取 * 否则,如果节点尚未创建,则创建这个它 * 否则,如果节点尚未入队,尝试入队一次 * 否则,如果通过 park 唤醒,重试,最多 postSpins 次 * 否则,如果 WAITING 状态未设置,设置并重试 * 否则,park 并且清除 WAITING 状态, 检查取消逻辑 */ for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // allocate; retry before enqueue if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // try to enqueue node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits Thread.onSpinWait(); } else if (node.status == 0) { node.status = WAITING; // enable signal and recheck } else { long nanos; spins = postSpins = (byte)((postSpins << 1) | 1); if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible); }
这个方法会在 Node 关联的线程让出锁资源后,开启一个死循环尝试通过 tryAcquire
尝试获取锁资源,最后如果超时或尝试次数超出限制,会通过 LockSupport.park
阻塞自身。
加载全部内容