欢迎访问shiker.tech

请允许在我们的网站上展示广告

您似乎使用了广告拦截器,请关闭广告拦截器。我们的网站依靠广告获取资金。

LoadBalancer如何替代ribbon实现负载均衡?
(last modified Dec 28, 2024, 12:31 AM )
by
侧边栏壁纸
  • 累计撰写 194 篇文章
  • 累计创建 66 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

LoadBalancer如何替代ribbon实现负载均衡?

橙序员
2023-12-10 / 0 评论 / 1 点赞 / 1,273 阅读 / 6,995 字 / 正在检测百度是否收录... 正在检测必应是否收录...
文章摘要(AI生成)

Spring Cloud不再使用Ribbon的原因主要有三点:一是Ribbon已停止维护和开发,未来不确定;二是微服务架构中负载均衡应移至服务注册中心;三是Spring Cloud推荐使用Spring Cloud LoadBalancer替代Ribbon,提供更灵活易扩展的解决方案。Spring Cloud LoadBalancer是通用的负载均衡器,支持多种算法和自定义扩展。Open Feign中的负载均衡通过FeignBlockingLoadBalancerClient实现,选择要调用的实例使用负载均衡策略。Spring Cloud已开始推荐使用Spring Cloud LoadBalancer替代Ribbon,提供更为现代化和灵活的负载均衡解决方案。

为什么是LoadBalancer而不是Ribbon?

spring cloud为什么不再使用ribbon?

Spring Cloud不再推荐使用Ribbon的原因有几个。首先,Ribbon是Netflix公司开发的一个负载均衡框架,但是Netflix宣布停止对Ribbon的维护和开发,这就使得Ribbon的未来变得不太明朗。

其次,Spring Cloud团队认为在微服务架构中,负载均衡的责任应该被移到服务注册中心,例如Netflix Eureka或者Consul。通过依赖服务注册中心,服务可以更加灵活地被发现和调用,而不需要显式地依赖于客户端负载均衡工具。

最后,Spring Cloud推荐使用Spring Cloud LoadBalancer作为替代方案,它提供了一种更加灵活且易于扩展的负载均衡解决方案。LoadBalancer是一个通用的负载均衡器,可以集成多种负载均衡算法,并且支持自定义扩展。

总的来说,Spring Cloud不再使用Ribbon主要是因为Ribbon的维护状态和微服务架构的演进,推荐使用更为现代化和灵活的替代方案。

spring cloud使用什么组件替代ribbon呢?

Spring Cloud开始推荐使用Spring Cloud LoadBalancer作为Ribbon的替代方案,从Spring Cloud 2.2版本开始。在这个版本之后,Spring Cloud官方文档就开始强调使用Spring Cloud LoadBalancer来实现负载均衡的功能,而不再推荐使用Ribbon。

Spring Cloud LoadBalancer是一个通用的负载均衡器,它被设计为与服务注册中心集成,例如Netflix Eureka或者Consul。

Spring Cloud LoadBalancer提供了一种更加灵活且易于扩展的负载均衡解决方案。它支持多种负载均衡算法,并且允许你通过自定义扩展来满足特定的需求。与Ribbon不同,Spring Cloud LoadBalancer并不强制要求在客户端进行负载均衡,而是鼓励将负载均衡的责任移到服务注册中心。

总体而言,Spring Cloud LoadBalancer是Spring Cloud生态系统中用来替代Ribbon的主要组件。

LoadBalancer如何实现负载均衡?

open feign中的负载均衡

Open Feign如何运行的?一文中,我们讲到了openfeign通过获取Feign相关的配置、使用加载器加载Feign客户端,并将其注册到容器中。而Feign客户端调用服务提供方的http接口时,则使用了动态代理创建feign客户端去调用实际服务提供方:

image-1702194337275

这个调用链路中,我们看FeignBlockingLoadBalancerClient的执行方法:

@Override
public Response execute(Request request, Request.Options options) throws IOException {
    final URI originalUri = URI.create(request.url());
    String serviceId = originalUri.getHost();
    Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
    String hint = getHint(serviceId);
    DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
          new RequestDataContext(buildRequestData(request), hint));
    Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
          .getSupportedLifecycleProcessors(
                loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
                RequestDataContext.class, ResponseData.class, ServiceInstance.class);
    supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
    //通过负载均衡策略选择出要调用的实例
    ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
    org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
          instance);
    //如果实例为空,返回服务不可用
    if (instance == null) {
       String message = "Load balancer does not contain an instance for the service " + serviceId;
       if (LOG.isWarnEnabled()) {
          LOG.warn(message);
       }
       supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
             .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                   CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
       return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value())
             .body(message, StandardCharsets.UTF_8).build();
    }
    //根据获取到的实例ip,组装实际的调用地址
    String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
    Request newRequest = buildRequest(request, reconstructedUrl, instance);
    return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
          supportedLifecycleProcessors);
}

它则实现了一个基本的流程:

  1. 通过负载均衡策略选择出要调用的实例
  2. 如果实例为空,返回服务不可用
  3. 根据获取到的实例ip,组装实际的调用地址

我们接下来主要看这个loadBalancerClient.choose是如何获取我们被调用服务下的具体执行实例的。

@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
    //获取一个负载均衡器
    ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
    if (loadBalancer == null) {
       return null;
    }
    //异步获取服务实例
    Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
    if (loadBalancerResponse == null) {
       return null;
    }
    //返回服务实例
    return loadBalancerResponse.getServer();
}

其主要做了三件事:

  1. 获取一个负载均衡器
  2. 异步获取服务实例
  3. 返回服务实例

LoadBalancer的配置加载

spring-cloud-loadbalancer包中,其 imports文件中加载了如下配置类:

spring-cloud-loadbalancer包 imports:

