欢迎访问shiker.tech

请允许在我们的网站上展示广告

您似乎使用了广告拦截器,请关闭广告拦截器。我们的网站依靠广告获取资金。

从线上问题看服务注册
(last modified Nov 14, 2023, 1:52 AM )
by
侧边栏壁纸
  • 累计撰写 194 篇文章
  • 累计创建 66 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

从线上问题看服务注册

橙序员
2023-11-12 / 0 评论 / 0 点赞 / 677 阅读 / 8,548 字 / 正在检测百度是否收录... 正在检测必应是否收录...
文章摘要(AI生成)

本文介绍了Eureka服务注册的工作流程和相关组件。首先,通过EurekaAutoServiceRegistration实现服务的注册入口。然后,使用EurekaServiceRegistry进行服务的注册操作。接下来,通过EurekaClientAutoConfiguration创建EurekaClient的实例。在客户端创建EurekaClient后,会创建与注册EurekaClient的调度任务,并且会有一个线程负责刷新本地注册信息,另一个线程负责向注册中心注册心跳信息。最后,介绍了当注册中心配置错误时,微服务会打印的堆栈信息。

当我们有时候注册中心配置错误时,微服务会打印以下堆栈信息:

Request execution error. endpoint=DefaultEndpoint{ serviceUrl='http://localhost:8761/eureka/}, exception=I/O error on GET request for"http://localhost:8761/eureka/apps/": Connect to http://localhost:8761 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused: no further information stacktrace=org.springframework.web.client.ResourceAccessException: I/O error on GET request for "http://localhost:8761/eureka/apps/": Connect to http://localhost:8761 [localhost/127.0.0.1, localhost/0:0:0:0:0:0:0:1] failed: Connection refused: no further information
	at org.springframework.web.client.RestTemplate.createResourceAccessException(RestTemplate.java:888)
	at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:868)
	at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:764)
	at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:646)
	at org.springframework.cloud.netflix.eureka.http.RestTemplateEurekaHttpClient.getApplicationsInternal(RestTemplateEurekaHttpClient.java:149)
	at org.springframework.cloud.netflix. eureka.http.RestTemplateEurekaHttpClient.getApplications(RestTemplateEurekaHttpClient.java:139)**
	at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$6.execute(EurekaHttpClientDecorator.java:137)
	at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.executeOnNewServer(RedirectingEurekaHttpClient.java:121)
	at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.execute(RedirectingEurekaHttpClient.java:80)
	at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getApplications(EurekaHttpClientDecorator.java:134)
	at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$6.execute(EurekaHttpClientDecorator.java:137)
	at com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient.execute(RetryableEurekaHttpClient.java:120)
	at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getApplications(EurekaHttpClientDecorator.java:134)
	at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$6.execute(EurekaHttpClientDecorator.java:137)
	at com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient.execute(SessionedEurekaHttpClient.java:77)
	at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getApplications(EurekaHttpClientDecorator.java:134)
	at com.netflix.discovery.DiscoveryClient.getAndStoreFullRegistry(DiscoveryClient.java:1045)
	at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:958)
	at com.netflix.discovery.DiscoveryClient.<init>(DiscoveryClient.java:396)
	at com.netflix.discovery.DiscoveryClient.<init>(DiscoveryClient.java:247)
	at com.netflix.discovery.DiscoveryClient.<init>(DiscoveryClient.java:242)
	at org.springframework.cloud.netflix.eureka.CloudEurekaClient.<init>(CloudEurekaClient.java:68)
	at org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration$RefreshableEurekaClientConfiguration.eurekaClient(EurekaClientAutoConfiguration.java:320)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:139)
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:650)
	at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:642)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1332)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1162)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:560)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:364)
	at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:375)
	at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:179)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:361)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
	at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
	**at org.springframework.cloud.netflix.eureka.serviceregistry.EurekaRegistration.getTargetObject(EurekaRegistration.java:128)**
	**at org.springframework.cloud.netflix.eureka.serviceregistry.EurekaRegistration.getEurekaClient(EurekaRegistration.java:116)**
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:281)
	at org.springframework.cloud.context.scope.GenericScope$LockedScopedProxyFactoryBean.invoke(GenericScope.java:482)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:751)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:703)
	at org.springframework.cloud.netflix.eureka.serviceregistry.EurekaRegistration$$SpringCGLIB$$0.getEurekaClient(<generated>)
	at org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry.maybeInitializeClient(EurekaServiceRegistry.java:54)
	at org.springframework.cloud.netflix.eureka.serviceregistry.EurekaServiceRegistry.register(EurekaServiceRegistry.java:38)**
	at org.springframework.cloud.netflix.eureka.serviceregistry.EurekaAutoServiceRegistration.start(EurekaAutoServiceRegistration.java:83)
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:179)
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:357)
	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:156)
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:124)
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:966)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:619)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:738)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:440)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295)
	at com.example.eurekaclidemo.EurekaClidemoApplication.main(EurekaClidemoApplication.java:12)

接下来我们从堆栈日志出发,来看实例如何注册到eureka的。

Eureka服务注册入口-EurekaAutoServiceRegistration

EurekaAutoServiceRegistration作为服务注册入口,其目的就是完成在应用程序启动时完成向注册中心注册服务,并在程序关闭时向注册中心注销服务。

public class EurekaAutoServiceRegistration
		implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener 

AutoServiceRegistration在spring-cloud-common包中作为配置类接口被自动注入,这里提供了eureka的服务发现的实现。它也实现了SmartLifecycle接口,实现了其定义的方法:

	private final AtomicBoolean running = new AtomicBoolean(false);

	//在应用程序启动时执行start方法	
	@Override
	public void start() {
		//设置nonSecurePort和securePort
		//only set the port if the nonSecurePort or securePort is 0 and this.port != 0
		if (this.port.get() != 0) {
			if (this.registration.getNonSecurePort() == 0) {
				this.registration.setNonSecurePort(this.port.get());
			}

			if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
				this.registration.setSecurePort(this.port.get());
			}
		}

        // 如果服务未启动,且非安全端口大于0,则执行注册
		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		if (!this.running.get() && this.registration.getNonSecurePort() > 0) {

			this.serviceRegistry.register(this.registration);

			this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
			this.running.set(true);
		}
	}

	//在应用程序关闭时调用的方法
	//注销服务,运行状态设为false
	@Override
	public void stop() {
		this.serviceRegistry.deregister(this.registration);
		this.running.set(false);
	}

	//返回一个布尔值,表示组件当前是否正在运行。
	@Override
	public boolean isRunning() {
		return this.running.get();
	}

	//返回一个整数值,表示组件的生命周期阶段。具有更小值的组件将在具有较大值的组件之前启动,并在关闭时相反的顺序执行。
	//优先执行服务注册
	@Override
	public int getPhase() {
		return 0;
	}

	//返回一个布尔值,指示是否自动启动该组件。
	//服务启动时,自动启动服务注册
	@Override
	public boolean isAutoStartup() {
		return true;
	}

在伴随容器生命周期进行服务发现的同时,其也实现了SmartApplicationListener接口,用于监听容器启动和关闭事件

@Override
public int getOrder() {
	return this.order;
}

//要监听的event事件类型
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
    return WebServerInitializedEvent.class.isAssignableFrom(eventType)
          || ContextClosedEvent.class.isAssignableFrom(eventType);
}

//消费event事件
@Override
public void onApplicationEvent(ApplicationEvent event) {
    if (event instanceof WebServerInitializedEvent) {
       onApplicationEvent((WebServerInitializedEvent) event);
    }
    else if (event instanceof ContextClosedEvent) {
       onApplicationEvent((ContextClosedEvent) event);
    }
}

//消费web容器启动事件-WebServerInitializedEvent,设置端口号并执行服务注册
public void onApplicationEvent(WebServerInitializedEvent event) {
    // TODO: take SSL into account
    String contextName = event.getApplicationContext().getServerNamespace();
    if (contextName == null || !contextName.equals("management")) {
       int localPort = event.getWebServer().getPort();
       if (this.port.get() == 0) {
          log.info("Updating port to " + localPort);
          this.port.compareAndSet(0, localPort);
          start();
       }
    }
}

//消费容器关闭事件-ContextClosedEvent,执行服务注销
public void onApplicationEvent(ContextClosedEvent event) {
    if (event.getApplicationContext() == context) {
       stop();
    }
}

为什么既要通过lifeCycle的start方法启动,又要通过监听WebServerInitializedEvent进行启动呢?

在Spring中,WebServerInitializedEvent是在start()方法执行之后发送的。

SmartLifecycle接口中的start()方法是在应用程序启动时调用的,用于启动组件。而WebServerInitializedEvent是在Web服务器完全启动并开始接受请求后发送的事件。

因此,start()方法会在WebServerInitializedEvent事件之前执行。当start()方法完成后,Web服务器才会完全启动,然后发送WebServerInitializedEvent事件。

这里有两个考虑,一是如果我们在服务中将server.port属性或者eureka.instance.non-secure-port设置为0时,在web服务未启动时,我们是无法获取到服务的非安全端口号的,只有等服务启动后才能够获取到服务的实际端口号。也就是说,eureka注册中心的实例端口号即是服务实际的端口号,不是服务映射到外部的80端口号或者443端口。

Eureka服务注册-EurekaServiceRegistry

EurekaServiceRegistry被服务注册入口类EurekaAutoServiceRegistration调用。在实际的服务注册流程中,其首先会创建eruekaClient进行服务注册与发现,然后获取客户端中存储的注册表信息,并更新了注册表中服务的实例状态以及健康检查处理器。

	@Override
	public void register(EurekaRegistration reg) {
        //获取注册表信息
		maybeInitializeClient(reg);

		if (log.isInfoEnabled()) {
			log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
					+ " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
		}

		reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

		reg.getHealthCheckHandler()
				.ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
	}

	private void maybeInitializeClient(EurekaRegistration reg) {
		// force initialization of possibly scoped proxies
		reg.getApplicationInfoManager().getInfo();
		reg.getEurekaClient().getApplications();
	}

