Java多线程 CountDownLatch Java多线程之同步工具类CountDownLatch
冬日毛毛雨 人气:0前言:
CountDownLatch
是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。
1 CountDownLatch主要方法
void await():如果当前count
大于0,当前线程将会wait,直到count等于0或者中断。 PS:当count
等于0的时候,再去调用await()
,
线程将不会阻塞,而是立即运行。后面可以通过源码分析得到。
boolean await(long timeout, TimeUnit unit):使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
void countDown(): 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
long getCount() :获得计数的数量
2 CountDownLatch使用例子
public class CountDownLatchTest { private static final int N = 4; public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(4); for(int i=0;i<N;i++) { new Thread(){ public void run() { try { System.out.println("子线程"+Thread.currentThread().getName()+"正在执行"); Thread.sleep(3000); System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕"); latch.countDown(); System.out.println("剩余计数"+latch.getCount()); } catch (InterruptedException e) { e.printStackTrace(); } }; }.start(); } try { System.out.println("等待"+N+"个子线程执行完毕..."); latch.await(); System.out.println(N+"个子线程已经执行完毕"); System.out.println("继续执行主线程"); } catch (InterruptedException e) { e.printStackTrace(); } } }
子线程Thread-1正在执行
子线程Thread-3正在执行
子线程Thread-2正在执行
等待4个子线程执行完毕...
子线程Thread-0正在执行
子线程Thread-3执行完毕
子线程Thread-2执行完毕
剩余计数2
子线程Thread-1执行完毕
剩余计数1
子线程Thread-0执行完毕
剩余计数3
剩余计数0
4个子线程已经执行完毕
继续执行主线程
3 CountDownLatch源码分析
CountDownLatch
是通过计数器的方式来实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务之后,就会对计数器减1,当计数器的值为0时,表示所有线程完成了任务,此时等待在闭锁上的线程才继续执行,从而达到等待其他线程完成任务之后才继续执行的目的。
构造函数
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
通过传入一个数值来创建一个CountDownLatch
,数值表示线程可以从等待状态恢复,countDown
方法必须被调用的次数
countDown方法
public void countDown() { sync.releaseShared(1); }
线程调用此方法对count
进行减1。当count
本来就为0,此方法不做任何操作,当count
比0大,调用此方法进行减1,当new count
为0,释放所有等待当线程。
countDown方法的内部实现
/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * * <p>If the current count equals zero then nothing happens. */ public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared();//释放所有正在等待的线程节点 return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
await方法
(1)不带参数
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
调用此方法时,当count
为0,直接返回true
,当count
比0大,线程会一直等待,直到count
的值变为0,或者线程被中断(interepted,此时会抛出中断异常)。
(2)带参数
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
调用此方法时,当count
为0,直接返回true
,当count
比0大,线程会等待一段时间,等待时间内如果count
的值变为0,返回true
;当超出等待时间,返回false
;或者等待时间内线程被中断,此时会抛出中断异常。
await()方法的内部实现
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
具体如下:
- 1、检测中断标志位
- 2、调用
tryAcquireShared
方法来检查AQS标志位state
是否等于0,如果state
等于0,则说明不需要等待,立即返回,否则进行3 - 3、调用
doAcquireSharedInterruptibly
方法进入AQS同步队列进行等待,并不断的自旋检测是否需要唤醒
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } /* 函数功能:根据AQS的状态位state来返回值, 如果为state=0,返回 1 如果state=1,则返回-1 */ protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) {//如果大于零,则说明需要唤醒 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
4 CountDownLatch和CyclicBarrier区别
CountDownLatch
和CyclicBarrier
都能够实现线程之间的等待,只不过它们侧重点不同:
CountDownLatch
一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;CyclicBarrier
一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
CountDownLatch
是不能够重用的,而CyclicBarrier
是可以重用的。
加载全部内容