亲宝软件园·资讯

展开

Java信号量Semaphore

刘Java 人气:0

1.Semaphore的概述

public class Semaphore extends Object implements Serializable

Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为一种多线程并发控制工具来使用。

Semaphore可以控制同时访问共享资源的线程个数,线程通过 acquire方法获取一个信号量,信号量减一,如果没有就等待;通过release方法释放一个信号量,信号量加一。它通过控制信号量的总数量,以及每个线程所需获取的信号量数量,进而控制多个线程对共享资源访问的并发度,以保证合理的使用共享资源。相比synchronized和独占锁一次只能允许一个线程访问共享资源,功能更加强大,有点类似于共享锁!

2.Semaphore的原理

2.1 基本结构

根据uml类图,可以很明显的看出来Semaphore和CountDownLatch一样都是直接使用AQS实现的。区别就是Semaphore还分别实现了公平模式FairSync和非公平模式NonfairSync两个内部类。

实际上公平与非公平只是在获取信号量的时候得到体现,它们的释放信号量的方法都是一样的,这就类似于ReentrantLock:公平与非公平只是在获取锁的时候得到体现,它们的释放锁的方法都是一样的!或许这里有人在想,信号量是不是可以看作锁资源呢?某些时候这么看是没问题的,比如都是获取了只有获取了“信号量”或者“锁”才能访问共享资源,但是它们又有区别,锁资源会和线程绑定,而信号量则不会和线程绑定。

在构造器部分,如同CountDownLatch 构造函数传递的初始化计数个数count被赋给了AQS 的state 状态变量一样,Semaphore的信号量个数permits同样赋给了AQS 的state 值。

在创建Semaphore时可以使用一个fair变量指定是否使用公平策略,默认是非公平的模式。公平模式会确保所有等待的获取信号量的线程按照先进先出的顺序获取信号量,而非公平模式则没有这个保证。非公平模式的吞吐量比公平模式的吞吐量要高,而公平模式则可以避免线程饥饿。

/**
 * 保存某个AQS子类实例
 */
private final Sync sync;

/**
 * 创建具有给定的信号量数和非公平的公平模式的 Semaphore。
 *
 * @param permits 初始的可用信号量数目。此值可能为负数,在这种情况下,必须在授予任何获取信号量前进行释放信号量。
 */
public Semaphore(int permits) {
    //默认初始化NonfairSync实例
    sync = new NonfairSync(permits);
}

/**
 * 创建具有给定的信号量数和给定的公平设置的 Semaphore。
 *
 * @param permits 初始的可用信号量数目。此值可能为负数,在这种情况下,必须在授予任何获取信号量前进行释放信号量。
 * @param fair    如果此信号量保证在争用时按先进先出的顺序授予信号量,则为 true;否则为 false。
 */
public Semaphore(int permits, boolean fair) {
    //根据fair参数选择初始化一个公平FairSync类或者非公平NonfairSync类的实例
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}


/**
 * 非公平模式的实现
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    //…………其他方法后面再讲

}

/**
 * 公平模式的实现
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    //…………其他方法后面再讲

}

/**
 * 信号量的同步实现。 使用 AQS 的state状态表示信号量。子分类为公平和非公平模式。
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    /**
     * 构造器
     *
     * @param permits 初始的可用信号量数目。
     */
    Sync(int permits) {
        //被设置为state值
        setState(permits);
    }

    //…………其他方法后面再讲
}

2.2 可中断获取信号量

public void acquire()

可中断的获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒该线程或者线程被中断。获取一个信号量就立即返回,将可用的信号量数减 1。 如果调用此方法时已被中断或者等待时被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。

public void acquire(int permits)

可中断的获取permits 个信号量。

内部调用AQS的acquireSharedInterruptibly方法,这实际上就是共享式可中断获取资源的模版方法,因此Semaphore和CountDownLatch一样都是基于共享资源模式。

/**
 * Semaphore的acquire方法
 * 从信号量获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒或者线程被中断。
 *
 * @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
 */
public void acquire() throws InterruptedException {
    //内部调用AQS的acquireSharedInterruptibly方法
    //这实际上就是共享式可中断获取资源模版方法
    sync.acquireSharedInterruptibly(1);
}

/**
 * 从信号量获取permits个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒或者线程被中断。
 *
 * @param permits 需要获取的信号量数量
 * @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
 */
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    //参数就是permits
    sync.acquireSharedInterruptibly(permits);
}

/**
 1. AQS的acquireSharedInterruptibly方法
 2. 共享式可中断获取信号量资源的模版方法
 3.  4. @param arg 需要获取的信号量资源数量
 5. @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
 */
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //最开始就检查一次,如果当前线程是被中断状态,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //调用tryAcquireShared尝试获取共享信号量资源,这个方法是子类自己重写的
    //如果返回值小于0,表示当前线程共享信号量资源失败,否则表示成功
    //Semaphore的FairSync和NonfairSync对tryAcquireShared分别做出了公平和不公平的实现
    if (tryAcquireShared(arg) < 0)
        //获取不到就执行doAcquireSharedInterruptibly方法
        doAcquireSharedInterruptibly(arg);
}

在获取共享信号量资源的时候,Semaphore还实现了公平模式和非公平模式!它们的实现实际上和lock锁的实现中锁资源的公平、非公平获取非常类似!

2.1.1 公平模式

公平模式调用FairSync的tryAcquireShared方法!

如果我们学习了AQS、ReentrantLock、ReadWriteLock的源码,我们第一个就会发现hasQueuedPredecessors方法,这个方法是AQS为实现公平模式的预定义的方法,AQS帮我们实现好了,该方法用于查询是否有任何线程等待获取信号量资源的时间超过当前线程。

大概步骤为:

原理还是很简单的,就是判断目前的信号量资源数量—state的值,是否满足要获取的信号量资源数量,acquire()方法默认获取1个资源。获取到了就是CAS的原子性的将state递减,否则表示获取资源失败,那么可能会阻塞。但是我们也会发现:如果remaining大于等于0,但是CAS更新state失败,那么会循环重试,这里为什么要重试呢?

实际上我们的在AQS文章的“可重入共享锁的实现” 部分已经讲过:因为可能会有多个线程同时获取信号量资源,但是由于CAS只能保证一次只有一个线程成功,因此其他线程必定失败,但此时,实际上还是存在剩余的信号量资源没有被获取完毕的,因此让其他线程重试,相比于直接加入到同步队列中,对于信号量资源的利用率更高!

/**
 * 公平模式
 */
static final class FairSync extends Sync {
    /**
     * 尝试公平的获取共享信号量资源
     *
     * @param acquires 获取信号量资源数量
     * @return 如果返回值小于0,表示当前线程共享信号量资源失败,否则表示成功
     */
    protected int tryAcquireShared(int acquires) {
        /*开启一个循环尝试获取共享信号量资源*/
        for (; ; ) {
            //这是AQS实现公平模式的预定义的方法,AQS帮我们实现好了。该方法用于查询是否有任何线程等待获取信号量资源的时间超过当前线程
            //如果该方法返回true,则表示有线程比当前线程更早地请求获取信号量资源。由于是公平的的,因此当前线程不应该获取信号量资源,直接返回-1,表示获取信号量资源失败
            if (hasQueuedPredecessors())
                return -1;
            //到这里,表示当前线程就是最早请求获取信号量资源,可以尝试获取

            //获取state的值available,我们知道state代表信号量资源数量
            int available = getState();
            //remaining为available减去需要获取的信号量资源数量之后的差值
            int remaining = available - acquires;
            //如果remaining小于0,那么返回remaining值,由于是负数,因此获取失败
            //如果大于等于0,那么表示可以获取成功,尝试CAS的更新state,更新成功之后同样返回remaining,由于是大于等于0的数,因此获取成功
            if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                return remaining;
            //如果remaining大于等于0,但是CAS更新state失败,那么循环重试
        }
    }
}

2.1.2 非公平模式

非公平模式调用NonfairSync的tryAcquireShared方法!

相比于公平模式的实现,少了hasQueuedPredecessors的判断。可以想象:如果某线程A 先调用了aquire()方法获取信号量,但是如果当前信号量个数为0,那么线程A 会被放入AQS 的同步队列阻塞。