##创建LoaderBalancerClientFactory
org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration
##创建LoaderBalancerClient
org.springframework.cloud.loadbalancer.config.BlockingLoadBalancerClientAutoConfiguration
##服务列表缓存配置
org.springframework.cloud.loadbalancer.config.LoadBalancerCacheAutoConfiguration
##oauth2配置
org.springframework.cloud.loadbalancer.security.OAuth2LoadBalancerClientAutoConfiguration
org.springframework.cloud.loadbalancer.config.LoadBalancerStatsAutoConfiguration

负载均衡客户端工厂配置类-LoadBalancerAutoConfiguration

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@EnableConfigurationProperties({ LoadBalancerClientsProperties.class, LoadBalancerEagerLoadProperties.class })
@AutoConfigureBefore({ ReactorLoadBalancerClientAutoConfiguration.class,
		LoadBalancerBeanPostProcessorAutoConfiguration.class })
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.enabled", havingValue = "true", matchIfMissing = true)
public class LoadBalancerAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerZoneConfig zoneConfig(Environment environment) {
		return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
	}

	@ConditionalOnMissingBean
	@Bean
	public LoadBalancerClientFactory loadBalancerClientFactory(LoadBalancerClientsProperties properties,
			ObjectProvider<List<LoadBalancerClientSpecification>> configurations) {
		LoadBalancerClientFactory clientFactory = new LoadBalancerClientFactory(properties);
		clientFactory.setConfigurations(configurations.getIfAvailable(Collections::emptyList));
		return clientFactory;
	}

	@Bean
	public LoadBalancerEagerContextInitializer loadBalancerEagerContextInitializer(
			LoadBalancerClientFactory clientFactory, LoadBalancerEagerLoadProperties properties) {
		return new LoadBalancerEagerContextInitializer(clientFactory, properties.getClients());
	}

	@Bean
	static LoadBalancerChildContextInitializer loadBalancerChildContextInitializer(
			LoadBalancerClientFactory loadBalancerClientFactory, ApplicationContext parentContext) {
		return new LoadBalancerChildContextInitializer(loadBalancerClientFactory, parentContext);
	}

}

负载均衡客户端工厂类-LoadBalancerClientFactory

LoadBalancerClientFactory实现了NamedContextFactory。Spring的NamedContextFactory类是Spring框架提供的一个类,用于创建和管理具有不同名称的多个ApplicationContext实例。这在需要在同一应用程序中具有多个隔离的上下文的场景中非常有用。

通过使用NamedContextFactory,您可以创建具有唯一名称的ApplicationContext实例,并对其进行单独配置。这样,您可以为每个实例使用不同的配置、bean和其他Spring组件。

NamedContextFactory类提供了创建和管理ApplicationContext实例的方法。它还提供了一种通过名称检索特定ApplicationContext的方式。

总的来说,Spring的NamedContextFactory提供了一种方便的方式来管理Spring应用程序中的多个ApplicationContext实例。

public LoadBalancerClientFactory(LoadBalancerClientsProperties properties) {
    super(LoadBalancerClientConfiguration.class, NAMESPACE, PROPERTY_NAME, new HashMap<>());
    this.properties = properties;
}

@Override
public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
    return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
}

可见,其构造函数中加载了LoadBalancerClientConfiguration配置类,作为负载均衡客户端配置类

负载均衡客户端配置类-LoadBalancerClientConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {

	private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;

	@Bean
	@ConditionalOnMissingBean
	public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
			LoadBalancerClientFactory loadBalancerClientFactory) {
		String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        //默认为轮询策略
		return new RoundRobinLoadBalancer(
				loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
	}

    //响应式编程支持
	@Configuration(proxyBeanMethods = false)
	@ConditionalOnReactiveDiscoveryEnabled
	@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER)
	public static class ReactiveSupportConfiguration {

        //服务列表提供器-缓存获取
		@Bean
		@ConditionalOnBean(ReactiveDiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(DefaultConfigurationCondition.class)
		public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().build(context);
		}

        //服务列表提供器-同一地区优先
		@Bean
		@ConditionalOnBean(ReactiveDiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(ZonePreferenceConfigurationCondition.class)
		public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().withZonePreference()
					.build(context);
		}

		@Bean
		@ConditionalOnBean(LoadBalancerClientFactory.class)
		@ConditionalOnMissingBean
		public XForwardedHeadersTransformer xForwarderHeadersTransformer(LoadBalancerClientFactory clientFactory) {
			return new XForwardedHeadersTransformer(clientFactory);
		}

        //服务列表提供器-通过健康检查获取可用服务列表
		@Bean
		@ConditionalOnBean({ ReactiveDiscoveryClient.class, WebClient.Builder.class })
		@ConditionalOnMissingBean
		@Conditional(HealthCheckConfigurationCondition.class)
		public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withDiscoveryClient().withHealthChecks().build(context);
		}
		
        //服务列表提供器-通过粘性会话获取服务列表
		@Bean
		@ConditionalOnBean(ReactiveDiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(RequestBasedStickySessionConfigurationCondition.class)
		public ServiceInstanceListSupplier requestBasedStickySessionDiscoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching()
					.withRequestBasedStickySession().build(context);
		}

        //服务列表提供器-统一实例优先
		@Bean
		@ConditionalOnBean(ReactiveDiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(SameInstancePreferenceConfigurationCondition.class)
		public ServiceInstanceListSupplier sameInstancePreferenceServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching()
					.withSameInstancePreference().build(context);
        }

        //服务列表提供器-按权重优先
		@Bean
		@ConditionalOnBean(ReactiveDiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(WeightedConfigurationCondition.class)
		public ServiceInstanceListSupplier weightedServiceInstanceListSupplier(ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withDiscoveryClient().withCaching().withWeighted()
					.build(context);
		}

	}

    //阻塞式编程支持-默认
	@Configuration(proxyBeanMethods = false)
	@ConditionalOnBlockingDiscoveryEnabled
	@Order(REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER + 1)
	public static class BlockingSupportConfiguration {

		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(DefaultConfigurationCondition.class)
		public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
		}

		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(ZonePreferenceConfigurationCondition.class)
		public ServiceInstanceListSupplier zonePreferenceDiscoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching()
					.withZonePreference().build(context);
		}

		@Bean
		@ConditionalOnBean({ DiscoveryClient.class, RestTemplate.class })
		@ConditionalOnMissingBean
		@Conditional(HealthCheckConfigurationCondition.class)
		public ServiceInstanceListSupplier healthCheckDiscoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withBlockingHealthChecks()
					.build(context);
		}

		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(RequestBasedStickySessionConfigurationCondition.class)
		public ServiceInstanceListSupplier requestBasedStickySessionDiscoveryClientServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching()
					.withRequestBasedStickySession().build(context);
		}

		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(SameInstancePreferenceConfigurationCondition.class)
		public ServiceInstanceListSupplier sameInstancePreferenceServiceInstanceListSupplier(
				ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching()
					.withSameInstancePreference().build(context);
		}

		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@ConditionalOnMissingBean
		@Conditional(WeightedConfigurationCondition.class)
		public ServiceInstanceListSupplier weightedServiceInstanceListSupplier(ConfigurableApplicationContext context) {
			return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().withWeighted()
					.build(context);
		}

	}

    //阻塞式编程 服务实例重试-默认
	@Configuration(proxyBeanMethods = false)
	@ConditionalOnBlockingDiscoveryEnabled
	@ConditionalOnClass(RetryTemplate.class)
	@Conditional(BlockingOnAvoidPreviousInstanceAndRetryEnabledCondition.class)
	@AutoConfigureAfter(BlockingSupportConfiguration.class)
	@ConditionalOnBean(ServiceInstanceListSupplier.class)
	public static class BlockingRetryConfiguration {

		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@Primary
		public ServiceInstanceListSupplier retryAwareDiscoveryClientServiceInstanceListSupplier(
				ServiceInstanceListSupplier delegate) {
			return new RetryAwareServiceInstanceListSupplier(delegate);
		}

	}

    //响应式编程 服务实例重试
	@Configuration(proxyBeanMethods = false)
	@ConditionalOnReactiveDiscoveryEnabled
	@Conditional(ReactiveOnAvoidPreviousInstanceAndRetryEnabledCondition.class)
	@AutoConfigureAfter(ReactiveSupportConfiguration.class)
	@ConditionalOnBean(ServiceInstanceListSupplier.class)
	@ConditionalOnClass(RetrySpec.class)
	public static class ReactiveRetryConfiguration {

		@Bean
		@ConditionalOnBean(DiscoveryClient.class)
		@Primary
		public ServiceInstanceListSupplier retryAwareDiscoveryClientServiceInstanceListSupplier(
				ServiceInstanceListSupplier delegate) {
			return new RetryAwareServiceInstanceListSupplier(delegate);
		}

	}
	
    //阻塞重试时避免尝试前一个刚调用的实例
    //spring.cloud.loadbalancer.retry.enabled为true
    //且spring.cloud.loadbalancer.retry.enabled为true
    //且spring.cloud.loadbalancer.avoid-previous-instance为true
	static final class BlockingOnAvoidPreviousInstanceAndRetryEnabledCondition extends AllNestedConditions {

		private BlockingOnAvoidPreviousInstanceAndRetryEnabledCondition() {
			super(ConfigurationPhase.REGISTER_BEAN);
		}

		@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "true",
				matchIfMissing = true)
		static class LoadBalancerRetryEnabled {

		}

		@Conditional(AvoidPreviousInstanceEnabledCondition.class)
		static class AvoidPreviousInstanceEnabled {

		}

	}
   
    //响应式重试时避免尝试前一个刚调用的实例
    //spring.cloud.loadbalancer.retry.enabled为true
    //且spring.cloud.loadbalancer.avoid-previous-instance为true
	static final class ReactiveOnAvoidPreviousInstanceAndRetryEnabledCondition extends AllNestedConditions {

		private ReactiveOnAvoidPreviousInstanceAndRetryEnabledCondition() {
			super(ConfigurationPhase.REGISTER_BEAN);
		}

		@ConditionalOnProperty(value = "spring.cloud.loadbalancer.retry.enabled", havingValue = "true")
		static class LoadBalancerRetryEnabled {

		}

		@Conditional(AvoidPreviousInstanceEnabledCondition.class)
		static class AvoidPreviousInstanceEnabled {

		}

	}

    //对应属性为spring.cloud.loadbalancer.avoid-previous-instance
	static class AvoidPreviousInstanceEnabledCondition implements Condition {

		@Override
		public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
			return LoadBalancerEnvironmentPropertyUtils.trueOrMissingForClientOrDefault(context.getEnvironment(),
					"retry.avoid-previous-instance");
		}

	}
	
    //对应属性为spring.cloud.loadbalancer.configurations=default
	static class DefaultConfigurationCondition implements Condition {

		@Override
		public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
			return LoadBalancerEnvironmentPropertyUtils.equalToOrMissingForClientOrDefault(context.getEnvironment(),
					"configurations", "default");
		}

	}

    //对应属性为spring.cloud.loadbalancer.configurations=zone-preference
	static class ZonePreferenceConfigurationCondition implements Condition {

		@Override
		public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
			return LoadBalancerEnvironmentPropertyUtils.equalToForClientOrDefault(context.getEnvironment(),
					"configurations", "zone-preference");
		}

	}

    //对应属性为spring.cloud.loadbalancer.configurations=health-check
	static class HealthCheckConfigurationCondition implements Condition {

		@Override
		public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
			return LoadBalancerEnvironmentPropertyUtils.equalToForClientOrDefault(context.getEnvironment(),
					"configurations", "health-check");
		}

	}

    //对应属性为spring.cloud.loadbalancer.configurations=request-based-sticky-session
	static class RequestBasedStickySessionConfigurationCondition implements Condition {

		@Override
		public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
			return LoadBalancerEnvironmentPropertyUtils.equalToForClientOrDefault(context.getEnvironment(),
					"configurations", "request-based-sticky-session");
		}

	}
	
    //对应属性为spring.cloud.loadbalancer.configurations=same-instance-preference
	static class SameInstancePreferenceConfigurationCondition implements Condition {

		@Override
		public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
			return LoadBalancerEnvironmentPropertyUtils.equalToForClientOrDefault(context.getEnvironment(),
					"configurations", "same-instance-preference");
		}

	}
	
    //对应属性为spring.cloud.loadbalancer.configurations=weighted
	static class WeightedConfigurationCondition implements Condition {

		@Override
		public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
			return LoadBalancerEnvironmentPropertyUtils.equalToForClientOrDefault(context.getEnvironment(),
					"configurations", "weighted");
		}

	}

}

其包含了服务下的实例列表提供的策略,分别包含了:

  • 同域优先策略
  • 同实例优先策略
  • 默认策略
  • 粘性会话策略
  • 健康检查策略
  • 权重策略
  • 缓存策略
  • 根据hint筛选策略

以及默认的服务实例获取策略:轮询策略

阻塞式负载均衡客户端创建-BlockingLoadBalancerClientAutoConfiguration

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients
@AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
@AutoConfigureBefore({ org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration.class })
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnProperty(value = "spring.cloud.loadbalancer.enabled", havingValue = "true", matchIfMissing = true)
public class BlockingLoadBalancerClientAutoConfiguration {

    @Bean
    @ConditionalOnBean(LoadBalancerClientFactory.class)
    @ConditionalOnMissingBean
    public LoadBalancerClient blockingLoadBalancerClient(LoadBalancerClientFactory loadBalancerClientFactory) {
       return new BlockingLoadBalancerClient(loadBalancerClientFactory);
    }

    @Bean
    @ConditionalOnBean(LoadBalancerClientFactory.class)
    @ConditionalOnMissingBean(LoadBalancerServiceInstanceCookieTransformer.class)
    public LoadBalancerServiceInstanceCookieTransformer loadBalancerServiceInstanceCookieTransformer(
          LoadBalancerClientFactory loadBalancerClientFactory) {
       return new LoadBalancerServiceInstanceCookieTransformer(loadBalancerClientFactory);
    }

    @Bean
    @ConditionalOnMissingBean(XForwardedHeadersTransformer.class)
    @ConditionalOnBean(LoadBalancerClientFactory.class)
    public XForwardedHeadersTransformer xForwarderHeadersTransformer(
          LoadBalancerClientFactory loadBalancerClientFactory) {
       return new XForwardedHeadersTransformer(loadBalancerClientFactory);
    }

    @Configuration
    @ConditionalOnClass(RetryTemplate.class)
    @EnableConfigurationProperties(LoadBalancerClientsProperties.class)
    protected static class BlockingLoadBalancerRetryConfig {

       @Bean
       @ConditionalOnMissingBean
       LoadBalancedRetryFactory loadBalancedRetryFactory(
             ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory) {
          return new BlockingLoadBalancedRetryFactory(loadBalancerFactory);
       }

    }

}

服务实例列表提供策略

image-20231210162405767

所有的服务列表提供器都是基于DelegatingServiceInstanceListSupplier实现的

同域优先策略-ZonePreferenceServiceInstanceListSupplier

zone-preference属性用于设置服务的区域偏好。当应用程序需要调用远程服务时,LoadBalancer会根据区域偏好选择特定的服务实例。

通常情况下,一个服务可能在多个区域或数据中心部署,每个区域都有各自的服务实例。如果应用程序有特定的区域偏好,可以使用zone-preference属性来指定。

通过设置zone-preference属性,LoadBalancer将更有可能选择与指定区域匹配的服务实例。这可以提高服务调用的性能和可靠性,并确保数据在特定区域内进行处理。

zone-preference属性的值通常是一个代表区域的字符串,可以是地理位置、数据中心名称或其他标识。LoadBalancer将根据该属性值选择具有相应区域标识的服务实例。

总之,zone-preference属性用于在Spring Cloud LoadBalancer中设置服务的区域偏好,以便更准确地选择特定区域的服务实例进行调用。

//用于存储消费方所在的区域
private String zone;

@Override
public Flux<List<ServiceInstance>> get() {
    return getDelegate().get().map(this::filteredByZone);
}

@Override
public Flux<List<ServiceInstance>> get(Request request) {
    if (callGetWithRequestOnDelegates) {
       return getDelegate().get(request).map(this::filteredByZone);
    }
    return get();
}

private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
    //没有设置
    if (zone == null) {
       zone = zoneConfig.getZone();
    }
    //有则根据区域过滤
    if (zone != null) {
       List<ServiceInstance> filteredInstances = new ArrayList<>();
       for (ServiceInstance serviceInstance : serviceInstances) {
          String instanceZone = getZone(serviceInstance);
          if (zone.equalsIgnoreCase(instanceZone)) {
             filteredInstances.add(serviceInstance);
          }
       }
       if (filteredInstances.size() > 0) {
          return filteredInstances;
       }
    }
    // If the zone is not set or there are no zone-specific instances available,
    // we return all instances retrieved for given service id.
    return serviceInstances;
}

同实例优先策略-SameInstancePreferenceServiceInstanceListSupplier

在Spring Cloud LoadBalancer中,"same-instance-preference"属性用于设置负载均衡时优先选择同一实例的偏好。当这个属性被启用时,LoadBalancer会倾向于选择之前处理过请求的同一实例来处理后续的请求。

