Eureka源码采用1.7.2版本
本人小白,此文为本人阅读源码笔记,如果您读到本文,您需要自己甄别是否正确,文中的说明只代表本人理解,不一定是正确的!!!
自我保护机制设计的初衷是防止服务注册服务因为本地网络故障,长时间未接受到心跳请求,造成错误的移除大量服务实例,其实调用服务还是可用的
自我保护机制是和自动故障移除联系在一起的,针对的移除实例也是自动故障移除
com.netflix.eureka.registry.AbstractInstanceRegistry#evict(long)
//是否允许主动下线故障实例 和自我保护机制相关
//自我保护机制的是比较上一分钟的心跳数量和期望的最小心跳数量
//即expectedNumberOfRenewsPerMin * 0.85
// expectedNumberOfRenewsPerMin 在服务下线,注册的时候都进行了数量计算,但是故障的时候,进行自动移除没进行数量增减
// 如果故障实例超过了服务数量的15%,那么下一分钟,自我保护机制就会生效
// 但是 expectedNumberOfRenewsPerMin的同步也依赖于一个自动检测的定时任务 @Link com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.updateRenewalThreshold
// 此定时任务每隔15分钟进行一次实际服务数量更新,那么故障的服务实例在此过程中将会被剔除,从而自我保护就关闭了
// 我个人认为啊,此处设计是没问题的,自我保护的机制设计初衷是那个防止注册中心网络故障造成的大批量心跳丢失,造成的服务不可达,但是真正的调用服务是可用的
// 所以我觉得服务故障不需要更新expectedNumberOfRenewsPerMin
//心跳的大批量丢失不能就直接移除实例,可能是当前网络环境的波动。正常的下线,即调用shutDown()不会造成服务保护机制的触发的,这个触发只会争对非正常下线
//2021.05.23
//我再次想了下,触发的大概率事件是这样的
//在15分钟内,累计丢失了15%以上的节点心跳,那么在这个15分钟内将会启用自我保护
// 到了15分钟后,numberOfRenewsPerMinThreshold重新计算了,自动保护可能就关闭了
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
在服务故障移除的方法中有这样一个判断,当返回false时候,直接返回,不进行故障实例的摘除
进入该方法
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled
@Override
public boolean isLeaseExpirationEnabled() {
//默认是true 是否启用自我保护机制
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
//numberOfRenewsPerMinThreshold 每分钟最少收到多少心跳
//getNumOfRenewsInLastMin() 上一分钟所发的心跳总数
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
- 判断配置项中是否设置了关闭自我保护,默认为true,配置项:enableSelfPreservation
- 判断上一分钟心跳总数(getNumOfRenewsInLastMin())是否大于期望每分钟心跳总数(numberOfRenewsPerMinThreshold)
关于获取上一分钟心跳总数,Eureka Server内部采用的是定时线程进行统计,使用两个AtomicLong进行保存当前和上一分钟的心跳总数
com.netflix.eureka.util.MeasuredRate#start
public synchronized void start() {
if (!isActive) {
// 默认一分钟执行一次
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
//将currentBucket的值赋值给lastBUcket,且将currentBucket的计数器设置为0
//lastBucket 保留上一分钟的心跳次数
// Zero out the current bucket.
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate", e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
该方法初始化了运行了定时调度的线程进行统计,默认执行间隔为1min,执行流程:
- 每分钟将当前统计的心跳总数赋值给lastBucket(上一分钟计数器),currentBucket(当前分钟的计数器)的计数器重置为0
那么当前的心跳总数是怎么计算的呢,直接看心跳的renew()方法,是否嵌入了计数器累计操作
com.netflix.eureka.registry.AbstractInstanceRegistry#renew
//服务续约方法
public boolean renew(String appName, String id, boolean isReplication) {
………………
//renewsLastMin 当前心跳总数+1
renewsLastMin.increment();
//重置续约时间
leaseToRenew.renew();
return true;
}
}
如上所示,当接收到心跳时,当前心跳计数器进行了递增操作
而getNumOfRenewsInLastMin()获取上一分钟心跳总数就是获取lastBucket数量,再找下该定时任务启动的入口
com.netflix.eureka.registry.AbstractInstanceRegistry#postInit
和自动故障移除的定时同时启动的,那么lastBucket代表了上一分钟的心跳总数
接下来,我们需要看看期望每分钟最小心跳总数的由来:
numberOfRenewsPerMinThreshold最开始的初始化计算是在Eureka Server初始化计算的,使用当前Server拉取到的服务实例总数 * 0.85
com.netflix.eureka.EurekaBootStrap#initEurekaServerContext
int registryCount = registry.syncUp();
//定时检查服务实例是否故障,并自动下线
registry.openForTraffic(applicationInfoManager, registryCount);
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#openForTraffic
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
//30秒心跳 1分钟就是count * 2 写死了
//心跳发送时间可以设置
this.expectedNumberOfRenewsPerMin = count * 2;
//期望每分钟最少的心跳数量
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got " + count + " instances from neighboring DS node");
logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);
……………………
}
在openForTraffic()方法中使用初始化拉取的服务实例总数作为基数标准进行计算,(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()) -> count * 2 * 0.85,
集群模式下,count为其他节点中已注册的服务实例总数,单节点就为0
下面我们看看在注册中心接收到注册,下线等请求执行时,维护numberOfRenewsPerMinThreshold
注册:com.netflix.eureka.registry.AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
…………………………
// 如果已经存在租约 , 保存最后的过期时间
if (existingLease != null && (existingLease.getHolder() != null)) {
…………………………
} else {
// 租约不存在,因此是新的注册
synchronized (lock) {
//重置期望每分钟心跳数量 有问题 和自我保护有关
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
…………………………
} finally {
read.unlock();
}
}
下线:com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#cancel
@Override
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
if (super.cancel(appName, id, isReplication)) {
//集群节点同步下线消息
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
//期望每分钟心跳数量-2
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
return true;
}
return false;
}
注册,当前实例数量+2,下线,当前实例数量-2,然后再次*0.85,计算期望每分钟最小心跳数
在Eureka Server中有专门的定时任务进行更新numberOfRenewsPerMinThreshold,默认每15min执行一次
com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask
private void updateRenewalThreshold() {
try {
//获取全部服务实例
Applications apps = eurekaClient.getApplications();
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
//重新计算期望每分钟心跳数量 触发概率很小
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold of if the self preservation is disabled.
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
主要流程如下:
- 获取当前服务注册表中服务实例
- 重新使用服务实例数量计算expectedNumberOfRenewsPerMin
也就是说每15min进行一次expectedNumberOfRenewsPerMin的实时更新,这个主要针对的是故障移除的服务实例
注意,自动服务故障移除没有进行numberOfRenewsPerMinThreshold的更新
<font color= 'blue'>服务故障实例的摘除需要判断当前是否处于自我保护模式,而自我保护模式的默认是开启(isSelfPreservationModeEnabled),需要判断上一分钟的心跳总数是否大于期望每分钟最小心跳数,如果在15分钟内,累计丢失了15%以上的节点心跳,那么Eureka Server就会认为当前所处的网络环境异常,从而处于自动保护模式,故障实例将不会移除,再等待15min后,进行expectedNumberOfRenewsPerMin的基于当前服务实例的重新计算后,自我保护模式才会关闭!</font>
自我保护服务开启模拟:
- 上一分钟内直接丢失了15%以上的服务节点心跳,自动故障摘除将不会移除实例,直接进入保护模式了,那么有个疑问,到达15min自动更新的时候,当前的服务实例没有实例任何移除,再次计算还是原值,那么自我保护模式还会一直进行下去
- 第1分钟丢失2%,节点移除,第二分钟丢失5%,节点移除,第三分钟丢失9%,进入自我保护模式,累计丢失了15%的节点心跳,那么到达15分钟后,自动更新numberOfRenewsPerMinThreshold,重新计算,期望每分钟最小心跳数变小,自我保护模式可能就是关闭。