SpringCloud负载均衡
LogicMF 人气:0背景
随着微服务项目的迭代,可能一个服务会有多个实例,这时候就会涉及到负载均衡。然而我们在开发的时候,肯定希望只启动一个项目。然后调试的时候希望负载均衡把请求分配到我们正在开发测试的服务实例上面。
如图所示,我们希望可以指定调用路径也就是定向路由。
实现方式
基于ip
这个很好理解,就是开发者本地正在运行的服务在nacos上面肯定显示你本机的ip;那么只要我得到开发者的ip就能够根据这个ip来过滤nacos上面的服务,达到定向路由的效果。
基于nacos的元数据
spring: cloud: nacos: discovery: metadata: version: "mfine"
在yaml中配置nacos元数据的version属性。前端在请求header中添加与其对应version属性,就可以实现服务过滤也就是定向路由。
实现原理
Gateway服务
因为gateway底层的不同,所以其负载均衡也与普通服务的不同,因此要特殊处理。先看gateway中load balancer组件调用流程。
首先在gateway中load balancer本身也是一个过滤器,所以流程如下。
- ReactiveLoadBalancerClientFilter里面有个LoadBalancerClientFactory属性,通过这个工厂获取具体的负载均衡器
- LoadBalancerClientFactory会载入LoadBalancerClientConfiguration配置
- LoadBalancerClientConfiguration会初始化我们需要的RoundRobinLoadBalancer,并且会通过构造函数传入LoadBalancerClientFactory对象。
那我们要做什么呢?其实就是截胡。
- 实现自己的LoadBalancerClientFactory,传入自己LoadBalancerClientConfiguration。
- 在自己的LoadBalancerClientConfiguration初始化自己的RoundRobinLoadBalancer
- 最后在自己的ReactiveLoadBalancerClientFilter里面传入自己的LoadBalancerClientFactory,获得自己的负载均衡器。
具体源码(只放核心)
MyRoundRobinLoadBalancer
private Response<ServiceInstance> getInstanceResponse( List<ServiceInstance> instances, ServerWebExchange exchange) { if (instances.isEmpty()) { log.warn("No servers available for service: " + this.serviceId); return new EmptyResponse(); } try { //可重入锁 if (this.lock.tryLock(10, TimeUnit.SECONDS)) instances = this.filterServiceInstance(exchange, instances); // TODO: enforce order? int pos = Math.abs(this.position.incrementAndGet()); ServiceInstance instance = instances.get(pos % instances.size()); return new DefaultResponse(instance); } catch (InterruptedException e) { throw new RuntimeException("自定义负载均衡器,超时等待异常"); } finally { lock.unlock(); } } // 根据附加信息过滤服务 private List<ServiceInstance> filterServiceInstance(ServerWebExchange exchange, List<ServiceInstance> serviceInstances) { List<ServiceInstance> filteredServices = new ArrayList<>(); // 自动模式 if (DevConfigEnum.AUTO.getCode().equals(this.properties.getModel())) { filteredServices = autoModel(exchange, serviceInstances); } // ip 模式 if (this.properties.getModel().equals(DevConfigEnum.IP.getCode())) { filteredServices = ipModel(exchange, serviceInstances); } // metadata 模式 if (this.properties.getModel().equals(DevConfigEnum.METADATA.getCode())) { filteredServices = metadataModel(exchange, serviceInstances); } if (filteredServices.isEmpty()) { log.info("未发现符合ip或metadata.version服务,将采用原始服务集合"); return serviceInstances; } return filteredServices; } // 自动模式 private List<ServiceInstance> autoModel(ServerWebExchange exchange, List<ServiceInstance> serviceInstances) { List<ServiceInstance> filteredServices; filteredServices = ipModel(exchange, serviceInstances); if (filteredServices.isEmpty()) { filteredServices = metadataModel(exchange, serviceInstances); } return filteredServices; } //元数据模式 private List<ServiceInstance> metadataModel(ServerWebExchange exchange, List<ServiceInstance> serviceInstances) { String version = exchange.getRequest().getHeaders().getFirst("version"); List<ServiceInstance> filteredServices = new ArrayList<>(); if (version != null) { log.info("version模式:获取metadata.version成功"); filteredServices = serviceInstances.stream().filter(instance -> { String metaVersion = instance.getMetadata().get("version"); if (metaVersion == null) { return false; } return metaVersion.equals(version); }).collect(Collectors.toList()); } return filteredServices; } // ip模式 private List<ServiceInstance> ipModel(ServerWebExchange exchange, List<ServiceInstance> serviceInstances) { List<ServiceInstance> filteredServices = new ArrayList<>(); try { String ipAddress = exchange.getRequest().getHeaders().getFirst("ip"); if (ipAddress == null) { ipAddress = IPUtils.getIpAddress(exchange.getRequest()); } log.warn("ip模式:获取ip成功"); String finalIpAddress = ipAddress; filteredServices = serviceInstances.stream().filter(item -> item.getHost().equals(finalIpAddress)) .collect(Collectors.toList()); } catch (UnknownHostException e) { log.warn("ip模式:获取ip失败,无法进行定向路由"); } return filteredServices; }
MyLoadBalancerClientFactory
public class MyLoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification> implements ReactiveLoadBalancer.Factory<ServiceInstance>{ ................ public MyLoadBalancerClientFactory() { // 传入自己的自动配置 super(MyLoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME); } ........... }
MyLoadBalancerClientConfiguration
@Configuration public class MyLoadBalancerClientConfiguration { @Autowired private MicroServiceDevConfigProperties microServiceDevConfigProperties; @Bean public ReactorServiceInstanceLoadBalancer reactiveLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); // 初始化自己的负载均衡器 return new MyRoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name, 1000,microServiceDevConfigProperties); } }
@Configuration public class MyReactiveLoadBalancerClientFilter extends ReactiveLoadBalancerClientFilter { private static final Log log = LogFactory .getLog(ReactiveLoadBalancerClientFilter.class); private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150; private final MyLoadBalancerClientFactory clientFactory; private LoadBalancerProperties properties; // 注入自己的LoadBalancerClientFactory public MyReactiveLoadBalancerClientFilter(MyLoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) { super(null, null); this.clientFactory = clientFactory; this.properties = properties; } ........ private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) { URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); //获得自己load balancer MyRoundRobinLoadBalancer loadBalancer = this.clientFactory .getInstance(uri.getHost(), MyRoundRobinLoadBalancer.class); if (loadBalancer == null) { throw new NotFoundException("No loadbalancer available for " + uri.getHost()); } // 在自己的load balancer里面扩展choose方法,使其接受ServerWebExchange参数 // 传入ServerWebExchange我们就可以,获取请求信息,方便我们过滤 return loadBalancer.choose(exchange); } }
我的这种实现较为繁琐,可能大家有更好方式,大家要是有更好更简单的方法,也可以直接替换掉。
普通服务
普通服务实现自定义负载均衡器就很简单了,实现自定义RoundRobinRule就可以了
@Configuration public class MyRoundRobinLoadBalancer extends RoundRobinRule { //不同的使用注入的方式获取请求信息 @Autowired private HttpServletRequest request; ..... private List<Server> filterServers(List<Server> reachableServers) { List<Server> servers = new ArrayList<>(); if (this.properties.getModel().equals(DevConfigEnum.AUTO.getCode())) { servers = ipModel(reachableServers); if (servers.isEmpty()) { servers = metadataModel(reachableServers); } } if (this.properties.getModel().equals(DevConfigEnum.IP.getCode())) { servers = ipModel(reachableServers); } if (this.properties.getModel().equals(DevConfigEnum.METADATA.getCode())) { servers = metadataModel(reachableServers); } if (servers.isEmpty()) { return reachableServers; } return servers; } private List<Server> metadataModel(List<Server> reachableServers) { String version = request.getHeader("version"); List<Server> servers = new ArrayList<>(); if (version != null) { log.info("metadata模式: 获取version成功"); servers = reachableServers.stream().filter(item -> { NacosServer nacosServer = (NacosServer) item; String metaVersion = nacosServer.getMetadata().get("version"); if (metaVersion == null) { return false; } return metaVersion.equals(version); }).collect(Collectors.toList()); } else { log.warn("metadata模式: header中无version字段且未获取到请求者ip"); } return servers; } private List<Server> ipModel(List<Server> reachableServers) { List<Server> servers = new ArrayList<>(); try { String ip = this.request.getHeader("ip"); if (ip == null) { ip = IPUtils.getIpAddress(request); } String finalIp = ip; servers = reachableServers.stream().filter(item -> item.getHost().equals(finalIp)).collect(Collectors.toList()); log.info("ip模式: 获取请求者ip成功"); } catch (UnknownHostException e) { log.warn("ip模式: 获取ip失败"); } return servers; } ........ }
深入思考一下,通过注入的方式获取request信息是否存在多线程安全问题呢?
使用方法
metadata模式
配置yaml:
spring: application: name: cloud-order cloud: nacos: discovery: metadata: // 重点 version: mfine celi-dev: config: model: "metadata"
nacos中服务元数据
然后请求头中附带version信息
自定义负载均衡器会通过请求头中的version去nacos中注册服务的元数据里面去比对version信息。
ip模式
配置yaml
celi-dev: config: model: "ip"
- 在header中指定IP
- 依靠请求信息获取ip
配置yaml就好
此不指定ip的时候,后台获取的ip可能不对。取决你本地是否存在多张网卡(虚拟网卡也算),有时候nacos中ip显示也会是虚拟网卡的ip。
使用前请确认你的服务在nacos中的ip是多少,然后在header中指定ip,这样最省事也最稳妥。
一般是先从header中获取ip信息,获取不到再从request对象中分析。
auto模式
配置yaml,其实可以不配置。
celi-dev: config: model: "auto"
自动模式默认先使用ip模式获取不到ip会自动切换metadata模式。
什么都不配置,默认auto模式
总结
三种模式里面meta模式最繁琐,心智负担最重,但是也是最简单的。ip模式难度在于获取ip的准确性因此加入指定ip的方式。自动模式则二者结合。
加载全部内容