通过设置"same-instance-preference"为true,负载均衡器将优先选择之前处理过请求的同一实例来处理后续的请求,从而保持会话的连贯性或避免建立新的连接。

需要注意,"same-instance-preference"属性的具体实现可能因使用的负载均衡器而有所不同。要确切了解该属性在您使用的Spring Cloud LoadBalancer版本中的配置和用法,请参考官方文档或相关资源。

//保存上一次调用所在的实例
private ServiceInstance previouslyReturnedInstance;

@Override
public Flux<List<ServiceInstance>> get() {
    return delegate.get().map(this::filteredBySameInstancePreference);
}

@Override
public Flux<List<ServiceInstance>> get(Request request) {
    if (callGetWithRequestOnDelegates) {
       return delegate.get(request).map(this::filteredBySameInstancePreference);
    }
    return get();
}

private List<ServiceInstance> filteredBySameInstancePreference(List<ServiceInstance> serviceInstances) {
    //如果上一次调用实例不为空,前现在的服务实例列表还包含此实例则直接返回
    if (previouslyReturnedInstance != null && serviceInstances.contains(previouslyReturnedInstance)) {
       if (LOG.isDebugEnabled()) {
          LOG.debug(String.format("Returning previously selected service instance: %s",
                previouslyReturnedInstance));
       }
       return Collections.singletonList(previouslyReturnedInstance);
    }
    if (LOG.isDebugEnabled()) {
       LOG.debug(String.format(
             "Previously selected service instance %s was not available. Returning all the instances returned by delegate.",
             previouslyReturnedInstance));
    }
    previouslyReturnedInstance = null;
    return serviceInstances;
}

//设置当次调用的服务实例
@Override
public void selectedServiceInstance(ServiceInstance serviceInstance) {
    super.selectedServiceInstance(serviceInstance);
    if (previouslyReturnedInstance == null || !previouslyReturnedInstance.equals(serviceInstance)) {
       previouslyReturnedInstance = serviceInstance;
    }
}

粘性会话策略-RequestBasedStickySessionServiceInstanceListSupplier

Spring Cloud LoadBalancer中,request-based-sticky-session属性用于启用基于请求的粘性会话(sticky session)功能。

粘性会话是一种负载均衡策略,它确保相同的客户端请求在一段时间内始终被路由到同一个服务实例上。这种策略通常用于需要在一次会话期间保持客户端状态的应用程序,以确保相关数据在同一服务实例上处理。

当启用request-based-sticky-session属性时,LoadBalancer会根据客户端请求的某些特征(如客户端IP地址或其他请求标识)将请求路由到同一个服务实例上。这样,相同的客户端请求将始终被路由到相同的服务实例上,保持了会话的一致性。

需要注意的是,启用基于请求的粘性会话可能会导致负载不均衡的情况,特别是在某些服务实例负载较重或不可用的情况下。因此,应根据应用程序的需求和具体情况来决定是否使用粘性会话功能。

总而言之,request-based-sticky-session属性用于在Spring Cloud LoadBalancer中启用基于请求的粘性会话功能,以确保相同的客户端请求在一段时间内始终被路由到同一个服务实例上。

@Override
public Flux<List<ServiceInstance>> get() {
    return delegate.get();
}

@SuppressWarnings("rawtypes")
@Override
public Flux<List<ServiceInstance>> get(Request request) {
    //先获取粘性会话的cookie名称
    String instanceIdCookieName = properties.getStickySession().getInstanceIdCookieName();
    Object context = request.getContext();
    if ((context instanceof RequestDataContext)) {
       MultiValueMap<String, String> cookies = ((RequestDataContext) context).getClientRequest().getCookies();
       if (cookies == null) {
          return delegate.get(request);
       }
       // We expect there to be one value in this cookie
       //获取对应cookie值
       String cookie = cookies.getFirst(instanceIdCookieName);
       if (cookie != null) {
          return delegate.get(request).map(serviceInstances -> selectInstance(serviceInstances, cookie));
       }
       if (LOG.isDebugEnabled()) {
          LOG.debug("Cookie not found. Returning all instances returned by delegate.");
       }
       return delegate.get(request);
    }
    // If the object type is not RequestData, we return all the instances provided by
    // the delegate.
    return delegate.get(request);
}

private List<ServiceInstance> selectInstance(List<ServiceInstance> serviceInstances, String cookie) {
    for (ServiceInstance serviceInstance : serviceInstances) {
       //判断服务实例中的cookie值是否相等
       if (cookie.equals(serviceInstance.getInstanceId())) {
          if (LOG.isDebugEnabled()) {
             LOG.debug(String.format("Returning the service instance: %s. Found for cookie: %s", serviceInstance,
                   cookie));
          }
          return Collections.singletonList(serviceInstance);
       }
    }
    // If the instances cannot be found based on the cookie,
    // we return all the instances provided by the delegate.
    if (LOG.isDebugEnabled()) {
       LOG.debug(String.format(
             "Service instance for cookie: %s not found. Returning all instances returned by delegate.",
             cookie));
    }
    return serviceInstances;
}

被调用实例的cookie如何设置进去的?–LoadBalancerServiceInstanceCookieTransformer

public class LoadBalancerServiceInstanceCookieTransformer implements LoadBalancerRequestTransformer {

    private ReactiveLoadBalancer.Factory<ServiceInstance> factory;

    public LoadBalancerServiceInstanceCookieTransformer(ReactiveLoadBalancer.Factory<ServiceInstance> factory) {
       this.factory = factory;
    }

