响应式编程
本文是对Reactor官方文档Introduction to Reactive Programming部分内容的翻译,希望对理解响应式编程的概念和原理有所帮助。
响应式编程是一种异步编程范式,它关注数据流和变化的传播。这意味着可以通过使用编程语言轻松地表示静态(例如数组)和动态(例如事件发射器)数据流。
Reactive Streams为基于JVM的响应库提供了规范,它定义了一组接口和交互规则。在Java 9中,这些接口已经集成到
java.util.concurrent.Flow
类之下。
在面向对象编程语言中,响应式编程通常以观察者模式的扩展呈现。还可以将响应式流模式和迭代器模式比较,一个主要的区别是,迭代器基于”拉“,而响应式流基于”推“。
使用迭代器是一种命令式编程,由开发者决定何时去访问序列中的next()
元素。而在响应式流中,与Iterable-Iterator
对应的是Publisher-Subscriber
。当新的可用元素出现时,发布者通知订阅者,这种”推“正是响应的关键。此外,应用于推入元素上的操作是声明式的而不是命令式的:程序员要做的是表达计算的逻辑,而不是描述精准的控制流程。
除了推送元素,响应式编程还定义了良好的错误处理和完成通知方式。发布者可以通过调用next()
方法推送新的元素给订阅者,也可以通过调用onError()
方法发送一个错误信号或者调用onComplete()
发送一个完成信号。错误信号和完成信号都会终止序列。
响应式编程非常灵活,它支持没有值、一个值或n个值的用例(包括无限序列,例如时钟的连续滴答声)。
但是让我们首先考虑,为什么我们需要响应式编程?
阻塞可能是浪费的
现代的应用需要满足大量的用户并发访问,尽管硬件的能力依然在不断提高,软件的性能仍然是一个关键问题。
通常有两种方法可以提升程序的性能:
- 并行化:使用更多的线程和更多的硬件资源。
- 在如何使用现有资源方面寻求更高的效率。
通常,Java开发者使用阻塞代码编程。在出现性能瓶颈之前,这种做法没有问题,此时就需要引入额外的线程,来运行相似的阻塞代码。但是,这种资源利用率的扩展可能很快引来争用和并发问题。
更糟糕的是,阻塞会浪费资源。如果仔细观察,只要程序涉及一些延迟(特别是IO,比如数据库请求或网络调用),资源就会被浪费,因为一个(或多个线程)正处于空闲状态,等待数据。
所以,并行化方法并不是什么灵丹妙药。为了提高硬件资源的利用率,响应式编程是必要的。但是它也很复杂,因为容易造成资源浪费。
使用异步来解决?
上文提到的第二种方法是寻求更高的效率,可以解决资源浪费问题。通过编写异步非阻塞代码,你可以将执行切换到另一个使用相同底层资源的活动任务上,在异步执行完成后返回到当前程序。
如何在JVM上编写异步代码呢?Java提供了两种异步编程模型:
- Callbacks:异步方法没有返回值,但是提供一个额外的回调参数(一个lambda或者匿名类对象),当结果可用时调用该参数。
-
Futures:异步方法立即返回一个
Future<T>
对象。异步线程计算一个T
值,Future
对象封装对它的访问。该值不是立即可用的,但可以轮询Future
对象,直到该值可用为止。
这些技术足够好吗?并不是每个场景都适用,而且两种方式都有限制。
回调是很难组合在一起的,很快就会导致难以阅读和维护的代码(称为”回调地狱”)。
Futures
比回调稍微好一点,但是在组合方面依然做的不够好,即使Java 8中引入了CompletableFuture
。把多个Future
编排到一起虽然可行,但是并不容易。而且,Future
还有另外的问题:通过调用get()
方法,很容易让Future
对象进入到另一种阻塞情景;它们不支持延时计算;不支持多值和高级错误处理。
从命令式编程到响应式编程
响应式编程库Reactor旨在解决JVM上这些”经典“的异步编程方式的缺点,同时关注几个额外的方面:
- 可组合性和可读性;
- 使用丰富的操作符词汇操作数据流;
- 在订阅数据流之前什么也不会发生(延时计算);
- 背压(backpressure)或消费者向生产者发送发射速率过快的信号的能力;
- 与并发无关的高级抽象;
可组合性和可读性
可组合性指的是编排、协调多个异步任务的能力。包括使用先前任务的结果为后续任务提供输入,或者以fork-join格式执行多个任务以及在更高级别的系统中将异步任务作为独立组件重用。
编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理的数量和复杂性的增加,编写和阅读代码变得越来越困难。正如我们所看到的,回调模式很简单,但是它的一个主要缺点是,对于复杂的任务,你需要从回调中执行回调,回调本身嵌套在另一个回调中,等等。这种混乱被称为回调地狱。可以想象,这样的代码很难回头进行推理。
装配线类比
你可以将在响应式应用中处理数据想象成(数据)在装配线上移动。响应式编程既是传送带,又是工作站。原材料从一个数据源(最初的Publisher
)中倾泻而出,最终成为准备推送给消费者(Subscriber
)的成品。
原材料可以经过各种转换和中间步骤,或者成为将中间部件组装在一起的大型装配线的一部分。如果在某一点发生了小故障或者阻塞,受影响的工作站可以向上游发信号限制原材料的流动。
操作符
操作符就是装配线中的工作站。每一个操作符都向发布者(Publisher
)添加新的行为,并将上一步中的发布者包装到一个新的实例中。整个链条就是这样连接在一起的,数据源自第一个发布者(Publisher
),并沿着链条向下移动,在每个链接处被转换。最终,一个订阅者(Subscriber
)结束这个流程。
订阅前什么也不会发生
在Reactor中,当你编写一个发布者(Publiser
)链时,默认情况下不会开始向其中注入数据。相反,你创建的是一个异步处理的抽象描述(这有助于重用性和组合)。
通过订阅操作(Subscribe()
),你将发布者绑定到一个订阅者,从而触发整个链中的数据流。这是通过订阅者(Subscriber
)发出一个请求(request()
)信号在内部实现的,该信号向上游传播,一直到源头发布者(Publisher
)。
背压
向上游传播信号也被用来实现背压。在装配线类比中,我们将其描述为当工作站处理速度比上游工作站慢时,沿装配线向上游发送的反馈信号。
响应式流规范中定义的真正的机制与装配线类比非常相似:订阅者可以在无限制的模式下工作,并让数据源以最快的速度推送数据;或者它可以用request()
机制向数据源发出信号,表明自己最多可以处理n
个数据。
中间操作符还可以在传输过程中更改请求。假设有一个buffer
操作符以10个元素为一个批次对元素进行分组。如果订阅者请求1个缓冲区,数据源就要生成10个元素。一些操作符还实现预取(prefetching
)策略,这避免了每次请求一个元素的往返开销,如果在被请求之前生成元素的成本不是太高,那么这样做是有益的。
预取策略将推模型转换为推-拉混合模型,在这种混合模型中,下游可以从上游拉出n个元素(如果它们已经可用)。但是,如果元素还没有准备好,它们会在生产好之后由上游推给下游。
热响应式序列和冷响应式序列
在响应式编程库的Rx家族中,我们可以区分两大类响应式序列:热响应式序列和冷响应式序列 。这种区别主要与响应式流如何对订阅者做出响应有关:
- 冷响应式序列对每个订阅者(包括数据源)会重新启动一个全新的响应序列。
- 热响应式序列对每个订阅者不会重零开始。相反,后来的订阅者只能接收到他们订阅后发出的数据。但是,请注意,一些热响应式序列可以缓存或回放全部或部分数据发布历史。从一般的角度来看,热序列甚至可以在没有订阅者在监听时发射数据(“订阅前什么都不会发生”规则的一个例外)。