前言
Eureka Client的应用启动时,在com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法中,会做以下几件事:
- 1、周期性更新服务列表;
- 3、周期性服务续约;
- 3、服务注册逻辑;
概览
以下图片来自Netflix官方,图中显示Eureka Client会发起Renew向注册中心做周期性续约,这样其他Eureka client通过Get Registry请求就能获取到新注册应用的相关信息:
来自官方文档的指导信息
最准确的说明信息来自Netflix的官方文档,地址:
https://github.com/Netflix/eureka/wiki/Understanding-eureka-client-server-communication#renew
关于续约的理解:
- 1、Eureka client每隔三十秒发送一次心跳到Eureka server,这就是续约;
- 2、Eureka client续约的目的是告诉Eureka server自己还活着;
- 3、Eureka server若90秒内未收到心跳,就从自己的服务列表中剔除该Eureka client;
- 4、建议不要改变心跳间隔,因为Eureka server是通过心跳来判断Eureka client是否正常;
服务续约执行简要流程图
下面这张图大致描述了服务续约从Client端到Server端的大致流程,详情如下:
Eureka 续约源码分析
1、Eureka Client发起续约
Eureka Client向Eureka Server发起注册应用实例成功后获得租约,Eureka Client固定间隔向Eureka Server发起续约(renew),避免租约过期。
默认情况下,租约有效期为90秒,续约频率为30秒。两者比例为1:3,保证在网络异常等情况下,有三次重试的机会。
1)、初始化定时任务
Eureka Client在初始化过程中,创建心跳线程,固定间隔向Eureka Server发起续约。实现代码如下
@Singleton
public class DiscoveryClient implements EurekaClient {
/**
* 初始化所有计划的任务
*/
private void initScheduledTasks() {
//获取注册信息的定时任务
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
//心跳定时任务
// Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
//服务实例同步定时任务
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
// 注册应用实例状态变更监听器
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//初始化定时服务注册任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
}
2)、发起续约
@Singleton
public class DiscoveryClient implements EurekaClient {
//最后成功向Eureka Server心跳时间戳
private volatile long lastSuccessfulHeartbeatTimestamp = -1;
private class HeartbeatThread implements Runnable {
public void run() {
// 调用续约方法
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
//服务续约
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
//发Restful请求,即心跳
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
//404错误会触发注册逻辑
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
//返回码200表示心跳成功
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
}
AbstractJerseyEurekaHttpClient的renew()方法使用PUT请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}
接口,参数为status、lastDirtyTimestamp、overriddenstatus,实现续约。
继续展开上面代码段中的 eurekaTransport.registrationClient.sendHeartBeat方法,源码在EurekaHttpClientDecorator类中:
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,
final String id,
final InstanceInfo info,
final InstanceStatus overriddenStatus) {
return execute(new RequestExecutor<InstanceInfo>() {
@Override
public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
//网络处理委托给代理类完成
return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public RequestType getRequestType() {
//请求类型为心跳
return RequestType.SendHeartBeat;
}
});
}
继续展开delegate.sendHeartBeat,多层调用一路展开,最终由JerseyApplicationClient类来完成操作,对应源码在父类AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey库的Restful Api将自身的信息PUT到Eureka server,注意:这里不是POST,也不是GET,而是PUT:
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
//请求参数有两个:Eureka client自身状态、自身关键信息(状态、元数据等)最后一次变化的时间
WebResource webResource = jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("status", info.getStatus().toString())
.queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
if (overriddenStatus != null) {
webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
}
Builder requestBuilder = webResource.getRequestBuilder();
addExtraHeaders(requestBuilder);
//注意:这里不是POST,也不是GET,而是PUT
response = requestBuilder.put(ClientResponse.class);
EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
if (response.hasEntity() &&
!HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the server
eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
}
return eurekaResponseBuilder.build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
至此,Eureka client向服务续租的源码就分析完毕了,过程相对简单,DiscoveryClient、TimedSupervisorTask、JerseyApplicationClient等实例各司其职,定时发送PUT请求到Eureka server。
2、Eureka Server接收续约
Eureka Server接收续约核心流程如下图:
1)、接收续约请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {
@PUT
public Response renewLease(
// 是否是Replication模式 复制,同步
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus, // 实例的覆盖状态
@QueryParam("status") String status, // 实例状态
// 实例信息在EurekClient端上次被修改的时间
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 续约
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
// 续租失败,返回404,EurekaClient端收到404后会发起注册请求
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
// 比较InstanceInfo的lastDirtyTimestamp属性
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
// 验证传入的lastDirtyTimestamp和EurekaServer端保存的lastDirtyTimestamp是否相同
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
// 续约成功,返回200
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
}
PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的renew(...)方法续约实例信息
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
public boolean renew(final String appName, final String id, final boolean isReplication) {
// 调用父类里的renew(appName, id, isReplication)方法续约
if (super.renew(appName, id, isReplication)) {
// 如果是续约请求则向其他EurekaServer节点同步续约信息
// 如果是同步信息请求则直接返回
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
}
2)、续约应用实例信息
调用了AbstractInstanceRegistry的renew(...)方法,续约实例信息,代码如下:
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
public boolean renew(String appName, String id, boolean isReplication) {
// 增加续约次数到监控
RENEW.increment(isReplication);
// 获取应用名对应的租约,即根据实例名称取出实例信息集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
// 根据实例id取出具体实例租约信息
leaseToRenew = gMap.get(id);
}
// 租约不存在
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
// 获得实例的覆盖状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
// 实例覆盖状态为UNKNOWN,续租失败
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
// 实例状态与覆盖状态不一致
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
// 强行把实例的覆盖状态设为实例状态
// 即status = overriddenInstanceStatus
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
// 新增续租每分钟次数
renewsLastMin.increment();
// 续租(设置lastUpdateTimestamp(租约最后更新时间))
leaseToRenew.renew();
return true;
}
}
}
public class Lease<T> {
enum Action {
Register, Cancel, Renew
};
private volatile long lastUpdateTimestamp;
public void renew() {
// 设置租约最后更新时间戳
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
}
续约的整个过程修改租约的过期时间,即使并发请求,也不会对数据的一致性产生影响,因此不需要像注册操作一样加锁。
3)、eureka引入overriddenstatus用来解决状态被覆盖问题
客户端调用updateStatus方法时,同时更新server端实例的status和overriddenStatus状态。
客户端调用renew方法时,也要更新server端实例的status和overriddenstatus状态,但是有以下规则的
(1):如果客户端上传的实例状态是down或者starting,表明客户端是重启或者healthCheck失败。此时这个实例不能作为服务提供服务。因此即使客户端调用updateStatus把实例状态更新为up,也是没用的。此时客户端实例的准确状态就是down或者starting。
(2):如果客户端的实例是up或者out_of_service,此时是不可信的。就像第二大节介绍的那样。有可能client端的实例状态已被改变,此时要使用overriddenstatus状态作为当前实例的状态,避免被覆盖。
(3):(2)中的overriddenstatus有可能不存在,缓存失效,此时要使用server端已经存在的实例的状态。
参考:
https://xinchen.blog.csdn.net/article/details/82915355
https://blog.csdn.net/qq_40378034/article/details/119079180