最近对负载均衡的实现原理比较感兴趣,于是阅读了其相关的源码并通过作图记录下整个流程,对于自己感兴趣的某一个模块我会探究下去,后面我也会带着当初我不懂的问题去寻找答案,包括我之前写博客的方式也是带是着问题去找答案,这样会有一个很好的方向。
Q:
1.ribbon负载均衡如何获取服务列表
2.负载均衡如何保持数据更新
3.负载均衡的策略
4.Spring RestTemplate的使用
关于负载均衡的使用,网上已有比较全面的资料,在这里主要记录,Spring RestTemplate 是如何结合负载均衡使用的
开启负载均衡
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
例子,调用服务EUREKACLIENT的接口
/**
* 沈敏杰
* 18/6/8 下午3:37
* 测试服务
*/
@Service
public class HelloService {
@Autowired
RestTemplate restTemplate;
public String send(String name) {
return restTemplate.postForObject("http://EUREKACLIENT/helloController/hi?name=" + name, null, String.class);
}
}
通过使用声明Bean并且通过注解@LoadBalanced开启负载均衡,实际上RestTemplate能做到负载均衡是通过添加拦截器,将url的服务,通过ILoadBalancer获取该服务的实际地址。我们通过@LoadBalanced开启负载均衡,RestTemplate注册拦截器。
LoadBalancerAutoConfiguration.java
@Configuration
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
/**
* 声明负载均衡拦截器 Bean
*/
@Bean
public LoadBalancerInterceptor ribbonInterceptor(
LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
/**
* 创建RestTemplate 定制器
* restTemplate添加拦截器
*/
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
以上为开启负载均衡,restTemplate添加拦截器后发起请求:
restTemplate.postForObject("http://EUREKACLIENT/helloController/hi?name=" + name, null, String.class);
我们先通过这个进行查找源码分析
RestTemplate.java:
@Override
@Nullable
public <T> T postForObject(String url, @Nullable Object request, Class<T> responseType,
Object... uriVariables) throws RestClientException {
RequestCallback requestCallback = httpEntityCallback(request, responseType);
HttpMessageConverterExtractor<T> responseExtractor =
new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);
return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables);
}
@Override
@Nullable
public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {
URI expanded = getUriTemplateHandler().expand(url, uriVariables);
return doExecute(expanded, method, requestCallback, responseExtractor);
}
@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
try {
//创建请求对象
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}
当我们发起一个请求的时候,会执行doExecute,然后创建一个http请求对象ClientHttpRequest,ClientHttpRequest是一个接口
/**
* Represents a client-side HTTP request.
* Created via an implementation of the {@link ClientHttpRequestFactory}.
*
* <p>A {@code ClientHttpRequest} can be {@linkplain #execute() executed},
* receiving a {@link ClientHttpResponse} which can be read from.
*
* @author Arjen Poutsma
* @since 3.0
* @see ClientHttpRequestFactory#createRequest(java.net.URI, HttpMethod)
*/
public interface ClientHttpRequest extends HttpRequest, HttpOutputMessage {
/**
* Execute this request, resulting in a {@link ClientHttpResponse} that can be read.
* @return the response result of the execution
* @throws IOException in case of I/O errors
*/
ClientHttpResponse execute() throws IOException;
}
我们看到注释:
Created via an implementation of the {@link ClientHttpRequestFactory}.
ClientHttpRequest是通过ClientHttpRequestFactory创建的,看回RestTemplate是如何创建ClientHttpRequest的
/**
* Create a new {@link ClientHttpRequest} via this template's {@link ClientHttpRequestFactory}.
* @param url the URL to connect to
* @param method the HTTP method to execute (GET, POST, etc)
* @return the created request
* @throws IOException in case of I/O errors
* @see #getRequestFactory()
* @see ClientHttpRequestFactory#createRequest(URI, HttpMethod)
*/
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
if (logger.isDebugEnabled()) {
logger.debug("Created " + method.name() + " request for \"" + url + "\"");
}
return request;
}
/**
* Overridden to expose an {@link InterceptingClientHttpRequestFactory}
* if necessary.
* @see #getInterceptors()
*/
@Override
public ClientHttpRequestFactory getRequestFactory() {
//获取当前对象持有的拦截器列表
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
//如果拦截器列表不为空,创建InterceptingClientHttpRequestFactory并返回
if (!CollectionUtils.isEmpty(interceptors)) {
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null) {
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}
在RestTample获取ClientHttpRequestFactory工厂类,根据当前是否持有拦截器,如果有拦截器则返回InterceptingClientHttpRequestFactory http请求工厂类:
/**
* {@link ClientHttpRequestFactory} wrapper with support for {@link ClientHttpRequestInterceptor}s.
*
* @author Arjen Poutsma
* @since 3.1
* @see ClientHttpRequestFactory
* @see ClientHttpRequestInterceptor
*/
public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {
private final List<ClientHttpRequestInterceptor> interceptors;
/**
* Create a new instance of the {@code InterceptingClientHttpRequestFactory} with the given parameters.
* @param requestFactory the request factory to wrap
* @param interceptors the interceptors that are to be applied (can be {@code null})
*/
public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,
@Nullable List<ClientHttpRequestInterceptor> interceptors) {
super(requestFactory);
this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());
}
@Override
protected org.springframework.http.client.ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
//创建具有拦截器请求类
return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}
}
获取到ClientHttpRequest请求对象后,发请求实际是执行execute()
response = request.execute();
InterceptingClientHttpRequest 实际上不是没有直接实现execute(),经过一层一层查找,实现execute()的是AbstractClientHttpRequest
public abstract class AbstractClientHttpRequest implements ClientHttpRequest {
private final HttpHeaders headers = new HttpHeaders();
private boolean executed = false;
@Override
public final HttpHeaders getHeaders() {
return (this.executed ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
}
@Override
public final OutputStream getBody() throws IOException {
assertNotExecuted();
return getBodyInternal(this.headers);
}
@Override
public final ClientHttpResponse execute() throws IOException {
assertNotExecuted();
ClientHttpResponse result = executeInternal(this.headers);
this.executed = true;
return result;
}
/**
* Assert that this request has not been {@linkplain #execute() executed} yet.
* @throws IllegalStateException if this request has been executed
*/
protected void assertNotExecuted() {
Assert.state(!this.executed, "ClientHttpRequest already executed");
}
/**
* Abstract template method that returns the body.
* @param headers the HTTP headers
* @return the body output stream
*/
protected abstract OutputStream getBodyInternal(HttpHeaders headers) throws IOException;
/**
* Abstract template method that writes the given headers and content to the HTTP request.
* @param headers the HTTP headers
* @return the response object for the executed request
*/
protected abstract ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException;
}
execute()会调用AbstractClientHttpRequest自己的一个抽象方法executeInternal(HttpHeaders headers),AbstractBufferingClientHttpRequest继承成AbstractClientHttpRequest,实现了executeInternal(HttpHeaders headers)
abstract class AbstractBufferingClientHttpRequest extends AbstractClientHttpRequest {
private ByteArrayOutputStream bufferedOutput = new ByteArrayOutputStream(1024);
@Override
protected OutputStream getBodyInternal(HttpHeaders headers) throws IOException {
return this.bufferedOutput;
}
@Override
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
byte[] bytes = this.bufferedOutput.toByteArray();
if (headers.getContentLength() < 0) {
headers.setContentLength(bytes.length);
}
ClientHttpResponse result = executeInternal(headers, bytes);
this.bufferedOutput = new ByteArrayOutputStream(0);
return result;
}
/**
* Abstract template method that writes the given headers and content to the HTTP request.
* @param headers the HTTP headers
* @param bufferedOutput the body content
* @return the response object for the executed request
*/
protected abstract ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput)
throws IOException;
}
最后我们看到了executeInternal(HttpHeaders headers)还调用了一个抽象方法,executeInternal(HttpHeaders headers, byte[] bufferedOutput),这个就是最后拦截器请求对象实现的方法
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
....
@Override
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution requestExecution = new org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution();
return requestExecution.execute(this, bufferedOutput);
}
}
在executeInternal(HttpHeaders headers, byte[] bufferedOutput)中,声明一个执行器,InterceptingRequestExecution,InterceptingRequestExecution是InterceptingClientHttpRequest中的一个内部类
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
public InterceptingRequestExecution() {
//根据拦截器列表,创建迭代器
this.iterator = interceptors.iterator();
}
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
//通过迭代器获取拦截器
if (this.iterator.hasNext()) {
//通过拦截器,执行请求
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
org.springframework.http.client.ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
InterceptingRequestExecution获取外部类的拦截器列表,通过该列表创建迭代器,从而获取拦截器,开头我们看到,@LoadBalanced开启负载均衡,实际上就是RestTamplate添加拦截器,以致发请求的时候会执行拦截器
nextInterceptor.intercept(request, body, this);
实际执行是LoadBalancerInterceptor,我们看一下LoadBalancerInterceptor的代码:
/**
* @author Spencer Gibb
* @author Dave Syer
* @author Ryan Baxter
* @author William Tran
*/
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
}
}