    @Override
    public HttpRequest transformRequest(HttpRequest request, ServiceInstance instance) {
       if (instance == null) {
          return request;
       }
       //获取粘性会话配置
       LoadBalancerProperties.StickySession stickySession = factory != null
             ? factory.getProperties(instance.getServiceId()).getStickySession()
             : new LoadBalancerProperties.StickySession();
       //不需要向服务实例添加cookie则跳过
       if (!stickySession.isAddServiceInstanceCookie()) {
          return request;
       }
       //没有配置instanceIdCookieName跳过
       String instanceIdCookieName = stickySession.getInstanceIdCookieName();
       if (!StringUtils.hasText(instanceIdCookieName)) {
          return request;
       }
       //向请求中添加cookie,cookie值为instanceIdCookieName+实例id
       HttpHeaders headers = request.getHeaders();
       List<String> cookieHeaders = new ArrayList<>(request.getHeaders().getOrEmpty(HttpHeaders.COOKIE));
       String serviceInstanceCookie = new HttpCookie(instanceIdCookieName, instance.getInstanceId()).toString();
       cookieHeaders.add(serviceInstanceCookie);
       headers.put(HttpHeaders.COOKIE, cookieHeaders);
       return request;
    }

}

健康检查策略-HealthCheckServiceInstanceListSupplier

在Spring Cloud LoadBalancer中,health-check属性用于设置服务实例的健康检查策略。健康检查是一种用于确定服务实例是否可用的机制,它可以在服务调用前进行检查,以确保只选择健康的服务实例进行请求。

通过配置health-check属性,LoadBalancer可以根据指定的健康检查策略来确定服务实例的可用性。常见的健康检查策略包括:

  1. 基于HTTP的健康检查:通过发送HTTP请求到服务实例的健康检查端点,检查响应是否符合预期,以确定服务实例的健康状态。
  2. 基于TCP的健康检查:通过建立与服务实例的TCP连接,检查连接是否成功建立,以确定服务实例的健康状态。
  3. 基于心跳的健康检查:服务实例定期发送心跳信号,LoadBalancer检查心跳信号的接收情况,以确定服务实例的健康状态。

通过设置health-check属性,可以选择适合特定场景的健康检查策略,并根据服务实例的健康状态来调整负载均衡算法,以保证只选择健康的服务实例进行请求。

总之,health-check属性用于在Spring Cloud LoadBalancer中设置服务实例的健康检查策略,以确保只选择健康的服务实例进行请求。

当我们启用属性后,LoadBalancer在创建HealthCheckServiceInstanceListSupplier时会调用实例的健康检查地址检测实例是否存活:

//基于响应式编程的健康检查
private ServiceInstanceListSupplier healthCheckServiceInstanceListSupplier(WebClient webClient,
       ServiceInstanceListSupplier delegate,
       ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
    return new HealthCheckServiceInstanceListSupplier(delegate, loadBalancerClientFactory,
          (serviceInstance, healthCheckPath) -> webClient.get()
                .uri(UriComponentsBuilder.fromUriString(getUri(serviceInstance, healthCheckPath)).build()
                      .toUri())
                .exchange().flatMap(clientResponse -> clientResponse.releaseBody()
                      .thenReturn(HttpStatus.OK.equals(clientResponse.statusCode()))));
}

//基于阻塞式编程的健康检查
private ServiceInstanceListSupplier blockingHealthCheckServiceInstanceListSupplier(RestTemplate restTemplate,
       ServiceInstanceListSupplier delegate,
       ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
    return new HealthCheckServiceInstanceListSupplier(delegate, loadBalancerClientFactory,
          (serviceInstance, healthCheckPath) -> Mono.defer(() -> {
             URI uri = UriComponentsBuilder.fromUriString(getUri(serviceInstance, healthCheckPath)).build()
                   .toUri();
             try {
                return Mono
                      .just(HttpStatus.OK.equals(restTemplate.getForEntity(uri, Void.class).getStatusCode()));
             }
             catch (Exception ignored) {
                return Mono.just(false);
             }
          }));
}

HealthCheckServiceInstanceListSupplier构造函数中还会构造一个重复的健康检查任务

public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
       ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory,
       BiFunction<ServiceInstance, String, Mono<Boolean>> aliveFunction) {
    super(delegate);
    this.healthCheck = loadBalancerClientFactory.getProperties(getServiceId()).getHealthCheck();
    defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default", "/actuator/health");
    this.aliveFunction = aliveFunction;
    //重复执行健康检查任务
    Repeat<Object> aliveInstancesReplayRepeat = Repeat
          .onlyIf(repeatContext -> this.healthCheck.getRefetchInstances())
          .fixedBackoff(healthCheck.getRefetchInstancesInterval());
    Flux<List<ServiceInstance>> aliveInstancesFlux = Flux.defer(delegate).repeatWhen(aliveInstancesReplayRepeat)
          .switchMap(serviceInstances -> healthCheckFlux(serviceInstances).map(alive -> List.copyOf(alive)));
    //存活的服务列表
    aliveInstancesReplay = aliveInstancesFlux.delaySubscription(healthCheck.getInitialDelay()).replay(1)
          .refCount(1);
}

通过循环调用健康检查任务,我们来定期判断服务列表中的实例是否存活

