本人小白,这个文章是本人的阅读笔记,不是权威解读,需要自己甄别对错
大致流程,具体代码没深入分析,大致跟读了下源码,具体的部分后面再起文章分析
一般我们使用负载均衡策略的话,这么使用的:
@RestController
@Configuration
public class ServiceBController {
@Bean
@LoadBalanced
public RestTemplate getRestTemplate() {
return new RestTemplate();
}
@RequestMapping(value = "/say/{name}", method = RequestMethod.GET)
public String greeting(@PathVariable("name") String name) {
RestTemplate restTemplate = getRestTemplate();
return restTemplate.getForObject("http://ServiceA/sayHello/" + name, String.class);
}
}
在使用RestTemplate的时候,我们在声明的Bean上面加上了@LoadBalanced,这个就是对RestTemplate进行封装,准确点说是加入了Ribbon的拦截器
直接肝吧,不逼逼……………………………………………………
先看下@LoadBalanced注解声明:
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}
可以看见他是继承了@Qualifier这个注解,在Spring当中是一般的理解就是两个作用:
- 在使用@Autowire自动注入的时候,加上@Qualifier(“test”)可以指定注入哪个对象
- 可以作为筛选的限定符,我们在做自定义注解时可以在其定义上增加@Qualifier,用来筛选需要的对象
在这里,@LoadBalanced就是使用第二作用,那么在哪进行筛选注入的呢???
首先,我么看看@LoadBalanced注解的源码位于哪个包,一般以SpringBoot的尿性,在相关类的包下会有AutoConfiguration的配置类,用来向SpringBoot容器注入相关的实例Bean
在这个包里面看见两个Configuration,AsyncLoadBalancerAutoConfiguration和LoadBalancerAutoConfiguration,猜猜都知道Async开头的是和异步相关的,不用管,直接干LoadBalancerAutoConfiguration
org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
配置类的注解就不扯了,老生常谈的东西,直接看这段代码,直接使用的@Qualifier指定类型注入特性,所以在LoadBalancerAutoConfiguration加载的时候,会将所有被@LoadBalanced修饰的RestTemplate的都注入到restTemplates这个集合当中。
那么这个集合在哪使用呢???我们找到下面这个Bean实例化
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
final List<RestTemplateCustomizer> customizers) {
return new SmartInitializingSingleton() {
@Override
public void afterSingletonsInstantiated() {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
}
};
}
这里使用customizers对所有的restTemplate进行包装,那么customizers怎么实例化的呢
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return new RestTemplateCustomizer() {
@Override
public void customize(RestTemplate restTemplate) {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
}
};
}
可以看出RestTemplateCustomizer的实例化需要一个LoadBalancerInterceptor,且实现的customize()方法就是在RestTemplate中加入loadBalancerInterceptor拦截器,接着找找拦截器的初始化
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
就是直接new LoadBalancerInterceptor()了,但是这个LoadBalancerClient这个很有讲究,到时候需要的时候再细讲
总的来说,在这个配置类里干了一件事,收集所有被@LoadBalanced注解修饰的RestTemplate,并且为所有的RestTemplate加上一个拦截器 LoadBalancerInterceptor
好了,核心类找到了,就是这个拦截器LoadBalancerInterceptor
接下来我们看看拦截器里面干了什么
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
核心方法就是这个intercept()方法,具体调用RestTemplate怎么使用拦截器的话,走断点,看下线程堆栈就行,跳转的太多,懒得说。
这个方法前两步就是获取serviceName,例如:http://ServiceB/sayHello/,它的serviceName就是ServiceB
接下来我们看看 execute()方法
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
serviceId), serverIntrospector(serviceId).getMetadata(server));
return execute(serviceId, ribbonServer, request);
}
追进第一步ILoadBalancer loadBalancer = getLoadBalancer(serviceId),获取该服务对应的ILoadBalancer
看看是怎么获取的?
经过一顿猛如虎的跳转 RibbonLoadBalancerClient → SpringClientFactory → NamedContextFactory,最后真正的方法啊是这样的:
public <T> T getInstance(String name, Class<T> type) {
AnnotationConfigApplicationContext context = getContext(name);
if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context,
type).length > 0) {
return context.getBean(type);
}
return null;
}
先看getContext(name)方法
protected AnnotationConfigApplicationContext getContext(String name) {
if (!this.contexts.containsKey(name)) {
synchronized (this.contexts) {
if (!this.contexts.containsKey(name)) {
this.contexts.put(name, createContext(name));
}
}
}
return this.contexts.get(name);
}
我必须说,看这个方法,我长见识了。
这个contexts是一个Map<String, AnnotationConfigApplicationContext>,存储的是 key:服务名称,value是这个服务对应的上下文。
意思是说每个服务对应的都是自己单独的上下文。
我们看这个方法,首先判断当前服务对应的上下文是否初始化了,就是Map中是否存了,如果有直接获取返回,没有就直接创建在放入contexts中,返回!
这里必须说下这个上下文的创建createContext(name),非常有意思,进入该方法:
protected AnnotationConfigApplicationContext createContext(String name) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
if (this.configurations.containsKey(name)) {
for (Class<?> configuration : this.configurations.get(name)
.getConfiguration()) {
context.register(configuration);
}
}
for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
if (entry.getKey().startsWith("default.")) {
for (Class<?> configuration : entry.getValue().getConfiguration()) {
context.register(configuration);
}
}
}
context.register(PropertyPlaceholderAutoConfiguration.class,
this.defaultConfigType);
context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(
this.propertySourceName,
Collections.<String, Object> singletonMap(this.propertyName, name)));
if (this.parent != null) {
// Uses Environment from parent as well as beans
context.setParent(this.parent);
}
context.refresh();
return context;
}
先不说代码的好坏,这么干第一次见识!!!
我们分析下这个代码,如果看不懂,走断点是非常有效的:
- 首先创建一个基于注解配置的 AnnotationConfigApplicationContext上下文
- 接着读取配置,然后将配置类注册到context中
- 注册解析器
- 容器刷新
- 返回
我们看下configurations是哪些玩意?
居然是配置类!那么在其中声明的@Bean将会进行注册的,接着看看进入context.register(configuration)是哪个,
是org.springframework.cloud.netflix.ribbon.eureka.EurekaRibbonClientConfiguration,那么去看看这个配置类里面声明了哪些Bean
- 实例IPing实现类 NIWSDiscoveryPing
- 实例ServerList实现类 DomainExtractingServerList
- 实例ServerIntrospector实现类 EurekaServerIntrospector
补充下,在context.register(PropertyPlaceholderAutoConfiguration.class,this.defaultConfigType) 除了注册解析器还注册了一个配置类
所以在新创建的context中有org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration的申明的Bean,所以我们看看这里面有啥?主要的实例有:
- ILoadBalancer 实现类ZoneAwareLoadBalancer
- RibbonLoadBalancerContext
- IRule 实现类ZoneAvoidanceRule
- ServerListUpdater 实现类 PollingServerListUpdater
所以创建的上下文中,主要的类有
- 实例IPing实现类 NIWSDiscoveryPing
- 实例ServerList实现类 DomainExtractingServerList
- 实例ServerIntrospector实现类 EurekaServerIntrospector
- ILoadBalancer 实现类ZoneAwareLoadBalancer
- RibbonLoadBalancerContext
- IRule 实现类ZoneAvoidanceRule
- ServerListUpdater 实现类 PollingServerListUpdater
按照上面我们注册的实例,那么获取的ILoadBalancer是ZoneAwareLoadBalancer,接下来接着看获取Server方法:getServer(loadBalancer),这个方法看起来就是获取服务的,一通追踪,我们发现最后调的是choose()方法,这个是选择服务节点的,那么ZoneAwareLoadBalancer中应该维持了服务实例列表。
我们进入RibbonClientConfiguration中,看下实例化的方法
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
直到找到她的父类构造方法:
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
super(clientConfig, rule, ping);
this.serverListImpl = serverList;
this.filter = filter;
this.serverListUpdater = serverListUpdater;
if (filter instanceof AbstractServerListFilter) {
((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
}
restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
boolean primeConnection = this.isEnablePrimingConnections();
// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
this.setEnablePrimingConnections(false);
enableAndInitLearnNewServersFeature();
updateListOfServers();
if (primeConnection && this.getPrimeConnections() != null) {
this.getPrimeConnections()
.primeConnections(getReachableServers());
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
直接剧透,获取注册表的方法是updateListOfServers(),进入该方法:
servers = serverListImpl.getUpdatedListOfServers();
调用的是父类传入的serverList,由此可以直到子类的实现类就是 DomainExtractingServerList,那么就是调用该类的方法
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(this.list
.getUpdatedListOfServers());
return servers;
}
还是调用的别的类的同名方法啊,我们看看DomainExtractingServerList的构造Bean方法:
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
它的构造中传入了DiscoveryEnabledNIWSServerList,那么调用的就是它的同名方法了,最后发现执行的是这个方法:
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
//vipAddresses就是ServiceName
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
//核心方法
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
进入核心核心方法:
@Override
public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure,
@Nullable String region) {
if (vipAddress == null) {
throw new IllegalArgumentException(
"Supplied VIP Address cannot be null");
}
Applications applications;
//获取全量注册表
if (instanceRegionChecker.isLocalRegion(region)) {
applications = this.localRegionApps.get();
} else {
applications = remoteRegionVsApps.get(region);
if (null == applications) {
logger.debug("No applications are defined for region {}, so returning an empty instance list for vip "
+ "address {}.", region, vipAddress);
return Collections.emptyList();
}
}
//依据ServiceName进行筛选获取该服务对应的服务实例
if (!secure) {
return applications.getInstancesByVirtualHostName(vipAddress);
} else {
return applications.getInstancesBySecureVirtualHostName(vipAddress);
}
}
好了,这样就获取了服务名称对应的注册表,接下来执行updateAllServerList(servers)
protected void updateAllServerList(List<T> ls) {
// other threads might be doing this - in which case, we pass
if (serverListUpdateInProgress.compareAndSet(false, true)) {
try {
for (T s : ls) {
s.setAlive(true); // set so that clients can start using these
// servers right away instead
// of having to wait out the ping cycle.
}
setServersList(ls);
super.forceQuickPing();
} finally {
serverListUpdateInProgress.set(false);
}
}
}
代码嵌套很多,就直接说功能,实现的是将注册表赋值给内置的一个allServerList
现在ZoneAwareLoadBalancer中就有了该服务的列表,那么一个问题,Eureka Client的服务列表是30S更新一次,那么相应的LoadBalancer中的服务列表也要更新,还得存在定时更新得方法:
enableAndInitLearnNewServersFeature();
public void enableAndInitLearnNewServersFeature() {
LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
serverListUpdater.start(updateAction);
}
执行的是ServerListUpdater的实现类的,我们看看容器中该接口的实现类 PollingServerListUpdater。
那看看该类的方法
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
延迟1S,执行频率30S,执行方法updateAction.doUpdate();
updateAction是传入的:
com.netflix.loadbalancer.DynamicServerListLoadBalancer
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
从而得知,也是定时拉取本地注册表
好了,现在分析完ZoneAwareLoadBalancer的初始化的几个重要步骤的大概流程,我们接着看看下个方法的执行
Server server = getServer(loadBalancer);
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
一大坨,看的难受,不想看了,其实我们小点难度,分析下单机房部署的模式下是怎么玩的,上面那一坨执行的代码就是这么一点:
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
看下父类的实现吧
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
调用的是rule的核心choose()方法,老步骤,看看初始化的上下文中是哪个实现类 PredicateBasedRule
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
//核心方法
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
来回跳,最后指向的是一个负载均衡算法:
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
自己琢磨下就知道这个就是轮询的算法,并发安全的,那么choose()就是轮询的获取服务实例
好了,主方法 execute(String serviceId, LoadBalancerRequest<T> request) throws IOException 接着分析
下一步就是封装了RibbonServer,不管,直接下一步
execute(serviceId, ribbonServer, request);
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if(serviceInstance instanceof RibbonServer) {
server = ((RibbonServer)serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try {
//核心方法
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}
上面的核心方法调用了传入的LoadBalancerRequest的apply()方法,那么我们回到拦截器的intercept()方法,传入的LoadBalancerRequest构造方法是requestFactory.createRequest(request, body, execution)
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request,
final byte[] body, final ClientHttpRequestExecution execution) {
return new LoadBalancerRequest<ClientHttpResponse>() {
@Override
public ClientHttpResponse apply(final ServiceInstance instance)
throws Exception {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, loadBalancer);
if (transformers != null) {
for (LoadBalancerRequestTransformer transformer : transformers) {
serviceRequest = transformer.transformRequest(serviceRequest, instance);
}
}
return execution.execute(serviceRequest, body);
}
};
}
使用的是一个匿名内部类,实现的apply()方法,那么最后调用的也是这个方法了,看看构造的ServiceRequestWrapper,内部重写了getURI()方法
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(
this.instance, getRequest().getURI());
return uri;
}
其实就是真实地址替换服务名
后面的execution.execute()是org.springframework.http.client.InterceptingClientHttpRequest执行了,Spring底层的Http通信组件,不关心了,不看了,Ribbon的大致流程就是这样,我觉很亮眼的地方就是那一套Server对应的上下文
最后上个流程图的简图便于理解: