亲宝软件园·资讯

展开

Redisson分布式信号量RSemaphore的使用超详细讲解

每天都要进步一点点 人气:0

本篇文章基于redisson-3.17.6版本源码进行分析

一、RSemaphore的使用

@Test
public void testRSemaphore() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    RedissonClient redissonClient = Redisson.create(config);
    RSemaphore rSemaphore = redissonClient.getSemaphore("semaphore");
    // 设置5个许可,模拟五个停车位
    rSemaphore.trySetPermits(5);
    // 创建10个线程,模拟10辆车过来停车
    for (int i = 1; i <= 10; i++) {
        new Thread(() -> {
            try {
                rSemaphore.acquire();
                System.out.println(Thread.currentThread().getName() + "进入停车场...");
                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(100));
                System.out.println(Thread.currentThread().getName() + "离开停车场...");
                rSemaphore.release();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, "A" + i).start();
    }
    try {
        TimeUnit.MINUTES.sleep(1);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

二、RSemaphore设置许可数量

初始化RSemaphore,需要调用trySetPermits()设置许可数量:

/**
 * 尝试设置许可数量,设置成功,返回true,否则返回false
 */
boolean trySetPermits(int permits);

trySetPermits()内部调用了trySetPermitsAsync():

// 异步设置许可
@Override
public RFuture<Boolean> trySetPermitsAsync(int permits) {
    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 判断分布式信号量的key是否存在,如果不存在,才设置
            "local value = redis.call('get', KEYS[1]); " +
                    "if (value == false) then "
                    // set "semaphore" permits
                    // 使用String数据结构设置信号量的许可数
                    + "redis.call('set', KEYS[1], ARGV[1]); "
                    // 发布一条消息到redisson_sc:{semaphore}通道
                    + "redis.call('publish', KEYS[2], ARGV[1]); "
                    // 设置成功,返回1
                    + "return 1;"
                    + "end;"
                    // 否则返回0
                    + "return 0;",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(r -> {
            if (r) {
                log.debug("permits set, permits: {}, name: {}", permits, getName());
            } else {
                log.debug("unable to set permits, permits: {}, name: {}", permits, getName());
            }
        });
    }
    return future;
}

可以看到,设置许可数量底层使用LUA脚本,实际上就是使用redis的String数据结构,保存了我们指定的许可数量。如下图:

参数说明:

总结设置许可执行流程为:

三、RSemaphore的加锁流程

许可数量设置好之后,我们就可以调用acquire()方法获取了,如果未传入许可数量,默认获取一个许可。

public void acquire() throws InterruptedException {
    acquire(1);
}
public void acquire(int permits) throws InterruptedException {
    // 尝试获取锁成功,直接返回
    if (tryAcquire(permits)) {
        return;
    }
    // 对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息
    CompletableFuture<RedissonLockEntry> future = subscribe();
    semaphorePubSub.timeout(future);
    RedissonLockEntry entry = commandExecutor.getInterrupted(future);
    try {
        // 不断循环尝试获取许可
        while (true) {
            if (tryAcquire(permits)) {
                return;
            }
            entry.getLatch().acquire();
        }
    } finally {
        // 取消订阅
        unsubscribe(entry);
    }
//        get(acquireAsync(permits));
}

可以看到,获取许可的核心逻辑在tryAcquire()方法中,如果tryAcquire()返回true说明获取许可成功,直接返回;如果返回false,说明当前没有许可可以使用,则对于没有获取锁的那些线程,订阅redisson_sc:{分布式信号量key}通道的消息,并通过死循环不断尝试获取锁。

我们看一下tryAcquire()方法的逻辑,内部调用了tryAcquireAsync()方法:

// 异步获取许可
@Override
public RFuture<Boolean> tryAcquireAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>(true);
    }
    return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              // 获取当前剩余的许可数量
              "local value = redis.call('get', KEYS[1]); " +
              // 许可不为空,并且许可数量 大于等于 当前线程申请的许可数量        
              "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                  // 通过decrby减少剩余可用许可    
                  "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                  // 返回1    
                  "return 1; " +
              "end; " +
              // 其它情况,返回0        
              "return 0;",
              Collections.<Object>singletonList(getRawName()), permits);
}

从源码可以看到,获取许可就是操作redis中的数据,首先获取到redis中剩余的许可数量,只有当剩余的许可数量大于线程申请的许可数量时,才获取成功,返回1;否则获取失败,返回0;

总结加锁执行流程为:

四、RSemaphore的解锁流程

通过前面对RSemaphore获取锁的分析,我们很容易能猜到,释放锁,无非就是归还许可数量到redis中。我们查看具体的源码:

public RFuture<Void> releaseAsync(int permits) {
    if (permits < 0) {
        throw new IllegalArgumentException("Permits amount can't be negative");
    }
    if (permits == 0) {
        return new CompletableFutureWrapper<>((Void) null);
    }
    RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
            // 通过incrby增加许可数量
            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                    // 发布一条消息到redisson_sc:{semaphore}中
                    "redis.call('publish', KEYS[2], value); ",
            Arrays.asList(getRawName(), getChannelName()), permits);
    if (log.isDebugEnabled()) {
        future.thenAccept(o -> {
            log.debug("released, permits: {}, name: {}", permits, getName());
        });
    }
    return future;
}

加载全部内容

相关教程
猜你喜欢
用户评论