这里并不能够看出与Eureka注册的过程。所以真正的玄机在EurekaRegistration的构造中:

	public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry,
			EurekaRegistration registration) {
		this.context = context;
		this.serviceRegistry = serviceRegistry;
		this.registration = registration;
	}

	public EurekaRegistration(CloudEurekaInstanceConfig instanceConfig, EurekaClient eurekaClient,
			ApplicationInfoManager applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) {
		this.eurekaClient = eurekaClient;
		this.instanceConfig = instanceConfig;
		this.applicationInfoManager = applicationInfoManager;
		this.healthCheckHandler = healthCheckHandler;
	}

我们发现在容器创建EurekaAutoServiceRegistration实例时,也注入了容器、eureka服务注册器EurekaServiceRegistry、eureka服务注册表EurekaRegistration,而注册表实例包含了CloudEurekaInstanceConfigEurekaClientApplicationInfoManager以及HealthCheckHandler.这些bean是在哪里生命的呢?

从euerka-client包结构出发

我们首先看其导入了哪些自动配置类:

##如果配置服务器恰好是 Eureka 实例,则对其进行额外配置
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration
##客户端额外参数(如TLS)配置
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration
##eurekaClient客户端创建
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
##健康检查
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
##响应式客户端及其健康检查配置
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration
##负载均衡配置
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration

其spring.factories中声明如下:

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration

org.springframework.boot.BootstrapRegistryInitializer=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapper

EurekaConfigServerBootstrapConfiguration中创建eureka客户端中的属性EurekaClientConfigBean,并根据属性配置创建一个eureka的Http客户端。

EurekaConfigServerBootstrapConfiguration在配置属性加载之后,且eureka.client.enabled属性值为true && spring.cloud.config.discovery.enabled为true的配置下才创建。其为我们创建了一个与注册中心进行http请求交互的http客户端:

首先根据我们的客户端配置,创建一个EurekaClientConfigBean,然后根据我们的配置:

  1. EurekaHttpClient未创建,且eureka.client.webclient.enabled的属性为false时,则创建一个restful风格的euerkaHttpClient
  2. 当如果服务中使用了WebClient,且eureka.client.webclient.enabled的属性为true时,则创建webClient类型的euerkaHttpClient

创建的EuerkaHttpClient负责发送心跳,获取注册表,获取实例信息,我们可以据此获取到注册中心中的所有实例注册信息

//配置属性加载之后
@ConditionalOnClass(ConfigServicePropertySourceLocator.class)
//eureka.client.enabled为true && spring.cloud.config.discovery.enabled为true
@Conditional(EurekaConfigServerBootstrapConfiguration.EurekaConfigServerBootstrapCondition.class)
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
public class EurekaConfigServerBootstrapConfiguration {

    //eureka客户端配置信息创建:eureka 客户端向 Eureka 服务器注册实例所需的配置信息
	@Bean
	@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
	public EurekaClientConfigBean eurekaClientConfigBean() {
		return new EurekaClientConfigBean();
	}
	
    //如果EurekaHttpClient未创建,且eureka.client.webclient.enabled的属性为false
    //则创建一个restful风格的euerkaHttpClient,负责发送心跳,获取注册表,获取实例信息
	@Bean
	@ConditionalOnMissingBean(EurekaHttpClient.class)
	@ConditionalOnProperty(prefix = "eureka.client", name = "webclient.enabled", matchIfMissing = true,
			havingValue = "false")
	public RestTemplateEurekaHttpClient configDiscoveryRestTemplateEurekaHttpClient(EurekaClientConfigBean config,
			Environment env, @Nullable TlsProperties properties,
			EurekaClientHttpRequestFactorySupplier eurekaClientHttpRequestFactorySupplier) {
		return (RestTemplateEurekaHttpClient) new RestTemplateTransportClientFactory(properties,
				eurekaClientHttpRequestFactorySupplier)
						.newClient(HostnameBasedUrlRandomizer.randomEndpoint(config, env));
	}

    //针对eureka client创建一个http连接池,负责维护与注册中心的链接
	@Bean
	@ConditionalOnMissingBean
	EurekaClientHttpRequestFactorySupplier defaultEurekaClientHttpRequestFactorySupplier() {
		return new DefaultEurekaClientHttpRequestFactorySupplier();
	}

    //获取注册中心中的所有实例信息
	@Bean
	public ConfigServerInstanceProvider.Function eurekaConfigServerInstanceProvider(EurekaHttpClient client,
			EurekaClientConfig config) {
		return new EurekaConfigServerInstanceProvider(client, config)::getInstances;
	}
    
