摘要
Gprc Java最近1.3.0 Release了
- 首先是Keepalived机制
1)客户端的Keepalives设置可以work了
2)设置Keepalives后,在整个网络连接建立完成之后,会不断的发送ping消息给服务端
3)服务端根据Keppavlied的ping消息来自动识别哪些连接是断了的 - 服务端可以设置连接的时效了
1)当设置连接的最大时间到了,该连接将会中断掉 - 增加了trace的一些传递
- 对LoadBalancers进行了一些变化
重点说说LoadBalancer的一些变化
在1.1.0版本,整个Lb的工厂类是这样的
private static class RoundRobinLoadBalancer<T> extends LoadBalancer<T> {
private RoundRobinLoadBalancer(TransportManager<T> tm){
this.tm = tm;
}
而TransportManager是建立trsnsport的管理通道,而真正做负载均衡是这样的
@Override
public T pickTransport(Attributes affinity) {
final RoundRobinServerList<T> addressesCopy;
synchronized (lock) {
if (closed) {
return tm.createFailingTransport(SHUTDOWN_STATUS);
}
if (addresses == null) {
if (nameResolutionError != null) {
return tm.createFailingTransport(nameResolutionError);
}
if (interimTransport == null) {
interimTransport = tm.createInterimTransport();
}
return interimTransport.transport();
}
addressesCopy = addresses;
}
return addressesCopy.getTransportForNextServer();
}
而现在这些都没了,变成了一个叫做Subchannel来做这件事情了
针对Subchannel在他的注释里写了
/**
* A logical connection to a server, or a group of equivalent servers represented by an {@link
* EquivalentAddressGroup}.
*
* <p>It maintains at most one physical connection (aka transport) for sending new RPCs, while
* also keeps track of previous transports that has been shut down but not terminated yet.
*
* <p>If there isn't an active transport yet, and an RPC is assigned to the Subchannel, it will
* create a new transport. It won't actively create transports otherwise. {@link
* #requestConnection requestConnection()} can be used to ask Subchannel to create a transport if
* there isn't any.
*/
大概意思是一个逻辑连接而非真正的物理连接,里面有可能有一个或多个地址来
他维护了一个真正的物理连接去真正的建立Rpc的连接,而当连接状态还没有变成活动的时候,他将建立一个transport
所以这里比较有意思:
首先,grpc针对transport再进行了一次封装,把transport的建立提前了,在从nameresove拿到ip地址后,马上就会进行requestConnection
@Override
public void requestConnection() {
subchannel.obtainActiveTransport();
}
@Nullable
ClientTransport obtainActiveTransport() {
ClientTransport savedTransport = activeTransport;
if (savedTransport != null) {
return savedTransport;
}
try {
synchronized (lock) {
savedTransport = activeTransport;
// Check again, since it could have changed before acquiring the lock
if (savedTransport != null) {
return savedTransport;
}
if (state.getState() == IDLE) {
gotoNonErrorState(CONNECTING);
startNewTransport();
}
}
} finally {
channelExecutor.drain();
}
return null;
而真正实现负载均衡的是SubchannelPicker
/**
* The main balancing logic. It <strong>must be thread-safe</strong>. Typically it should only
* synchronize on its own state, and avoid synchronizing with the LoadBalancer's state.
*
* <p>Note: Implementations should override exactly one {@code pickSubchannel}.
*/
@ThreadSafe
public abstract static class SubchannelPicker {
/**
* Make a balancing decision for a new RPC.
*
* @param args the pick arguments
*/
public abstract PickResult pickSubchannel(PickSubchannelArgs args);
}
所以,在这里的整个负载均衡机制和原来1.1.0版本已经完全不同,如果想在负载均衡前面再加一次路由规则的限制的话,需要重新修改
我是这样修改的:
对SubchannelPicker进行扩展,当选择做负载均衡的时候,把某一些建立的Subchannel给踢掉
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Map<String, Object> affinity = args.getCallOptions().getOption(GrpcClientCall.CALLOPTIONS_CUSTOME_KEY);
GrpcURL refUrl = (GrpcURL) affinity.get(GrpcClientCall.GRPC_REF_URL);
if (size > 0) {
Subchannel subchannel = nextSubchannel(refUrl);
affinity.put(GrpcClientCall.GRPC_NAMERESOVER_ATTRIBUTES, nameResovleCache);
return PickResult.withSubchannel(subchannel);
}
if (status != null) {
return PickResult.withError(status);
}
return PickResult.withNoResult();
}
private Subchannel nextSubchannel(GrpcURL refUrl) {
if (size == 0) {
throw new NoSuchElementException();
}
synchronized (this) {
Subchannel val = list.get(index);
index++;
if (index >= size) {
index = 0;
}
//剔除不合规的Subchannel
boolean discard = discard(refUrl, val);
if (discard && index != 0) {
nextSubchannel(refUrl);
}
return val;
}
}
但是这样有一个问题,剔除掉Subchannel,但是其实其Transport其实还在,还是浪费了整个客户端的资源,这个问题暂时也没有好的办法,如果把该Subchannel给shutdown掉,由于在接到了nameresovle的地址列表了,已经建立起了transpot了,如果shutdown了,将会触发nameresovle的refresh,重新获取地址,这样又是一个重复的循环,并不合算,所以只能是暂时浪费了一点内存来做服务路由了
文章写的比较乱,主要是在升级Grpc后碰到的一些问题,然后想出的一些解决办法,希望看得懂
另外,吐槽一下,在选择很多人说自己写个rpc,他搞一下,他也弄一下,其实rpc不是那么简单的传输一个数据就拉倒这么简单,维护连接的状态,传输的数据这些都是要方方面面需要考虑的事情,现在业界有许多rpc的基础组件在了,如果用在生产上就不要自己造个轮子重新实现一套rpc
基于http2的有grpc、armeria都是一个比较靠谱的选型,而不是一来就我要自己造一个轮子实现以下rpc