问题
熟悉Spring Cloud的同学都知道, 在Zuul内部进行Routing和Load Balancing的时候, 为了保证HA, 不受Eureka掉线的影响, 内存中会有一个Server List缓存. 进行路由和LB的时候并不是每次都实时的去Eureka拉取新的注册信息. 而我们知道有缓存就会有延迟, 会给整个系统的运行带来一些不良影响, 尤为明显的一点:
服务状态变更, 不能及时剔除/增加节点
为了解决服务状态延迟, 通过搜索和文档很容易找到下面两个配置:
ribbon:
ServerListRefreshInterval: 30000
eureka:
client:
registryFetchIntervalSeconds: 30
ribbon和eureka默认缓存刷新频率都是30s, 对系统影响太大,
- 经常服务上线半天客户端调用不到, 尤其在测试的时候服务往往只有单节点, 要等20多秒才能调通, 严重影响效率.
- 而对于线上多节点环境一样会造成影响, 某一个节点进行升级部署的时候, 虽然服务已经从Eureka下线, 但是Zuul仍然会有30s时间会把请求路由到下线的节点上. 对部分用户造成影响, 当遇到QPS较大的接口时甚至造成没必要的熔断.
对策
为了解决上面两个问题, 一般会把两个配置都改成5s甚至更短, 虽然情况有了很大的缓解, 但是问题依旧.
没办法, 只有深入代码了
代码分析
刚开始一头雾水, 可能想看代码也不知道从哪里开始. 其实找到规律就好了, 代码的入口可以是Zuul的route filter. 也可以是Ribbon的AutoConfiguration. 我从route filter开始.
RibbonRoutingFilter.java
public Object run() {
RibbonCommandContext commandContext = buildCommandContext(context);
ClientHttpResponse response = forward(commandContext);
}
很明显, forward是进行请求转发, 继续跟进
RibbonRoutingFilter.java
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
ClientHttpResponse response = command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
return response;
}
catch (HystrixRuntimeException ex) {
return handleException(info, ex);
}
}
这段代码构建了一个RibbonCommand, 实际的请求发送都是通过这个Command来完成的
HttpClientRibbonCommandFactory.java
@Override
public HttpClientRibbonCommand create(final RibbonCommandContext context) {
ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId());
final String serviceId = context.getServiceId();
final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient(
serviceId, RibbonLoadBalancingHttpClient.class);
client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));
return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider,
clientFactory.getClientConfig(serviceId));
}
跟到最后会发现, 负载均衡的请求发送都是通过client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId));设置的负载均衡器来完成, 这个负载均衡器是从Spring Context中取得的ILoadBalancer类型的Bean
先看一下这个接口的定义
ILoadBalancer.java
public interface ILoadBalancer {
public void addServers(List<Server> newServers);
public Server chooseServer(Object key);
public void markServerDown(Server server);
public List<Server> getReachableServers();
public List<Server> getAllServers();
}
再看一下取到的具体实现类:
是一个ZoneAwareLoadBalancer, 最终挑选服务节点的逻辑:
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
然后发现这个lb.getAllServers()并不是去从Eureka Server拉取注册信息, 使用的是一个内存中的缓存, 所以只要知道这个ServerList的刷新机制就好了.
然后在ZoneAwareLoadBalancer的父类DynamicServerListLoadBalancer中会有一个ServerListUpdater
DynamicServerListLoadBalancer.java
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
PollingServerListUpdater.java
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
定时刷新. 这个refreshIntervalMs就是上面说到的ribbon.ServerListRefreshInterval, 最终发现ServerList是通过DiscoveryClient的InstanceInfos来获取的
DiscoveryClient.java
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure,
@Nullable String region) {
applications = this.localRegionApps.get();
if (!secure) {
return applications.getInstancesByVirtualHostName(vipAddress);
} else {
return applications.getInstancesBySecureVirtualHostName(vipAddress);
}
}
这个InstanceInfo仍然是一个缓存.. 再找这个缓存的刷新机制
DiscoveryClient.java
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
}
这里的registryFetchIntervalSeconds对应上面说到的配置项:eureka.client.registryFetchIntervalSeconds, 然后进去CacheRefreshThread才算彻底搞清楚ServerList的缓存刷新机制是怎样的, 真正的从Eureka拉取注册信息就是在这个CacheRefreshThread里, 每registryFetchIntervalSeconds会从Eureka进行一次delta更新
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
现在缓存更新策略已经研究透彻, 如何在服务状态变化的时候实时触发Server List的更新?
终极对策
目前的想法是配合Bus, 服务上下线的时候通过Bus来触发Zuul的refresh, 但是Server List的更新schedule并不会更新, 考虑自己实现一套可随Context refresh的DiscoveryClient
To Be Continued...
续:
自定义DiscoveryClient比较复杂.
通过代码分析可以知道只需要能调到DiscoveryClient的refreshRegistry就可以实时刷新了, 但它是个private方法, 那就反射嘛, 配合Bus的refresh功能同时刷新注册信息
@Component
@Slf4j
public class RefreshListener implements ApplicationListener<RefreshScopeRefreshedEvent> {
@Override
public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
try {
Method method = DiscoveryClient.class.getDeclaredMethod("refreshRegistry");
method.setAccessible(true);
method.invoke(SpringUtils.getBean(DiscoveryClient.class));
} catch (Exception e) {
log.error("Failed to refreshRegistry.", e);
}
}
}
完.