	//如果服务中使用了WebClient,且eureka.client.webclient.enabled的属性为true
    //则创建webClient类型的euerkaHttpClient
	@Configuration(proxyBeanMethods = false)
	@ConditionalOnClass(name = "org.springframework.web.reactive.function.client.WebClient")
	@ConditionalOnProperty(prefix = "eureka.client", name = "webclient.enabled", havingValue = "true")
	@ImportAutoConfiguration({ CodecsAutoConfiguration.class, WebClientAutoConfiguration.class })
	protected static class WebClientConfiguration {

		@Bean
		@ConditionalOnMissingBean(EurekaHttpClient.class)
		public WebClientEurekaHttpClient configDiscoveryWebClientEurekaHttpClient(EurekaClientConfigBean config,
				ObjectProvider<WebClient.Builder> builder, Environment env) {
			return (WebClientEurekaHttpClient) new WebClientTransportClientFactory(builder::getIfAvailable)
					.newClient(HostnameBasedUrlRandomizer.randomEndpoint(config, env));
		}

	}

    //eureka.client.enabled为true && spring.cloud.config.discovery.enabled为true
	static class EurekaConfigServerBootstrapCondition extends AllNestedConditions {

		EurekaConfigServerBootstrapCondition() {
			super(ConfigurationPhase.REGISTER_BEAN);
		}
		
        //spring.cloud.config.discovery.enabled为true
		@ConditionalOnProperty("spring.cloud.config.discovery.enabled")
		static class OnCloudConfigProperty {

		}
		//eureka.client.enabled为true
		@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
		static class OnEurekaClient {

		}

	}

}

EurekaConfigServerBootstrapper中注册了一个eureka函数,负责将eureka.client属性配置解析到EurekaClientConfigBean

@BootstrapConfiguration注解是Spring框架中的一个注解,它的作用是将带有此注解的类标记为Spring Boot应用程序的引导配置类。

引导配置类是Spring Boot应用程序中的关键组件,它用于配置和初始化Spring Boot框架的各种功能和组件。通过在类上添加@BootstrapConfiguration注解,Spring Boot会将这个类识别为引导配置类,并在应用程序启动时加载和执行其中的配置逻辑。

引导配置类通常会包含一些常规的Spring配置,例如定义Bean、配置属性、初始化数据库连接、设置日志记录等。它们可以通过使用其他的注解(如@Bean@ComponentScan@ConfigurationProperties等)来完成具体的配置工作。

总而言之,@BootstrapConfiguration注解的作用是标记一个类为Spring Boot应用程序的引导配置类,用于配置和初始化应用程序的各种功能和组件

BootstrapRegistryInitializer是Spring Boot框架中的一个类,其作用是在应用程序启动时初始化BootstrapRegistryBootstrapRegistry用于注册和管理Spring Boot的核心组件和配置项。它可以在应用程序启动过程中提供必要的初始化操作,例如注册自定义的BeanDefinition、属性源、配置类等。通过BootstrapRegistryInitializer,可以实现在应用程序启动前进行一些必要的初始化配置,以满足应用程序的需求。

bean实例创建-EurekaClientAutoConfiguration

实例创建条件和时间

EurekaClientAutoConfiguration这个配置类在eureka配置加载后, 且eureka.client.enabled为true的条件下进行创建,在CommonsClientAutoConfigurationServiceRegistryAutoConfiguration这两个配置之前加载(此配置依赖CommonsClientAutoConfigurationServiceRegistryAutoConfiguration),在DiscoveryClientOptionalArgsConfigurationRefreshAutoConfigurationEurekaDiscoveryClientConfigurationAutoServiceRegistrationAutoConfiguration配置之后加载(上述配置依赖此配置)。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
//eureka配置加载完成
@ConditionalOnClass(EurekaClientConfig.class)
//eureka.client.enabled为true
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
		"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
		"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
		"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })

在Spring Boot中,自动配置类的加载顺序和依赖关系可以通过@AutoConfigureBefore@AutoConfigureAfter@AutoConfigureOrder注解来控制

@AutoConfigureBefore注解可以用于控制自动配置类的加载顺序。如果配置类A依赖于配置类B,那么在配置类A上添加@AutoConfigureBefore(B.class)注解即可。这样,Spring Boot会在启动时先加载配置类B,再加载配置类A。同理,@AutoConfigureAfter注解可以用于控制自动配置类的加载顺序,如果配置类A依赖于配置类B,那么在配置类A上添加@AutoConfigureAfter(B.class)注解即可。

此外,@AutoConfigureOrder注解可以用于控制自动配置类的加载顺序,如果配置类A依赖于配置类B,那么在配置类A和配置类B上分别添加@AutoConfigureOrder(1)@AutoConfigureOrder(2)注解即可。这样,Spring Boot会在启动时先加载配置类A,再加载配置类B。

上述配置加载顺序与依赖关系图示如下:

image-20231108234043282

创建配置类

EurekaClientConfig不存在时(EurekaConfigServerBootstrapConfiguration未生效),则创建一个EurekaClientConfigBean,用来解析我们在配置文件中定义的含有eureka.client的相关配置属性。

	@Bean
	@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
	public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
		return new EurekaClientConfigBean();
	}

