文章摘要(AI生成)
在上篇文章中我们已经了解了eureka客户端是如何向eureka server发送请求进行服务注册,以及在实例内部创建三个任务进行服务心跳、服务实例信息同步、本地缓存信息更新的。在本篇文章中我们来从源码探究eureka server是如何接收客户端请求并维护客户端实例信息的。Eureka服务启动创建
在上篇文章中我们已经了解了eureka客户端是如何向eureka server发送请求进行服务注册,以及在实例内部创建三个任务进行服务心跳、服务实例信息同步、本地缓存信息更新的。在本篇文章中我们来从源码探究eureka server是如何接收客户端请求并维护客户端实例信息的。
Eureka服务启动
首先,我们来看服务的启动。首先我们从starter包中的imports信息中可以看到其引入的自动配置类:
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
其配置类声明如下,我们可以看到引入了三个配置属性:eureka面板配置、实例注册配置、eureka配置。在本类创建之后其有引入了一个EurekaServerInitializerConfiguration
的配置类进行eureka server服务的启动。那我们可以推送该类主要负责server中组件的创建
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class,
EurekaProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer
创建必要组件-EurekaServerAutoConfiguration
一个Eureka注册中心根据配置需要创建以下组件:
- *eureka面板管理
- 开启与客户端的通讯
- 注册中心启动器
- 创建服务注册表
创建eureka面板
如果eureka面板未被关闭(eureka.dashboard.enabled=false
),则创建一个EurekaContoller,用于处理面板统计信息
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController(EurekaProperties eurekaProperties) {
return new EurekaController(this.applicationInfoManager, eurekaProperties);
}
创建与客户端的会话
创建一个基于jersey3的EurekaServerHttpClient创建工厂,这个工厂负责创建一个基于http客户端的eureka会话:
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
@ConditionalOnMissingBean(EurekaServerHttpClientFactory.class)
public Jersey3EurekaServerHttpClientFactory jersey3EurekaServerHttpClientFactory() {
return new Jersey3EurekaServerHttpClientFactory();
}
Jersey是一种用于构建RESTful Web服务的开发框架。它是基于JAX-RS(Java API for RESTful Web Services)规范的实现之一。Jersey提供了一组API和工具,使得在Java应用程序中创建和开发RESTful Web服务变得更加简单和高效。
使用Jersey,开发者可以使用注解来定义资源、路径、HTTP方法等,并通过简单的编程模型实现请求的处理和响应的生成。Jersey还提供了一些功能丰富的特性,包括过滤器、拦截器、请求和响应的转换、异常处理等,以便开发者可以更好地控制和定制自己的RESTful服务。
总结起来,Jersey在Java中用于构建和开发RESTful Web服务,帮助开发者更方便地管理和处理HTTP请求和响应。它是一个强大且受欢迎的框架,广泛应用于Java Web开发领域。
通过调用工厂的实例创建方法可以创建一个会话:
public EurekaHttpClient createRemoteRegionClient(EurekaServerConfig serverConfig, EurekaTransportConfig transportConfig, ServerCodecs serverCodecs, ClusterResolver<EurekaEndpoint> clusterResolver) {
Jersey3RemoteRegionClientFactory jersey3RemoteRegionClientFactory = new Jersey3RemoteRegionClientFactory(serverConfig, serverCodecs, clusterResolver.getRegion());
TransportClientFactory metricsFactory = MetricsCollectingEurekaHttpClient.createFactory(jersey3RemoteRegionClientFactory);
//基于httpclient,创建一个eureka会话
SessionedEurekaHttpClient client = new SessionedEurekaHttpClient("remote", RetryableEurekaHttpClient.createFactory("remote", transportConfig, clusterResolver, createFactory(metricsFactory), ServerStatusEvaluators.legacyEvaluator()), 1800000L);
return client;
}
创建服务注册表
创建服务注册表PeerAwareInstanceRegistry
,提供初始化、同步、更新和注册注册表的操作
//服务编码格式,添加对json和xml解析的支持
@Bean
public ServerCodecs serverCodecs() {
return new CloudServerCodecs(this.eurekaServerConfig);
}
private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) {
CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName());
return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec;
}
private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) {
CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName());
return codec == null ? CodecWrappers.getCodec(CodecWrappers.XStreamXml.class) : codec;
}
class CloudServerCodecs extends DefaultServerCodecs {
CloudServerCodecs(EurekaServerConfig serverConfig) {
super(getFullJson(serverConfig), CodecWrappers.getCodec(CodecWrappers.JacksonJsonMini.class),
getFullXml(serverConfig), CodecWrappers.getCodec(CodecWrappers.JacksonXmlMini.class));
}
}
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs,
EurekaServerHttpClientFactory eurekaServerHttpClientFactory) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient,
eurekaServerHttpClientFactory,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
创建注册中心启动器
根据注册表信息创建eureka节点信息、eurekaServer上下文和eurekaServer启动器EurekaServerBootstrap
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs,
ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs,
this.applicationInfoManager, replicationClientAdditionalFilters);
}
@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry,
PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes,
this.applicationInfoManager);
}
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig,
registry, serverContext);
}
服务初始化-EurekaServerInitializerConfiguration
EurekaServerInitializerConfiguration
实现了SmartLifecycle
接口,在容器启动时,会调用该配置类的start()
方法。该start
方法主要做了三件事:.
- 调用注册中心启动器的初始化方法,初始化eureka
- 发布注册中心可用事件
- 发布注册中心启动完成事件
@Override
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
//初始化容器
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
//发布eureka注册可用事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
//发布eureka启动事件
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
服务启动-EurekaServerBootstrap
注册中心启动器EurekaBootStrap
实现了ServletContextListener
接口,可以用来监听Web应用的启动和关闭事件。当Web应用启动和关闭时,容器会调用ServletContextListener
的相应方法.
ServletContextListener是一个接口,可以用来监听Web应用的启动和关闭事件。当Web应用启动和关闭时,容器会调用ServletContextListener的相应方法,开发者可以在这些方法中编写自己的逻辑。
通过实现ServletContextListener,我们可以做以下操作:
- 初始化操作:在Web应用启动时,可以在contextInitialized方法中执行一些初始化操作,例如读取配置文件、建立数据库连接等。这样可以确保在应用程序启动时必要的资源和环境已经准备好。
- 资源释放:在Web应用关闭时,可以在contextDestroyed方法中释放资源,例如关闭数据库连接、释放文件句柄等。这样可以确保在应用程序关闭时资源得到正确的释放,避免资源泄漏和内存泄漏。
- 启动后台任务:可以在contextInitialized方法中启动一些后台任务,例如定时任务、消息队列消费者等。这样可以在应用程序启动后自动执行一些任务,提高应用的自动化程度和效率。
- 注册和注销Servlet、Filter和Listener:可以在contextInitialized方法中注册Servlet、Filter和Listener,确保它们在应用启动时自动被加载和初始化。而在contextDestroyed方法中可以注销这些组件,确保在应用关闭时能够正确地销毁它们。
- 日志记录:可以在contextInitialized和contextDestroyed方法中记录日志,用于跟踪应用的启动和关闭过程,以及打印相关的日志信息,方便排查问题和监控应用的运行状态。
总之,通过实现ServletContextListener,我们可以在Web应用的启动和关闭过程中执行一些自定义的操作,以满足应用的需求,并确保应用的正常运行和资源的正确释放。
注册中心的初始化,首先要初始化环境变量,然后再初始化容器:
- 首先获取eureka客户端的应用信息管理器
- 如果
eurekaClient
为空则创建:- 首先创建服务实例信息,如果为云服务则创建云实例信息,否则创建默认实例信息
- 云实例信息的创建:通过用户提供的值以及查询 AWS 实例元数据的组合向 eureka 注册所需的信息。实用程序类
AmazonInfo
有助于检索 AWS 特定值。其中一些信息(包括 可用性区域 )用于确定要与哪个 eureka 服务器通信。 - 默认实例信息创建:通过用户提供的值向 eureka 注册所需的信息。
- 创建应用信息管理器
- 创建
EurekaClient
配置并完成eureka客户端创建
- 如果
- 然后创建应用注册表,根据环境不同(亚马逊云服务还是其他)创建应用注册表
- 获取eureka节点列表,根据节点列表创建
eurekaServer
上下文 EurekaServer
上下文初始化,包括初始化注册表,并监听Eureka节点变化- 从邻近的Eureka节点复制注册表
- 每30秒更新应用续订信息
@Override
public void contextInitialized(ServletContextEvent event) {
try {
initEurekaEnvironment();
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
/**
* 用户可以重写该方法来自行初始化环境。
*/
protected void initEurekaEnvironment() throws Exception {
logger.info("Setting the eureka configuration..");
String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER);
if (dataCenter == null) {
logger.info("Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
} else {
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
String environment = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
logger.info("Eureka environment value eureka.environment is not set, defaulting to test");
}
}
/**
* 服务器上下文的初始化钩子。根据需要自定义逻辑时可以进行重写。
*/
protected void initEurekaServerContext() throws Exception {
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
// 为了向后兼容性
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
logger.info("Initializing the eureka client...");
logger.info(eurekaServerConfig.getJsonCodecName());
ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
//获取应用信息管理器
ApplicationInfoManager applicationInfoManager = null;
//eurekaClient为空则创建
if (eurekaClient == null) {
//创建服务实例信息,如果为云服务则创建云实例信息,否则创建默认实例信息
//云实例信息的创建:通过用户提供的值以及查询 AWS 实例元数据的组合向 eureka 注册所需的信息。实用程序类 AmazonInfo 有助于检索 AWS 特定值。其中一些信息(包括 可用性区域 )用于确定要与哪个 eureka 服务器通信。
//默认实例信息创建:通过用户提供的值向 eureka 注册所需的信息。
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
//创建应用信息管理器
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
//创建EurekaClient配置
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig, getTransportClientFactories(),
getDiscoveryClientOptionalArgs());
} else {
//eurekaClient不为空则直接返回
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
EurekaServerHttpClientFactory eurekaServerHttpClientFactory = getEurekaServerHttpClientFactory();
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
//亚马逊云服务注册表创建
registry = new AwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient, eurekaServerHttpClientFactory
);
awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
} else {
//其他环境注册表创建
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient, eurekaServerHttpClientFactory
);
}
//获取eureka节点列表
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
//创建eurekaServer上下文
serverContext = new DefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
EurekaServerContextHolder.initialize(serverContext);
//初始化EurekaServer
serverContext.initialize();
logger.info("Initialized server context");
// 从邻近的Eureka节点复制注册表。
int registryCount = registry.syncUp();
// 续订每 30 秒发生一次,一分钟应该是 2 次。
registry.openForTraffic(applicationInfoManager, registryCount);
// 注册所有监控统计数据。
EurekaMonitors.registerAllStats();
}
上下文初始化-DefaultEurekaServerContext
上下文初始化会先监听Eureka节点变化,然后初始化服务注册表
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
//Eureka节点的关闭和增加
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
监听Eureka总节点变化peerEurekaNodes.start()->PeerEurekaNodes
PeerEurekaNodes
会创建一个定时调度的线程池,用来完成eureka节点信息的刷新。每一次刷新都会根据新的实例url集合,更新已有的实例节点
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
/**
* 在给定新的副本URL集合的情况下,销毁不再可用的{@link PeerEurekaNode},并创建新的节点。
* 类中已经声明类变量:private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
* @param newPeerUrls 同伴节点URL;这个集合应该过滤掉本地节点的URL。
*/
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
//获取要关闭的节点信息
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
//获取要添加的新节点信息
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
//获取已注册的节点列表
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
//要关闭的节点列表不为空
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
//遍历已注册的节点列表,如果在关闭列表中则进行关闭和移除
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 添加新节点,创建新节点到已注册的节点列表中
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
单个Eureka节点的信息变化createPeerEurekaNode->PeerEurekaNode
PeerEurekaNode
的作用在我们走读客户端代码的时候其实已经透露过一部分信息,每个Eureka节点本地都会缓存一份已经注册的Eureka注册表信息。而PeerEurekaNode
则负责同步注册中心的注册表信息到这些实例节点上。
其构造函数创建了一个复制任务处理器ReplicationTaskProcessor
,这个同步任务会根据实例新的的变更内容不同做不同的同步处理:
- 如果为单个实例同步任务,例如eureka节点的ASG状态发生变更,则调用eureka server的实例状态信息同步接口;
- 批量实例信息同步处理任务,例如eureka节点的增加取消和心跳,则调用eureka server的实例批量同步接口。
服务注册表-PeerAwareInstanceRegistryImpl
注册表初始化
注册表的由配置类创建,由服务上下文调用其init
方法进行初始化:
- 初始化响应缓存,保存请求响应结果
- 计划定期更新 续订阈值 的任务。续订阈值将用于确定续订是否因网络分区而急剧下降,并保护一次过期的实例过多。
- 初始化需要在另一个区域中运行的 eureka 服务上执行的所有注册表操作的处理器,负责将另一个区域的eureka实例信息的变化同步到本地注册表中
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
//初始化响应缓存,保存请求响应结果
initializedResponseCache();
//计划定期更新 续订阈值 的任务。续订阈值将用于确定续订是否因网络分区而急剧下降,并保护一次过期的实例过多。
scheduleRenewalThresholdUpdateTask();
//初始化需要在另一个区域中运行的 eureka 服务上执行的所有注册表操作的处理器
initRemoteRegionRegistry();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
protected void initRemoteRegionRegistry() throws MalformedURLException {
Map<String, String> remoteRegionUrlsWithName = serverConfig.getRemoteRegionUrlsWithName();
if (!remoteRegionUrlsWithName.isEmpty()) {
allKnownRemoteRegions = new String[remoteRegionUrlsWithName.size()];
int remoteRegionArrayIndex = 0;
for (Map.Entry<String, String> remoteRegionUrlWithName : remoteRegionUrlsWithName.entrySet()) {
RemoteRegionRegistry remoteRegionRegistry = new RemoteRegionRegistry(
serverConfig,
clientConfig,
serverCodecs,
eurekaServerHttpClientFactory,
remoteRegionUrlWithName.getKey(),
new URL(remoteRegionUrlWithName.getValue()));
regionNameVSRemoteRegistry.put(remoteRegionUrlWithName.getKey(), remoteRegionRegistry);
allKnownRemoteRegions[remoteRegionArrayIndex++] = remoteRegionUrlWithName.getKey();
}
}
logger.info("Finished initializing remote region registries. All known remote regions: {}",
(Object) allKnownRemoteRegions);
}
RemoteRegionRegistry
和PeerAwareInstanceRegistry
的区别
远程区域的注册表信息:是需要通过网络发送到远程区域注册中心的注册表信息,其为下图的黄色框内容,他的实例信息包装在实例信息集合中,最后层层包装到应用集合中
本地的注册表信息:即本注册中心维护的注册表信息,其为下图的绿色框内容,他的结构是由应用名,实例id,实例租约信息组成的键值对的map集合。方便我们对其进行增删改查修改。
远程区域变更接收-RemoteRegionRegistry
RemoteRegionRegistry
的构造函数中创建了一个定时的拉取远程实例信息的任务,用来从远程区域获取注册表信息。
- 如果禁用了增量同步远程信息,或者是第一次同步远程信息,则获取远程区域中的所有应用信息进行保存
- 如果非上述场景,则为增量同步。则获取远程增量变化的实例信息进行保存,若保存完毕后hash相同则成功;否则重新拉取全量的远程实例信息进行保存
实例可用标记
openForTraffic
方法的作用就是将当前实例标记为可接收流量的状态。这样其他服务实例就可以通过Eureka服务器获取到当前实例的信息,并将流量发送到当前实例上。
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
Eureka节点维护
续约信息管理
实例续约信息管理接口LeaseManager
为注册中心提供了实例的注册、续约、注销、驱逐过期实例的操作
实例续约信息Lease
,其包含了实例信息以及实例续约的处理,即实例上线、续约、注销操作
与实例信息
InstanceInfo
中的LeaseInfo
不同的是,注册表的续约信息包含了实例信息以及其在本注册表中的最新的续约操作。
接收应用信息变更-ApplicationResource
注册中心提供了接收Eureka节点应用信息变更的接口,用来监听新增的应用信息,并通过调用注册表的注册方法完成实例注册。同时也提供了ApplicationsResource
,用于查询已注册的应用信息。
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// 验证 InstanceInfo 是否包含所有必需的必填字段
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// 处理客户端可能注册到缺少数据的错误 DataCenterInfo 的情况
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
//调用注册方法进行注册
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
接收实例信息变更-InstanceResource
接收实例信息的变更和状态更新,重置状态更新记录等操作,并通过调用注册表的相应方法完成实例信息的变更
//续约
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
//实例状态更新
@PUT
@Path("status")
public Response statusUpdate(
@QueryParam("value") String newStatus,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
logger.warn("Instance not found: {}/{}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
//调用注册表状态更新方法进行更新
boolean isSuccess = registry.statusUpdate(app.getName(), id,
InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
"true".equals(isReplication));
if (isSuccess) {
logger.info("Status updated: {} - {} - {}", app.getName(), id, newStatus);
return Response.ok().build();
} else {
logger.warn("Unable to update status: {} - {} - {}", app.getName(), id, newStatus);
return Response.serverError().build();
}
} catch (Throwable e) {
logger.error("Error updating instance {} for status {}", id,
newStatus);
return Response.serverError().build();
}
}
/**
* 删除实例的状态变更记录
*/
@DELETE
@Path("status")
public Response deleteStatusUpdate(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("value") String newStatusValue,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
logger.warn("Instance not found: {}/{}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
InstanceStatus newStatus = newStatusValue == null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue);
//调用注册表状态撤销方法进行更新
boolean isSuccess = registry.deleteStatusOverride(app.getName(), id,
newStatus, lastDirtyTimestamp, "true".equals(isReplication));
if (isSuccess) {
logger.info("Status override removed: {} - {}", app.getName(), id);
return Response.ok().build();
} else {
logger.warn("Unable to remove status override: {} - {}", app.getName(), id);
return Response.serverError().build();
}
} catch (Throwable e) {
logger.error("Error removing instance's {} status override", id);
return Response.serverError().build();
}
}
接收多实例信息变更-PeerReplicationResource
书接上文,当服务上下文创建监听,当注册表发生变化,需要同步每个PeerEurekaNode
变化信息时,如果多个实例信息变更时,我们就需要接收批量的实例信息变更。而PeerReplicationResource
则负责接收多实例信息的变更,其请求为ReplicationList
,本质为一个包含实例租约信息的列表:
private final List<ReplicationInstance> replicationList;
ReplicationInstance
则含有如下信息,与Lease<InstanceInfo>
相同:
private String appName;
private String id;
private Long lastDirtyTimestamp;
private String overriddenStatus;
private String status;
private InstanceInfo instanceInfo;
private Action action;
该实例则调用eureka server
中的相应实例信息变更接口进行更新:
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
//遍历实例列表,根据实例动作执行不同操作
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
logger.error("{} request processing failed for batch item {}/{}",
instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
}
}
return Response.ok(batchResponse).build();
} catch (Throwable e) {
logger.error("Cannot execute batch Request", e);
return Response.status(Status.INTERNAL_SERVER_ERROR).build();
}
}
//根据实例动作执行不同操作
private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
ApplicationResource applicationResource = createApplicationResource(instanceInfo);
InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
String instanceStatus = toString(instanceInfo.getStatus());
Builder singleResponseBuilder = new Builder();
switch (instanceInfo.getAction()) {
case Register:
//注册
singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
break;
case Heartbeat:
//心跳
singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
break;
case Cancel:
//取消
singleResponseBuilder = handleCancel(resource);
break;
case StatusUpdate:
//状态更新
singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
break;
case DeleteStatusOverride:
singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
break;
}
return singleResponseBuilder.build();
}
//创建一个应用处理类
/* Visible for testing */ ApplicationResource createApplicationResource(ReplicationInstance instanceInfo) {
return new ApplicationResource(instanceInfo.getAppName(), serverConfig, registry);
}
//创建一个实例处理类
/* Visible for testing */ InstanceResource createInstanceResource(ReplicationInstance instanceInfo,
ApplicationResource applicationResource) {
return new InstanceResource(applicationResource, instanceInfo.getId(), serverConfig, registry);
}
//注册时,调用applicationResource.addInstance,先创建应用,在创建实例
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
//撤销时,直接更新实例状态
private static Builder handleCancel(InstanceResource resource) {
Response response = resource.cancelLease(REPLICATION);
return new Builder().setStatusCode(response.getStatus());
}
//发送心跳,更新实例的续约信息
private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) {
Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp);
int responseStatus = response.getStatus();
Builder responseBuilder = new Builder().setStatusCode(responseStatus);
if ("false".equals(config.getExperimental("bugfix.934"))) {
if (responseStatus == Status.OK.getStatusCode() && response.getEntity() != null) {
responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
}
} else {
if ((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode())
&& response.getEntity() != null) {
responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
}
}
return responseBuilder;
}
//更新实例状态
private static Builder handleStatusUpdate(ReplicationInstance instanceInfo, InstanceResource resource) {
Response response = resource.statusUpdate(instanceInfo.getStatus(), REPLICATION, toString(instanceInfo.getLastDirtyTimestamp()));
return new Builder().setStatusCode(response.getStatus());
}
//删除状态覆写
private static Builder handleDeleteStatusOverride(ReplicationInstance instanceInfo, InstanceResource resource) {
Response response = resource.deleteStatusUpdate(REPLICATION, instanceInfo.getStatus(),
instanceInfo.getLastDirtyTimestamp().toString());
return new Builder().setStatusCode(response.getStatus());
}
具体注册表维护-AbstractInstanceRegistry
具体的注册表操作,则是对于我们由应用名,实例id,实例租约信息组成的键值对的map集合的元素增加、删除和更新。
/**
* 注册具有给定持续时间的新实例。
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
read.lock();
try {
//获取应用的续约信息,如果没有则创建一个
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
//保留最后一个脏时间戳而不覆盖它(如果已经存在租约)
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// 这是一个>而不是 >=,因为如果时间戳相等,我们仍然采用远程传输的 InstanceInfo 而不是服务器本地副本。
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// 租约不存在,因此是新注册
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// 由于客户端想要注册它,因此增加发送续订的客户端数量ending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
//这是覆盖状态的初始状态传输发生的地方
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
//根据覆盖的状态规则设置状态
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
//如果租约注册为 UP 状态,请设置租约服务 UP 时间戳
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
/**
* 取消实例的注册。
* 这通常由客户端在关闭时调用,通知服务器从流量中删除实例。
*/
@Override
public boolean cancel(String appName, String id, boolean isReplication) {
return internalCancel(appName, id, isReplication);
}
/**
* cancel(String, String, boolean) 方法被 PeerAwareInstanceRegistry 覆盖,因此每个取消请求都会复制到对等节点。
* 但是,对于在远程对等体中计为有效取消的过期,这是不希望的,因此不会启动自我保护模式。
*/
protected boolean internalCancel(String appName, String id, boolean isReplication) {
read.lock();
try {
//获取应用的所有实例续约信息,删除对应节点的续约信息
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
//覆写状态
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
//续约信息为空,则取消失败
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
//取消续约信息
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
//实例信息不为空,则更新实例状态
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
}
} finally {
read.unlock();
}
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clients to send renews.
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
return true;
}
/**
* 将给定应用名称的给定实例标记为已续订,并标记它是否源自复制。
*
* @see com.netflix.eureka.lease.LeaseManager#renew(java.lang.String, java.lang.String, boolean)
*/
public boolean renew(String appName, String id, boolean isReplication) {
//获取实例的续约信息
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
//实例续约信息不能为空
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
//更新实例信息
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
overriddenInstanceStatus.name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
//更新续约信息
leaseToRenew.renew();
return true;
}
}
评论区