过一段时间后线程B调用了release()方法释放了一个信号量,他它会唤醒队列中等待的线程A,但是这时线程C又调用了aquire()方法。如果采用非公平策略,那么线程C就会和线程A 去竞争这个信号量资源。由nonfairTryAcquireShared的代码可知,线程C完全可以在线程A 被激活前,或者激活后先于线程A 获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。

另外,非公平模式的具体实现是在父类Sync中的nonfairTryAcquireShared方方法,为什么该方法要实现在父类中的,因为无论是指定的公平模式还是非公平模式,它们的tryAcquire方法都是调用的nonfairTryAcquireShared方法,即非公平的,因此实现在父类中!

/**
 * 非公平模式
 */
static final class NonfairSync extends Sync {

    /**
     * 尝试非公平的获取共享信号量资源
     *
     * @param acquires 获取信号量资源数量
     * @return 如果返回值小于0,表示当前线程共享信号量资源失败,否则表示成功
     */
    protected int tryAcquireShared(int acquires) {
        //调用父类Sync的nonfairTryAcquireShared方法
        //为什么该方法要实现在父类中的,因为无论是指定的公平模式还是非公平模式,
        //它们的tryAcquire方法都是调用的nonfairTryAcquireShared方法,即非公平的,因此实现在父类中
        return nonfairTryAcquireShared(acquires);
    }
}

/**
 * AQS的实现,作为公平和非公平模式的父类,有一些共享方法
 */
abstract static class Sync extends AbstractQueuedSynchronizer {

    /**
     * 尝试非公平的获取共享信号量资源
     *
     * @param acquires 获取信号量资源数量
     * @return 如果返回值小于0,表示当前线程共享信号量资源失败,否则表示成功
     */
    final int nonfairTryAcquireShared(int acquires) {
        /*开启一个循环尝试获取共享信号量资源*/
        for (; ; ) {
            //相比于公平模式,少了hasQueuedPredecessors的实现
            //获取state的值available,我们知道state代表信号量资源数量
            int available = getState();
            //remaining为available减去需要获取的信号量资源数量之后的差值
            int remaining = available - acquires;
            //如果remaining小于0,那么返回remaining值,由于是负数,因此获取失败
            //如果大于等于0,那么表示可以获取成功,尝试CAS的更新state,更新成功之后同样返回remaining,由于是大于等于0的数,因此获取成功
            if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                return remaining;
            //如果remaining大于等于0,但是CAS更新state失败,那么循环重试
        }
    }
}

2.3 不可中断获取信号量

public void acquireUninterruptibly()

不可中断的获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒该线程。获取一个信号量就立即返回,将可用的信号量数减 1。

相比于acquire()方法,该方法不响应中断,不会抛出InterruptedException

public void acquireUninterruptibly(int permits)

不可中断的获取permits个信号量。

相比于acquire方法,acquireUninterruptibly方法不响应中断,不会抛出InterruptedException。实际上内部调用AQS的acquireShared方法,这实际上就是共享式获取资源的模版方法式。

/**
 * 获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒该线程。
 * 获取一个信号量就立即返回,将可用的信号量数减 1。
 */
public void acquireUninterruptibly() {
    //内部调用AQS的acquireShared方法
    //这实际上就是共享式不可中断获取资源模版方法
    sync.acquireShared(1);
}

/**
 * AQS的acquireShared方法
 * 共享式不可中断获取资源模版方法
 *
 * @param arg 获取的资源数量
 */
public final void acquireShared(int arg) {
    //并没有检查中断
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}


/**
 * 获取permits个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒该线程。
 *
 * @param permits 获取的信号量数量
 */
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //参数就是permits
    sync.acquireShared(permits);
}

2.4 超时可中断获取信号量

public boolean tryAcquire(long timeout, TimeUnit unit)

超时可中断的获取一个信号量,没有则一直阻塞,直到在其他线程提供信号量并唤醒该线程或者线程被中断或者阻塞超时。获取一个信号量就立即返回,将可用的信号量数减 1。

如果调用此方法时已被中断或者等待时被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。

public boolean tryAcquire(int permits,long timeout,TimeUnit unit)

