亲宝软件园·资讯

展开

SpringCloud微服务续约实现源码分析详解

卡布奇诺-海晨 人气:0

一、前言

微服务续约都有通用的设计,就是(微服务)客户端使用心跳机制向注册中心报告自己还活着(可以提供服务),它们的心跳机制略有不同。而Eureka Client客户端会每隔 30 秒发送一次心跳来续约,通过续约来告知 Eureka Server注册中心该 Eureka Client客户端正常运行,没有出现问题。那么心跳机制是什么呢、底层基于什么的?客户端发送心跳的代码在哪里?注册中心怎么处理的?

二、客户端续约

真正触发的还是SpringBoot的自动装配,这里不会过多赘述,下面直奔主题:

1、入口

构造初始化

    private final ScheduledExecutorService scheduler;
    private final ThreadPoolExecutor heartbeatExecutor;
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
                    AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
            ......此处省略n行代码.....
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh心跳和缓存刷新的默认大小分别为2-1
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
            // 心跳执行者
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            ......此处省略n行代码.....
        // 最后,初始化调度任务(例如,集群解析器、 heartbeat、 instanceInfo replicator、 fetch
        initScheduledTasks();
            ......此处省略n行代码.....
}

主要逻辑:

scheduler和heartbeatExecutor都是DiscoveryClient的私有成员变量,并且是final的,故在构造方法中必须初始化。而DiscoveryClient的构造初始化前面也讲了,是在SpringBoot的自动装配过程调用的。构造方法中:

1)scheduler是交给jdk的Executors工具类创建的,核心线程数为2(心跳和缓存刷新需用到)。

2)直接调用ThreadPoolExecutor原生构造方法初始化,核心线程数为1,使用SynchronousQueue队列。

3)最后,初始化调度任务(例如,集群解析器、 心跳、 服务实例复制、 刷新),进入下面逻辑分析

initScheduledTasks()调度执行心跳任务

    private void initScheduledTasks() {
        if (clientConfig.shouldRegisterWithEureka()) {
            /*  LeaseInfo:
                public static final int DEFAULT_LEASE_RENEWAL_INTERVAL = 30;
                // Client settings
                private int renewalIntervalInSecs = DEFAULT_LEASE_RENEWAL_INTERVAL;
             */
            // 默认30
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
            // Heartbeat timer心跳任务
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            // 默认的情况下会每隔30秒向注册中心 (eureka.instance.lease-renewal-interval-in-seconds)发送一次心跳来进行服务续约
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);
            // InstanceInfo replicator实例信息复制任务
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
            // 状态变更监听者
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    // Saw local status change event StatusChangeEvent [timestamp=1668595102513, current=UP, previous=STARTING]
                    logger.info("Saw local status change event {}", statusChangeEvent);
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
            // 初始化状态变更监听者
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            // 定时刷新服务实例信息和检查应用状态的变化,在服务实例信息发生改变的情况下向server重新发起注册
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

主要逻辑:

2、TimedSupervisorTask组件

可见TimedSupervisorTask是Runnable类型的任务,那么它的任务逻辑在run()方法。

构造初始化

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.name = name;
        this.scheduler = scheduler;
        // heartbeatExecutor或cacheRefreshExecutor
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        // HeartbeatThread或CacheRefreshThread等类型任务
        this.task = task;
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;
        // Initialize the counters and register.
        successCounter = Monitors.newCounter("success");
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }

初始化自己的一些字段,在阿里Java编程规范中是强烈建议给线程起别名的,这样便于监控排查问题等。name线程名字,在心跳任务中为heartbeat;scheduler字段传递进来是为了周期性执行任务;executor用于提交任务,下面分析;task任务,为HeartbeatThread或CacheRefreshThread类型任务;delay,用于存储以及计算延迟时间;最大延迟时间不能超过maxDelay。