其此创建了一个eureka的元数据提供器,包含了健康检查地址和状态页地址

再创建一个EurekaInstanceConfigBean,解析配置中eureka.instance相关属性。

@Bean
@ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
       ManagementMetadataProvider managementMetadataProvider) {
    String hostname = getProperty("eureka.instance.hostname");
    boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
    String ipAddress = getProperty("eureka.instance.ip-address");
    boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));

    String serverContextPath = env.getProperty("server.servlet.context-path", "/");
    int serverPort = Integer.parseInt(env.getProperty("server.port", env.getProperty("port", "8080")));

    Integer managementPort = env.getProperty("management.server.port", Integer.class);

    String managementContextPath = env.getProperty("management.server.servlet.context-path");
    if (!StringUtils.hasText(managementContextPath)) {
       managementContextPath = env.getProperty("management.server.base-path");
    }

    Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);
    EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);

    instance.setNonSecurePort(serverPort);
    instance.setInstanceId(getDefaultInstanceId(env));
    instance.setPreferIpAddress(preferIpAddress);
    instance.setSecurePortEnabled(isSecurePortEnabled);
    if (StringUtils.hasText(ipAddress)) {
       instance.setIpAddress(ipAddress);
    }

    if (isSecurePortEnabled) {
       instance.setSecurePort(serverPort);
    }

    if (StringUtils.hasText(hostname)) {
       instance.setHostname(hostname);
    }
    String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
    String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");

    if (StringUtils.hasText(statusPageUrlPath)) {
       instance.setStatusPageUrlPath(statusPageUrlPath);
    }
    if (StringUtils.hasText(healthCheckUrlPath)) {
       instance.setHealthCheckUrlPath(healthCheckUrlPath);
    }

    ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath,
          managementContextPath, managementPort);

    if (metadata != null) {
       instance.setStatusPageUrl(metadata.getStatusPageUrl());
       instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
       if (instance.isSecurePortEnabled()) {
          instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
       }
       Map<String, String> metadataMap = instance.getMetadataMap();
       metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort()));
    }
    else {
       // without the metadata the status and health check URLs will not be set
       // and the status page and health check url paths will not include the
       // context path so set them here
       if (StringUtils.hasText(managementContextPath)) {
          instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath());
          instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath());
       }
    }

    setupJmxPort(instance, jmxPort);
    return instance;
}

创建客户端

创建客户端分两种情况:

  1. 如果没有客户端不支持多例,即没有添加@RefreshScope注解,或spring.cloud.refresh.enabled设置为false,或eureka.client.refresh.enable设置为false,则创建一个单例eureka客户端。创建时先根据配置创建一个包含实例信息instanceInfo和配置信息的apllicationManager,然后再创建eureka客户端

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnMissingRefreshScope
    protected static class EurekaClientConfiguration {
    
        @Autowired
        private ApplicationContext context;
    
        @Autowired
        private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
    
        //根据ApplicationInfoManager创建eurekaClient
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
              TransportClientFactories<?> transportClientFactories) {
           //核心方法:eurekaClient创建,与注册中心通讯完成服务发现
           return new CloudEurekaClient(manager, config, transportClientFactories, this.optionalArgs, this.context);
        }
    
        //使用eureka配置,先创建ApplicationInfoManager,其包含了InstanceInfo和配置
        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
           InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
           return new ApplicationInfoManager(config, instanceInfo);
        }
    
        //根据eurekaClient和ApplicationInfoManager创建注册表
        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
        public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
              CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager,
              @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
           return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient)
                 .with(healthCheckHandler).build();
        }
    
    }
    
    @Target({ ElementType.TYPE, ElementType.METHOD })
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Conditional(OnMissingRefreshScopeCondition.class)
    @interface ConditionalOnMissingRefreshScope {
    
    }
    
    private static class OnMissingRefreshScopeCondition extends AnyNestedCondition {
    
        OnMissingRefreshScopeCondition() {
           super(ConfigurationPhase.REGISTER_BEAN);
        }
    	
        //有配置@RefreshScope注解
        @ConditionalOnMissingClass("org.springframework.cloud.context.scope.refresh.RefreshScope")
        static class MissingClass {
    
        }
    
        //spring.cloud.refresh.enabled设置为true
        @ConditionalOnMissingBean(RefreshAutoConfiguration.class)
        static class MissingScope {
    
        }
    
        //eureka.client.refresh.enable为false
        @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "false")
        static class OnPropertyDisabled {
    
        }
    
    }
    
  2. 如果eureka客户端支持多例,即添加添加@RefreshScope注解,或spring.cloud.refresh.enabled设置为true,或eureka.client.refresh.enable设置为true,则创建一个多例eureka客户端(bean定义上添加@RefreshScope)。

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnRefreshScope
    protected static class RefreshableEurekaClientConfiguration {
    
        @Autowired
        private ApplicationContext context;
    
        @Autowired
        private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
    
        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
              EurekaInstanceConfig instance, TransportClientFactories<?> transportClientFactories,
              @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
           // 如果我们使用 ApplicationInfoManager 的代理,我们可能会在 CloudEurekaClient 上调用 shutdown 时遇到问题,其中请求了 ApplicationInfoManager bean,但由于我们正在关闭,因此不允许。为了避免这种情况,我们直接使用对象。
           ApplicationInfoManager appManager;
           if (AopUtils.isAopProxy(manager)) {
              appManager = ProxyUtils.getTargetObject(manager);
           }
           else {
              appManager = manager;
           }
           CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, transportClientFactories,
                 this.optionalArgs, this.context);
           cloudEurekaClient.registerHealthCheck(healthCheckHandler);
           return cloudEurekaClient;
        }
    
        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @Lazy
        public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
           InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
           return new ApplicationInfoManager(config, instanceInfo);
        }
    
        @Bean
        @org.springframework.cloud.context.config.annotation.RefreshScope
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
        public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
              CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager,
              @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
           return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient)
                 .with(healthCheckHandler).build();
        }
    }
    
    @Target({ ElementType.TYPE, ElementType.METHOD })
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @ConditionalOnClass(RefreshScope.class)
    @ConditionalOnBean(RefreshAutoConfiguration.class)
    @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "true", matchIfMissing = true)
    @interface ConditionalOnRefreshScope {
    
    }
    

