作者:Rafał Kowalski ,原文:Reactive Service-To-Service Communication With RSocket (Part 2): Load Balancing and Resumability
本文是《用 RSocket 解决响应式服务之间的通讯》微型系列的第二篇文章,它将帮助你熟悉 RSocket——一种可能会彻底改变机器之间通讯的新二进制协议。在以下段落中,我们将讨论在云环境中的负载平衡问题以及介绍可恢复性能力,可恢复性能力有助于解决网络问题,尤其是在 IOT 系统中。
如果您不了解 RSocket 基础知识,请查看该系列之前的文章。
高可用性和负载平衡是企业级系统的重要能力
类似银行和保险等许多业务领域,可用性和可靠性是应用程序的核心能力。在这些要求苛刻的行业中,即使在高流量、网络问题导致延迟增加或自然灾害期间,服务也必须 24*7 全天候提供服务。为了确保该软件始终能够提供用户使用,通常会在多个可用区域中对其进行冗余部署。
在这种情况下,每个微服务的至少两个实例部署在至少两个可用性区域中。多个可用区内冗余部署让系统具有弹性且增加了容量(更多个实例能够处理更高的负载)。那么这样会带来什么问题呢?冗余部署引入了额外的复杂性。作为研发人员,我们必须确保进入的的流量分布在所有可用实例中(如果流量打到不可用的实例上,会导致请求失败)。
有两种主要的方法可解决这个问题:服务端负载平衡和客户端负载平衡。
第一种方法(服务端负载平衡)基于以下假设,请求者不知道响应者的 IP 地址。取而代之的是,请求者与负载均衡器进行通信,负载均衡器负责在后端的微服务之间分配进入的请求,类似产品有 Ngnix。这种设计在云时代很容易采用。 IaaS 提供商通常具有内置的可靠解决方案,例如 Amazon Web Services 中提供的 Elastic Load Balancer。此外,这种设计有助于开发比普通轮询式负载均衡更复杂的路由策略(例如,自适应负载平衡或链式故障转移)。该服务端负载平衡的主要缺点是必须配置和部署额外的资源,如果我们的系统包含数百个微服务,这可能会很痛苦。此外,它可能会影响延迟-每个请求在负载均衡器上会增加了网络一跳。
第二种方法(客户端负载平衡)与第一种方法有些相反。这里请求者知道给定微服务的每个实例的 IP 地址,而不是通过连接到响应者的中心点来进行负载均衡。有了这些微服务的实例 IP 地址,客户端就可以自己选择响应者实例,然后向其发送请求。该方法好处是不需要任何额外资源,但是必须确保请求者具有响应者所有实例的 IP 地址(请参阅如何使用服务发现模式来处理它)。客户端负载平衡模式的主要优点是其性能(可以减少一个额外的“网络跃点”),进而可以显着减少延迟。这也是 RSocket 实现客户端负载平衡模式的关键原因之一。
RSocket 中的客户端负载平衡
在代码实现方面,RSocket 中客户端负载平衡的实现非常简单。该机制依赖于LoadBalancedRSocketMono
对象,根据该对象会从一组可用的 RSocket 实例中选择合适的 RSocket 实例。要访问 RSocket,我们必须订阅LoadBalancedRSocketMono
,onNext
该方法接收所有 RSocket 实例。而且,它为每个 RSocket 计算统计信息,以便能够估计每个实例的负载,并在特定时间点根据该负载选择性能最佳的实例。
该算法考虑了多个参数,例如延迟,保持的连接数以及未处理的请求数。每个 RSocket 的运行状况都由可用性参数反映出来,该参数的值从零到一,其中零表示给定实例无法处理任何请求,而一则表示分配的请求完全处理完成的 RSocket。下面的代码片段显示了 RSocket 负载平衡 的最基本示例,该示例连接到响应器的三个不同实例并执行 100 个请求。每次它从LoadBalancedRSocketMono
对象获取 RSocket 。
@Slf4j
public class LoadBalancedClient {
static final int[] PORTS = new int[]{7000, 7001, 7002};
public static void main(String[] args) {
List rsocketSuppliers = Arrays.stream(PORTS)
.mapToObj(port -> new RSocketSupplier(() -> RSocketFactory.connect()
.transport(TcpClientTransport.create(HOST, port))
.start()))
.collect(Collectors.toList());
LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono.create((Publisher>) s -> {
s.onNext(rsocketSuppliers);
s.onComplete();
});
Flux.range(0, 100)
.flatMap(i -> balancer)
.doOnNext(rSocket -> rSocket.requestResponse(DefaultPayload.create("test-request")).block())
.blockLast();
}
}
值得提到的是,RSocket 中的客户端负载均衡器还会处理无效连接。如果在LoadBalancedRSocketMono
中注册的任何 RSocket 实例停止响应,RSocket 将会自动尝试重新连接。默认情况下,它将在 25 秒内执行五次尝试。如果未成功,则将从可用连接池中删除给定的 RSocket。这种设计结合了服务器端负载平衡和低延迟的优点,并减少了客户端负载平衡的“网络跳数”。
无效连接和恢复机制
在云环境中,进行机器之间通信,实时流数据交互一般不会出现什么网络问题,但是试想一下,如果我们将物联网设备放置在无法稳定、可靠地通过网络连接访问的区域中,问题就比较复杂了。很容易想到,在这样的环境中可能面临的两个主要问题:“网络延迟”和“连接稳定性”。从软件的角度来看,我们对“网络延迟”可能没有太好的办法,但是对于“连接稳定性”我们还能够做些工作的。
我们来试试用 RSocket 解决问题,先来选择合适的交互模型开始。在这种情况下,最合适的是“请求流”方法,其中部署在云环境中的微服务是请求者,而温度传感器是响应者。选择好交互模型后,我们再应用可恢复性策略。在 RSocket 中,我们通过在RSocketFactory
上调用的resume()
方法来实现,如下例所示:
@Slf4j
public class ResumableRequester {
private static final int CLIENT_PORT = 7001;
public static void main(String[] args) {
RSocket socket = RSocketFactory.connect()
.resume()
.resumeSessionDuration(RESUME_SESSION_DURATION)
.transport(TcpClientTransport.create(HOST, CLIENT_PORT))
.start()
.block();
socket.requestStream(DefaultPayload.create("dummy"))
.map(payload -> {
log.info("Received data: [{}]", payload.getDataUtf8());
return payload;
})
.blockLast();
}
}
@Slf4j
public class ResumableResponder {
private static final int SERVER_PORT = 7000;
static final String HOST = "localhost";
static final Duration RESUME_SESSION_DURATION = Duration.ofSeconds(60);
public static void main(String[] args) throws InterruptedException {
RSocketFactory.receive()
.resume()
.resumeSessionDuration(RESUME_SESSION_DURATION)
.acceptor((setup, sendingSocket) -> Mono.just(new AbstractRSocket() {
@Override
public Flux requestStream(Payload payload) {
log.info("Received 'requestStream' request with payload: [{}]", payload.getDataUtf8());
return Flux.interval(Duration.ofMillis(1000))
.map(t -> DefaultPayload.create(t.toString()));
}
}))
.transport(TcpServerTransport.create(HOST, SERVER_PORT))
.start()
.subscribe();
log.info("Server running");
Thread.currentThread().join();
}
}
请注意,要运行提供的示例,您需要在计算机上安装socat
,请参阅README文件以了解更多详细信息
请求方和响应方的机制类似,它基于一些组件。首先,有一个ResumableFramesStore
,它用作帧的缓冲区。按照默认实现,它会将它们存储在内存中,但是我们可以通过实现ResumableFramesStore
接口(例如,将帧存储在分布式缓存中,如 Redis)来轻松调整以满足业务的需求。这个缓冲区是用来保存在“keep-alive 帧”之间发出的数据,“keep-alive 帧”是定期来回发送,能够探测出交互双方之间的连接是否稳定;另外,“keep-alive 帧(保活帧)”还包含令牌,这个令牌是为了确定请求者和响应者的最后接收位置。当交互双方需要要恢复连接时,它将发送带有“隐含位置”的“resume 帧(恢复帧)”。隐含位置是根据上次接收到的位置(与“保活帧”中的值相同)加上该时刻接收到的帧的长度计算得出的。此算法适用于通信的双方,在恢复帧中会含有“最后接收的服务器位置”和“第一个客户端可用位置”信息的令牌。下图显示了恢复操作的整个流程:
通过采用 RSocket 协议中内置的可恢复性机制,我们可以用相对较少的精力来减少网络问题的影响。就像上面的示例所示,可恢复性在数据流应用程序中可能非常有用,尤其是在 IOT 设备与云环境中的服务通信的场景下。
总结
在本文中,我们讨论了 RSocket 协议的更多高级功能,这些功能有助于减少网络对系统可操作性的影响。我们介绍了客户端负载平衡模式和可恢复性机制的实现。这些功能与健壮的交互模型相结合,构成了协议的核心。
在本微型系列的最后一篇文章中,我们将介绍 RSocket 之上的构建可用抽象层。