本文作者:陈刚,叩丁狼高级讲师。原创文章,转载请注明出处。
对于一个优秀的程序员而言,一个技术不仅要会用,还要知道他的实现原理和思想,即不仅要知其然还要知其所以然,这样我们写代码才会特别自信,出现bug才能很快定位到问题所在。接下来我们就来简单探讨一下SpringCloud的实现原理,即:源码分析
一.服务注册
服务注册与发现是SpringCloud最基础的部分,我们就从这部分开始着手分析,我们来看一下我们第一章写的服务注册案例,图:
我们知道当服务(Eureka Client)启动会主动向 EurekaServer注册自己,而消费者服务需要调用提供者服务实现消费时是需要向EurekaServer获取目标服务的服务地址等信息,那么我们就先来分析一下EurekaClient是如何实现服务注册的。
回想一下我们构造一个EurekaClient服务实例的时候需要做哪些事情呢?
1.在主程序配置类打开 @EnableDiscoveryClient(或EnableEurekaClient)标签开启EurekaClient功能
@SpringBootApplication
@EnableDiscoveryClient
public class EurekaClientApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaClientApplication.class, args);
}
}
2.在applicatiton.properties中做一些服务的基础配置和注册中心地址配置
eureka:
client:
serviceUrl:
defaultZone: http://localhost:1111/eureka/ #服务注册地址
instance:
prefer-ip-address: true
server:
port: 3333
spring:
application:
name: consumer1
...
那么我们就沿着这个2个点来切入源码:
点开 @ EnableDiscoveryClient 源码如下:
/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
/**
* If true, the ServiceRegistry will automatically register the local server.
*/
boolean autoRegister() default true;
}
EnableDiscoveryClient的文档注释:
Annotation to enable a DiscoveryClient implementation
告诉我们:这个EnableDiscoveryClient这个标签是用来开启 DiscoveryClient的,那么我们来看一下 org.springframework.cloud.client.discovery.DiscoveryClient 如下:
/**
* DiscoveryClient represents read operations commonly available to Discovery service such as
* Netflix Eureka or consul.io
* @author Spencer Gibb
*/
public interface DiscoveryClient {
/**
* A human readable description of the implementation, used in HealthIndicator
* @return the description
*/
String description();
/**
* Get all ServiceInstances associated with a particular serviceId
* @param serviceId the serviceId to query
* @return a List of ServiceInstance
*/
List<ServiceInstance> getInstances(String serviceId);
/**
* @return all known service ids
*/
List<String> getServices();
}
DiscoveryClient 的注释:
DiscoveryClient represents read operations commonly available to Discovery service such as Netflix Eureka or consul.io
告诉我们这个接口定义了服务通常的读取操作的抽象方法,分析这个接口下的三个方法,作用分别是:获取备注 ,根据服务id获取服务列表,获取所有的服务的id
这个接口貌似到顶了,我们看一下他的实现类你可以看到一个让你很亲切的名字 EurekaDiscoveryClient(Eureka的服务客户端) 如下:
public class EurekaDiscoveryClient implements DiscoveryClient {
public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
private final EurekaInstanceConfig config;
private final EurekaClient eurekaClient;
public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
this.config = config;
this.eurekaClient = eurekaClient;
}
@Override
public String description() {
return DESCRIPTION;
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
false);
List<ServiceInstance> instances = new ArrayList<>();
for (InstanceInfo info : infos) {
instances.add(new EurekaServiceInstance(info));
}
return instances;
}
...省略代码...
@Override
public List<String> getServices() {
Applications applications = this.eurekaClient.getApplications();
if (applications == null) {
return Collections.emptyList();
}
List<Application> registered = applications.getRegisteredApplications();
List<String> names = new ArrayList<>();
for (Application app : registered) {
if (app.getInstances().isEmpty()) {
continue;
}
names.add(app.getName().toLowerCase());
}
return names;
}
...
可以看到 EurekaDiscoveryClient 依赖了 EurekaClient 接口 ,而对于getServices和 getInstances方法的实现代码中都是调用了EurekaClient的方法,即:“this.eurekaClient…”
继续跟踪 EurekaClient 的源码:
@ImplementedBy(DiscoveryClient.class)
public interface EurekaClient extends LookupService {
EurekaClient 实现了 LookupService接口,继续跟踪 LookupService
/**
* Lookup service for finding active instances.
*
* @author Karthik Ranganathan, Greg Kim.
* @param <T> for backward compatibility
*/
public interface LookupService<T> {
/**
* Returns the corresponding {@link Application} object which is basically a
* container of all registered <code>appName</code> {@link InstanceInfo}s.
*
* @param appName
* @return a {@link Application} or null if we couldn't locate any app of
* the requested appName
*/
Application getApplication(String appName);
/**
* Returns the {@link Applications} object which is basically a container of
* all currently registered {@link Application}s.
*
* @return {@link Applications}
*/
Applications getApplications();
/**
* Returns the {@link List} of {@link InstanceInfo}s matching the the passed
* in id. A single {@link InstanceInfo} can possibly be registered w/ more
* than one {@link Application}s
*
* @param id
* @return {@link List} of {@link InstanceInfo}s or
* {@link java.util.Collections#emptyList()}
*/
List<InstanceInfo> getInstancesById(String id);
....省略代码....
从注释 :Lookup service for finding active instances.
可以知道,这个lookupService 的作用是用于查找活动实例的服务。并提供了一些查找方法
然而在 EurekaDiscoveryClient 中使用的到底是 EurekaClient 哪个实现类的实例呢?我们继续跟踪一下 EurekaClient的实现类 com.netflix.discovery.DiscoveryClient ,从包名可以知道这个类是netflix提供的:
/**
* The class that is instrumental for interactions with <tt>Eureka Server</tt>.
*
* <p>
* <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the
* instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with
* <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from
* <tt>Eureka Server</tt> during shutdown
* <p>
* d) <em>Querying</em> the list of services/instances registered with
* <tt>Eureka Server</tt>
* <p>
*
* <p>
* <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt>
* {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips
* which do not change. All of the functions defined above fail-over to other
* {@link java.net.URL}s specified in the list in the case of failure.
* </p>
*
* @author Karthik Ranganathan, Greg Kim
* @author Spencer Gibb
*
*/
@Singleton
public class DiscoveryClient implements EurekaClient {
翻译一下这个类的文档注释,大致意思为:
DiscoveryClient 和 Eureka Server 类进行交互
向 Eureka Server 注册服务
向 Eureka Server租约续期
服务关闭,取消租约续期
获取服务列表
Eureka client需要配置 Eureka Server的url列表
看到这里我们大概知道,其实真正实现服务发现的Netflix包中的com.netflix.discovery.DiscoveryClient类,我们整理一下这几个类/接口的关系图如下:
接下来我们详细看一下DiscoveryClient是如何实现服务注册和发现等功能的,找到DiscoveryClient中的代码:
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
...省略代码...
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
...省略代码...
}
从方法名字就可以看出方法里面初始化了很多的定时任务,
instanceInfo:是根据服务配置创建出来的服务实例相关信息对象,是在EurekaClientAutoConfiguration类中的 eurekaApplicationInfoManager方法中被创建的
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
而 EurekaInstanceConfig 就是application.properties配置的绑定
而instanceInfoReplicator 是一个线程对象,他的代码如下:
/**
* A task for updating and replicating the local instanceinfo to the remote server. Properties of this task are:
* - configured with a single update thread to guarantee sequential update to the remote server
* - update tasks can be scheduled on-demand via onDemandUpdate()
* - task processing is rate limited by burstSize
* - a new update task is always scheduled automatically after an earlier update task. However if an on-demand task
* is started, the scheduled automatic update task is discarded (and a new one will be scheduled after the new
* on-demand update).
*
* @author dliu
*/
class InstanceInfoReplicator implements Runnable {
翻译注释知道 InstanceInfoReplicator的作用是用于更新本地instanceinfo并将其复制到远程服务器的任务 ,其实就是把本地服务实例的相关配置信息(地址,端口,服务名等)发送到注册中心完成注册
我们来跟踪一下他的 run方法 :
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
定位关键代码: discoveryClient.register(); 这里就是在实现服务注册,继续跟踪进去:
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
这里通过调用:eurekaTransport.registrationClient.register(instanceInfo);实现注册,而instanceInfo其实就是当前服务实例的元数据(配置信息),继续跟踪
/**
* Low level Eureka HTTP client API.
*
* @author Tomasz Bak
*/
public interface EurekaHttpClient {
EurekaHttpResponse<Void> register(InstanceInfo info);
翻译 Low level Eureka HTTP client API. 得知这里是一个HTTP客户端的API,那么我们可以大胆猜测,register方法的实现其实就是通过 rest 请求的方式。继续往下追踪该方法
public abstract class EurekaHttpClientDecorator implements EurekaHttpClient {
@Override
public EurekaHttpResponse<Void> register(final InstanceInfo info) {
return execute(new RequestExecutor<Void>() {
@Override
public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {
return delegate.register(info);
}
@Override
public RequestType getRequestType() {
return RequestType.Register;
}
});
}
看到这里我们应该就明白了,在register方法中获取到了 serviceUrl 即配置文件中的注册服务地址,,把InstanceInfo作为参数,底层通过EurekaHttpClient(Rest方式)来发请求请求,实现服务注册。
服务获取
继续看com.netflix.discovery.DiscoveryClient的initScheduledTasks()方法,里面还有两个定时任务
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
....省略代码....
我们先看第一个任务:服务获取 ,
clientConfig.getRegistryFetchIntervalSeconds()是从配置中获取服务清单获取时间间隔,他是执行线程是new HeartbeatThread(),我们跟踪进去
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
...省略代码...
boolean success = fetchRegistry(remoteRegionsModified);
...省略代码...
fetchRegistry:就是获取注册表(注册服务清单)的方法
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applicat
...省略代码...
etAndStoreFullRegistry(); 获得并存储完整的注册表,跟踪进去
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
我们可以看到 eurekaTransport.queryClient.这样的代码其实就是通过Rest方式去获取服务清单 最后通过 localRegionApps.set把服务存储到本地区域
服务续约
继续看 com.netflix.discovery.DiscoveryClient 中的 initScheduledTasks() 方法中的另一个定时任务
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
heartbeat :使用心跳机制实现服务续约,即每间隔多少秒去请求一下注册中心证明服务还在线,防止被剔除。
renewalIntervalInSecs :就是心跳时间间隔 对应的配置:
eureka.instance.lease-renewal-interval-in-seconds=30
eureka.instance.lease-expiration-duration-in-seconds=90
我们来看一下他的执线程:HeartbeatThread
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
不难看出他是通过 eurekaTransport.registrationClient.sendHeartBeat:去发送心跳 ,依然是Rest方式
想获取更多技术视频,请前往叩丁狼官网:http://www.wolfcode.cn/openClassWeb_listDetail.html