EurekaClient创建与注册

EurekaClient创建过程如下:

  1. 设置健康检查处理器、健康检查回调处理器、事件监听器、预注册处理器
  2. 获取eureka的区域信息
  3. 如果配置了需要从注册中心拉取注册信息,则设当注册状态监控器
  4. 如果配置了需要向注册中心注册本实例,则设置健康状态监控器
  5. 如果不需要从注册中心拉取注册信息且不需要向注册中心注册本实例,则无需要设置网络任务,直接返回
  6. 如果需要从注册中心拉取注册信息或者需要向注册中心注册本实例
    1. 创建一个含有心跳和缓存刷新两个线程池的定时调度器
    2. 创建创建心跳检测的线程池,线程池无缓存队列
    3. 创建缓存刷新的线程池,线程池无缓存队列
    4. 创建用于拉取注册信息和进行实例注册的http客户端
    5. 根据是否使用DNS解析,来获取绑定到区域的基于区域的 CNAME或名称
  7. 如果需要拉取注册信息,则通过负责拉取的http客户端拉取注册信息
  8. 执行预注册处理器的预处理方法,即再注册前需要执行的方法
  9. 如果配置了需要向注册中心注册且需要再初始化时强制注册,则调用注册方法进行注册
  10. 最后,初始化调度任务
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, TransportClientFactories transportClientFactories, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
    this(applicationInfoManager, config, transportClientFactories, args, new Provider<BackupRegistry>() {
        private volatile BackupRegistry backupRegistryInstance;

        @Override
        public synchronized BackupRegistry get() {
            if (backupRegistryInstance == null) {
                String backupRegistryClassName = config.getBackupRegistryImpl();
                if (null != backupRegistryClassName) {
                    try {
                        backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                        logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                    } catch (InstantiationException e) {
                        logger.error("Error instantiating BackupRegistry.", e);
                    } catch (IllegalAccessException e) {
                        logger.error("Error instantiating BackupRegistry.", e);
                    } catch (ClassNotFoundException e) {
                        logger.error("Error instantiating BackupRegistry.", e);
                    }
                }

                if (backupRegistryInstance == null) {
                    logger.warn("Using default backup registry implementation which does not do anything.");
                    backupRegistryInstance = new NotImplementedRegistryImpl();
                }
            }

            return backupRegistryInstance;
        }
    }, randomizer);
}

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, TransportClientFactories transportClientFactories, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    //设置健康检查处理器、健康检查回调处理器、事件监听器、预注册处理器
    if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        this.eventListeners.addAll(args.getEventListeners());
        this.preRegistrationHandler = args.preRegistrationHandler;
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
        this.preRegistrationHandler = null;
    }
    this.transportClientFactories = transportClientFactories;
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();

    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    instanceInfo = myInfo;
    if (myInfo != null) {
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }

    this.backupRegistryProvider = backupRegistryProvider;
    this.endpointRandomizer = endpointRandomizer;
    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    localRegionApps.set(new Applications());

    fetchRegistryGeneration = new AtomicLong(0);
	//获取eureka的区域信息
    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
	
    //如果配置了需要从注册中心拉取注册信息,则设当注册状态监控器
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }
	//如果配置了需要向注册中心注册本实例,则设置健康状态监控器
    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
	//如果不需要从注册中心拉取注册信息且不需要向注册中心注册本实例
    //则无需要设置网络任务,直接返回
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        initRegistrySize = this.getApplications().size();
        registrySize = initRegistrySize;
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, initRegistrySize);

        return;  // no need to setup up an network tasks and we are done
    }
	//如果需要从注册中心拉取注册信息或者需要向注册中心注册本实例
    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        //创建一个含有心跳和缓存刷新两个线程池的定时调度器
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());
		//创建心跳检测的线程池,线程池无缓存队列
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff
		//创建缓存刷新的线程池,线程池无缓存队列
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff
	
        //创建用于拉取注册信息和进行实例注册的http客户端
        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);
		//根据是否使用DNS解析,来获取绑定到区域的基于区域的 CNAME或名称
        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        //实例所属区域检查
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    //如果需要拉取注册信息,则通过负责拉取的http客户端拉取注册信息
    if (clientConfig.shouldFetchRegistry()) {
        try {
            boolean primaryFetchRegistryResult = fetchRegistry(false);
            if (!primaryFetchRegistryResult) {
                logger.info("Initial registry fetch from primary servers failed");
            }
            boolean backupFetchRegistryResult = true;
            if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
                backupFetchRegistryResult = false;
                logger.info("Initial registry fetch from backup servers failed");
            }
            if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
                throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
            }
        } catch (Throwable th) {
            logger.error("Fetch registry error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // 执行预注册处理器的方法,即再注册前需要执行的方法
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }

    //如果配置了需要向注册中心注册且需要再初始化时强制注册,则调用注册方法进行注册
    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    //最后,初始化调度任务(例如集群解析器、心跳、instanceInfo 复制器、fetch
    initScheduledTasks();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register timers", e);
    }

    // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
    // to work with DI'd DiscoveryClient
    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
    initRegistrySize = this.getApplications().size();
    registrySize = initRegistrySize;
    logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
            initTimestampMs, initRegistrySize);
}

