Eureka源码采用1.7.2版本
本人小白,此文为本人阅读源码笔记,如果您读到本文,您需要自己甄别是否正确,文中的说明只代表本人理解,不一定是正确的!!!
服务注册,Eureka Client在初始化的时候向注册中心发送注册请求,注册中心获取到该请求将会把该服务实例加入到注册表中,大概来说就是这样,注册的起始就是在Client初始化的时候
com.netflix.discovery.DiscoveryClient#initScheduledTasks
//注册服务状态变更监听
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//服务注册服务在其中
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
com.netflix.discovery.InstanceInfoReplicator#start
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
//40秒之后执行注册任务
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//服务注册 初始注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
在客户端初始化的时候进行调度任务的初始化,当配置了registration.enabled为true时,进行注册任务初始化
最终调用的Jersey框架发送Http请求
com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
//防止循环注册
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
最终发送的Http请求是这样的:http://DESKTOP-1S6DCTA:8080/v2/apps/EUREKA
Eureka Server端接收该请求:
com.netflix.eureka.resources.ApplicationResource#addInstance
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
//集群节点注册事件同步
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
这里面做了下判断,当客户端租约过期时间没设置就采用服务端默认过期时间90S,如果客户端设置了,就采用客户端设置的时间,接下来就是注册主流程了
com.netflix.eureka.registry.AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
//如果是第一次注册,创建一个新的Map,加入到registry
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
//通过服务ID获取服务租约实例
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
// 如果已经存在租约 , 保存最后的过期时间
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
//已存在的实例过期时间 大于 注册的过期时间 ,将用本地的副本
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder(); //用本地的副本
}
} else {
// 租约不存在,因此是新的注册
synchronized (lock) {
//重置期望每分钟心跳数量 有问题 和自我保护有关
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
//保存注册服务的队列
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
//加入到最近修改队列中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
//缓存更新 读写缓存 如果读写缓存中有该服务的实例,则进行过期处理,下次请求直接获取注册表
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
注册的主流程:
- 根据服务名称从注册表中获取该服务的gMap,这是个Map结构 Map<String, Lease<InstanceInfo>>,这个说明下,我们在配置服务时,同一个服务可能配置多个,以便做负载均衡,在注册表中的结构就是<服务名称:<服务ID:租约集合>>,也就是 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>
- 如果gMap为NULL,那么说明当前为第一次注册,那么就会初始化一个ConcurrentHashMap<String, Lease<InstanceInfo>>加入到注册表中
- 从gMap中依据当前服务的ID获取租约集合
- 如果租约存在,就比较下两个租约的过期时间,保留最长的过期时间的租约
- 如果租约集合不存在,那么就将该实例加入到当前服务的租约集合中
- 将该实例加入到最近修改队列中recentRegisteredQueue
- 清空该服务读写缓存
注册表结构
<font color='red'>注册表在Eureka Server中是ConcurrentHashMap<String, Lease<InstanceInfo>>
第一层key,是服务名称
第二层key,是服务ID
Lease<InstanceInfo> 是该服务的租约
类似:{"serviceA :
001: Lease<InstanceInfo>
002: Lease<InstanceInfo>
003: Lease<InstanceInfo>
"}
</font>