背景
笔者接触到的一个项目里边有很多微服务,但受限于项目创立之初时间和团队技术条件所限,都是最基本的SpringBoot工程,没有使用Spring Cloud Netflix或alibaba这类“全家桶”来搭建,所有没有负载均衡、服务注册、服务发现、服务配置、熔断等这些微服务治理层面的东西。每个微服务采用多节点部署,使用nginx或阿里云slb来做负载均衡,大体上是"微服务A -> nginx/slb -> 微服务B"这样一种方式,每当重要线上活动来临、服务的扩容需要人工的方式去配置nginx来增加新节点的路由代理,开发运维人员一直这样初步的做着每个服务的流量在多节点进行分摊。除了运维方式比较原始之外,另外,当多节点中的某个节点出现故障时,由于缺乏自动故障转移机制仍然会有一部分请求进入到故障节点,对应的异常错误为前端用户所感知。更为糟糕的是,高并发时间由于节点故障异常导致的上游服务(这里上游服务指的是出现节点故障的服务的调用者)超时请求异常,通常微服务之间这类http调用超时设置为5-30秒之间,这就导致了上游服务的QPS性能骤减,请求层层积压,出现了传说中的级联故障。
针对以上种种,笔者给这个项目或者说这个公司同样类型架构的一众项目做架构升级方案,考虑到实施成本和开发团队的接受程度,原则是尽量减少架构升级的改造工作量,尽量减少团队的学习成本,同时又要解决上面所讲的所有痛点,方案不求高大上,但求使用为主。
本文是当时思路的一些整理,时间也有些远,加之笔者不是对Spring Cloud或netflix Ribbon所有的细节都了解,所以行文逻辑不是很有条理,或许还有错误的地方。如有不当,请大家指正。
分析
先定义问题,确定问题的边界,然后再解决问题。
1、我们需要的是一个能够有自动负载路由功能的东西,帮助服务之间rest调用进行多节点路由,路由可以采用轮询规则、这是负载均衡,同时要能及时的发现被调服务中的故障节点、并及时的下线节点使得调用者能够快速失败、当故障恢复之后及时上线节点,这属于熔断机制。
2、路由配置能过改变现状的人工去主机配置文件,要有一个统一的服务路由配置中心。
3、尽可能少的引入第三方的组件减少学习成本,尽可能少的减少改造工作量,方案力求使用为主。
笔者经过思考,和研究各种负载均衡方案(有服务端负载均衡,和客户端负载均衡),想到了一个方案:
如果利用Ribbon做客户端负载均衡,结合配置中心比如Apollo或者Spring Cloud Config,一定程度上也可以实现动态配置新节点上线,健康检查识别故障服务节点并下线等功能:
Ribbon本身是可以选择从本地配置文件中的或者从Eureka拉取的服务列表中选择节点进行路由的,默认的负载均衡规则是轮询,如果选择引入Eureka,那么就是Spring Cloud标准的解决方案了:服务注册中心Eureka负责收集各个应用启动的时候主动上报上来的自己的服务信息,比如服务名、地址、端口等,然后调用的应用就可以通过eureka获得所谓远端服务的服务列表了,调用端的Ribbon做客户端负载均衡从拉取的服务列表中选择可用的服务按照负载均衡策略选一个调用。
但这样一来需要团队进行Eureka的搭建和维护,同时需要每一个服务都引入Eureka相关依赖并进行代码和配置、使之能够进行服务注册,需要动员人力进行各个服务的代码进行改造。而经过了解,团队目前服务之间调用使用的是RestTemplate(实现配置的是Apache HttpClient),做的面向接口的开发,代码类似如下:
@Autowired
private RestTemplate restTemplate;
restTemplate.getForObject(serviceFullUrl, clazz, new Object[0]); //GET调用后端服务
restTemplate.postForObject(serviceFullUrl, requestBody, clazz, new Object[0]);//POST调用
托Spring RestTemplate接口与实现分离的福,那么如果能仅仅改造RestTemplate的实现层,使之具备客户端负载均衡、服务发现、被调服务故障下线、被调故障恢复上线等功能那便是极好的。而且由于少了调用服务与被调服务之间的nginx,也减少了运维人员的工作量。
解决方案思路
现在就是想办法要让Ribbon本地的服务列表动态化,Ribbon可以读取本地Spring文件的服务列表,而配置文件是可以通过Config配置中心进行统一管理、并且搭配Spring bus机制可以做到配置发生变化时主动通知各服务来实时更新本地的配置,这样一来就实现了本地服务列表的动态化。应用可以免去各个服务主动上报给注册中心这个动作(改为使用配置中心去配置)。然后节点健康原来是由Eureka告诉Ribbon的,现在要ribbon主动根据服务列表去心跳检查。一图胜千言,逻辑架构如下所示:
进入正题,下面结合源代码分析一下Ribbon的原理,同时说明一下需要对Ribbon源码进行哪些改造、扩展和封装,使之满足我们上面的需求。
使用的主要开源项目的版本:
spring boot
2.1.13.RELEASE
spring-cloud-netflix-ribbon
2.1.5.RELEASE
spring cloud
Greenwich.SR6
Ribbon主要代码分析与改造
Ribbon的自动装配
spring-cloud-netflix-ribbon-2.1.5.RELEASE.jar包里的spring.factories文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration
是spring cloud为了继承ribbon而开发的自动装配类,Ribbon自动装配的Bean都在这。
同时,
/**
* 所有RibbonClient的默认配置
* author: linyang
* */
@Configuration
public class DefaultRibbonConfig {
private Logger logger = LoggerFactory.getLogger(DefaultRibbonConfig.class);
//在可用服务中轮询选择
@Bean
public IRule ribbonRule() {
logger.info("全局IRule实现为AvailabilityFilteringRule,实例化...");
return new AvailabilityFilteringRule();
}
//定时健康检查服务列表中的应用,置可用状态
@Bean
public IPing ribbonPing() {
logger.info("全局IPing实现为HealthCheckPing,实例化...");
return new HealthCheckPing();
}
}
然后
/**
* 统一配置所有ribbon client
* author: linyang
* */
@RibbonClients(defaultConfiguration = DefaultRibbonConfig.class)
public class DefaultRibbonClientsConfig {
public static class BazServiceList extends ConfigurationBasedServerList {
public BazServiceList(IClientConfig config) {
super.initWithNiwsConfig(config);
}
}
}
上面三个类结合Spring自动装配,会进到org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration这个配置类里边,然后Spring对里边的Bean进行自动注入:
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
Ok,我们看到默认的ILoadBalancer是ZoneAwareLoadBalancer
这个实现,它的父类是DynamicServerListLoadBalancer
,再上一级父类是BaseLoadBalancer
,同时通过源码可以看到,这三个类实例化的时候都会先super()自己父类的构造方法。可以看到,我们之前提到的Ribbon的“服务列表”就缓存在BaseLoadBalancer里:
protected volatile List<Server> allServerList;
protected volatile List<Server> upServerList;
protected ReadWriteLock allServerLock;
protected ReadWriteLock upServerLock;
除了上面的3个类之外,还有两个定时任务也很关键:
PollingServerListUpdater 定时更新缓存的server list
PingTask 定时执行ping来确定server节点的状态
下面分“Ribbon的启动流程”和“LoadBalancerClient根据服务名自动根据Rule选择服务”两条线来进行分析:
Ribbon的启动流程
前文提到ZoneAwareLoadBalancer实例化
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater){
//...
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
构造是传入的参数ServerListUpdater也是在这个类配置注入的:
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
然后new ZoneAwareLoadBalancer会调用父类DynamicServerListLoadBalancer的构造方法:
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.isSecure = false;
this.useTunnel = false;
this.serverListUpdateInProgress = new AtomicBoolean(false);
this.updateAction = new NamelessClass_1();
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter)filter).setLoadBalancerStats(this.getLoadBalancerStats());
}
this.restOfInit(clientConfig);
this.setIntValue();
}
其中this.restOfInit(clientConfig)
会调用this.enableAndInitLearnNewServersFeature()
:
public void enableAndInitLearnNewServersFeature() {
this.serverListUpdater.start(this.updateAction);
}
所以这里执行的就是PollingServerListUpdater的start方法了:
public synchronized void start(final UpdateAction updateAction) {
if (this.isActive.compareAndSet(false, true)) {
Runnable wrapperRunnable = new Runnable() {
public void run() {
if (!PollingServerListUpdater.this.isActive.get()) {
if (PollingServerListUpdater.this.scheduledFuture != null) {
PollingServerListUpdater.this.scheduledFuture.cancel(true);
}
} else {
try {
updateAction.doUpdate();
PollingServerListUpdater.this.lastUpdated = System.currentTimeMillis();
} catch (Exception var2) {
PollingServerListUpdater.logger.warn("Failed one update cycle", var2);
}
}
}
};
this.scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable, this.initialDelayMs, this.refreshIntervalMs, TimeUnit.MILLISECONDS);
//logger.info("polling定时任务启动,首次延迟" + this.initialDelayMs + "ms执行,之后每" + this.refreshIntervalMs + "ms执行一次");
} else {
logger.info("Already active, no-op");
}
}
其中getRefreshExecutor()会得到一个ScheduledThreadPoolExecutor
,用来定时执行wrapperRunnable,Runnable里边业务逻辑主要是执行updateAction.doUpdate()
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
public void updateListOfServers() {
...
servers = serverListImpl.getUpdatedListOfServers(); //得到服务列表
...
updateAllServerList(servers); //更新服务列表
}
关于Ribbon如何从远程获取配置信息:
从DynamicServerListLoadBalancer里获取服务列表配置serverListImpl.getUpdatedListOfServers()开始跟,最后找到package com.netflix.config.sources , URLConfigurationSource类:
@Override public PollResult poll(boolean initial, Object checkPoint) throws IOException { if (configUrls == null || configUrls.length == 0) { return PollResult.createFull(null); } Map<String, Object> map = new HashMap<String, Object>(); for (URL url: configUrls) { InputStream fin = url.openStream(); Properties props = ConfigurationUtils.loadPropertiesFromInputStream(fin); //对远程url的流中加载Properties for (Entry<Object, Object> entry: props.entrySet()) { map.put((String) entry.getKey(), entry.getValue()); } } return PollResult.createFull(map); }
package com.netflix.config.util;
ConfigurationUtils:public static Properties loadPropertiesFromInputStream(InputStream fin) throws IOException { Properties props = new Properties(); InputStreamReader reader = new InputStreamReader(fin, "UTF-8"); try { props.load(reader); //加载java Properties return props; } finally { if (reader != null) { reader.close(); } if (fin != null) { fin.close(); } } }
之后,把获得的服务列表里的服务alive全置为true,调用setServersList(ls),然后做forceQuickPing:
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
调用父类BaseLoadBalancer的setServersList,然后setServerListForZones(serversInZones)
@Override
public void setServersList(List lsrv) {
super.setServersList(lsrv); //第一次BaseLoadBalancer.setServersList(lsrv)
//...
setServerListForZones(serversInZones);//有几个zone执行几次BaseLoadBalancer.setServersList(List lsrv)
}
这里插一句,笔者之前通过jmap观察JVM堆里发现居然有两个BaseLoadBalancer实例,这让笔者很疑惑,因为按照之前的理解,BaseLoadBalancer应该是只有一个实例才对,仔细分析源码发现了原因:
setServerListForZones这个方法是被override的,所以最终会执行回ZoneAwareLoadBalancer的setServerListForZones逻辑,在这里边会通过getLoadBalancer方法完成对成员变量balancers的填充:
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats()); BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
也就是说,在父类DynamicServerListLoadBalancer的构造方法完成的时候,是会实例化一个new BaseLoadBalancer并且放到balancers里的(上面的setServerListForZones)。但是,当从父类构造方法返回,子类ZoneAwareLoadBalancer的构造方法接下来会去执行子类的成员变量的初始化,类似这样一个关系:
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> { private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap(); //构造方法结束,成员变量balancers初始化,置为new ConcurrentHashMap() public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { //父类DynamicServerListLoadBalancer的构造方法,最后会通过setServerListForZones调用回子类的setServerListForZones方法,然后填充了balancers super(clientConfig, rule, ping, serverList, filter, serverListUpdater); } }
所以尽管在父类里对balancers进行了填充,但这时候因为子类构造方法返回前又相当于执行了一次balancers = new ConcurrentHashMap,所以balancers又变空了。当下一次polling任务执行的时候,会认为balancer未初始化,所以还会new一个BaseLoadBalancer填充进去。 这就是两个BaseLoadBalancer的由来。
其实可以认为,第一个BaseLoadBalancer实例这时候已经成为了垃圾对象、失去了引用,会在下一次ygc的时候被gc回收。
DynamicServerListLoadBalancer.java
//这个方法在初始化和之后的每次polling任务都会执行
public void setServersList(List lsrv) {
super.setServersList(lsrv); //第一次BaseLoadBalancer.setServersList(List lsrv)
setServerListForZones(serversInZones); //有几个zone执行几次BaseLoadBalancer.setServersList(List lsrv)
}
其次,上述setServerListForZones逻辑new出来的BaseLoadBalancer用的构造方法最后ping都是null,如下:
public BaseLoadBalancer(String lbName, IRule rule, LoadBalancerStats lbStats) {
this(lbName, rule, lbStats, (IPing)null);
}
所以执行下一次polling任务,BaseLoadBalancer.setServersList(lsrv)的时候会进到canSkipPing()、把全部server都置成alive!笔者的解决办法是把这段逻辑注释掉了的。
两个BaseLoadBalancer实例其中一个成为了垃圾对象,交给gc去处理就好了。zone对应的BaseLoadBalancer执行setServersList时因为ping=null会默认把所有server的状态置为alive,需要去掉这部分逻辑。
最后,看下BaseLoadBalancer.setServersList(lsrv)方法,也就是设置服务列表的核心方法:
public void setServersList(List lsrv) {
Lock writeLock = allServerLock.writeLock();
logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);
ArrayList<Server> newServers = new ArrayList<Server>();
writeLock.lock();
try {
ArrayList<Server> allServers = new ArrayList<Server>();
for (Object server : lsrv) {
if (server == null) {
continue;
}
if (server instanceof String) {
server = new Server((String) server);
}
if (server instanceof Server) {
logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId());
/*
*
allServers里边新加载的服务列表原来默认都是alive的,
这里改为如果服务在当前的allServerList里存在、先以当前的为准;如果不存在则先置为alive,我们认为这样更合理。
这样当pinger超时阻塞而暂未返回的时候,不会得出错误的服务状态
*/
for(Server s : allServerList) {
if(((Server) server).getId().equals(s.getId())) {
((Server) server).setAlive(s.isAlive());
}
}
allServers.add((Server) server);
} else {
throw new IllegalArgumentException(
"Type String or Server expected, instead found:"
+ server.getClass());
}
}
boolean listChanged = false;
if (!allServerList.equals(allServers)) {
listChanged = true;
if (changeListeners != null && changeListeners.size() > 0) {
List<Server> oldList = ImmutableList.copyOf(allServerList);
List<Server> newList = ImmutableList.copyOf(allServers);
for (ServerListChangeListener l: changeListeners) {
try {
l.serverListChanged(oldList, newList);
} catch (Exception e) {
logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);
}
}
}
}
if (isEnablePrimingConnections()) {
for (Server server : allServers) {
if (!allServerList.contains(server)) {
server.setReadyToServe(false);
newServers.add((Server) server);
}
}
if (primeConnections != null) {
primeConnections.primeConnectionsAsync(newServers, this);
}
}
// This will reset readyToServe flag to true on all servers
// regardless whether
// previous priming connections are success or not
allServerList = allServers; //这时候重置了allServerList的引用
if (canSkipPing()) {
/**
* 这段注释掉的原因:
* 每个zone对应的BaseLoadBalancer的ping=null,将会把所有server都再置为alive,
* 与笔者认为的“已经存在的server先默认认为与原来状态一样、新server才默认置为alive”的原则相左
*
* */
/*
for (Server s : allServerList) {
s.setAlive(true);
}
upServerList = allServerList;
*/
} else if (listChanged) {
forceQuickPing();
}
logger.debug("服务列表已更新BaseLoadBalancer.setServersList() ,allServerList :" + JSON.toJSONString(allServerList));
} finally {
writeLock.unlock();
}
}
LoadBalancerClient根据服务名自动根据Rule选择服务
前面花了很多时间分析Ribbon本地内存里的服务列表是怎么更新的,有了服务列表那么就可以按照一定的Rule规则来选择1个服务来执行调用了,所以接下来关键是规则。笔者这里使用Ribbon的AvailabilityFilteringRule,即在当前可用服务中轮询选择,所以关键就在于如何判断各服务的可用状态Alive,Ribbon使用IPing
接口来判断每个服务节点的可用状态,其默认实现是使用的DummyPing
,就是每个服务isAlive都返回true,因为Ribbon一般是搭配Eureka使用的,由后者负责维护各服务节点的可用状态,Ribbon默认认为从Eureka获得的节点就都是可用的。所以我们这里要自己实现一个自定义的Ping,来判断每个节点的状态:
@Configuration
public class DefaultRibbonConfig {
private Logger logger = LoggerFactory.getLogger(DefaultRibbonConfig.class);
public DefaultRibbonConfig() {
}
@Bean
public IRule ribbonRule() {
this.logger.info("全局IRule实现为AvailabilityFilteringRule,实例化...");
return new AvailabilityFilteringRule();
}
@Bean
public IPing ribbonPing() {
this.logger.info("全局IPing实现为HealthCheckPing,实例化...");
return new HealthCheckPing();
}
}
HealthCheckPing:
/**
自定义的Ping,向各服务节点统一的一个健康检查接口发送Http HEAD请求来判断节点的可用状态
*/
public class HealthCheckPing implements IPing {
private Logger logger = LoggerFactory.getLogger(HealthCheckPing.class);
private static CloseableHttpClient httpClient;
public HealthCheckPing() {
}
private CloseableHttpClient getHttpClient() {
if (httpClient == null) {
httpClient = HttpClientBuilder.create().build();
}
return httpClient;
}
public boolean isAlive(Server server) {
boolean isAlive = true;
String url = "http://" + server.getId() + "/" + server.getServiceName() + "/healthcheck/checkAlive";
HttpHead headRequest = new HttpHead(url);
RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(2000).setSocketTimeout(2000).setConnectionRequestTimeout(2000).build();
headRequest.setConfig(requestConfig);
try {
HttpResponse response = this.getHttpClient().execute(headRequest);
isAlive = response.getStatusLine().getStatusCode() == 200;
} catch (Exception var10) {
this.logger.error(var10.getMessage());
isAlive = false;
} finally {
this.logger.info("心跳检测结果:节点" + url + " ,状态:[" + isAlive + "] ");
headRequest.abort();
}
return isAlive;
}
}
在BaseLoadBalancer实例化的时候会启动定时任务PingTask,默认10秒一次:
/**
ShutdownEnabledTimer是个java Timer,Runntime.shutdown的时候会调用cancel()结束定时,
而定时执行的逻辑就是PingTask的run()方法了
*/
void setupPingTask() {
if (!this.canSkipPing()) {
if (this.lbTimer != null) {
this.lbTimer.cancel();
}
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
this.lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + uuid + "-" + this.name, true);
this.lbTimer.schedule(new BaseLoadBalancer.PingTask(), 0L, (long)(this.pingIntervalSeconds * 1000));
this.forceQuickPing();
}
}
定时执行PingTask,task里边逻辑是new Pinger(pingStrategy).runPinger();
public void runPinger() throws Exception {
if (!pingInProgress.compareAndSet(false, true)) {
logger.debug("runPinger pingInProgress " + pingInProgress.get());
return; // Ping in progress - nothing to do
}
// we are "in" - we get to Ping
Server[] allServers = null;
boolean[] results = null;
Lock allLock = null;
Lock upLock = null;
String uuid = UUID.randomUUID().toString().replaceAll("-","");
try {
/*
* The readLock should be free unless an addServer operation is
* going on...
*/
allLock = allServerLock.readLock();
allLock.lock();
allServers = allServerList.toArray(new Server[allServerList.size()]);
allLock.unlock();
int numCandidates = allServers.length;
results = pingerStrategy.pingServers(ping, allServers);
final List<Server> newUpList = new ArrayList<Server>();
final List<Server> changedServers = new ArrayList<Server>();
final Map<String, Server> allServersMap = new HashMap<String, Server>(); //暂存allServers,里边的Server状态是正确的
//根据ping结果,设置allServers里Server的状态
for (int i = 0; i < numCandidates; i++) {
boolean isAlive = results[i];
Server svr = allServers[i];
boolean oldIsAlive = svr.isAlive();
svr.setAlive(isAlive);
allServersMap.put(svr.getId(), svr);
if (oldIsAlive != isAlive) {
changedServers.add(svr); //状态发生变化的
logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
}
if (isAlive) {
newUpList.add(svr);
}
}
upLock = upServerLock.writeLock();
upLock.lock();
upServerList = newUpList;
upLock.unlock();
changeServerStatusInAllServerList(allServersMap); //补偿逻辑
notifyServerStatusChangeListener(changedServers);
} finally {
pingInProgress.set(false);
}
}
runPinger()的关键代码是results = pingerStrategy.pingServers(ping, allServers);
里边就是对每个server都用iPing接口实现的ping逻辑进行验证一遍,获得每个server的最新状态。
记录一下本次changedServers都有哪些。注意,这里会去持有一个真正的allServerList的引用并去更新状态,upServerList也会去更新。 allServers = allServerList.toArray(new Server[allServerList.size()]); 所以定时线程PingTask的作用是用iPing里的逻辑验证服务器的状态,然后更新服务器列表状态。
但是,坑爹的是ping持有的这个allServerList的引用会“失效”!
ping接口持有的allServerList的引用为什么会失效,原因是PollingServerListUpdater线程每次加载配置的serverlist的时候,默认都是alive的、且用的是allServerList = serverList
这种方式,这样就算allServerList定义为volatile类型也无济于事,毕竟整个引用指向了另一个对象(这时候之前的对象正在被Ping接口持有)。
而这时候Ping接口里边的逻辑因为在阻塞超时2秒,没有执行完,所以相当于PIng接口里边持有的allServerList指向的还是之前的那个对象。所以这时候Ping接口根据当前检查结果去更新服务列表也是没用的了,因为持有的服务列表是过期的对象。
简单来说就是polling和ping两个定时任务同时去修改服务列表、加上polling默认服务alive、且直接修改allServerList引用,导致的并发问题。(源码里allServerList有个读写锁,但是对这个场景来说并没有用)
我的解决办法是在BaseLoadBalancer里边在runPinger最后加了补偿逻辑,最后再更新一遍allServerList;
/**
* 在runPinger之后,再修改一次allServerList,防止runPinger的过程中allServerList已经被修改了引用
* 这时候runPinger持有的已经是过期引用,修改的server状态也是没法更新到真正的allServerList了。
* */
private void changeServerStatusInAllServerList(Map<String, Server> allServersMap) {
Lock allLock = null;
allLock = allServerLock.writeLock();
allLock.lock();
try {
for(Server s : allServerList) {
Server serverWithRightStatus = allServersMap.get(s.getId());
if(null != serverWithRightStatus) {
s.setAlive(serverWithRightStatus.isAlive());
logger.debug("修改了Server {}的状态,最新状态为alive={}" , s.getId(), s.isAlive());
}
}
}finally {
allLock.unlock();
}
}
但这样还不够,如果一个节点down掉,在下一次ping循环时,超时2秒之内pinger还没返回的时候,如果正好这2s的时候发生了调用,本该down的节点仍然被设置的是alive,所以关键问题是要改一下polling线程每次“都设置为alive”的逻辑:回顾一下前文中BaseLoadBalancer的setServersList(List lsrv)
方法中的两段注释:
/*
*
allServers里边新加载的服务列表原来默认都是alive的,
这里改为如果服务在当前的allServerList里存在、先以当前的为准;
如果不存在则先置为alive,我们认为这样更合理。
这样当pinger超时阻塞而暂未返回的时候,不会得出错误的服务状态
*/
for(Server s : allServerList) {
if(((Server) server).getId().equals(s.getId())) {
((Server) server).setAlive(s.isAlive());
}
}
然后又发现了新的问题:down的节点在2秒的时候又被设置回alive了!
检查代码发现是第二次setServerList的时候又给改回来了,也就是zone的setServerList的时候;(见上文,setServerList会被掉两次,后面也会有解释)
if (canSkipPing()) {
/**
* 这段注释掉的原因:
* 每个zone对应的BaseLoadBalancer的ping=null,将会把所有server都再置为alive,
* 与笔者认为的“已经存在的server先默认认为与原来状态一样、新server才默认置为alive”的原则相左
* */
/*
for (Server s : allServerList) {
s.setAlive(true);
}
upServerList = allServerList;
*/
} else if (listChanged) {
forceQuickPing();
}
上面的代码会走到注释的地方,所以还是zone对应的balancer有两个的原因,其中一个ping为null
暂时注释掉这段解决了问题。
BaseLoadBalancer的setServersList方法,执行两次
PollingServerListUpdater-0或者PollingServerListUpdater-1线程组成个线程池,负责执行30秒一次的定时任务。这个定时任务执行BaseLoadBalancer
的setServersList方法,也执行了两次。
原因是在DynamicServerListLoadBalancer
初始化的时候会初始化PollingServerListUpdater
并调用其start()启动定时任务,定时执行updateAction.doUpdate(),里边也就是setServersList(List lsrv),它接连调用了BaseLoadBalancer
的setServersList方法,和一个setServerListForZones方法,而后者的实现是在子类ZoneAwareLoadBalancer
里,getLoadBalancer(zone).setServersList(entry.getValue());就又会调用一次setServersList方法了。这就是为啥日志里看是PollingServerListUpdater线程执行了两次BaseLoadBalancer的setServersList的逻辑。
这个执行两次,感觉是一种业务逻辑:服务列表更新,相当于是先更新当前banancer的,然后更新当前所属zone的;也就是说每30秒只执行一次polling线程的逻辑,然后setServersList执行了两次。
对com.netflix.loadbalancer.Server类的字段进行扩充,增加应用名serviceName,Ribbon配置服务列表的时候是IP+Port,我们这次要求加上服务名,即ip:port/serviceName
package com.netflix.loadbalancer;
import com.netflix.util.Pair;
/**
* Class that represents a typical Server (or an addressable Node) i.e. a
* Host:port identifier
*
* @author stonse
*
* 在原来的基础上增加了应用名serviceName
* 刷新服务列表的同时保存应用名,给自定义Ping做心跳检测用
*/
public class Server {
//...
public static final String UNKNOWN_ZONE = "UNKNOWN";
private String host;
private int port = 80;
private String scheme;
private volatile String id;
private volatile boolean isAliveFlag;
private String zone = UNKNOWN_ZONE;
private volatile boolean readyToServe = true;
private String serviceName; //服务名,与serviceName.ribbon.xxx中的serviceName一样,是服务提供者的实际应用名称
// ...
public void setId(String id) {
Pair<String, Integer> hostPort = getHostPort(id);
if (hostPort != null) {
this.id = hostPort.first() + ":" + hostPort.second();
this.host = hostPort.first();
this.port = hostPort.second();
this.scheme = getScheme(id);
} else {
this.id = null;
}
this.serviceName = parseServiceName(id); //add by liny
}
//获取服务名
public String getServiceName() {
return this.serviceName;
}
//解析服务名
private String parseServiceName(String id) {
if (id != null) {
if (id.toLowerCase().startsWith("http://")) {
id = id.substring(7);
} else if (id.toLowerCase().startsWith("https://")) {
id = id.substring(8);
}
if (id.contains("/")) {
int slash_idx = id.indexOf("/");
id = id.substring(slash_idx);
if(id.length()>1)
return id.substring(1);
else
return "";
}else {
return "";
}
}else {
return null;
}
}
// ...
}
到此,我们了解了定时更新服务列表与定时Ping来判断服务节点的可用性。接下来看下组件提供的工具类的代码:
/**
* 创建一个有负载均衡功能的restTemplate,最终提供出去的工具类
*
* */
public class LbRestTemplate {
private Logger logger = LoggerFactory.getLogger(LbRestTemplate.class);
// @Autowired
private RestTemplate restTemplate;
// @Autowired
private LoadBalancerClient loadBalancer;
public LbRestTemplate(RestTemplate restTemplate, LoadBalancerClient loadBalancer){
this.restTemplate = restTemplate;
this.loadBalancer = loadBalancer;
}
public <T extends Object> T getForBean(String serviceName, String interfaceUrl, Class<T> clazz){
ServiceInstance instance = loadBalancer.choose(serviceName);
String contextPath ;
if(null == instance)
throw new RuntimeException(serviceName + "当前无可用节点");
URI serviceAddress = instance.getUri();
if(instance instanceof RibbonServer) {
contextPath = ((RibbonServer) instance).getServer().getServiceName();
}else {
contextPath = serviceName;
}
String serviceFullUrl = serviceAddress + "/" + contextPath + interfaceUrl;
logger.debug("serviceFullUrl:{}", serviceFullUrl);
return restTemplate.getForObject(serviceFullUrl, clazz);
}
public <T extends Object> T postForBean(String serviceName, String interfaceUrl, Object requestBody, Class<T> clazz){
ServiceInstance instance = loadBalancer.choose(serviceName);
String contextPath ;
if(null == instance)
throw new RuntimeException(serviceName + "当前无可用节点");
URI serviceAddress = instance.getUri();
if(instance instanceof RibbonServer) {
contextPath = ((RibbonServer) instance).getServer().getServiceName();
}else {
contextPath = serviceName;
}
String serviceFullUrl = serviceAddress + "/" + contextPath + interfaceUrl;
logger.debug("serviceFullUrl:{}", serviceFullUrl);
return restTemplate.postForObject(serviceFullUrl, requestBody, clazz);
}
}
LbRestTemplate通过Spring SPI机制注入到Spring IOC容器后,LoadBalancerClient会Ribbon的RibbonAutoConfiguration也通过SPI自动注入:
@Bean
@ConditionalOnMissingBean(LoadBalancerClient.class)
public LoadBalancerClient loadBalancerClient() {
return new RibbonLoadBalancerClient(springClientFactory());
}
RibbonLoadBalancerClient.choose(serviceName)最终会走到ZoneAwareLoadBalancer以及BaseLoadBalancer的chooseServer方法里,后者掉用this.rule.choose()方法来根据Rule来选服务。
总结:
至此,Ribbon的整个流程就串起来了:polling动态更新服务列表、服务状态判定ping、动态路由根据rule选取服务节点。
Ribbon一般是配合Eureka使用,节点健康状态从Eureka获取到本地然后直接取用、不会发生阻塞,我们使用了自定义的Ping根据心跳检查来判定对端服务节点的可用状态,当节点故障时会有超时阻塞。所以针对阻塞的情况我们对Ribbon源代码中节点状态逻辑做了一系列的优化。