scheduleServerEndpointTask方法创建了一个传输工厂,用于创建和注册中心进行信息传输的http客户端。并且创建了用于获取注册信息的http client和用于注册的http client。

//        private ClosableResolver bootstrapResolver;
//        private TransportClientFactory transportClientFactory;

//        private EurekaHttpClient registrationClient;
//        private EurekaHttpClientFactory registrationClientFactory;

//        private EurekaHttpClient queryClient;
//        private EurekaHttpClientFactory queryClientFactory;

private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                        AbstractDiscoveryClientOptionalArgs args) {

        
    Collection<?> additionalFilters = args == null
            ? Collections.emptyList()
            : args.additionalFilters;

    if (transportClientFactories == null) {
        throw new IllegalArgumentException("transportClientFactories may not be null");
    }

    Optional<SSLContext> sslContext = args == null
            ? Optional.empty()
            : args.getSSLContext();
    Optional<HostnameVerifier> hostnameVerifier = args == null
            ? Optional.empty()
            : args.getHostnameVerifier();

    // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
    //传输工厂,用于创建和注册中心进行信息传输的http客户端
    eurekaTransport.transportClientFactory = transportClientFactories.newTransportClientFactory(clientConfig,
            additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier);
	//获取本地缓存的应用信息,如果超过阈值则为空
    ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
        @Override
        public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
            long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
            long delay = getLastSuccessfulRegistryFetchTimePeriod();
            if (delay > thresholdInMs) {
                logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
                        thresholdInMs, delay);
                return null;
            } else {
                return localRegionApps.get();
            }
        }
    };
	
    //引导解析器,它通过远程调用本地注册表的“VIP 源”来解析 Eureka 服务器端点,其中源是从 rootResolver(dns 或 config)中找到的
    eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
            clientConfig,
            transportConfig,
            eurekaTransport.transportClientFactory,
            applicationInfoManager.getInfo(),
            applicationsSource,
            endpointRandomizer
    );
	//如果需要向注册中心注册本实例,则创建一个可以创建向注册中心注册的httpclient工厂
    if (clientConfig.shouldRegisterWithEureka()) {
        EurekaHttpClientFactory newRegistrationClientFactory = null;
        EurekaHttpClient newRegistrationClient = null;
        try {
            newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                    eurekaTransport.bootstrapResolver,
                    eurekaTransport.transportClientFactory,
                    transportConfig
            );
            newRegistrationClient = newRegistrationClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
        eurekaTransport.registrationClient = newRegistrationClient;
    }

    // new method (resolve from primary servers for read)
    // Configure new transport layer (candidate for injecting in the future)
    //如果需要向注册中心注册本实例,则创建一个可以拉取注册信息的httpclient工厂
    if (clientConfig.shouldFetchRegistry()) {
        EurekaHttpClientFactory newQueryClientFactory = null;
        EurekaHttpClient newQueryClient = null;
        try {
            newQueryClientFactory = EurekaHttpClients.queryClientFactory(
                    eurekaTransport.bootstrapResolver,
                    eurekaTransport.transportClientFactory,
                    clientConfig,
                    transportConfig,
                    applicationInfoManager.getInfo(),
                    applicationsSource,
                    endpointRandomizer
            );
            newQueryClient = newQueryClientFactory.newClient();
        } catch (Exception e) {
            logger.warn("Transport initialization failure", e);
        }
        eurekaTransport.queryClientFactory = newQueryClientFactory;
        eurekaTransport.queryClient = newQueryClient;
    }
}