protected Flux<List<ServiceInstance>> healthCheckFlux(List<ServiceInstance> instances) {
    Repeat<Object> healthCheckFluxRepeat = Repeat.onlyIf(repeatContext -> healthCheck.getRepeatHealthCheck())
          .fixedBackoff(healthCheck.getInterval());
    return Flux.defer(() -> {
       List<Mono<ServiceInstance>> checks = new ArrayList<>(instances.size());
       for (ServiceInstance instance : instances) {
          //调用判断服务是否存货
          Mono<ServiceInstance> alive = isAlive(instance).onErrorResume(error -> {
             if (LOG.isDebugEnabled()) {
                LOG.debug(String.format(
                      "Exception occurred during health check of the instance for service %s: %s",
                      instance.getServiceId(), instance.getUri()), error);
             }
             return Mono.empty();
          }).timeout(healthCheck.getInterval(), Mono.defer(() -> {
             if (LOG.isDebugEnabled()) {
                LOG.debug(String.format(
                      "The instance for service %s: %s did not respond for %s during health check",
                      instance.getServiceId(), instance.getUri(), healthCheck.getInterval()));
             }
             return Mono.empty();
          })).handle((isHealthy, sink) -> {
             if (isHealthy) {
                sink.next(instance);
             }
          });

          checks.add(alive);
       }
       List<ServiceInstance> result = new ArrayList<>();
       if (healthCheck.isUpdateResultsList()) {
          return Flux.merge(checks).map(alive -> {
             result.add(alive);
             return result;
          }).defaultIfEmpty(result);
       }
       return Flux.merge(checks).collectList();
    }).repeatWhen(healthCheckFluxRepeat);
}

当获取服务实例集合式,返回存活的服务实例列表

@Override
public Flux<List<ServiceInstance>> get() {
    return aliveInstancesReplay;
}

权重策略-WeightedServiceInstanceListSupplier

当我们在服务的applicaiton.properites配置文件中设置服务的eureka.instance.metadata-map.weight属性时,权重策略便会根据我们设置的权重大小依次返回服务实例列表。

@Override
public Flux<List<ServiceInstance>> get() {
    return delegate.get().map(this::expandByWeight);
}

@Override
public Flux<List<ServiceInstance>> get(Request request) {
    if (callGetWithRequestOnDelegates) {
       return delegate.get(request).map(this::expandByWeight);
    }
    return get();
}

private List<ServiceInstance> expandByWeight(List<ServiceInstance> instances) {
    if (instances.size() == 0) {
       return instances;
    }

    int[] weights = instances.stream().mapToInt(instance -> {
       try {
          int weight = weightFunction.apply(instance);
          if (weight <= 0) {
             if (LOG.isDebugEnabled()) {
                LOG.debug(String.format(
                      "The weight of the instance %s should be a positive integer, but it got %d, using %d as default",
                      instance.getInstanceId(), weight, DEFAULT_WEIGHT));
             }
             return DEFAULT_WEIGHT;
          }
          return weight;
       }
       catch (Exception e) {
          if (LOG.isDebugEnabled()) {
             LOG.debug(String.format(
                   "Exception occurred during apply weight function to instance %s, using %d as default",
                   instance.getInstanceId(), DEFAULT_WEIGHT), e);
          }
          return DEFAULT_WEIGHT;
       }
    }).toArray();

    return new LazyWeightedServiceInstanceList(instances, weights);
}

LazyWeightedServiceInstanceList继承了AbstractList,是LoadBalancer中加权实例列表的具体实现:

class LazyWeightedServiceInstanceList extends AbstractList<ServiceInstance> {

    /* for testing */ final ServiceInstance[] expanded;

    private final Object expandingLock = new Object();

    private WeightedServiceInstanceSelector selector;

    private volatile int position = 0;

    LazyWeightedServiceInstanceList(List<ServiceInstance> instances, int[] weights) {
       // Calculate the greatest common divisor (GCD) of weights, and the
       // total number of elements after expansion.
       int greatestCommonDivisor = 0;
       int total = 0;
       for (int weight : weights) {
          greatestCommonDivisor = greatestCommonDivisor(greatestCommonDivisor, weight);
          total += weight;
       }
       expanded = new ServiceInstance[total / greatestCommonDivisor];
       selector = new WeightedServiceInstanceSelector(instances, weights, greatestCommonDivisor);
    }

    @Override
    public ServiceInstance get(int index) {
       if (index >= position) {
          synchronized (expandingLock) {
             for (; position <= index && position < expanded.length; position++) {
                expanded[position] = selector.next();
             }
             if (position == expanded.length) {
                selector = null; // for gc
             }
          }
       }
       return expanded[index];
    }

    @Override
    public int size() {
       return expanded.length;
    }

    static int greatestCommonDivisor(int a, int b) {
       int r;
       while (b != 0) {
          r = a % b;
          a = b;
          b = r;
       }
       return a;
    }

    static class WeightedServiceInstanceSelector {

       Queue<Entry> active;

       Queue<Entry> expired;

       WeightedServiceInstanceSelector(List<ServiceInstance> instances, int[] weights, int greatestCommonDivisor) {
          active = new ArrayDeque<>(instances.size());
          expired = new ArrayDeque<>(instances.size());
          // Use iterator for some implementation of the List that not supports
          // RandomAccess, but `weights` is supported, so use a local variable `i`
          // to get the current position.
          int i = 0;
          for (ServiceInstance instance : instances) {
             active.offer(new Entry(instance, weights[i] / greatestCommonDivisor));
             i++;
          }
       }

       ServiceInstance next() {
          if (active.isEmpty()) {
             Queue<Entry> temp = active;
             active = expired;
             expired = temp;
          }

          Entry entry = active.poll();
          if (entry == null) {
             // Suppress warnings, never touched!
             return null;
          }

          entry.remainder--;
          if (entry.remainder == 0) {
             entry.remainder = entry.weight;
             expired.offer(entry);
          }
          else {
             active.offer(entry);
          }
          return entry.instance;
       }

       static class Entry {

          final ServiceInstance instance;

          final int weight;

          int remainder;

          Entry(ServiceInstance instance, int weight) {
             this.instance = instance;
             this.weight = weight;
             remainder = weight;
          }

       }

    }

}

不同实例重试策略-RetryAwareServiceInstanceListSupplier

@Override
public Flux<List<ServiceInstance>> get(Request request) {
    if (!(request.getContext() instanceof RetryableRequestContext context)) {
       return delegate.get(request);
    }
    ServiceInstance previousServiceInstance = context.getPreviousServiceInstance();
    if (previousServiceInstance == null) {
       return delegate.get(request);
    }
    return delegate.get(request).map(instances -> filteredByPreviousInstance(instances, previousServiceInstance));
}