超时可中断的获取permits 个信号量。

实际上内部调用AQS的tryAcquireSharedNanos方法,这实际上就是共享式超时可中断获取资源的模版方法。

/**
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return 是否获取资源成功
 * @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
 */
public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
    //实际上就是调用的AQS的共享式超时获取资源的方法,获取1个资源
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

/**
 * @param permits 获取的资源数量
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return 是否获取资源成功
 * @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
 */
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    //实际上就是调用的AQS的共享式超时获取资源的方法,获取permits个资源
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

/**
 * AQS的共享式超时获取资源的模版方法,支持中断
 *
 * @param arg          参数
 * @param nanosTimeout 超时时间,纳秒
 * @return 是否获取资源成功
 * @throws InterruptedException 如果调用此方法时已被中断或者等待时被中断
 */
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    //最开始就检查一次,如果当前线程是被中断状态,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //下面是一个||运算进行短路连接的代码,同样左边是调用子类实现的tryAcquireShared尝试获取资源,获取到了直接返回true
    //获取不到资源就执行doAcquireSharedNanos方法,这个方法是AQS的方法,因此超时机制是AQS帮我们实现的!
    return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
}

2.5 尝试获取信号量

public boolean tryAcquire()

仅在调用时至少存在至少一个可用信号量,才尝试获取一个信号量。

public boolean tryAcquire(int permits)

仅在调用时至少存在permits个的信号量,才尝试获取permits个信号量。

实际上内部就是直接调用的nonfairTryAcquireShared方法,即公平模式和非公平模式的tryAcquire实现是一样的!并且该方法不会阻塞线程,获取成功返回true,获取失败返回false!

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //调用nonfairTryAcquireShared方法
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //调用nonfairTryAcquireShared方法
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

2.6 释放信号量

public void release()

释放一个信号量,信号量总数加1。释放成功后,将唤醒在同步队列中等待获取信号量的结点(线程)!

public void release(int permits)

释放permits个信号量,信号量总数加permits。释放成功后,将唤醒在同步队列中等待获取信号量的结点(线程)!

公平模式和非公平模式的信号量的释放都是一样的。实际上内部调用AQS的releaseShared方法,这实际上就是共享式释放资源的模版方法。

/**
 * 释放一个信号量,信号量总数加1。
 */
public void release() {
    //内部调用AQS的releaseShared方法
    //这实际上就是共享式释放资源的模版方法
    sync.releaseShared(1);
}

/**
 * 释放permits个信号量,信号量总数加permits。
 *
 * @param permits 释放的信号量个数
 */
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //参数就是permits
    sync.releaseShared(permits);
}


/**
 * AQS的共享模式下释放资源的模版方法。
 * 如果成功释放则会调用doReleaseShared
 */
public final boolean releaseShared(int arg) {
    //tryReleaseShared释放信号量资源,该方法由子类自己实现
    if (tryReleaseShared(arg)) {
        //释放成功,必定调用doReleaseShared尝试唤醒后继结点,即阻塞的线程
        doReleaseShared();
        return true;
    }
    return false;
}

/**
 * Sync的tryReleaseShared实现
 *
 * @param releases 要释放的资源数量
 * @return true 成功 false 失败
 */
protected final boolean tryReleaseShared(int releases) {
    for (; ; ) {
        //很简单,就是尝试CAS的增加state值,增加releases
        int current = getState();
        int next = current + releases;
        //这里可以知道,信号量资源数量不可超过int的最大值
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS的增加state值,CAS成功之后返回true,否则循环重试
        if (compareAndSetState(current, next))
            return true;
    }
}

3.Semaphore的使用

Semaphore可以用来控制多线程对于共享资源访问的并发量!

案例:若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用,每个工人之多工作10秒,最后统计工作量。

我们可以通过Semaphore与之前的CountDownLatch搭配线程池来轻松实现。我们能发现,采用非公平模式的Semaphore时工人的总工作量大部分情况下要高于采用公平模式的工人总工作量,即非公平模式的执行效率更高(这是不一定的)。我们还能发现,在非公平模式工人的总工作量高于公平模式的工人总工作量时,非公平模式下总会有某些工人工(特别是工人0、1、2)作量更多,而另一些工人工作量更少,这就是线程饥饿!

