亲宝软件园·资讯

展开

springboot zookeeper分布式锁

冬雪是你 人气:8

InterProcessMutex内部实现了zookeeper分布式锁的机制,所以接下来我们尝试使用这个工具来为我们的业务加上分布式锁处理的功能

zookeeper分布式锁的特点:1、分布式 2、公平锁 3、可重入

依赖

<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.4.10</version>
</dependency>
<!-- zookeeper 客户端 -->
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-framework</artifactId>
   <version>2.12.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.12.0</version>
</dependency>
<!-- lombok -->
<dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <version>1.18.16</version>
   <scope>provided</scope>
</dependency>

本地封装

这个工具类主要封装CuratorFramework这个client(连接Zookeeper)

@Slf4j
public class CuratorClientUtil {
    private String zookeeperServer;

    @Getter
    private CuratorFramework client;

    public CuratorClientUtil(String zookeeperServer) {
        this.zookeeperServer = zookeeperServer;
    }

  	// 创建CuratorFrameworkFactory并且启动
    public void init() {
       // 重试策略,等待1s,最大重试3次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        this.client = CuratorFrameworkFactory.builder()
                .connectString(zookeeperServer)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();
        this.client.start();
    }

   // 容器关闭,CuratorFrameworkFactory关闭
    public void destroy() {
        try {
            if (Objects.nonNull(getClient())) {
                getClient().close();
            }
        } catch (Exception e) {
            log.info("CuratorFramework close error=>{}", e.getMessage());
        }
    }
}

配置

@Configuration
public class CuratorConfigration {
    @Value("${zookeeper.server}")
    private String zookeeperServer;
    // 注入时,指定initMethod和destroyMethod
    @Bean(initMethod = "init", destroyMethod = "destroy")
    public CuratorClientUtil curatorClientUtil() {
        CuratorClientUtil clientUtil = new CuratorClientUtil(zookeeperServer);
        return clientUtil;
    }
}

测试代码

模拟不同客户端的请求

@Slf4j
@RestController
@RequestMapping("/test")
public class TestController {
    // 注入client工具类
    @Autowired
    private CuratorClientUtil curatorClientUtil;
    // 在zookeeper的/rootLock节点下创建锁对应的临时有序节点
    private String rootLock = "/rootLock";

    @GetMapping("/testLock")
    public Object testLock() throws Exception {
        // 获取当前线程的名字,方便观察那些线程在获取锁
        String threadName = Thread.currentThread().getName();
        InterProcessMutex mutex = new InterProcessMutex(curatorClientUtil.getClient(), rootLock);
        try {
            log.info("{}---获取锁start", threadName);
            // 尝试获取锁,最长等待3s,超时放弃获取
            boolean lockFlag = mutex.acquire(3000, TimeUnit.SECONDS);
            // 获取锁成功,进行业务处理
            if (lockFlag) {
                log.info("{}---获取锁success", threadName);
                // 模拟业务处理,时间为3s
                Thread.sleep(3000);
            } else {
                log.info("{}---获取锁fail", threadName);
            }
        } catch (Exception e) {
            log.info("{}---获取锁异常", threadName);
        } finally {
            // 业务处理完成,释放锁,唤醒比当前线程创建的节点序号大(最靠近)的线程获取锁
            mutex.release();
            log.info("{}---锁release", threadName);
        }
        return "线程:" + threadName + "执行完成";
    }
}

JMeter测试

我们使用JMeter模拟100个客户端同时并发的访问 localhost:8081/test/testLock,相当于100个客户端争抢分布式锁,结果如图右上角所示,100个请求花了5分6s,每个线程获取到锁后业务处理3s,100个线程理想时间为300s(Thread.sleep(3000)),所以运行时间符合。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AiBDB0tM-1647660941538)(/Users/maxuedong/Library/Containers/com.tencent.qq/Data/Library/Application Support/QQ/Users/2860850965/QQ/Temp.db/63AC764F-1C38-4422-9A01-776403B51E5F.png)]

在这里插入图片描述

zookeeper每个线程在/rooLock节点下创建的临时有序节点如下图,由于是临时的,所以线程释放锁后这些节点也会删除

在这里插入图片描述

100个线程程序日志打印

在这里插入图片描述

关于InterProcessMutex内部如何实现zookeeper分布式锁,请看我写的这篇文章:在这里

加载全部内容

相关教程
猜你喜欢
用户评论