亲宝软件园·资讯

展开

AQS同步组件CyclicBarrier

共饮一杯无 人气:0

CyclicBarrier原理

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

当某个线程调用了await方法之后,就会进入等待状态,并将计数器+1,直到所有线程调用await方法使计数器为CyclicBarrier设置的值,才可以继续执行,由于计数器可以重复使用,所以我们又叫它循环屏障。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。

源码分析

    /**
     * 创建一个新的CyclicBarrier当给定数量的参与方(线程)等待它时,它将触发,
     * 并且在障碍触发时不执行预定义的操作。
     *
     * @param  在barrier被触发之前必须调用await()的线程数
     * @throws IllegalArgumentException 如果parties小于1抛出异常
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

/**
     * 
     * 当前线程调用await方法的线程告知CyclicBarrier已经到达屏障,然后当前线程被阻塞
     *
     * @return 当前线程的到达索引,其中索引为- 1表示第一个到达的,0表示最后一个到达的
     * @throws InterruptedException 如果当前线程在等待时被中断
     * @throws BrokenBarrierException 如果另一个线程在当前线程等待时被中断或超时,
     * 或者屏障被重置,或者在调用await方法时屏障被破坏,或者屏障操作(如果存在)由于异常而失败
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

使用案例

await()

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    /**
     * 屏障拦截的线程数量为5,表示每次屏障会拦截5个线程
     */
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready {}", threadNum,barrier.getNumberWaiting());
        //每次调用await方法后计数器+1,当前线程被阻塞
        barrier.await();
        log.info("{} continue", threadNum);
    }

输出结果:

16:16:40.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 is ready 0
16:16:41.244 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 is ready 1
16:16:42.244 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 is ready 2
16:16:43.244 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 is ready 3
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 is ready 4
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 continue
16:16:44.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 continue
16:16:44.245 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 continue
16:16:44.245 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 continue
16:16:44.245 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 continue
16:16:45.245 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 is ready 0
16:16:46.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 is ready 1
16:16:47.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 is ready 2
16:16:48.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 is ready 3
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 is ready 4
16:16:49.246 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 continue
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 continue
16:16:49.246 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 continue
16:16:49.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 continue
16:16:49.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 continue
16:16:50.247 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 is ready 0
16:16:51.247 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 is ready 1
16:16:52.247 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 is ready 2
16:16:53.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 is ready 3
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 is ready 4
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 continue
16:16:54.248 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 continue
16:16:54.248 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 continue
16:16:54.248 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 continue
16:16:54.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 continue

通过输出结果可以知道,每次屏障会阻塞5个线程,5个线程执行后计数器达到预设值,继续执行后续操作。

await(long timeout, TimeUnit unit)

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    /**
     * 屏障拦截的线程数量为5,表示每次屏障会拦截5个线程
     */
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready{}", threadNum,barrier.getNumberWaiting());
        //每次调用await方法后计数器+1,当前线程被阻塞
        //等待2s.为了使在发生异常的时候,不影响其他线程,一定要catch
        //由于设置了超时时间后阻塞的线程可能会被中断,抛出BarrierException异常,如果想继续往下执行,需要加上try-catch
        try {
            barrier.await(2, TimeUnit.SECONDS);
        }catch (Exception e){
            //查看执行异常的线程
            log.info("线程{} 执行异常,阻塞被中断?{}",threadNum,barrier.isBroken());
        }
        log.info("{} continue", threadNum);
    }

输出结果:

17:06:24.440 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 is ready0
17:06:25.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 is ready1
17:06:26.435 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 is ready2
17:06:26.455 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 线程0 执行异常,阻塞被中断?true
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 线程2 执行异常,阻塞被中断?true
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程1 执行异常,阻塞被中断?true
17:06:26.456 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 continue
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 continue
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 continue
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 is ready0
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程3 执行异常,阻塞被中断?true
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 continue
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 is ready0
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程4 执行异常,阻塞被中断?true
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 continue
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 is ready0
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程5 执行异常,阻塞被中断?true
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 continue
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 is ready0
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程6 执行异常,阻塞被中断?true
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 continue
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 is ready0
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程7 执行异常,阻塞被中断?true
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 continue
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 is ready0
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程8 执行异常,阻塞被中断?true
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 continue
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 is ready0
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程9 执行异常,阻塞被中断?true
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 continue

CyclicBarrier(int parties, Runnable barrierAction)

 /**
     * 线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景
     */
    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

输出结果:

17:11:38.867 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 is ready
17:11:38.966 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 is ready
17:11:39.067 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 is ready
17:11:39.167 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 continue
17:11:39.268 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 continue
17:11:39.268 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 continue
17:11:39.268 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 continue
17:11:39.268 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 continue
17:11:39.369 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 is ready
17:11:39.470 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 is ready
17:11:39.570 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 is ready
17:11:39.671 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 continue
17:11:39.772 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 continue
17:11:39.772 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 continue
17:11:39.772 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 continue
17:11:39.772 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 continue

CyclicBarrier和CountDownLatch的区别

加载全部内容

相关教程
猜你喜欢
用户评论