JUC并发编程基石AQS之主流程源码解析
LuxBai 人气:0
## 前言
由于AQS的源码太过凝练,而且有很多分支比如取消排队、等待条件等,如果把所有的分支在一篇文章的写完可能会看懵,所以这篇文章主要是从正常流程先走一遍,重点不在取消排队等分支,之后会专门写一篇取消排队和等待条件的分支逻辑。读源码千万别在每个代码分支中来回游走,先按一个正常的分支把流程看明白,之后再去重点关注其他分支,各个击破。我相信看完正常流程,你再去分析其他分支会更加得心应手。本篇将主要方法名都做了目录索引,查看时可通过目录快速跳到指定方法的逻辑。
## 执行流程
AQS的执行流程大体为当线程获取锁失败时,会加入到等待队列中,在**等待队列**中的线程会按照从头至尾的顺序依次再去尝试获取锁执行。
当线程获取锁后如果还需要等待特定的条件才能执行,那么线程就加入到**条件队列**排队,当等待的条件到来时**再从条件队列中按照从头至尾的顺序加入到等待队列**中,然后再按照等待队列的执行流程去获取锁。所以AQS最核心的数据结构其实就两个队列,等待队列和条件队列,然后再加上一个获取锁的同步状态。
## AQS数据结构
AQS最核心的数据结构就三个
- **等待队列**
源码中head和tail为等待队列的头尾节点,在通过前后指向则构成了等待队列,为双向链表,学名为CLH队列。
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210456579-856565466.png)
- **条件队列**
ConditionObject中的firstWaiter和lastWaiter为等待队列的头尾节点,然后通过next指向构成了条件队列,是个单向链表。
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210520472-754565803.png)
- **同步状态**
state为同步状态,通过CAS操作来实现获取锁的操作。
```java
public abstract class AbstractQueuedSynchronizer{
/**
* 等待队列的头节点
*/
private transient volatile Node head;
/**
* 等待队列的尾节点
*/
private transient volatile Node tail;
/**
* 同步状态
*/
private volatile int state;
public class ConditionObject implements Condition, java.io.Serializable {
/** 条件队列的头节点 */
private transient Node firstWaiter;
/** 条件队列的尾节点 */
private transient Node lastWaiter;
}
}
```
### Node节点
两个队列中的节点都是通过AQS中内部类Node来实现的。主要字段:
- waitStatus
当前节点的状态,具体看源码列出的注释。很重要,之后会在源码中讲解。
- Node prev
等待队列节点指向的前置节点
- Node next
待队列节点指向的后置节点
- Node nextWaiter
条件队列中节点指向的后置节点
- Thread thread
当前节点持有的线程
```java
static final class Node {
/** */
static final Node SHARED = new Node();
/** */
static final Node EXCLUSIVE = null;
/** 标明当前节点线程取消排队 */
static final int CANCELLED = 1;
/** 标明该节点的后置节点需要自己去唤醒 */
static final int SIGNAL = -1;
/** 标明当前节点在等待某个条件,此时节点在条件队列中 */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* 等待状态,值对于上面的四个常量
*/
volatile int waitStatus;
/**
* 等待队列节点指向的前置节点
*/
volatile Node prev;
/**
* 等待队列节点指向的后置节点
*/
volatile Node next;
/**
* 当前节点持有的线程
*/
volatile Thread thread;
/**
* 条件队列中节点指向的后置节点
*/
Node nextWaiter;
```
## 加锁
上面说明的数据结构我们先大致有个印象,现在通过加锁来一步步说明下具体的流程,上篇文章**JUC并发编程基石AQS之结构篇**,我们知道了AQS加锁代码执行的是acquire方法,那么我们从这个方法说起,从源码中看出执行流程为:tryAcquire——>addWaiter——>acquireQueued
tryAcquire为自己实现的具体加锁逻辑,当加锁失败时返回false,则会执行addWaiter,将线程加入到等待队列中,Node.EXCLUSIVE为独占锁的模式,即同时只能有一个线程获取锁去执行。
**例子说明**
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210553150-182392148.png)
首先假设有四个线程t0-t4调用tryAcquire获取锁,t0线程为天选之子获取到了锁,则t1-t4线程接着去执行addWaiter。
### acquire
```java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
```
### addWaiter分支1
addWaiter方法,首先会初始化一个node节点,将当前线程设置到node节点中。然后判断head和tail节点是否为空,head和tail节点是懒加载的,当AQS初始化时为null,则第一次进来时if (pred != null) 条件不成立,执行enq方法。
**例子说明**
假如t1和t2线程同时执行到该方法,head节点未初始化则执行enq。
```java
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
```
### enq
此时可能多个线程会同时调用enq方法,所以该方法中也使用CAS操作。for (;;)是个死循环,首先会CAS操作初始化head节点,且**head节点是个空节点,没有设置线程。**然后第二次循环时通过CAS操作将该节点设置我尾部节点,并将前置节点指向head,之后会跳出循环,返回生成的Node节点到addWaiter,从源码可以看到addWaiter方法后面没有逻辑,之后会调用acquireQueued。
**例子说明**
t1和t2线程同时执行,t1线程上天眷顾CAS成功,则流程为
- 初始化head
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210609555-882274264.png)
- t1线程的node节点加入等待队列
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210651530-1199757265.png)
- t2线程执行,node节点加入等待队列
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210702551-1406037531.png)
```java
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
```
### addWaiter分支2
现在在来说t3和t4,t3和t4线程这时终于获取到了cpu的执行权,此时head节点已经初始化,则进入条件中的代码,其实也是通过CAS操作将节点加入到等待队列尾部,之后会调用acquireQueued。
**例子说明**
假如t3线程先CAS成功,之后t4成功,此时的数据结构为
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210723518-381228880.png)
```java
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
```
### acquireQueued
这个方法有两个逻辑,首先如果该节点的前置节点是head会走第一个if,再次去尝试获取锁???
获取锁成功,则将头节点设置为自己,并返回到acquire方法,此时acquire方法执行完,代表获取锁成功,线程可以执行自己的逻辑了。这里有下面几个注意点
- p.next = null; // help GC 设置旧的head节点的后置节点为null
- setHead方法 将t1节点设置为头节点,因为头节点是个空节点,所以设置t1线程节点线程为null,设置t1前置节点为null,此时旧的head节点已经没有任何指向和关联,可以被gc回收,所以上面那一步会写个help GC 的注释。
**例子说明**
现在t1线程的前置节点为头结点,如果t1执行tryAcquire成功则结果为
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210742531-1679283002.png)
当获取锁失败或者前置节点不是头节点都会走第二个if逻辑,首先会判断当前线程是否需要挂起,如果需要则执行线程挂起。
```java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
```
### shouldParkAfterFailedAcquire
判断线程是否需要挂起,首先需要注意的是这个方法的参数是当前节点的前置节点。当线程需要挂起的时候,它需要把身后事安排明白,挂起后让谁来把我唤醒。这个方法就主要做这个操作。我们再来看Node节点中的waitStatus状态,这个状态有一个Node.SIGNAL=-1,**代表了当前节点需要将后置节点唤醒**。这个理解可能有点绕。首先我们要理解一点,如果我需要被唤醒,那么我就要设置我们的前置节点的状态为Node.SIGNAL,这样当我的前置节点发现waitStatus=Node.SIGNAL时,它才知道,我执行完后需要去唤醒后置节点让后置节点去执行。所以这个方法是**当前节点去设置自己的前置节点的状态为Node.SIGNAL**。
waitStatus初始化后是0,
第一次进入该方法,发现自己的前置节点不是Node.SIGNAL,需要先设置为Node.SIGNAL状态
第二次进入时发现前置节点已经是Node.SIGNAL状态,那么我就可以安心的挂起了,有人会唤醒我的。
所以这个方法其实是两个逻辑,先设置前置节点状态,再判断是否可以挂起。因为前面acquireQueued方法中for (;;) 也是个循环,所以会重复进入。
```java
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
```
### parkAndCheckInterrupt
将自己的前置节点设置为可唤醒的状态后进入该方法,线程挂起。
**例子说明**
此时t2-t4线程都执行到了此方法,则t2-t4线程都已经挂起不再执行,并且**head-t3**节点的waitStatus都为Node.SIGNAL,因为t4没有后置节点。
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210756816-1954238693.png)
```java
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
```
## 解锁
### release
解锁方法的入口是AQS的release方法,首先会调用tryRelease方法,这个是AQS实现类自己实现的方法,去CAS改变state状态,如果解锁成功,则会进入if里的代码,获取head节点,判断waitStatus!=0,如果等于0代表没有后置节点需要去唤醒。之后调用unparkSuccessor方法。
```java
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
```
waitStatus>0时,代表为CANCELLED = 1状态,即线程取消排队,这个以后会细讲。先将头结点的waitStatus状态设为初始值0,之后查看后置节点的状态,如果>0代表后置节点取消了排队,不需要唤醒。但是当前节点需要去唤醒后续的节点让后续节点再去执行,所以会从尾结点开始寻找找到离当前线程最近的一个且waitStatus<0的去唤醒。之后会调用LockSupport.unpark(s.thread);取消后续节点的挂起,让后续节点继续执行。
### unparkSuccessor
```java
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
```
**例子说明**
此时等待队列的数据,当t0线程执行完成后执行解锁操作,此时所有等待的线程都没有取消等待。
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210820697-368819274.png)
则t0线程会唤醒t1线程
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210835093-895930851.png)
如果t1和t3线程取消的排队时,t0线程会唤醒t2,**从后往前找离head最近的一个没有取消派对的节点**。
![](https://img2020.cnblogs.com/blog/1496984/202004/1496984-20200425210848702-114717817.png)
线程执行到parkAndCheckInterrupt方法时被挂起,当被头节点唤醒后会继续执行,设置interrupted=true,表示被中断,会继续执行for循环逻辑,到现在一个正常的获取锁失败——>加入等待队列——>挂起——>被唤醒继续执行的流程已经整体走了一遍。
本篇文章都是自己根据源码写出的阅读心得,可能有的地方没有揣摩到Doug Lea大神的意图,如果有理解不对的地方欢迎一起探讨。
**如有不实,还望指正**
加载全部内容