EurekaClient的调度任务

我们看其初始化调度任务的方法,可以发现其启动了三个任务:

  1. 本地注册表刷新任务:负责将注册中心中应用的注册信息拉取到本地
  2. 心跳检测续约任务:负责向注册中心发送心跳进行续约
  3. 实例信息同步任务:本地实例信息发生变更时,同步注册中心
private void initScheduledTasks() {
    //配置了拉取注册信息
    if (clientConfig.shouldFetchRegistry()) {
        //获取刷新注册信息的时间间隔
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        //缓存刷新执行器指数回退相关属性。它是重试延迟的最大乘数值,以防发生一系列超时。
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        //设置本地注册缓存刷新任务
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        );
        //创建定时任务
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
	//配置了需要注册
    if (clientConfig.shouldRegisterWithEureka()) {
        //获取续约间隔
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        //检测信号执行器指数回退相关属性。它是重试延迟的最大乘数值,以防发生一系列超时。
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        //设置心跳检测任务
        heartbeatTask = new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        );
        scheduler.schedule(
                heartbeatTask,
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo复制器
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
            	//指示将实例更改复制到 eureka 服务器的频率(以秒为单位)。
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize
		//状态监听器
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                logger.info("Saw local status change event {}", statusChangeEvent);
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

本地注册信息刷新-CacheRefreshThread

这里注册信息的刷新包括了两部分信息的刷新:

  1. 注册中心中注册的区域列表
  2. 注册中心中注册的实例信息
@VisibleForTesting
void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        //这可确保对要提取的远程区域进行动态更改
        //获取最新的远程区域
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            //本地区域列表和远程区域列表不相等,则远程区域列表发生了变更
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // remoteRegionsToFetch 和 AzToRegionMapper.regionsToFetch 都需要同步
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // 只需刷新映射即可反映任何 DNSProperty 更改
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }
		//注册中心与本实例的区域列表不一致时,也需要重新拉取注册信息
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            //获取本实例所在区域的应用数量,并更新拉取时间
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        if (logger.isDebugEnabled()) {
            StringBuilder allAppsHashCodes = new StringBuilder();
            allAppsHashCodes.append("Local region apps hashcode: ");
            allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
            allAppsHashCodes.append(", is fetching remote regions? ");
            allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
            for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                allAppsHashCodes.append(", Remote region: ");
                allAppsHashCodes.append(entry.getKey());
                allAppsHashCodes.append(" , apps hashcode: ");
                allAppsHashCodes.append(entry.getValue().getAppsHashCode());
            }
            logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                    allAppsHashCodes);
        }
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }
}

向注册中心注册-HeartbeatThread

这个过程很简单:

  1. 首先向注册中心发送心跳
  2. 如果心跳响应失败则向注册中心重新注册
boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        //向注册中心发送心跳
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        //若注册中心没有此实例,则执行注册操作
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

本地实例信息同步-instanceInfoReplicator

本地实例信息有一个标志位isDirty

isDirty属性表示实例状态是否脏数据,即是否有新的变更需要同步。当实例状态发生变化时,比如实例的元数据信息发生了更新,isDirty属性会被设置为true,表示实例状态已经脏了,需要进行同步。

isDirtyWithTime属性是isDirty属性的扩展,在isDirty属性为true的基础上,还记录了实例状态变化的时间戳。这个时间戳可以用于记录最后一次状态变化的时间,方便进行状态的更新和同步。

这两个属性的作用是帮助Eureka客户端和服务器之间实现状态的同步和更新。当实例状态变化时,Eureka客户端会将这些变化信息同步给Eureka服务器,以保持实例信息的一致性和可靠性。而isDirtyisDirtyWithTime属性则用来标识需要同步的实例状态是否脏,以及记录状态变化的时间,方便进行状态的更新和同步操作。

public void run() {
    try {
        //刷新本地实例信息,设置为待同步
        discoveryClient.refreshInstanceInfo();
		//获取待同步时间
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        //待同步时间不为空,则需要重新注册
        //注册完成后将状态设为已同步
        if (dirtyTimestamp != null) {
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        //以replicationIntervalSeconds实例信息同步间隔创建定时任务,执行本地信息同步
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

实例信息的定时刷新主要对两个信息进行更新:

  1. 实例的主机名,即实例所属主机为云主机则有可能发生变化需要更新实例信息
  2. 实例续约信息更新,即续约响应时间和续约间隔的配置发生变更
void refreshInstanceInfo() {
	//重新获取主机名以检查它是否已更改,并更新本地实例信息
    applicationInfoManager.refreshDataCenterInfoIfRequired();
    //如果需要,刷新续约信息
    applicationInfoManager.refreshLeaseInfoIfRequired();
	//更新实例的在线状态
    InstanceStatus status;
    try {
        status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
    } catch (Exception e) {
        logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
        status = InstanceStatus.DOWN;
    }
	
    if (null != status) {
        applicationInfoManager.setInstanceStatus(status);
    }
}
0

评论区