前言
从今天开始,来分析下服务注册与发现组件eureka,eureka篇只作为学习使用,生产环境推荐使用nacos作为服务注册与发现组件,后面我也会抽时间分析下nacos相关源码。
基本介绍
Spring Cloud 封装了 Netflix 公司开发的 Eureka 模块来实现服务注册和发现(请对比Zookeeper)。
Eureka 采用了 C-S 的设计架构。Eureka Server 作为服务注册功能的服务器,它是服务注册中心。
而系统中的其他微服务,使用 Eureka 的客户端连接到 Eureka Server并维持心跳连接。这样系统的维护人员就可以通过 Eureka Server 来监控系统中各个微服务是否正常运行。SpringCloud 的一些其他模块(比如Zuul)就可以通过 Eureka Server 来发现系统中的其他微服务,并执行相关的逻辑。下面是eureka的架构图:
Eureka包含两个组件:Eureka Server和Eureka Client
Eureka Server提供服务注册服务
各个节点启动后,会向EurekaServer中进行注册,这样EurekaServer中的服务注册表中将会存储所有可用服务节点的信息,服务节点的信息可以在界面中直观的看到
EurekaClient是一个Java客户端,用于简化Eureka Server的交互,客户端同时也具备一个内置的、使用轮询(round-robin)负载算法的负载均衡器。在应用启动后,将会向Eureka Server发送心跳(默认周期为30秒)。如果Eureka Server在多个心跳周期内没有接收到某个节点的心跳,EurekaServer将会从服务注册表中把这个服务节点移除(默认90秒)
Eureka三大角色:
- Eureka Server 提供服务注册和发现
- Service Provider服务提供方将自身服务注册到Eureka,从而使服务消费方能够找到
- Service Consumer服务消费方从Eureka获取注册服务列表,从而能够消费服务
作为服务注册中心,Eureka 和 Zookeeper对比的优势:
著名的CAP理论指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。由于分区容错性P在是分布式系统中必须要保证的,因此我们只能在A和C之间进行权衡。
因此
Zookeeper保证的是CP,
Eureka则是AP。
Zookeeper保证CP
当向注册中心查询服务列表时,我们可以容忍注册中心返回的是几分钟以前的注册信息,但不能接受服务直接down掉不可用。也就是说,服务注册功能对可用性的要求要高于一致性。但是zk会出现这样一种情况,当master节点因为网络故障与其他节点失去联系时,剩余节点会重新进行leader选举。问题在于,选举leader的时间太长,30 ~ 120s, 且选举期间整个zk集群都是不可用的,这就导致在选举期间注册服务瘫痪。在云部署的环境下,因网络问题使得zk集群失去master节点是较大概率会发生的事,虽然服务能够最终恢复,但是漫长的选举时间导致的注册长期不可用是不能容忍的。
Eureka保证AP
Eureka看明白了这一点,因此在设计时就优先保证可用性。Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务。而Eureka的客户端在向某个Eureka注册或时如果发现连接失败,则会自动切换至其它节点,只要有一台Eureka还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。除此之外,Eureka还有一种自我保护机制,如果在15分钟内超过85%的节点都没有正常的心跳,那么Eureka就认为客户端与注册中心出现了网络故障,此时会出现以下几种情况:
- Eureka不再从注册列表中移除因为长时间没收到心跳而应该过期的服务
- Eureka仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上(即保证当前节点依然可用)
- 当网络稳定时,当前实例新的注册信息会被同步到其它节点中
因此, Eureka可以很好的应对因网络故障导致部分节点失去联系的情况,而不会像zookeeper那样使整个注册服务瘫痪。
zookeeper与eureka比较,推荐使用eureka,为什么不推荐使用zookeeper,请阅读这篇文章:为什么不应该使用ZooKeeper做服务发现
Eureka的一些概念
在Eureka的服务治理中,会涉及到下面一些概念:
服务注册:Eureka Client会通过发送REST请求的方式向Eureka Server注册自己的服务,提供自身的元数据,比如ip地址、端口、运行状况指标的url、主页地址等信息。Eureka Server接收到注册请求后,就会把这些元数据信息存储在一个双层的Map中。
服务续约:在服务注册后,Eureka Client会维护一个心跳来持续通知Eureka Server,说明服务一直处于可用状态,防止被剔除。Eureka Client在默认的情况下会每隔30秒发送一次心跳来进行服务续约。
服务同步:Eureka Server之间会互相进行注册,构建Eureka Server集群,不同Eureka Server之间会进行服务同步,用来保证服务信息的一致性。
获取服务:服务消费者(Eureka Client)在启动的时候,会发送一个REST请求给Eureka Server,获取上面注册的服务清单,并且缓存在Eureka Client本地,默认缓存30秒。同时,为了性能考虑,Eureka Server也会维护一份只读的服务清单缓存,该缓存每隔30秒更新一次。
服务调用:服务消费者在获取到服务清单后,就可以根据清单中的服务列表信息,查找到其他服务的地址,从而进行远程调用。Eureka有Region和Zone的概念,一个Region可以包含多个Zone,在进行服务调用时,优先访问处于同一个Zone中的服务提供者。
服务下线:当Eureka Client需要关闭或重启时,就不希望在这个时间段内再有请求进来,所以,就需要提前先发送REST请求给Eureka Server,告诉Eureka Server自己要下线了,Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去。
服务剔除:有时候,服务实例可能会因为网络故障等原因导致不能提供服务,而此时该实例也没有发送请求给Eureka Server来进行服务下线,所以,还需要有服务剔除的机制。Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。
自我保护:既然Eureka Server会定时剔除超时没有续约的服务,那就有可能出现一种场景,网络一段时间内发生了异常,所有的服务都没能够进行续约,Eureka Server就把所有的服务都剔除了,这样显然不太合理。所以,就有了自我保护机制,当短时间内,统计续约失败的比例,如果达到一定阈值,则会触发自我保护的机制,在该机制下,Eureka Server不会剔除任何的微服务,等到正常后,再退出自我保护机制。
从这些概念中,就可以知道大体的流程,Eureka Client向Eureka Server注册,并且维护心跳来进行续约,如果长时间不续约,就会被剔除。Eureka Server之间进行数据同步来形成集群,Eureka Client从Eureka Server获取服务列表,用来进行服务调用,Eureka Client服务重启前调用Eureka Server的接口进行下线操作。
基本用法
- 引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
- 配置application.yml
eureka:
instance:
hostname: localhost
# 将ip注册到eureka server
prefer-ip-address: true
client:
service-url:
# 服务端地址,多个逗号分隔
defaultZone: http://localhost:8671/eureka
# 是否向注册中心注册服务,默认是true
register-with-eureka: true
# 是否从注册中心获取服务,默认是true
fetch-registry: true
- 激活配置(非必须)
在spring cloud Edgware版本之前,要想将微服务注册到Eureka Server或其他服务发现组件上,必须在启动类上添加@EnableDiscoveryClient
或@EnableEurekaClient
。
在spring cloud Edgware及更高版本中,只需要添加相关依赖,即可自动注册。
若不想将服务注册到Eureka Server,只需要设置spring.cloud.service-registry.auto-registration.enabled=false
或@EnableDiscoveryClient(autoRegister=false)
即可。
源码分析
服务注册的入口一
如果你看过网上很多人写的文章,都有一个共同点,都是从启动类注解@EnableDiscoveryClient
开始找入口的,但是有个问题就是,在Edgware
版本之后并没有用到这个注解也完成了服务的注册,这是怎么做到的呢,如果你已经具备了spring boot的自动装配原理,很容易就会想到:一定在spring-cloud-starter-netflix-eureka-client
依赖包的/META-INF/spring.factories
中,引入了相关配置并向容器中注册了相关bean。另外在@EnableDiscoveryClient的源码中:
/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
/**
* If true, the ServiceRegistry will automatically register the local server.
* @return - {@code true} if you want to automatically register.
*/
boolean autoRegister() default true;
}
从该注解的注释我们可以知道:该注解用来开启DiscoveryClient的实例。通过搜索DiscoveryClient,我们可以发现有一个类和一个接口。通过梳理可以得到如下图的关系:
其中,左边的org.springframework.cloud.client.discovery.DiscoveryClient
是Spring Cloud
的接口,它定义了用来发现服务的常用抽象方法,而org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient
是对该接口的实现,从命名来就可以判断,它实现的是对Eureka发现服务的封装。所以EurekaDiscoveryClient
依赖了Eureka的com.netflix.discovery.EurekaClient
接口,EurekaClient
继承了LookupService
接口,他们都是Netflix开源包中的内容,它主要定义了针对Eureka的发现服务的抽象方法,而真正实现发现服务的则是Netflix包中的com.netflix.discovery.DiscoveryClient
类。
那么,我们就来详细看看DiscoveryClient
类。先解读一下该类头部的注释有个总体的了解,注释的大致内容如下:
这个类用于帮助与Eureka Server互相协作。
Eureka Client负责了下面的任务:
- 向Eureka Server注册服务实例
- 向Eureka Server为租约续期
- 当服务关闭期间,向Eureka Server取消租约
- 查询Eureka Server中的服务实例列表
结合以上信息,通过在IDEA中查找DiscoveryClient
引用,最终会在EurekaClientAutoConfiguration
中被创建并注册到容器中:
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
}
利用构造器创建CloudEurekaClient
对象,进入构造方法:
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config,
AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
//调用父类的构造器
super(applicationInfoManager, config, args);
this.applicationInfoManager = applicationInfoManager;
this.publisher = publisher;
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
ReflectionUtils.makeAccessible(this.eurekaTransportField);
}
继续调用分类的构造方法:
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
//继续调用内部的其他构造器
this(applicationInfoManager, config, args, ResolverUtils::randomize);
}
最终一路向下,会调用到下面这个构造器:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.endpointRandomizer = endpointRandomizer;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
//是否要从eureka server上获取服务地址信息
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
//是否要注册到eureka server上
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
//如果不需要注册并且不需要更新服务地址
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
return; // no need to setup up an network tasks and we are done
}
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
// 定时任务线程池
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
//创建心跳线程池
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//创建从获取服务端获取服务信息的线程池
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
//如果开启了获取服务注册选项
if (clientConfig.shouldFetchRegistry()) {
try {
boolean primaryFetchRegistryResult = fetchRegistry(false);
if (!primaryFetchRegistryResult) {
logger.info("Initial registry fetch from primary servers failed");
}
boolean backupFetchRegistryResult = true;
if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
backupFetchRegistryResult = false;
logger.info("Initial registry fetch from backup servers failed");
}
if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
}
} catch (Throwable th) {
logger.error("Fetch registry error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
//如果需要注册到Eureka server并且是开启了初始化的时候强制注册,则调用register()发起服务注册
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
//初始化定时任务
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
initRegistrySize = this.getApplications().size();
registrySize = initRegistrySize;
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, initRegistrySize);
}
可以看到在最终的DiscoveryClient构造方法中,有非常长的代码。其实很多代码可以不需要关心,大部分都是一些初始化工作,比如初始化了几个线程池:
- scheduler 定时任务线程池
- heartbeatExecutor 心跳线程池
- cacheRefreshExecutor 同步服务端的实例列表线程池
最后通过initScheduledTasks
方法去启动定时任务:
private void initScheduledTasks() {
//如果开启了获取服务列表配置,则启动一个定时任务,周期性的获取服务注册表
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
//获取服务列表任务
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//如果开启了服务注册到Eureka,则通过需要做几个事情
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
//心跳任务
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
//初始化一个:instanceInfoReplicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
//创建一个服务状态变更监听器(这里在入口二中会被调用,先留个印象)
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
logger.info("Saw local status change event {}", statusChangeEvent);
//更新服务,其实这里也是发起的服务注册
instanceInfoReplicator.onDemandUpdate();
}
};
//如果开启了状态变更配置,则注册监听器(默认为true)
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//启动一个实例信息复制器,主要就是为了开启一个定时线程,每40秒判断实例信息是否变更,如果变更了则重新注册
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
如果eureka.client.fetch-registry=true:
- 开启一个从服务注册中心获取服务注册列表的定时器,默认每隔30秒执行一次,可以通过下面的配置更改:
eureka.client.registry-fetch-interval-seconds=30
如果eureka.client.register-with-eureka=true:
- 开启一个心跳续约定时器,默认每隔30秒执行一次,可以通过下面的配置更改:
eureka.instance.lease-renewal-interval-in-seconds=30
#租约到期持续时间,默认90秒,客户端可以通过改变这个属性,来控制服务端服务剔除持续时间
eureka.instance.lease-expiration-duration-in-seconds=90
- 开启一个服务注册定时器,初始延迟40秒执行,可以通过下面的配置更改:
eureka.client.initial-instance-info-replication-interval-seconds=40
服务注册
进入instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
方法看start
做了什么事,可以发现用了cas保证了线程的安全性,并在里面启动了一个任务:
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
//标识当前服务实例信息为脏信息
instanceInfo.setIsDirty(); // for initial register
//启动定时任务,将当前InstanceInfoReplicator实例作为任务传入定时器
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
将当前InstanceInfoReplicator
实例作为任务传递到定时器中,那么这个InstanceInfoReplicator
必然实现了Runnable
接口,那看一下它的run
方法:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//发起服务注册
discoveryClient.register();
//重置服务实例信息
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
//启动一个定时任务,默认每隔30秒执行一次,如果实例信息有变更,会重新发起注册
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
discoveryClient.register();
这一行,真正触发调用注册的地方就在这里,并且在finally
中,每30s
(可以通过eureka.client.instance-info-replication-interval-seconds=30
设置)会定时执行一下当前的run
方法进行检查,继续查看register()
的实现内容如下:
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
通过属性命名,大家基本也能猜出来,注册操作也是通过发送http post请求,请求地址为serviceUrl + "apps/" + info.getAppName()
。同时,这里我们也能看到发起注册请求的时候,传入了一个com.netflix.appinfo.InstanceInfo
对象,该对象就是注册时候客户端给服务端的服务的元数据
服务注册的入口二
spring cloud是一个生态,它提供了一套标准,这套标准可以通过不同的组件来实现,其中就包含服务注册/发现、熔断、负载均衡等,在spring-cloud-commons
这个包中,org.springframework.cloud.client.serviceregistry
路径下,可以看到一个服务注册的接口定义 ServiceRegistry
。它就是定义了spring cloud中服务注册的一个接口。我们看一下它的类关系图,这个接口有一个唯一的实现 EurekaServiceRegistry
。表示采用的是Eureka Server作为服务注册中心。
服务注册的触发路径
有了上面的概念下面就来看下springboot是如何调用EurekaServiceRegistry
的,而EurekaServiceRegistry
在被调用时又做了啥,大家自要想想其实应该不难猜测到,服务的注册取决于服务是否已经启动好了。而在spring boot中,会等到spring 容器启动并且所有的配置都完成之后来进行注册。
在EurekaClientAutoConfiguration
配置类中,还会发现配置并注册了一个EurekaAutoServiceRegistration
类型的bean:
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context,
EurekaServiceRegistry registry, EurekaRegistration registration) {
return new EurekaAutoServiceRegistration(context, registry, registration);
}
进入EurekaAutoServiceRegistration中,可以看到这个类实现了SmartLifecycle接口,并重写了start、stop等接口:
在spring boot的启动方法中的refreshContext中(这一步请读者自行跟踪SpringApplication.run方法)。最终会调用AbstractApplicationContext的refresh()方法,在方法内部会调用finishRefresh方法:
protected void finishRefresh() {
// Clear context-level resource caches (such as ASM metadata from scanning).
clearResourceCaches();
// Initialize lifecycle processor for this context.
// 初始化生命周期处理器
initLifecycleProcessor();
// Propagate refresh to lifecycle processor first.
//默认调用的是DefaultLifecycleProcessor的onRefresh方法
getLifecycleProcessor().onRefresh();
// Publish the final event.
publishEvent(new ContextRefreshedEvent(this));
// Participate in LiveBeansView MBean, if active.
LiveBeansView.registerApplicationContext(this);
}
最终会调用DefaultLifecycleProcessor
的startBeans
方法:
private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
Map<Integer, LifecycleGroup> phases = new HashMap<>();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
int phase = getPhase(bean);
LifecycleGroup group = phases.get(phase);
if (group == null) {
group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
phases.put(phase, group);
}
group.add(beanName, bean);
}
});
if (!phases.isEmpty()) {
List<Integer> keys = new ArrayList<>(phases.keySet());
Collections.sort(keys);
for (Integer key : keys) {
//遍历Lifecycle实现类的start方法
phases.get(key).start();
}
}
}
由于EurekaAutoServiceRegistration
实现了SmartLifecycle
接口,从而会调用它的start
方法:
@Override
public void start() {
// only set the port if the nonSecurePort or securePort is 0 and this.port != 0
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
//调用EurekaServiceRegistry的register方法
this.serviceRegistry.register(this.registration);
//发布InstanceRegisteredEvent事件,框架目前没有提供这个事件的监听器,如果业务有需要,可以监听此事件
this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);
}
}
进入EurekaServiceRegistry
的register
方法:
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
if (log.isInfoEnabled()) {
log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName()
+ " with eureka with status " + reg.getInstanceConfig().getInitialStatus());
}
//调用ApplicationInfoManager的setInstanceStatus方法,变更实例为UP状态
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler()
.ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
进入ApplicationInfoManager
的setInstanceStatus
方法:
public synchronized void setInstanceStatus(InstanceStatus status) {
//什么都没做,传入什么,返回什么
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
//循环调用监听器的notify方法
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}
最终会循环调用StatusChangeListener
的notify
方法,通知相关事件监听器。目前只有一个唯一的实现,就是在DiscoveryClient
的initScheduledTasks
方法中,在分析initScheduledTasks
时已经做过介绍:
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
logger.info("Saw local status change event {}", statusChangeEvent);
//调用InstanceInfoReplicator实例的onDemandUpdate方法
instanceInfoReplicator.onDemandUpdate();
}
};
进入InstanceInfoReplicator
的onDemandUpdate
方法:
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
//启动一个定时任务
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
//取消最后一个定时任务
latestPeriodic.cancel(false);
}
//最终还是调用当前类的run方法,因为直接调用的run方法,此处就不是一个异步任务了
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
又见到了我们熟悉的run
方法了:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
后面的代码,已经分析过了,这里就不再赘述了。至于spring cloud为什么会提供2处服务注册的入口,我猜测:一是为了保证当前微服务实例成功运行之后,再进行服务注册。二是由于spring cloud与eureka进行了整合,服务注册、服务下线等相关代码都在netflex包下的DiscoveryClient类中,spring cloud通过这种方式可以更优雅的控制服务的注册和注销。
关于服务续约、服务获取以及服务注销等相关源码,请读者自行阅读,全部都在DiscoveryClient
类中。
总结
spring boot应用在启动的时候,利用自动装配向容器中注册了EurekaClient类型的bean,并在实例化的时候,通过构造函数调用父类DiscoveryClient的构造函数,在构造函数内,初始化了三个线程池,分别是定时任务线程池、心跳任务线程池、服务获取任务线程池,并在initScheduledTasks方法中,根据配置条件,分别启动了服务获取定时任务(默认30秒执行一次)、服务续约定时任务(默认30秒执行一次,服务租期持续时间默认90秒)以及服务注册定时任务(默认初始延迟40秒执行一次,之后每隔30秒执行一次)。
本文只是对eureka服务注册的主线进行了源码分析,还有很多细节没有介绍到,请读者自行探索。如有错误之处,还请指正!
下一篇,我们将介绍eureka server端相关源码,敬请期待!
欢迎关注我的公众号:程序员L札记