/**
 * @author lx
 */
public class SemaphoreTest {

    /**
     * 机器数目,实际上就是信号量为5,非公平模式
     */
    private static Semaphore semaphore = new Semaphore(5, false);
    /**
     * 机器数目,实际上就是信号量为5,公平模式
     */
    //private static Semaphore semaphore = new Semaphore(5, true);

    /**
     * 当所有工人都完成任务,那么统计工作量
     */
    private static CountDownLatch countDownLatch = new CountDownLatch(10);

    /**
     * 工人数目,8
     */
    private static final int NUM = 10;

    /**
     * 当前时间
     */
    private static final long NOW = System.nanoTime();

    /**
     * 纳秒单位
     */
    private static final long NANOUNIT = 1000000000;

    /**
     * 工作量
     */
    private static final LongAdder WORKLOAD = new LongAdder();


    static class Worker implements Runnable {
        public Worker(int num) {
            this.num = num;
        }

        private int num;
        private long timed = 20 * NANOUNIT;

        @Override
        public void run() {
            while (true) {
                //获取信号量
                try {
                    if (semaphore.tryAcquire(timed, TimeUnit.NANOSECONDS)) {
                        System.out.println("工人" + this.num + "占用一个机器在生产...");
                        //占用一定时间
                        LockSupport.parkNanos((long) (NANOUNIT * num * 0.5));
                        //统一调整为2秒,将会看到更明显的Semaphore效果
                        //LockSupport.parkNanos((long) (NANOUNIT * 2));

                        System.out.println("工人" + this.num + "生产完毕,释放出机器");
                        //释放信号量
                        //每个工人最多执行20秒
                        WORKLOAD.increment();
                        if ((timed = timed - (System.nanoTime() - NOW)) <= 0) {
                            semaphore.release();
                            countDownLatch.countDown();
                            break;
                        }
                        semaphore.release();
                    } else {
                        countDownLatch.countDown();
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < NUM; i++) {
            executorService.execute(new Worker(i));
        }
        executorService.shutdown();
        countDownLatch.await();
        System.out.println("工作完毕,空闲机器为:" + semaphore.availablePermits());
        System.out.println("总工作量为:" + WORKLOAD.sum());
    }
}

4.Semaphore的总结

Semaphore和CountDownLatch的原理都差不多,都是直接使用AQS的共享模式实现自己的逻辑,都是对于AQS的state资源的利用,但是它们却实现了不同的功能,CountDownLatch中state被看作一个倒计数器,当state变为0时,表示线程可以放开执行。而Semaphore中的state被看作信号量资源,获取不到资源则可能会阻塞,获取到资源则可以访问共享区域,共享区域使用完毕要记得还回信号量。

很明显Semaphore的信号量资源很像锁资源,但是我们前面就说过他们的不同,那就是锁资源是和获得锁的线程绑定的,而这里的信号量资源并没有和线程绑定,也就是说你可以让一些线程不停的“释放信号量”,而另一些线程只是不停的“获取信号量”,这在AQS内部实际上就是对state状态的值的改变而已,与线程无关!

通常Semaphore可以用来控制多线程对于共享资源访问的并发量,在上面的案例中我们就见过!另外还需要注意的是,如果在AQS的同步队列中队头结点线程需要获取n个资源,目前有m个资源,如果m小于n,那么这个队列中的头结点线程以及后面的所有结点线程都会因为不能获取到资源而继续阻塞,即使头结点后面的结点中的线程所需的资源数量小于m也不行。即已经在AQS同步队列中阻塞的线程,只能按照先进先出的顺序去获取资源,如果头部线程因为所需资源数量不够而一直阻塞,那么队列后面的线程必定不能获取资源!

和CountDownLatch一样,Semaphore的源码看起来非常简单,那是因为复杂的线程等待、唤醒机制都被AQS实现了,如果想要真正了解Semaphore的原理,那么AQS是必须要了解的。实际上如果学会了AQS,那么JUC中的锁或者其他同步组件就很简单了!

加载全部内容

相关教程
猜你喜欢
用户评论