亲宝软件园·资讯

展开

java ReentrantLock条件锁实现原理示例详解

小海编码日记 人气:0

引言

在前两篇文章中,我们了解了ReentrantLock内部公平锁和非公平锁的实现原理,可以知道其底层基于AQS,使用双向链表实现,同时在线程间通信方式(2)中我们了解到ReentrantLock也是支持条件锁的,接下来我们来看下,其内部条件锁的实现原理。

条件锁的使用

 public static void main(String[] args) {
     ReentrantLock lock = new ReentrantLock();
     Condition condition = lock.newCondition();
     ExecutorService executorService = Executors.newCachedThreadPool();
     executorService.execute(new Runnable() {
         @Override
         public void run() {
             lock.lock();
             System.out.println(Thread.currentThread().getName()+" enter lock first");
             System.out.println(Thread.currentThread().getName()+" await start");
             try {
                 condition.await();
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
             System.out.println(Thread.currentThread().getName()+" await end");
             lock.unlock();
         }
     });
     executorService.execute(new Runnable() {
         @Override
         public void run() {
             lock.lock();
             System.out.println(Thread.currentThread().getName()+" enter lock first");
             System.out.println(Thread.currentThread().getName()+" start sleep");
             try {
                 Thread.sleep(20000);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
             System.out.println(Thread.currentThread().getName()+" end sleep");
             System.out.println(Thread.currentThread().getName()+" signalAll condition");
             condition.signalAll();
             System.out.println(Thread.currentThread().getName()+"signal end");
             lock.unlock();
         }
     });
 }

如上代码所示,一般情况下我们通过

 Condition condition = lock.newCondition();

创建条件对象,使用condition.await();表示当前线程需要等待条件才能继续执行,当线程执行到此处时,会进入等待队列等待,直到有另一个线程通过condition.signalAll();condition.signal();唤醒,此时表明当前线程执行条件已具备,此时当前线程继续执行,上述代码中,当前线程会转入AQS的同步等待队列中,去等待抢占lock锁,其运行结果如下图所示:

条件锁一般适用于线程需要具备一定条件后才能正确执行的情况。

ReentrantLock.newCondition()

上文看到Condition的创建和基本用法,接下来我们来看下Condition的实现原理,跟踪ReentrantLock的执行代码如下所示:

 // ReentrantLock.java 
 public Condition newCondition() {
     return sync.newCondition();
 }
 ​
 // ReentrantLock内部类Sync中
 final ConditionObject newCondition() {
     return new ConditionObject();
 }

可以看到newCondition最终返回了一个ConditionObject类的对象,ConditionObject类代码如下所示:

 // AQS中声明的ConditionObject
 public class ConditionObject implements Condition, java.io.Serializable {
     private static final long serialVersionUID = 1173984872572414699L;
     private transient Node firstWaiter;
     private transient Node lastWaiter;
     public ConditionObject() { }
     private Node addConditionWaiter() {
     }
     private void doSignal(Node first) {
       .....
     }
     private void doSignalAll(Node first) {
       .....
     }
     private void unlinkCancelledWaiters() {
       .....
     }

相信大家已经看出来了,很熟悉的Node链表有没有?其中firstWaiter指向链表首位,lastWaiter指向链表尾,在该链表内维护一个Node的双向链表,结合AQS中实现,我们可以猜测出,在condition.await的时候会以当前线程创建Node节点,随后以插入条件队列,随后当执行condition.signal/condition.signalAll时,唤醒在链表上的这些节点,具体实现是不是这样呢?我们继续看

Condition.await

ConditionObject实现的await方法如下所示:

 private Node addConditionWaiter() {
     Node t = lastWaiter;
     // If lastWaiter is cancelled, clean out.
     if (t != null && t.waitStatus != Node.CONDITION) {
         unlinkCancelledWaiters();
         t = lastWaiter;
     }
     Node node = new Node(Thread.currentThread(), Node.CONDITION);
     if (t == null)
         firstWaiter = node;
     else
         t.nextWaiter = node;
     lastWaiter = node;
     return node;
 }
 public final void await() throws InterruptedException {
     if (Thread.interrupted())
         throw new InterruptedException();
     // 以当前线程创建Node对象,并添加值队尾
     Node node = addConditionWaiter();
     int savedState = fullyRelease(node);
     int interruptMode = 0;
     // 通过LockSupport阻塞线程
     while (!isOnSyncQueue(node)) {
         LockSupport.park(this);
         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
             break;
     }
     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
         interruptMode = REINTERRUPT;
     if (node.nextWaiter != null) // clean up if cancelled
         unlinkCancelledWaiters();
     if (interruptMode != 0)
         reportInterruptAfterWait(interruptMode);
 }

Condition.signal

ConditionObject中的signal函数实现如下所示:

 public final void signal() {
     if (!isHeldExclusively())
         throw new IllegalMonitorStateException();
     Node first = firstWaiter;
     if (first != null)
         // 对队首节点唤醒
         doSignal(first);
 }
 private void doSignal(Node first) {
     do {
         // 重置firstWaiter并不断尝试唤醒首节点
         if ( (firstWaiter = first.nextWaiter) == null)
             lastWaiter = null;
         first.nextWaiter = null;
     } while (!transferForSignal(first) &&
              (first = firstWaiter) != null);
 }
 final boolean transferForSignal(Node node) {
     // 尝试更新节点的waitStatus
     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
         return false;
     // 当前线程可以正常执行了,将该节点移入同步等待队列中,尝试获取锁
     Node p = enq(node);
     int ws = p.waitStatus;
     // 如果可以获取锁,则立即唤醒执行
     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
         LockSupport.unpark(node.thread);
     return true;
 }

Condition.signalAll的逻辑与signal基本一致,区别在于是将在该条件上等待的所有节点均移入同步等待队列中。

加载全部内容

相关教程
猜你喜欢
用户评论