Spring Cloud Eureka-Client 源码分析
上文研究了Eureka Server ,现在看下Eureka Client 是怎么一个逻辑,让我们来一步一步解开其神秘面纱!
Eureka Client启动服务
Eureka客户端在启动时也会装载很多配置类,我们通过spring-cloud-netflflix-eureka-client-2.1.0.RELEASE.jar下的spring.factories⽂件可以看到加载的配置类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
引⼊jar就会被⾃动装配,分析EurekaClientAutoConfifiguration类头
@Configuration
@EnableConfigurationProperties
@Import({DiscoveryClientOptionalArgsConfiguration.class})
@ConditionalOnClass({EurekaClientConfig.class})
@ConditionalOnBean({Marker.class}) //存在是为了可以不加注解@EnableEurekaClient,自动实现客户端
@ConditionalOnProperty(
value = {"eureka.client.enabled"},
matchIfMissing = true
)
//这些类装配前实例化
@AutoConfigureBefore({NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class})
//这些类装配后实例化
@AutoConfigureAfter(
name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"}
)
public class EurekaClientAutoConfiguration {
}
主要注解分为两类:
1、自动实例化的条件:关闭客户端可以配置:eureka.client.enabled: false
@ConditionalOnClass({EurekaClientConfig.class})
@ConditionalOnBean({Marker.class})
@ConditionalOnProperty(
value = {"eureka.client.enabled"},
matchIfMissing = true
)
2、自动实例化的时机
//这些类装配前实例化
@AutoConfigureBefore({NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class})
//这些类装配后实例化
@AutoConfigureAfter(
name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"}
)
抛砖引玉:EurekaClient启动过程要做什么事情?
1)读取配置⽂件
2)启动时从EurekaServer获取服务实例信息
3)注册⾃⼰到EurekaServer(addInstance)
4)开启⼀些定时任务(⼼跳续约,刷新本地服务缓存列表)
1)读取配置⽂件
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration#eurekaInstanceConfigBean
public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, ManagementMetadataProvider managementMetadataProvider) {
String hostname = this.getProperty("eureka.instance.hostname");
boolean preferIpAddress = Boolean.parseBoolean(this.getProperty("eureka.instance.prefer-ip-address"));
String ipAddress = this.getProperty("eureka.instance.ip-address");
boolean isSecurePortEnabled = Boolean.parseBoolean(this.getProperty("eureka.instance.secure-port-enabled"));
String serverContextPath = this.env.getProperty("server.context-path", "/");
int serverPort = Integer.valueOf(this.env.getProperty("server.port", this.env.getProperty("port", "8080")));
Integer managementPort = (Integer)this.env.getProperty("management.server.port", Integer.class);
String managementContextPath = this.env.getProperty("management.server.servlet.context-path");
Integer jmxPort = (Integer)this.env.getProperty("com.sun.management.jmxremote.port", Integer.class);
EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
instance.setNonSecurePort(serverPort);
instance.setInstanceId(IdUtils.getDefaultInstanceId(this.env));
instance.setPreferIpAddress(preferIpAddress);
instance.setSecurePortEnabled(isSecurePortEnabled);
if (StringUtils.hasText(ipAddress)) {
instance.setIpAddress(ipAddress);
}
if (isSecurePortEnabled) {
instance.setSecurePort(serverPort);
}
if (StringUtils.hasText(hostname)) {
instance.setHostname(hostname);
}
String statusPageUrlPath = this.getProperty("eureka.instance.status-page-url-path");
String healthCheckUrlPath = this.getProperty("eureka.instance.health-check-url-path");
if (StringUtils.hasText(statusPageUrlPath)) {
instance.setStatusPageUrlPath(statusPageUrlPath);
}
if (StringUtils.hasText(healthCheckUrlPath)) {
instance.setHealthCheckUrlPath(healthCheckUrlPath);
}
ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath, managementContextPath, managementPort);
if (metadata != null) {
instance.setStatusPageUrl(metadata.getStatusPageUrl());
instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
if (instance.isSecurePortEnabled()) {
instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
}
Map<String, String> metadataMap = instance.getMetadataMap();
if (metadataMap.get("management.port") == null) {
metadataMap.put("management.port", String.valueOf(metadata.getManagementPort()));
}
} else if (StringUtils.hasText(managementContextPath)) {
instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath());
instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath());
}
this.setupJmxPort(instance, jmxPort);
return instance;
}
2)启动时从EurekaServer获取服务实例信息
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration
@Bean(
destroyMethod = "shutdown"
)
@ConditionalOnMissingBean(
value = {EurekaClient.class},
search = SearchStrategy.CURRENT
)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
}
org.springframework.cloud.netflix.eureka.CloudEurekaClient#CloudEurekaClient(....)
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
super(applicationInfoManager, config, args);
this.cacheRefreshedCount = new AtomicLong(0L);
this.eurekaHttpClient = new AtomicReference();
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
观察⽗类DiscoveryClient(); com.netflix.discovery.DiscoveryClient#DiscoveryClient(.....)
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
//com.netflix.discovery.DiscoveryClient#DiscoveryClient(....)
this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
private volatile BackupRegistry backupRegistryInstance;
public synchronized BackupRegistry get() {
if (this.backupRegistryInstance == null) {
String backupRegistryClassName = config.getBackupRegistryImpl();
if (null != backupRegistryClassName) {
try {
this.backupRegistryInstance = (BackupRegistry)Class.forName(backupRegistryClassName).newInstance();
DiscoveryClient.logger.info("Enabled backup registry of type {}", this.backupRegistryInstance.getClass());
} catch (InstantiationException var3) {
DiscoveryClient.logger.error("Error instantiating BackupRegistry.", var3);
} catch (IllegalAccessException var4) {
DiscoveryClient.logger.error("Error instantiating BackupRegistry.", var4);
} catch (ClassNotFoundException var5) {
DiscoveryClient.logger.error("Error instantiating BackupRegistry.", var5);
}
}
if (this.backupRegistryInstance == null) {
DiscoveryClient.logger.warn("Using default backup registry implementation which does not do anything.");
this.backupRegistryInstance = new NotImplementedRegistryImpl();
}
}
return this.backupRegistryInstance;
}
});
}
com.netflix.discovery.DiscoveryClient#DiscoveryClient(....)
//从注册中心获取信息列表
if (this.clientConfig.shouldFetchRegistry() && !this.fetchRegistry(false)) {
this.fetchRegistryFromBackup();
}
com.netflix.discovery.DiscoveryClient#fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = this.FETCH_REGISTRY_TIMER.start();
label122: {
boolean var4;
try {
//如果是第一次,则获取多有注册信息
Applications applications = this.getApplications();
if (!this.clientConfig.shouldDisableDelta() && Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress()) && !forceFullRegistryFetch && applications != null && applications.getRegisteredApplications().size() != 0 && applications.getVersion() != -1L) {
//增量更新
this.getAndUpdateDelta(applications);
} else {
logger.info("Disable delta property : {}", this.clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", this.clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", applications == null);
logger.info("Registered Applications size is zero : {}", applications.getRegisteredApplications().size() == 0);
logger.info("Application version is -1: {}", applications.getVersion() == -1L);
//全量更新
this.getAndStoreFullRegistry();
}
applications.setAppsHashCode(applications.getReconcileHashCode());
this.logTotalInstances();
break label122;
} catch (Throwable var8) {
logger.error("DiscoveryClient_{} - was unable to refresh its cache! status = {}", new Object[]{this.appPathIdentifier, var8.getMessage(), var8});
var4 = false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
return var4;
}
this.onCacheRefreshed();
this.updateInstanceRemoteStatus();
return true;
}
3)注册⾃⼰到EurekaServer(addInstance)
com.netflix.discovery.DiscoveryClient#DiscoveryClient(....)
if (this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!this.register()) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable var8) {
logger.error("Registration error at startup: {}", var8.getMessage());
throw new IllegalStateException(var8);
}
}
com.netflix.discovery.DiscoveryClient#register
boolean register() throws Throwable {
logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);
EurekaHttpResponse httpResponse;
try {
//向serviceUrl配置的EurekaServer端发起rest请求,注册自己
httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
} catch (Exception var3) {
logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});
throw var3;
}
if (logger.isInfoEnabled()) {
logger.info("DiscoveryClient_{} - registration status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
底层使⽤Jersey客户端进⾏远程请求。
4)开启⼀些定时任务(⼼跳续约,刷新本地服务缓存列表)
com.netflix.discovery.DiscoveryClient#DiscoveryClient(.....)
//初始化定时任务: 心跳续约,本地缓存
this.initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable var7) {
logger.warn("Cannot register timers", var7);
}
com.netflix.discovery.DiscoveryClient#initScheduledTasks
private void initScheduledTasks() {
int renewalIntervalInSecs;
int expBackOffBound;
if (this.clientConfig.shouldFetchRegistry()) {
renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//本地缓存的服务信息刷新
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}
if (this.clientConfig.shouldRegisterWithEureka()) {
renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs);
//心跳预约
this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
this.statusChangeListener = new StatusChangeListener() {
public String getId() {
return "statusChangeListener";
}
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {
DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
} else {
DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
}
DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
}
};
if (this.clientConfig.shouldOnDemandUpdateStatusChange()) {
this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
}
this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
DiscoveryClient.HeartbeatThread()
com.netflix.discovery.DiscoveryClient.HeartbeatThread#run
private class HeartbeatThread implements Runnable {
private HeartbeatThread() {
}
public void run() {
if (DiscoveryClient.this.renew()) {
DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
com.netflix.discovery.DiscoveryClient#renew
boolean renew() {
try {
//向预约接口发起请求
EurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null);
logger.debug("DiscoveryClient_{} - Heartbeat status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
//如果响应错误码是找不到,则重新注册自己
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
this.REREGISTER_COUNTER.increment();
logger.info("DiscoveryClient_{} - Re-registering apps/{}", this.appPathIdentifier, this.instanceInfo.getAppName());
long timestamp = this.instanceInfo.setIsDirtyWithTime();
boolean success = this.register();//重新注册
if (success) {
this.instanceInfo.unsetIsDirty(timestamp);
}
return success;
} else {
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
}
} catch (Throwable var5) {
logger.error("DiscoveryClient_{} - was unable to send heartbeat!", this.appPathIdentifier, var5);
return false;
}
}
DiscoveryClient.CacheRefreshThread()
com.netflix.discovery.DiscoveryClient.CacheRefreshThread#run
class CacheRefreshThread implements Runnable {
CacheRefreshThread() {
}
public void run() {
DiscoveryClient.this.refreshRegistry();
}
}
com.netflix.discovery.DiscoveryClient#refreshRegistry
@VisibleForTesting
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = this.isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
String latestRemoteRegions = this.clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = (String)this.remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
synchronized(this.instanceRegionChecker.getAzToRegionMapper()) {
if (this.remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
this.remoteRegionsRef.set(remoteRegions);
this.instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently, ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
this.instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
boolean success = this.fetchRegistry(remoteRegionsModified);
if (success) {
this.registrySize = ((Applications)this.localRegionApps.get()).size();
this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
if (logger.isDebugEnabled()) {
StringBuilder allAppsHashCodes = new StringBuilder();
allAppsHashCodes.append("Local region apps hashcode: ");
allAppsHashCodes.append(((Applications)this.localRegionApps.get()).getAppsHashCode());
allAppsHashCodes.append(", is fetching remote regions? ");
allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
Iterator var11 = this.remoteRegionVsApps.entrySet().iterator();
while(var11.hasNext()) {
Entry<String, Applications> entry = (Entry)var11.next();
allAppsHashCodes.append(", Remote region: ");
allAppsHashCodes.append((String)entry.getKey());
allAppsHashCodes.append(" , apps hashcode: ");
allAppsHashCodes.append(((Applications)entry.getValue()).getAppsHashCode());
}
logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes);
}
} catch (Throwable var9) {
logger.error("Cannot fetch registry from server", var9);
}
}
Eureka Client下架服务
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration#eurekaClient
@Bean( destroyMethod = "shutdown")//EurekaClient对象注销时,会触发shutdown操作
@ConditionalOnMissingBean(
value = {EurekaClient.class},
search = SearchStrategy.CURRENT
)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
}
我们看com.netflix.discovery.DiscoveryClient.EurekaTransport#shutdown
@PreDestroy
public synchronized void shutdown() {
if (this.isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (this.statusChangeListener != null && this.applicationInfoManager != null) {
this.applicationInfoManager.unregisterStatusChangeListener(this.statusChangeListener.getId());
}
//关闭定时任务
this.cancelScheduledTasks();
if (this.applicationInfoManager != null && this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldUnregisterOnShutdown()) {
//设置下线状态
this.applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
//解除注册信息:
this.unregister();
}
if (this.eurekaTransport != null) {
this.eurekaTransport.shutdown(); // 相关资源下线
}
// 心跳状态监视器下线
this.heartbeatStalenessMonitor.shutdown();
// 注册监视器下线
this.registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
//向EurekaServer发送下线通知
void unregister() {
if (this.eurekaTransport != null && this.eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = this.eurekaTransport.registrationClient.cancel(this.instanceInfo.getAppName(), this.instanceInfo.getId());
logger.info("DiscoveryClient_{} - deregister status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception var2) {
logger.error("DiscoveryClient_{} - de-registration failed{}", new Object[]{this.appPathIdentifier, var2.getMessage(), var2});
}
}
}