一. 前言
本文主要分析eureka server端启动流程,包括启动入口、bean注入过程、初始化过程等。
client端注册逻辑请参见Spring Cloud Eureka 源码分析 —— Client端
server端服务治理请参见Spring Cloud Eureka 源码分析 —— Server端(服务治理)
二. 源码分析
1. 入口
我们知道开启eureka server端要加注解@EnableEurekaServer
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
这个enable注解引入了EurekaServerMarkerConfiguration
的import,内部只是一个Marker的标记,用于激活EurekaServerAutoConfiguration
2. 启动配置bean注入
EurekaServerAutoConfiguration
介绍下内部主要的bean定义
EurekaController
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
定义了eureka看板的controller
PeerAwareInstanceRegistry
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
eureka server端完成服务注册、续约、下线的核心处理类,所有的业务实现逻辑都在此类完成。
其中代码注释中写到force initialization,原因是eurekaClient是lazy的bean,eureka server虽然是服务端,但其也一个客户端进行自我注册。在初始化过程中,完成一系列的客户端侧的注册、拉取等。详情参考Spring Cloud Eureka 源码分析 —— Client端
PeerEurekaNodes
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs,
ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
replicationClientAdditionalFilters);
}
这个bean的字面意思是平级的eureka server端node,内部会封装eureka各个server节点的信息。
EurekaServerContext
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
eureka上下文,封装了eureka的各种bean对象。
EurekaServerBootstrap
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
eureka server端的启动类,封装了server端的启动、销毁实现。在完成bean初始化后调用。
FilterRegistrationBean
@Bean
@ConditionalOnBean(name = "httpTraceFilter")
public FilterRegistrationBean<?> traceFilterRegistration(
@Qualifier("httpTraceFilter") Filter filter) {
FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
bean.setFilter(filter);
bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);
return bean;
}
提供了指定bean名为httpTraceFilter过滤器的Registration,如果有这个bean会加入到FilterRegistrationBean中,在springboot actuator中有默认实现,也可以自定义bean的实现。
EurekaServerConfig
@Bean
@ConditionalOnMissingBean
public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
EurekaServerConfigBean server = new EurekaServerConfigBean();
if (clientConfig.shouldRegisterWithEureka()) {
// Set a sensible default if we are supposed to replicate
server.setRegistrySyncRetries(5);
}
return server;
}
保存server端配置属性的bean,前缀是eureka.server.xxx
常用配置:
参数 | 说明 |
---|---|
enableSelfPreservation | 是否允许开启自我保护,默认true |
renewalPercentThreshold | 开启自我保护的阈值乘数百分比,默认0.85 |
renewalThresholdUpdateIntervalMs | 定时任务重新计算自我保护阈值的时间周期,默认15min |
responseCacheUpdateIntervalMs | readOnlyCacheMap每次从readWriteCacheMap同步最新数据的时间周期,默认30s |
responseCacheAutoExpirationInSeconds | 注册表readWriteCacheMap的过期时间,默认180s |
useReadOnlyResponseCache | 是否开启注册表的readOnlyCacheMap,默认true |
expectedClientRenewalIntervalSeconds | 期望客户端发送续约的时间周期,默认30s |
leaseExpirationDurationInSeconds | 服务端接收到最后一次续约后,超过此时间未再次续约可被剔除,默认90s |
evictionIntervalTimerInMs | 剔除任务的执行周期,默认60s |
jerseyApplication
@Bean
public javax.ws.rs.core.Application jerseyApplication(Environment environment,
ResourceLoader resourceLoader) {
ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
false, environment);
// Filter to include only classes that have a particular annotation.
//
provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
// Find classes in Eureka packages (or subpackages)
//
Set<Class<?>> classes = new HashSet<>();
for (String basePackage : EUREKA_PACKAGES) {
Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
for (BeanDefinition bd : beans) {
Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
resourceLoader.getClassLoader());
classes.add(cls);
}
}
// Construct the Jersey ResourceConfig
Map<String, Object> propsAndFeatures = new HashMap<>();
propsAndFeatures.put(
// Skip static content used by the webapp
ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
DefaultResourceConfig rc = new DefaultResourceConfig(classes);
rc.setPropertiesAndFeatures(propsAndFeatures);
return rc;
}
扫描了一系列的Resource的bean,具体列表如下图: @Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
注册jerseyFilter的bean,所有的/eureka/*的请求都会经过这个filter,ServletContainer作为servlet来处理请求,而已经的请求资源规则就是上面的这些resource。
3. 启动流程
EurekaServerContext
eureka server端的上下文,涵盖了server端的各种bean的对象,并在内部完成。。。的初始化
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
@PreDestroy
@Override
public void shutdown() {
logger.info("Shutting down ...");
registry.shutdown();
peerEurekaNodes.shutdown();
logger.info("Shut down");
}
在bean完成构造之后,会基于@PostConstruct注解调用initialize
方法。首先会调用peerEurekaNodes.start();
public void start() {
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
/**
* Resolve peer URLs.
* 返回配置的service-url,如果配置了myUrl,则需要过滤一下,如 http://localhost:8761/eureka/,http://localhost:8762/eureka/
*/
protected List<String> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
- 首先创建了一个task线程池;
- 根据给定的service-urls来更新
PeerEurekaNodes
的PeerEurekaNode
,删除或创建新的节点; - 开启了一个定时任务,每个一段事件,同样刷新
PeerEurekaNodes
。
介绍下更新PeerEurekaNodes的过程:
updatePeerEurekaNodes
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
// 一种两个集合取差集的写法
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
// 将不存在的url从集合中移除,并调用node.shutdown();
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// Add new peers
// 创建新的PeerEurekaNode节点,这里createPeerEurekaNode会使用RefreshablePeerEurekaNodes实现类的方法,
//并在PeerEurekaNode创建了batchingDispatcher和nonBatchingDispatcher
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
/**
* Shuts down all resources used for peer replication.
*/
public void shutDown() {
batchingDispatcher.shutdown();
nonBatchingDispatcher.shutdown();
}
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
- 比较当前url的集合和之前集合的差异,来决定删除还是新建
- 新建会给每个service-url创建一个PeerEurekaNode对象,封装了这个node的各种信息。
- 删除会调用node.shutDown, 来关闭任务执行器。
再回到EurekaServerContext
的initialize
方法,继续执行registry.init(peerEurekaNodes);
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
// 开启一个计数器
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
// 初始化注册表的缓存
initializedResponseCache();
// 设置定时任务,用来更新自我保护的阈值
scheduleRenewalThresholdUpdateTask();
// 初始化远程region的Registry
initRemoteRegionRegistry();
try {
// JMX监控
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
这个方法主要完成eureka server的上下文关键功能的初始化。
initializedResponseCache
@Override
public synchronized void initializedResponseCache() {
if (responseCache == null) {
responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
}
}
// 只读cache
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
// 读写cache
private final LoadingCache<Key, Value> readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
// 默认值30s
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
// 初始化读写cache
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
// getResponseCacheAutoExpirationInSeconds 默认值180s
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
// 这里的shouldUseReadOnlyResponseCache 默认值是true
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
ResponseCacheImpl 内部定义了两个内存级的cache,在构造方法中会初始化readWriteCacheMap,并默认开启只读缓存刷新任务。默认每隔30s,将读写cache中的数据同步到只读cache中,实现读写分离。
scheduleRenewalThresholdUpdateTask
开启一个schedule,用于更新续约的阈值,根据每分钟实际续约情况来决定server端是否开启自我保护。
自我保护的目的是防止因为server端和client端的各类连接问题,导致大量有效client端被下线,从而影响到client之间本身的相互调用。
/**
* Schedule the task that updates <em>renewal threshold</em> periodically.
* The renewal threshold would be used to determine if the renewals drop
* dramatically because of network partition and to protect expiring too
* many instances at a time.
*
*/
// renewalThresholdUpdateIntervalMs 默认值15分钟
private void scheduleRenewalThresholdUpdateTask() {
timer.schedule(new TimerTask() {@Override public void run() {
updateRenewalThreshold(); }
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
private void updateRenewalThreshold() {
try {
Applications apps = eurekaClient.getApplications();
int count = 0;
// 获取当前所有客户端的总实例数
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold or if self preservation is disabled.
// renewalPercentThreshold 默认值0.85
// 如果当前总实例数 > 预计客户端总续约数 * 0.85
// 或者自我保护处于关闭状态
// 就更新阈值
if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
比较当前客户端的总实例数和(预计续约的总次数*0.85) ,以及自我保护当前的状态来决定是否更新阈值。
更新阈值的公式:
每分钟续约阈值 = 预计客户端续约数(客户端实例数) * 每分钟续约的次数 * 阈值百分比
initRemoteRegionRegistry
protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>();
protected void initRemoteRegionRegistry() throws MalformedURLException {
Map<String, String> remoteRegionUrlsWithName = serverConfig.getRemoteRegionUrlsWithName();
if (!remoteRegionUrlsWithName.isEmpty()) {
allKnownRemoteRegions = new String[remoteRegionUrlsWithName.size()];
int remoteRegionArrayIndex = 0;
for (Map.Entry<String, String> remoteRegionUrlWithName : remoteRegionUrlsWithName.entrySet()) {
RemoteRegionRegistry remoteRegionRegistry = new RemoteRegionRegistry(
serverConfig,
clientConfig,
serverCodecs,
remoteRegionUrlWithName.getKey(),
new URL(remoteRegionUrlWithName.getValue()));
regionNameVSRemoteRegistry.put(remoteRegionUrlWithName.getKey(), remoteRegionRegistry);
allKnownRemoteRegions[remoteRegionArrayIndex++] = remoteRegionUrlWithName.getKey();
}
}
logger.info("Finished initializing remote region registries. All known remote regions: {}",
(Object) allKnownRemoteRegions);
}
如果配置了远程region,则会将远程registry进行初始化。
至此EurekaServerContext
的初始化过程介绍完成。
EurekaServerInitializerConfiguration
在EurekaServerAutoConfiguration
中的主要bean都初始化完成后,可以看到在类上有个Import的注解,注入EurekaServerInitializerConfiguration
进行eureka server的启动。
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered { ... }
跟Client端起始点一样,都是以SmartLifecycle作为入口。直接看start()
方法的实现。
@Override
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
可以看到这个start完成如下几件事情:
- contextInitialized:通过启动类来完成上下文的初始化
- 发送EurekaRegistryAvailableEvent事件。
- 发送EurekaServerStartedEvent 事件。
eurekaServerBootstrap.
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
只有两个方法,分别是initEurekaEnvironment
,initEurekaServerContext
, 具体展开。
initEurekaEnvironment
protected void initEurekaEnvironment() throws Exception {
log.info("Setting the eureka configuration..");
// 配置数据中心
String dataCenter = ConfigurationManager.getConfigInstance()
.getString(EUREKA_DATACENTER);
if (dataCenter == null) {
log.info(
"Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
// 指定读取配置环境
String environment = ConfigurationManager.getConfigInstance()
.getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
log.info(
"Eureka environment value eureka.environment is not set, defaulting to test");
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
}
}
设置数据中心和部署环境,默认是default和test
initEurekaServerContext
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
// 设置一个可以用于获取上下文的Holder
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
这个方法主要完成两件事,通过syncUp
将注册表的信息同步到其他的eureka server节点上,openForTraffic
刷新自我保护阈值,并开启驱逐任务。
syncUp
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
// 重试逻辑等待时间,一旦有一个server同步成功,后续就不再重试
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
通过eurekaClient.getApplications()获取到client的注册表,遍历每个实例,并判断是否可在当前region中注册,满足则发起register
。
InstanceRegistry.register
// org.springframework.cloud.netflix.eureka.server.InstanceRegistry
@Override
public void register(InstanceInfo info, int leaseDuration, boolean isReplication) {
handleRegistration(info, leaseDuration, isReplication);
super.register(info, leaseDuration, isReplication);
}
private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
log("register " + info.getAppName() + ", vip " + info.getVIPAddress() + ", leaseDuration " + leaseDuration + ", isReplication " + isReplication);
publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
}
先由子类InstanceRegistry发送EurekaInstanceRegisteredEvent事件,再由父类完成真正的register逻辑。
AbstractInstanceRegistry.register
//com.netflix.eureka.registry.AbstractInstanceRegistry
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// registry缓存了实例注册的信息,注册实现本质上就是将实例信息添加到register属性中。
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
// 计数器+1
REGISTER.increment(isReplication);
//初始化gMap
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
// 如果之前有缓存续约信息,比较两个对象的时间差
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
// 如果本次注册是更老的实例信息,就还使用上次缓存的对象
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 如果本次是第一次注册,expectedNumberOfClientsSendingRenews+1,并更新自我保护阈值
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 使用之前的服务开启时间
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
// recentRegisteredQueue是一个记录注册操作的队列,key是注册时间,value是客户端实例id,主要用于debug或统计使用
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
client端注册到server端的核心实现逻辑:
三个参数分别是注册的实例信息、续约周期、是否是其他server的复制(true表示复制,false表示client发起的注册)
首先从register属性中根据appName来获取之前的续约信息,如果是第一次注册,则expectedNumberOfClientsSendingRenews加1;否则比较一下参数和缓存的instanceInfo哪个是最新的,按照最新的进行后续操作。
创建一个新的lease对象,保存到gMap中,并添加到recentRegisteredQueue中,(注册队列)用于debugging或者统计。
根据overridden status规则来设置registrant
的status,是否接受流量
更新各种registrant的属性,将本次变更加入到recentlyChangedQueue,失效readWriteCache
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
invalidate(
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
);
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
}
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
}
}
}
}
分别失效appName自身的cache和全局cache和增量cache。
注册完成之后,回到EurekaServerBootstrap
的initEurekaServerContext
,调用openForTraffic
。
openForTraffic
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
这个方法先重新计算了自我保护的阈值,设置实例状态为UP。
调用父类的postInit()完成驱逐任务的开启。
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs()); // 默认值60s
}
class EvictionTask extends TimerTask {
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
}
驱逐的具体分析请参见Spring Cloud Eureka 源码分析 — Server端(服务治理)
至此,server端完整启动流程分析完毕。