Spring Boot 集成Zookeeper
剑圣无痕 人气:0集成步骤
1.pom.xml文件配置,引入相关jar包
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.10.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
特殊说明: 1.无需引入curator-framework,因为curator-recipes自动关联依赖引入curator-framework。 2.curator会默认引入zookeeper的jar报,需要检查版本与服务器的版本是否一致,如果不一致则需要排除引入 3.
2. 核心配置类
@Configuration public class ZookeeperConfig implements Serializable { private static final long serialVersionUID = -9025878246972668136L; private final ZooKeeperProperty zooKeeperProperty; public ZookeeperConfig(ZooKeeperProperty zooKeeperProperty) { this.zooKeeperProperty = zooKeeperProperty; } @Bean public CuratorFramework curatorFramework() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(zooKeeperProperty.getBaseSleepTime(), zooKeeperProperty.getMaxRetries()); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(zooKeeperProperty.getServers()) .connectionTimeoutMs(zooKeeperProperty.getConnectionTimeout()) .sessionTimeoutMs(zooKeeperProperty.getSessionTimeout()) .retryPolicy(retryPolicy) .build(); client.start(); return client; } @Bean @ConditionalOnMissingBean public ZooKeeperUtils zooKeeperTemplate(CuratorFramework client) { return new ZooKeeperUtils(client); } } @ConfigurationProperties(prefix="zookeeper") @Component public class ZooKeeperProperty implements Serializable { private static final long serialVersionUID = 8650758711482699256L; /** * zk连接集群,多个用逗号隔开 */ private String servers; /** * 会话超时时间 */ private int sessionTimeout = 60000; /** * 连接超时时间 */ private int connectionTimeout = 15000; /** * 初始重试等待时间(毫秒) */ private int baseSleepTime = 1000; /** * 重试最大次数 */ private int maxRetries = 10; //省略get、set方法 ...... }
3.常用API功能
@Component public class ZooKeeperUtils { private static final Logger logger = LoggerFactory .getLogger(ZooKeeperUtils.class); /** * 路径分隔符 */ private static final String PATH_SEPARATOR = "/"; /** * zk连接 */ private final CuratorFramework client; public ZooKeeperUtils(CuratorFramework client) { this.client = client; } /** * 创建空节点,默认持久节点 * * @param path * 节点路径 * @param node * 节点名称 * @return 完整路径 */ public String createNode(String path, String node) { return createNode(path, node, CreateMode.PERSISTENT); } /** * 创建带类型的空节点 * * @param path * 节点路径 * @param node * 节点名称 * @param createMode * 类型 CreateMode.PERSISTENT: 创建节点后,不删除就永久存在 * CreateMode.PERSISTENT_SEQUENTIAL:节点path末尾会追加一个10位数的单调递增的序列 * CreateMode.EPHEMERAL:创建后,回话结束节点会自动删除 * CreateMode.EPHEMERAL_SEQUENTIAL:节点path末尾会追加一个10位数的单调递增的序列 * @return 路径 */ public String createNode(String path, String node, CreateMode createMode) { path = buildPath(path, node); logger.info("create node for path: {} with createMode: {}", path, createMode.name()); try { client.create().creatingParentsIfNeeded().withMode(createMode) .forPath(path); logger.info("create node :{} sucessfully", node); return path; } catch (Exception e) { logger.error( "create node for path: {} with createMode: {} failed!", path, createMode.name(), e); return null; } } /** * 创建节点,默认持久节点 * * @param path * 节点路径 * @param node * 节点名称 * @param value * 节点值 * @return 完整路径 */ public String createNode(String path, String node, String value) { return createNode(path, node, value, CreateMode.PERSISTENT); } /** * 创建节点,默认持久节点 * * @param path * 节点路径 * @param node * 节点名称 * @param value * 节点值 * @param createMode * 节点类型 * @return 完整路径 */ public String createNode(String path, String node, String value, CreateMode createMode) { if (Objects.isNull(value)) { logger.error("ZooKeeper节点值不能为空!"); } path = buildPath(path, node); logger.info("create node for path: {}, value: {}, with createMode: {}", path, value, createMode.name()); try { client.create().creatingParentsIfNeeded().withMode(createMode) .forPath(path, value.getBytes()); return path; } catch (Exception e) { logger.error( "create node for path: {}, value: {}, with createMode: {} failed!", path, value, createMode.name(), e); } return null; } /** * 获取节点数据 * * @param path * 路径 * @param node * 节点名称 * @return 完整路径 */ public String get(String path, String node) { path = buildPath(path, node); try { byte[] bytes = client.getData().forPath(path); if (bytes.length > 0) { return new String(bytes); } } catch (Exception e) { logger.error("get value for path: {}, node: {} failed!", path, node, e); } return null; } /** * 更新节点数据 * * @param path * 节点路径 * @param node * 节点名称 * @param value * 更新值 * @return 完整路径 */ public String update(String path, String node, String value) { if (Objects.isNull(value)) { logger.error("ZooKeeper节点值不能为空!"); } path = buildPath(path, node); logger.info("update path: {} to value: {}", path, value); try { client.setData().forPath(path, value.getBytes()); return path; } catch (Exception e) { logger.error("update path: {} to value: {} failed!", path, value); } return null; } /** * 删除节点,并且递归删除子节点 * * @param path * 路径 * @param node * 节点名称 * @return 路径 */ public boolean delete(String path, String node) { path = buildPath(path, node); logger.info("delete node for path: {}", path); try { client.delete().deletingChildrenIfNeeded().forPath(path); return true; } catch (Exception e) { logger.error("delete node for path: {} failed!", path); } return false; } /** * 获取子节点 * * @param path * 节点路径 * @return */ public List<String> getChildren(String path) { if (StringUtils.isEmpty(path)) { return null; } if (!path.startsWith(PATH_SEPARATOR)) { path = PATH_SEPARATOR + path; } try { return client.getChildren().forPath(path); } catch (Exception e) { logger.error("get children path:{} error", path, e); } return null; } /** * 判断节点是否存在 * * @param path * 路径 * @param node * 节点名称 * @return 结果 */ public boolean exists(String path, String node) { try { List<String> list = getChildren(path); return !CollectionUtils.isEmpty(list) && list.contains(node); } catch (Exception e) { return false; } } /** * 申请锁,指定请求等待时间 * * @param path * 加锁zk节点 * @param time * 时间 * @param unit * 时间单位 * @param runnable * 执行方法 */ public void lock(String path, long time, TimeUnit unit, Runnable runnable) { try { InterProcessMutex lock = new InterProcessMutex(client, path); if (lock.acquire(time, unit)) { try { runnable.run(); } finally { lock.release(); } } else { logger.error("获取锁超时:{}!", path); } } catch (Exception e) { logger.error("获取锁失败: {}!", path); } } /** * 申请锁,指定请求等待时间 * * @param path * 加锁zk节点 * @param time * 时间 * @param unit * 时间单位 * @param callable * 执行方法 * @return . */ public <T> T lock(String path, long time, TimeUnit unit, Callable<T> callable) { try { InterProcessMutex lock = new InterProcessMutex(client, path); if (lock.acquire(time, unit)) { try { return callable.call(); } finally { lock.release(); } } else { logger.error("获取锁超时:{}!", path); } } catch (Exception e) { logger.error("获取锁失败: {}!", path); } return null; } /* *//** * 对一个节点进行监听,监听事件包括指定的路径节点的增、删、改的操作 * * @param path * 节点路径 * @param listener * 回调方法 * @throws Exception */ public void watchNode(String path,boolean dataIsCompressed,final ZooKeeperCallback zooKeeperCallback)throws Exception { try { final NodeCache nodeCache = new NodeCache(client, path,dataIsCompressed); nodeCache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { ChildData childData = nodeCache.getCurrentData(); logger.info("ZNode节点状态改变, path={}", childData.getPath()); logger.info("ZNode节点状态改变, data={}", childData.getData()); logger.info("ZNode节点状态改变, stat={}", childData.getStat()); //处理业务逻辑 zooKeeperCallback.call(); } }); nodeCache.start(); } catch (Exception e) { logger.error("创建NodeCache监听失败, path={}",path); } } /** * 对指定的路径节点的一级子目录进行监听,不对该节点的操作进行监听,对其子目录的节点进行增、删、改的操作监听 * * @param path * 节点路径 * @param listener * 回调方法 */ public void watchChildren(String path, PathChildrenCacheListener listener) { try { PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true); pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL); pathChildrenCache.getListenable().addListener(listener); } catch (Exception e) { logger.error("watch children failed for path: {}", path, e); } } /** * 将指定的路径节点作为根节点(祖先节点),对其所有的子节点操作进行监听,呈现树形目录的监听,可以设置监听深度,最大监听深度为2147483647( * int类型的最大值) * * @param path * 节点路径 * @param maxDepth * 回调方法 * @param listener * 监听 */ public void watchTree(String path, int maxDepth, TreeCacheListener listener) { try { TreeCache treeCache = TreeCache.newBuilder(client, path) .setMaxDepth(maxDepth).build(); treeCache.start(); treeCache.getListenable().addListener(listener); } catch (Exception e) { logger.error("watch tree failed for path: {}", path, e); } } public String buildPath(String path, String node) { if (StringUtils.isEmpty(path) || StringUtils.isEmpty(node)) { logger.error("ZooKeeper路径或者节点名称不能为空!"); } if (!path.startsWith(PATH_SEPARATOR)) { path = PATH_SEPARATOR + path; } if (PATH_SEPARATOR.equals(path)) { return path + node; } else { return path + PATH_SEPARATOR + node; } } }
4.基本使用
@Autowired private ZooKeeperUtils zooKeeperUtil; @RequestMapping("/addNode") public String addNode() { String path= zooKeeperUtil.createNode("/zookeeper", "node1"); return path; }
特殊说明:关于zookeeper的分布式锁,后续讲解常用分布式锁的时候,会详细说明。
常见错误和解决办法
问题1:调用api创建zookeeper节点时,报KeeperErrorCode = Unimplemented for /test错误。
原因:服务器安装zookeeper的版本与程序中的zookeeper版本不一致。
解决方案: 登录服务器,查看zookeeper安装版本,执行如下命令:
echo stat|nc 127.0.0.1 2181
当前引入的zookeeper版本为3.4.13,而zookeeper的版本与curator对应关系如下:
Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x Curator 4.x.x - compatible only with ZooKeeper 3.5.x and includes support for new features such as dynamic reconfiguration, etc. Curator 5.x.x compatible only with ZooKeeper 3.6.x+
问题2:启动项目的日志中会有Will not attempt to authenticate using SASL错误
起初我认为是zookeeper需要进行SASL认证,但是通过查阅相关资料后,才知道3.4之前版本,zookeeper默认会采用SASL认证,3.4以后的版本没有此类问题。
加载全部内容