首先基于springboot自带的自动装配机制。如果我们想知道某个组件是如何启用的,可以想一下,为什么这个组件只要引入依赖就可以被使用了,自动装配其实也是基于配置化的一个机制,所以我们只需要找到对应的配置文件即可。
那么基于springboot的自动装配机制,我们来看看eureka的客户端组件是如何被启用的。
首先自动装配的机制上基于spring.factories文件去指定配置类。
那么下面我们去看看eureka-client组件的spring.factories所指定的配置类。
那么接下来看看spring.factories这个文件中的内容。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
这里面其实包含了若干个配置类。
名字可能差不多,但是通过名字,大概可以知道这些类具体是干啥的。
由于这里我们是关注client的一个注册机制。
所以我们只需要关心EurekaClientAutoConfiguration即可。
那么下面先来看看EurekaClientAutoConfiguration的源码。
整个类的内容其实很多,但是由于这里关心的是client的一个注册机制。所以只需要关心euekaclient的工作机制即可,那么通过下面源码是可以看到,EurekaClient其实是通过@Bean的方式进行注入的。
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
@Configuration
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
}
}
大概知道EurekaClient这个类的注入方式之后,就可以通过构造方法来了解这个客户端内部的一些实现。
但是在此之前,先看看这个EurekaClient接口具体是做什么的。
EurekaClient接口
@ImplementedBy(DiscoveryClient.class)
public interface EurekaClient extends LookupService {
public Applications getApplicationsForARegion(@Nullable String region);
public Applications getApplications(String serviceUrl);
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure);
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure, @Nullable String region);
public List<InstanceInfo> getInstancesByVipAddressAndAppName(String vipAddress, String appName, boolean secure);
public Set<String> getAllKnownRegions();
public InstanceInfo.InstanceStatus getInstanceRemoteStatus();
@Deprecated
public List<String> getDiscoveryServiceUrls(String zone);
@Deprecated
public List<String> getServiceUrlsFromConfig(String instanceZone, boolean preferSameZone);
@Deprecated
public List<String> getServiceUrlsFromDNS(String instanceZone, boolean preferSameZone);
@Deprecated
public void registerHealthCheckCallback(HealthCheckCallback callback);
public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
public void registerEventListener(EurekaEventListener eventListener);
public boolean unregisterEventListener(EurekaEventListener eventListener);
public HealthCheckHandler getHealthCheckHandler();
public void shutdown();
public ApplicationInfoManager getApplicationInfoManager();
}
EurekaClient的注册机制
EurekaClient是一个接口,那么在其实例化的时候,必然是通过某个子类去完成的。
这里的话要说一下DiscoveryClient。结构大概如下
@Singleton
public class DiscoveryClient implements EurekaClient {
...
}
那么在进行实例化的时候,由于是通过构造方法进行实例化,随后注入到容器,所以只需要关心DiscoveryClient的构造方法即可。
在DiscoveryClient通过构造方法去实例化对象的过程中,有这么一个方法,是注册机制的一个核心,initScheduledTasks方法。
@Singleton
public class DiscoveryClient implements EurekaClient {
private void initScheduledTasks() {
//如果开启了fetchRegistry,其实就是定时从注册中心拉取最新的注册表
if (clientConfig.shouldFetchRegistry()) {
//默认30秒
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
//默认10秒
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//通过定时器取执行拉取命令,第一次延迟30秒执行。
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//注册到eureka中,首先配置得开启
if (clientConfig.shouldRegisterWithEureka()) {
//默认30秒
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
//默认10秒
int expBackOffBound =
clientConfig.getHeartbeatExecutorExponentialBackOffBound();
//通过定时器取执行注册命令,第一次延迟30秒执行。
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2);
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
}
从initScheduledTasks方法中可以看出,是通过定时器去执行一次性任务去完成拉取注册表以及注册的任务。但是client本身的拉取或者注册行为,本身肯定是一个定时执行的行为,但是从上面看是一次性任务。延时30秒执行。
所有我们有必要继续往里面走。
定时器执行的任务是TimedSupervisorTask类型,所以下面先来看看这个类做了啥。
TimedSupervisorTask
public class TimedSupervisorTask extends TimerTask {
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
} catch (TimeoutException e) {
//如果在等待的过程中出错,那么定时任务下次执行的间隔,会一直不断的扩大,但是不会超过maxDelay。
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
//定时执行的关键
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
从上面代码看比较简单,说白了通过获取回调的方法进行堵塞,当发现执行完,继续执行定时任务,所以可以一直执行定时,而不是一次性执行。
最后总结一下,其实euerka-client的注册以及拉取机制就是通过在euekaclient进行实例化的时候,通过执行定时任务去进行注册以及拉取。
那么最后来看看心跳的具体内容。源码应该是一目了然了,就是将实例信息发送到eureka-server中去。
@Singleton
public class DiscoveryClient implements EurekaClient {
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
}
最后再来看看如何通过配置来启用注册表的拉取,以及客户端的注册
在initScheduledTasks方法中,定时器的初始化有俩个很重要的条件。
clientConfig.shouldFetchRegistry()
clientConfig.shouldRegisterWithEureka()
需要这2个方法为true,客户端才可以注册到注册中心,以及拉取到注册表
所以下面直接看看clientConfig对应的实体类
@ConfigurationProperties(EurekaClientConfigBean.PREFIX)
public class EurekaClientConfigBean implements EurekaClientConfig {
private boolean fetchRegistry = true;
private boolean registerWithEureka = true;
@Override
public boolean shouldFetchRegistry() {
return this.fetchRegistry;
}
@Override
public boolean shouldRegisterWithEureka() {
return this.registerWithEureka;
}
}
其实从上面来看,默认都是开启的,如以下配置是多余的
eureka:
client:
register-with-eureka: true
fetch-registry: true
如果想对eureka中对于刷新注册表的时间机制,或者说发送心跳的最大周期做调整(当发送心跳失败过后,会一直扩大周期,默认最大周期为原来周期的10倍),可以修改一下。
如下
@ConfigurationProperties(EurekaClientConfigBean.PREFIX)
public class EurekaClientConfigBean implements EurekaClientConfig {
private int registryFetchIntervalSeconds = 30;
private int heartbeatExecutorExponentialBackOffBound = 10;
@Override
public int getRegistryFetchIntervalSeconds() {
return registryFetchIntervalSeconds;
}
@Override
public int getHeartbeatExecutorExponentialBackOffBound() {
return heartbeatExecutorExponentialBackOffBound;
}
}
将拉取注册表的时间缩小,将发送心跳的最大周期缩小
eureka:
client:
registry-fetch-interval-seconds: 10
heartbeat-executor-exponential-back-off-bound: 1
综上
通过源码可以知道,client对于注册以及注册表的拉取都是通过定时器的机制。
然后每个定时器在操作失败的之后,下次操作的时间间隔会变大(默认不超过原先周期的10倍)
拉取注册表:默认30秒拉取一次,最大周期为30*10=300秒
eureka:
client:
registry-fetch-interval-seconds: 10 ##拉取间隔默认10秒
cache-refresh-executor-exponential-back-off-bound: 10 ##拉取间隔最大周期默认为registry-fetch-interval-seconds的10倍
注册到注册中心:默认也是30秒注册一次,最大周期为30*10=300秒
eureka:
client:
heartbeat-executor-exponential-back-off-bound: 10 ##默认为心跳周期的十倍
instance:
lease-renewal-interval-in-seconds: 10 ##续约的周期