//返回不含上一次请求的实例的服务实例列表
private List<ServiceInstance> filteredByPreviousInstance(List<ServiceInstance> instances,
       ServiceInstance previousServiceInstance) {
    List<ServiceInstance> filteredInstances = new ArrayList<>(instances);
    if (previousServiceInstance != null) {
       filteredInstances.remove(previousServiceInstance);
    }
    if (filteredInstances.size() > 0) {
       return filteredInstances;
    }
    if (LOG.isWarnEnabled()) {
       LOG.warn(String.format(
             "No instances found after removing previously used service instance from the search (%s). Returning all found instances.",
             previousServiceInstance));
    }
    return instances;
}

根据hint筛选策略-HintBasedServiceInstanceListSupplier

当我们在服务的applicaiton.properites配置文件中设置服务的spring.cloud.loadbalancer.hint-header-name属性或spring.cloud.loadbalancer.hint.[被调用方]属性时,hint筛选策略便会根据我们设置的hint返回服务实例列表。

@Override
public Flux<List<ServiceInstance>> get() {
    return delegate.get();
}

@Override
public Flux<List<ServiceInstance>> get(Request request) {
    return delegate.get(request).map(instances -> filteredByHint(instances, getHint(request.getContext())));
}

//获取hint
private String getHint(Object requestContext) {
    if (requestContext == null) {
       return null;
    }
    String hint = null;
    if (requestContext instanceof RequestDataContext) {
       hint = getHintFromHeader((RequestDataContext) requestContext);
    }
    if (!StringUtils.hasText(hint) && requestContext instanceof HintRequestContext) {
       hint = ((HintRequestContext) requestContext).getHint();
    }
    return hint;
}

//从请求头中获取hint
private String getHintFromHeader(RequestDataContext context) {
    if (context.getClientRequest() != null) {
       HttpHeaders headers = context.getClientRequest().getHeaders();
       if (headers != null) {
          return headers.getFirst(properties.getHintHeaderName());
       }
    }
    return null;
}

//根据hint过滤
private List<ServiceInstance> filteredByHint(List<ServiceInstance> instances, String hint) {
    if (!StringUtils.hasText(hint)) {
       return instances;
    }
    List<ServiceInstance> filteredInstances = new ArrayList<>();
    for (ServiceInstance serviceInstance : instances) {
       if (serviceInstance.getMetadata().getOrDefault("hint", "").equals(hint)) {
          filteredInstances.add(serviceInstance);
       }
    }
    if (filteredInstances.size() > 0) {
       return filteredInstances;
    }

    // If instances cannot be found based on hint,
    // we return all instances retrieved for given service id.
    return instances;
}

缓存策略-CachingServiceInstanceListSupplier

缓存策略会通过我们配置的缓存设备缓存我们的服务实例列表,并优先获取缓存中的服务实力列表进行返回

public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
    super(delegate);
    this.serviceInstances = CacheFlux.lookup(key -> {
       // TODO: configurable cache name
       Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
       if (cache == null) {
          if (log.isErrorEnabled()) {
             log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME);
          }
          return Mono.empty();
       }
       List<ServiceInstance> list = cache.get(key, List.class);
       if (list == null || list.isEmpty()) {
          return Mono.empty();
       }
       return Flux.just(list).materialize().collectList();
    }, delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
          .andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {
             Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
             if (cache == null) {
                if (log.isErrorEnabled()) {
                   log.error("Unable to find cache for writing: " + SERVICE_INSTANCE_CACHE_NAME);
                }
             }
             else {
                cache.put(key, instances);
             }
          }).then());
}

@Override
public Flux<List<ServiceInstance>> get() {
    return serviceInstances;
}

服务实例获取策略

服务实例获取策略ReactorServiceInstanceLoadBalancer类结构图如下:

image-20231210163916398

随机策略-RandomLoadBalancer

@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
    ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
          .getIfAvailable(NoopServiceInstanceListSupplier::new);
    return supplier.get(request).next()
          .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}

private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
       List<ServiceInstance> serviceInstances) {
    Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
    if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
       ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    }
    return serviceInstanceResponse;
}

private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
    if (instances.isEmpty()) {
       if (log.isWarnEnabled()) {
          log.warn("No servers available for service: " + serviceId);
       }
       return new EmptyResponse();
    }
    //获取一个随机下标
    int index = ThreadLocalRandom.current().nextInt(instances.size());

    ServiceInstance instance = instances.get(index);

    return new DefaultResponse(instance);
}

轮询策略-RoundRobinLoadBalancer

public Mono<Response<ServiceInstance>> choose(Request request) {
    ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
          .getIfAvailable(NoopServiceInstanceListSupplier::new);
    return supplier.get(request).next()
          .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}

private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
       List<ServiceInstance> serviceInstances) {
    Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
    if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
       ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
    }
    return serviceInstanceResponse;
}

private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
    if (instances.isEmpty()) {
       if (log.isWarnEnabled()) {
          log.warn("No servers available for service: " + serviceId);
       }
       return new EmptyResponse();
    }

    // Do not move position when there is only 1 instance, especially some suppliers
    // have already filtered instances
    //当只有 1 个实例时不要移动位置,尤其是一些供应商已经过滤了实例
    if (instances.size() == 1) {
       return new DefaultResponse(instances.get(0));
    }

    // Ignore the sign bit, this allows pos to loop sequentially from 0 to
    // Integer.MAX_VALUE
    // 忽略符号位,这允许 pos 从 0 到 Integer.MAX_VALUE 依次循环
    int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;

    ServiceInstance instance = instances.get(pos % instances.size());

    return new DefaultResponse(instance);
}
1

评论区