TimedSupervisorTask#run()任务逻辑

    @Override
    public void run() {
        // Future模式
        Future<?> future = null;
        try {
            // 提交任务待执行
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            // 阻塞直到完成或超时
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  
            // 更新以便计算延迟时间
            delay.set(timeoutMillis);
            // 更新
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            // 任务主管超时了
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
            long currentDelay = delay.get();
            // 任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            // CAS更新延迟时间,考虑到多线程,所以用了CAS
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
            // 一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略
            if (executor.isShutdown() || scheduler.isShutdown()) {
                // 线程池关闭,拒绝任务
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                // 线程池拒绝任务
                logger.warn("task supervisor rejected the task", e);
            }
            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                // 任务主管关闭,不能接受任务
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                // 任务主管抛出了一个异常
                logger.warn("task supervisor threw an exception", e);
            }
            throwableCounter.increment();
        } finally {
            // 上面的异常catch了没有外跑,下面继续运行
            if (future != null) {
                // 中断
                future.cancel(true);
            }
            // 调度器没有关闭,延迟继续周期性执行任务。这样的周期性任务时间设置灵活
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

主要逻辑:

异常处理:

上面的异常catch了没有外跑,下面继续运行:

1)丢弃当前任务;

2)调度器没有关闭,延迟继续周期性执行任务。这样的周期性任务时间设置灵活

3、心跳任务

HeartbeatThread私有内部类

    private class HeartbeatThread implements Runnable {
        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }

HeartbeatThread实现了Runnable接口,也就是上面说的task,它的逻辑封装了出去:

发送心跳

    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            // 发送心跳
            httpResponse = eurekaTransport.registrationClient
                    .sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            // 打印日志,如:心跳状态
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            // 响应状态没有找到重新注册
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            // 无法发送心跳!
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

主要逻辑:

4、发送心跳到注册中心

构建请求数据发送心跳

    @Override
    public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
        // 拼接请求URL
        String urlPath = "apps/" + appName + '/' + id;
        ClientResponse response = null;
        try {
            // 构建请求资源
            WebResource webResource = jerseyClient.resource(serviceUrl)
                    .path(urlPath)
                    .queryParam("status", info.getStatus().toString())
                    .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
            if (overriddenStatus != null) {
                webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            // 请求注册中心
            response = requestBuilder.put(ClientResponse.class);
            EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
            if (response.hasEntity() &&
                    !HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
                eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
            }
            // 返回构建结果
            return eurekaResponseBuilder.build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP PUT {}{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                // 关闭资源
                response.close();
            }
        }
    }

主要逻辑:

三、服务端处理客户端续约

InstanceRegistry父子关系图上一节也分析了,在使用super关键字时注意一下。

1、InstanceRegistry#renew()逻辑

	@Override
	public boolean renew(final String appName, final String serverId,
			boolean isReplication) {
		log("renew " + appName + " serverId " + serverId + ", isReplication {}"
				+ isReplication);
		List<Application> applications = getSortedApplications();
		for (Application input : applications) {
			if (input.getName().equals(appName)) {
				InstanceInfo instance = null;
				for (InstanceInfo info : input.getInstances()) {
					if (info.getId().equals(serverId)) {
						instance = info;
						break;
					}
				}
				// 发布服务实例处理心跳事件
				publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
						instance, isReplication));
				break;
			}
		}
		// 调用父类的处理心跳方法
		return super.renew(appName, serverId, isReplication);
	}

这里逻辑主要是委托父类处理心跳,具体逻辑见下面分析:

2、PeerAwareInstanceRegistryImpl#renew()逻辑

    public boolean renew(final String appName, final String id, final boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            // 服务续约成功,将所有的Eureka操作复制到对等的Eureka节点,除了复制到此节点的流量。
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

PeerAwareInstanceRegistryImpl职责:处理将所有操作复制到 AbstractInstanceRegistry的peer Eureka节点,以保持所有操作同步。这里如果服务续约成功,将所有的Eureka操作复制到对等的Eureka节点,除了复制到此节点的流量。

3、AbstractInstanceRegistry#renew()逻辑

    public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        // 根据appName从本地注册表服务实例信息
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            // 没有找到租约
            RENEW_NOT_FOUND.increment(isReplication);
            // 注册: 租约不存在,注册资源:
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            // 获取服务实例信息
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    // 实例状态{}与实例{}的重写实例状态{}不同。因此将状态设置为覆盖状态
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    overriddenInstanceStatus.name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
                }
            }
            renewsLastMin.increment();
            // 续约
            leaseToRenew.renew();
            return true;
        }
    }

主要逻辑:

Lease#renew()逻辑

    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
 
    }

lastUpdateTimestamp是Lease租约的字段,维护租约时间,在服务剔除下线会根据该字段判断是否过期需要对服务剔除下线处理。下一篇我们就来探讨一下,敬请期待!!!

加载全部内容

相关教程
猜你喜欢
用户评论