java ReentrantLock条件锁实现原理示例详解
小海编码日记 人气:0引言
在前两篇文章中,我们了解了ReentrantLock内部公平锁和非公平锁的实现原理,可以知道其底层基于AQS,使用双向链表实现,同时在线程间通信方式(2)中我们了解到ReentrantLock也是支持条件锁的,接下来我们来看下,其内部条件锁的实现原理。
条件锁的使用
public static void main(String[] args) { ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(new Runnable() { @Override public void run() { lock.lock(); System.out.println(Thread.currentThread().getName()+" enter lock first"); System.out.println(Thread.currentThread().getName()+" await start"); try { condition.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName()+" await end"); lock.unlock(); } }); executorService.execute(new Runnable() { @Override public void run() { lock.lock(); System.out.println(Thread.currentThread().getName()+" enter lock first"); System.out.println(Thread.currentThread().getName()+" start sleep"); try { Thread.sleep(20000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName()+" end sleep"); System.out.println(Thread.currentThread().getName()+" signalAll condition"); condition.signalAll(); System.out.println(Thread.currentThread().getName()+"signal end"); lock.unlock(); } }); }
如上代码所示,一般情况下我们通过
Condition condition = lock.newCondition();
创建条件对象,使用condition.await();
表示当前线程需要等待条件才能继续执行,当线程执行到此处时,会进入等待队列等待,直到有另一个线程通过condition.signalAll();
或condition.signal();
唤醒,此时表明当前线程执行条件已具备,此时当前线程继续执行,上述代码中,当前线程会转入AQS的同步等待队列中,去等待抢占lock锁,其运行结果如下图所示:
条件锁一般适用于线程需要具备一定条件后才能正确执行的情况。
ReentrantLock.newCondition()
上文看到Condition的创建和基本用法,接下来我们来看下Condition的实现原理,跟踪ReentrantLock的执行代码如下所示:
// ReentrantLock.java public Condition newCondition() { return sync.newCondition(); } // ReentrantLock内部类Sync中 final ConditionObject newCondition() { return new ConditionObject(); }
可以看到newCondition最终返回了一个ConditionObject类的对象,ConditionObject类代码如下所示:
// AQS中声明的ConditionObject public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject() { } private Node addConditionWaiter() { } private void doSignal(Node first) { ..... } private void doSignalAll(Node first) { ..... } private void unlinkCancelledWaiters() { ..... }
相信大家已经看出来了,很熟悉的Node链表有没有?其中firstWaiter指向链表首位,lastWaiter指向链表尾,在该链表内维护一个Node的双向链表,结合AQS中实现,我们可以猜测出,在condition.await的时候会以当前线程创建Node节点,随后以插入条件队列,随后当执行condition.signal/condition.signalAll时,唤醒在链表上的这些节点,具体实现是不是这样呢?我们继续看
Condition.await
ConditionObject实现的await方法如下所示:
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 以当前线程创建Node对象,并添加值队尾 Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; // 通过LockSupport阻塞线程 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
Condition.signal
ConditionObject中的signal函数实现如下所示:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 对队首节点唤醒 doSignal(first); }
private void doSignal(Node first) { do { // 重置firstWaiter并不断尝试唤醒首节点 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { // 尝试更新节点的waitStatus if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 当前线程可以正常执行了,将该节点移入同步等待队列中,尝试获取锁 Node p = enq(node); int ws = p.waitStatus; // 如果可以获取锁,则立即唤醒执行 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
Condition.signalAll的逻辑与signal基本一致,区别在于是将在该条件上等待的所有节点均移入同步等待队列中。
加载全部内容