Nacos服务注册客户端服务端原理分析
码出code 人气:0正文
在分布式服务中,原来的单体服务会被拆分成一个个微服务,服务注册实例到注册中心,服务消费者通过注册中心获取实例列表,直接请求调用服务。
服务是如何注册到注册中心,服务如果挂了,服务是如何检测?带着这些问题,我们从源码上对服务注册进行简单的源码分析。
版本 2.1.1
Nacos Server:2.1.1
spring-cloud-starter-alibaba:2.1.1.RELEASE
spring-boot:2.1.1.RELEASE
方便统一版本,客户端和服务端版本号都为2.1.1
。
客户端
启动nacos
服务注册和发现需要添加maven
依赖:
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>${latest.version}</version> </dependency>
根据maven
依赖找到对应的spring.factories
文件:
在spring.factories
文件里找到启动配置类信息,SpringBoot
服务启动时会将这些配置类信息注入到bean
容器中。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
服务注册的核心配置类为:NacosDiscoveryAutoConfiguration
,该类配置三个bean
对象:
NacosServiceRegistry
NacosRegistration
NacosAutoServiceRegistration
NacosAutoServiceRegistration
NacosAutoServiceRegistration
继承了抽象类AbstractAutoServiceRegistration
。AbstractAutoServiceRegistration
抽象类又实现了ApplicationListener
接口。
实现ApplicationListener
接口的方法,会在Spring
容器初始化完成之后调用onApplicationEvent
方法:
public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); }
调用bind
方法:
public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context) .getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); // 调用 start 方法 this.start(); }
调用了start
方法:
public void start() { if (!isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } return; } if (!this.running.get()) { this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } }
调用了register
方法,最终调用的是NacosServiceRegistry
类的register
方法。
NacosServiceRegistry
根据上文可知,服务器启动后调用NacosServiceRegistry
类的register
方法,该方法实现将实例注册到服务端:
public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); // 创建实例 Instance instance = getNacosInstanceFromRegistration(registration); try { // 注册实例 namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); } }
创建实例,然后通过namingService.registerInstance
方法注册实例,然后查看registerInstance
方法:
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { // 封装心跳包 BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); long instanceInterval = instance.getInstanceHeartBeatInterval(); beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval); // 发送心跳包 beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } // 发送实例 serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
registerInstance
主要做两件事:
- 发送心跳包
beatReactor.addBeatInfo
使用定时服务,每隔5s
向服务端发送一次心跳请求,通过http
请求发送心跳信息,路径为/v1/ns/instance/beat
。
心跳请求定时任务使用线程池ScheduledThreadPoolExecutor.schedule()
,而该方法只会调用一次,定时任务的实现是在每次请求任务只会再调用一次ScheduledThreadPoolExecutor.schedule()
, 简单说就是nacos
在发送心跳的时候,会调用schedule
方法,在schedule
要执行的任务中,如果正常发送完心跳,会再次调用schedule
方法。
那为什么不直接调用周期执行的线程池ScheduledThreadPoolExecutor.scheduleAtFixedRate()
?可能是由于发送心跳服务发生异常后,定时任务还会继续执行,但是周期执行的线程池遇到报错后也不会重复调用执行的任务。
线程任务BeatTask
的run
方法,,每次执行会先判断isStopped
,如果是false
,说明心跳停止,就不会触发下次执行任务。如果使用定时任务scheduleAtFixedRate
,即使心跳停止还会继续执行任务,造成资源不必要浪费。
- 注册实例
registerService
主要封装实例信息,比如ip
、port
、servicename
,将这些信息通过http
请求发送给服务端。路径为/v1/ns/instance
。
根据上面流程,查看以下的流程图:
服务端
服务端就是注册中心,服务注册到注册中心,在https://github.com/alibaba/nacos/releases/tag/2.1.1
下载源码部署到本地,方便调式和查看,部署方式详见我的另外一篇文章Nacos 源码环境搭建。
服务端主要接收两个信息:心跳包和实例信息。
心跳包
客户端向服务请求的路径为/v1/ns/instance/beat
,对应的服务端为InstanceController
类的beat
方法:
@PutMapping("/beat") @Secured(action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; // 判断是否有心跳,存在心跳就转成RsInfo if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { // fix #2533 clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat, serviceName, namespaceId); // 获取实例信息 BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder(); builder.setRequest(request); int resultCode = getInstanceOperator() .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder); result.put(CommonParams.CODE, resultCode); // 下次发送心跳包间隔 result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName)); result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }
在handleBeat
方法中执行线程任务ClientBeatProcessorV2
的run
方法,延长lastHeartBeatTime
时间。注册中心会定时查询实例,当前时间 - lastHeartBeatTime
> 设置时间(默认15秒),就标记实例为不健康实例。如果心跳实例不健康,发送通知给订阅方,变更实例。
服务端在15
秒没有收到心跳包会将实例设置为不健康,在30
秒没有收到心跳包会将临时实例移除掉。
实例注册
客户端请求的地址是/nacos/v1/ns/instance
, 对应的是服务端是在InstanceController
类。找到类上对应的post
请求方法上。
注册流程:
InstanceController#register ——>InstanceOperatorClientImpl#registerInstance ——>ClientOperationServiceProxy#registerInstance ——>EphemeralClientOperationServiceImpl#registerInstance
创建 Service
服务注册后,将服务存储在一个双层map
集合中:
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
通过是否存在ephemeral
,true
,走AP
模式,否则走CP
模式。
Nacos
默认就是采用的AP
模式使用Distro
协议实现。实现的接口是EphemeralConsistencyService
对节点信息的持久化主要是调用put
方法,
会先写入到DataStore
中:
public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); // 数据持久化到缓存中 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } notifier.addTask(key, DataOperation.CHANGE); }
总结
从依赖上找到需要启动的是要加载的服务注册类NacosDiscoveryAutoConfiguration
,主要配置三个对象
NacosServiceRegistry
NacosRegistration
NacosAutoServiceRegistration
NacosServiceRegistry
类的register
方法,封装实例和心跳信息
- 通过
http
请求,定时发送发送心跳包,默认时间间隔是5
秒。 - 通过
http
请求,发送实例信息。
服务端
- 接收到心跳请求,更新心跳包最新时间。服务端在
15
秒没有收到心跳包会将实例设为不健康,在30
秒没有收到心跳包会将临时实例移除掉。 - 接收到服务注册接口,通过
ephemeral
判断是否走AP
还是走CP
,AP
模式使用Distro
协议。通过调用EphemeralConsistencyService
接口实现,持久化实例信息。
参考
加载全部内容