阅读前的思考
使用netflix eureka做服务管理时,若你只停留在对eureka的概念理解和使用层面,那么你面试时会得到面试官的灵魂拷问,例如:
1)eureka将服务注册信息存放在哪里?服务注册信息都有哪些内容?
2)eureka如何做到高可用?底层的通信机制是什么?
3)心跳机制到底发送些什么内容,有了解吗?
4)服务注册列表是存在客户端还是服务端?如果多复本数据不一致怎么处理?
5)若网络故障服务注册失败了,eureka是如何保证注册成功的?
6)注册,同步,下线,剔除分别是怎么实现的?
7)为什么刚启动的服务没有即时被eureka发现?对此你还遇到过哪些坑?
带着这些问题或疑惑,作者决定推出eureka源码解读系列,从众所周知的Eureka功能着手,对register,renew,heartbeat,fetch,剔除/关闭,数据复制等进行源码解读,意在深入理解eureka功能。
Tip:建议开篇从 Eureka源码解析(一) 开始,之后的文章是基于开篇的分析成果之上进行撰写的。
Renew client端处理流程
DiscoveryClient.renew()方法定义的服务续约的具体流程。
boolean renew(){
EurekaHttpResponse<InstanceInfo>httpResponse;
try{
//通过sendHeartBeat的方式完成
httpResponse=eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(),instanceInfo.getId(),instanceInfo,null);
if(httpResponse.getStatusCode()==Status.NOT_FOUND.getStatusCode()){
REREGISTER_COUNTER.increment();
long timestamp=instanceInfo.setIsDirtyWithTime();
boolean success=register();
if(success){
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode()==Status.OK.getStatusCode();
}catch(Throwablee){
}
}
sendHeartBeat一共有两个类实现了方法,AbstractJerseyEurekaHttpClient,JerseyReplicationClient无论是哪个发送内容者是一样的。
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName,String id,InstanceInfo info,InstanceStatus overriddenStatus){
String urlPath="apps/"+appName+'/'+id;
……
WebResourcewebResource=jerseyClient.resource(serviceUrl)
.path(urlPath)
.queryParam("status",info.getStatus().toString())
.queryParam("lastDirtyTimestamp",info.getLastDirtyTimestamp().toString());
if(overriddenStatus!=null){
webResource=webResource.queryParam("overriddenstatus",overriddenStatus.name());
}
……
//http put请求
response=requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class);
……
}
public enum InstanceStatus:
UP,//Ready to receive traffic
DOWN,//Do not send traffic-health check callback failed
STARTING,//Just about starting-initializations to be done-do not
//sendtraffic
OUT_OF_SERVICE,//Intentionally shut down for traffic
UNKNOWN;
心跳发送内容:appName+id(instanceId)作为url,参数:status即服务状态,lastDirtyTimestamp 即instance在client端最后被修改的时间戳,overriddenStatus 更新过的服务状态。
服务续约的调用
DiscoveryClient 构造方法中调用了initScheduledTasks方法初始化了heartbeatTask 用于执行心跳发送任务,具体任务实现在HeartbeatThread(Runnable)中
/**
*Theheartbeattaskthatrenewstheleaseinthegivenintervals.
*/
private class HeartbeatThread implements Runnable{
public void run(){
//调用renew
if(renew()){
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
Eureka 默认每隔30秒一次心跳检测具体现实如下:
//Heartbeat timer
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,//默认30秒
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs,TimeUnit.SECONDS);
总结:renew功能与心跳是绑定在一起发送的。定时心跳实现采用JDK的ScheduledExecutorService,执行任务是调用renew函数,发送的内容为appName+instanceId组成的url,参数是服务器状态status 和 最近更新状态的时间戳。以Http put 请求发送。
Server 端流程处理
跟register服务处理流程一样,服务器启动后扫描并创建ApplicationResource,根据@Path("{id}")创建InstanceResource并调用@Put修饰的方法。
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) StringisReplication,
@QueryParam("overriddenstatus") StringoverriddenStatus,
@QueryParam("status") Stringstatus,
@QueryParam("lastDirtyTimestamp") StringlastDirtyTimestamp){
booleanisFromReplicaNode="true".equals(isReplication);
booleanisSuccess=registry.renew(app.getName(),id,isFromReplicaNode);
……
register.renew指向AbstractInstanceRegistry的renew方法,代码如下:
public boolean renew(String appName,String id,boolean isReplication){
RENEW.increment(isReplication);
Map<String,Lease<InstanceInfo>>gMap=registry.get(appName);
Lease<InstanceInfo>leaseToRenew=null;
if(gMap!=null){
leaseToRenew=gMap.get(id);
}
//如果gMap中找不到服务实例返回false,则续租失败。
if(leaseToRenew==null){
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS:Registry:leasedoesn'texist,registeringresource:{}-{}",appName,id);
return false;
}else{
InstanceInfoinstanceInfo=leaseToRenew.getHolder();
if(instanceInfo!=null){
InstanceStatus overriddenInstanceStatus=this.getOverriddenInstanceStatus(instanceInfo,leaseToRenew,isReplication);
if(overriddenInstanceStatus==InstanceStatus.UNKNOWN){
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if(!instanceInfo.getStatus().equals(overriddenInstanceStatus)){
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
//更新时间戳 lastUpdateTimestamp=System.currentTimeMillis()+duration;
leaseToRenew.renew();
return true;
}
}
此处renew方法在concurrentHashMap<String,Map<String,Lease<InstanceInfo>>> registry中查找(appName,(instanceId,Lease<InstanceInfo>)).如果判断并更新时间Instance的时间戳。如果查询不到,则返回false.意味着被剔除的服务是无法得到续租的。
eureka没有服务超时一说,每隔30秒一次心跳,服务在map中存在则更新时间戳,90秒没有续租上则执行服务剔除。