初始化配置
在Eureka-Server启动的时候,会启动一个定时任务,用来清理过期的客户端
protected void initEurekaServerContext() throws Exception {
// ....省略N多代码
// 服务刚刚启动的时候,去其他服务节点同步客户端的数量。
int registryCount = this.registry.syncUp();
// 这个方法里面计算expectedNumberOfRenewsPerMin的值 , 重点在这里面,这里启动了清理任务的定时器
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// ...省略N多代码
// 开启定时清理过期客户端的定时器
super.postInit();
}
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
// 设置定时器
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
renewsLastMin.start() : 在每个Eureka-Server端都维护着,每分钟的续约数量,续约数量是有一个Long类型的变量
来存储的,每过一分钟就需要对这个变量进行清0 , 因此这个地方是为了启动这个线程
public synchronized void start() {
if (!isActive) {
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// 进行清0
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate", e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
EvictionTask
是用来清理过期客户端的任务类
serverConfig.getEvictionIntervalTimerInMs() : 默认为60秒 , 可配置。
//EvictionTask
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
// 获取延迟秒数,就是延迟几秒下线
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// 判断是否开启自我保护机制
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
// 循环遍历本地CurrentHashMap中的实例信息
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 判断是否过期,此处为重点,里面有判断实例过期的依据
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// 获取注册的实例数量
int registrySize = (int) getLocalRegistrySize();
// serverConfig.getRenewalPercentThreshold() 为0.85 , 主要是为了避免开启自动保护机制。 所以会逐步过期
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
// 可以过期的数量
int evictionLimit = registrySize - registrySizeThreshold;
// 取最小值,在过期数量和可以过期的数量中间取最小值。
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
// 随机过期
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
// 写入过期监控
EXPIRED.increment();
// 服务下线
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
Lease.isExpire()
//判断是否过期
public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
// 续约的时候会调用,用来更新最后更新时间
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
duration : 过期间隔,默认为90秒
evictionTimestamp : 实例下线时间,当客户端下线时,会更新这个时间。
lastUpdateTimestamp : 为最后更新时间 , 这里有个错误,因为续约的时候,更新这个时间的时候,加上了duration , 但是在最终做判断的时候
lastUpdateTimestamp + duration + additionalLeaseMs , 这个地方还加了一遍,也就导致了,当前时间必须要大于实际最后更新时间180秒,才会认为他过期 (撇开additionalLeaseMs这个因素不谈)
分批过期机制
从上面可以得知 , 这里有个分批过期的概念,每次最多过期15%的机器,超过15%则不会自动过期
假如检测到过期的实例数量为4台 , 总数量为10
执行过程过下:
第一个60秒到来,执行任务
int registrySize = 10 ;
int registrySizeThreshold = (int) (registrySize * 0.85) ;// 值为8
// 可以过期的数量
int evictionLimit = registrySize - registrySizeThreshold; //值为2
也就说仅仅只可以过期两台,那么另外两台怎么办,只能等待下一次任务执行的时候
第二个60秒到来,执行任务 , 计算是否开启保护机制时,这个时候呢,
numberOfRenewsPerMinThreshold还是原来的值,也就是 1020.85 = 17 , 但是由于
存活的机器数量只有6台,则每秒最大续约数为12 , 12>17 = false , 所以会开启自动保护机制
(如果在一分钟之类,另外两台机器恢复了心跳,16>17 , 依旧会开启),只能等待15分钟
之后,定时任务重新计算这两个参数的值。
@Override
public boolean isLeaseExpirationEnabled() {
// 是否开启自我保护机制,这是个配置,默认为true
if (!isSelfPreservationModeEnabled()) {
return true;
}
// 计算是否下线
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
开启自我保护机制之后,则不会继续往下执行了。。
int registrySize = 8 ;
int registrySizeThreshold = (int) (registrySize * 0.85) ; //值为6
// 可以过期的数量
int evictionLimit = registrySize - registrySizeThreshold;// 值为2
总结: 由上可知,客户端具体的过期时间,是不确定的,但是必须大于180秒, 如果加上定时任务的时间间隔,240秒,
如果第一轮任务执行不到的话,可能会等到第二轮的时候执行,但是如果开启了自我保护机制,则没有第二轮的说法了。
如果不想启用这种机制,那么可以关闭自我保护机制,同时设置registrySizeThreshold = 0; 就可以一次性过期。