目的
- 了解 ReactiveX 是什么?
- 了解 ReactiveX 优劣势?
- ReactiveX 中常用的概念?
- ReactiveX 如何使用?
- ReactiveX 操作符?
- ReactiveX 使用场景是什么?
ReactiveX 简义
ReactiveX 的历史
ReactiveX 是 Reactive Extensions 的缩写,一般简写为 Rx,最初是 LINQ 的一个扩展,由微软的架构师 Erik Meijer 领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持 .NET、JavaScript 和 C++,Rx 近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由 ReactiveX 这个组织负责维护,比较流行的有 RxJava/RxJS/Rx.NET
下文 ReactiveX 简称 Rx
Rx 简义
什么是 Rx(简称:Rx)?
ReactiveX.io 官网对其自身的介绍:
An API for asynchronous programming with observable streams
一个专注于异步编程与控制可观察数据(或者事件)流的API.
我们可以通俗的理解为:
- An API: 它首先是一个编程接口规范,不同的语言提个不同的实现。像 RxJava、RxSwift、RxJs。
- For asynchronous programming:在异步编程中使用。比如子线程耗时网络请求。
- With observable streams: 基于可观察的事件流。比如观察者模式中的观察者对被观察着的监听。
其核心设计思想:观察者模式、Iterator 模式、函数式编程
观察者模式:即定义对象间一种一对多的依赖关系,当一个对象改变状态时,则所有依赖它的对象都会被改变。
Iterator 模式: 即迭代器模式。
函数式编程:即提供一系列函数样式的方法供快速开发。
Rx 工作模式图:http://reactivex.io/
开胃菜
再开始了解 Rx 之前,我们先了解一下以下几个基本概念。
- 什么是响应式编程(简)
A = B + C
有这么一段代码A
被赋值为B
和C
的值。
在传统的命令式编程中,如果我们改变B
的值,A
的值并不会随之改变。
而如果我们运用一种机制,当B
或者C
的值发现变化的时候,A
的值也随之改变,这样就实现了”响应式“。
或者说叫数据绑定
。
http://wiki.jikexueyuan.com/project/android-weekly/issue-145/introduction-to-RP.html 响应式编程(Reactive Programming)介绍
- 什么是函数式编程(简单)
函数式编程思维, 就是 用计算(函数)来表示程序, 用计算(函数)的组合来表达程序的组合的思维方式.
伪代码
// 传统
int abs(int i) {
return i * 2;
}
int add_abs(int a, int b) {
return abs(a) + abs(b)
}
int result = add_abs(2, 3);
// 函数式
def abs = {i -> i * 2} // 传入 i 返回 i*2
def add_abs = {(a, b) -> abs(a)+abs(b)} // 传入a,b ,返回经过 a*2 + b*2 的值
int a = add_adb(1, 2)
能看到这里就是通过组合函数来达到计算结果的过程
http://www.ruanyifeng.com/blog/2012/04/functional_programming.html
- 函数式相应编程
结合函数式编程
以及响应式编程
就得到了函数响应式编程
例子
我们如果有一个输入框,输入的文字达到预期时,弹出提示框
传统的话我们的话会给这个输入框添加一个监听事件,用于监听文字的输入,然后在监听方法中,去做判断是否需要弹出提示框。
input.setInputListener(inputListener) // 设置输入监听
void inputListener(input) {
if (!input.text.isEmpty()) {
if input.text.count >= 100 {
alert.show("不能输入更多了")
}
}
}
如果我们使用函数响应式编程的话,就可以用一下代码来表示
input.text
.isEmpty
.map {text.count >= 100} //
.bind {alert.show("不能输入更多了")}
这样就实现了输入与提示框的绑定。
而 Rx(RxJava) 的原理就是这种模式,在这个模式中有 4 个角色
角色 | 作用 |
---|---|
被观察者(Observable) | 产生事件 |
观察者(Observer) | 接收事件,并给出响应动作 |
订阅(Subscribe) | 连接 被观察者 & 观察者 |
事件(Event) | 被观察者 & 观察者 沟通的载体 |
在Rx 中还有一种特殊的存在 Subject,它既是可监听序列也是观察者。它同时充当了 Observer 和 Observable 的角色。因为它是一个 Observer,它可以订阅一个或多个 Observable ;又因为它是一个 Observable ,它可以转发它收到( Observe )的数据,也可以发射新的数据。
Rx N部曲
Rx 的使用步骤
被观察者 (Observable)
通过 订阅(Subscribe)
按顺序发送事件 给观察者 (Observer)
观察者(Observer)
按顺序接收事件 & 作出对应的响应动作。
- 初始化 Observable
- 初始化 Observer
- 建立订阅关系
- 取消订阅
// 初始化 Observable
Observable<Integer> observable = Observable.create(observer -> {
observer.onNext(1);
observer.onNext(2);
observer.onNext(3);
observer.onComplete();
observer.onError
});
Observer observer = new Observer<Integer>() { // 初始化 Observer
public void onNext(Integer integer) {
// 收到消息
}
public void onError(Error e) {
// 发生错误
}
public void onComplete() {
// 完成订阅
}
}
// 建立订阅关系
observable.subscribe(observer);
// 简化
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("接受完成");
}
});
// 再次使用强大的 Rx 操作符进行再次简化
Observable.just(1)
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});
?>
// 其实还可以更简单
Observable.just(1)
.subScribe(System.out::println)
onNext:用来发送数据,可多次调用,每调用一次发送一条数据
onError:用来发送异常通知,只发送一次,若多次调用只发送第一条
onComplete:用来发送完成通知,只发送一次,若多次调用只发送第一条
Consumer
Consumer 可以看做是对观察者Observer功能单一化之后的产物, 其函数accept只接收可观察对象发射的数据,不接收异常信息或完成信息。
如果想接收异常或完成信息需要使用
Observable.just("Hello World")
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
throwable.printStackTrace();
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("接收完成");
}
});
第二个参数Consumer规定泛型<Throwable>通过函数accept接收异常信息。
第三个参数Action也是对观察者Observer功能单一化之后的产物--行动,通过函数run接收完成信息,作出响应行动。
Disposable
public void onSubscribe(Disposable d) {
}
在观察者 Observer 与可观察对象Observable ,建立订阅关系后,回调这个方法,并且传过来一个 Disposable 类型的参数,可通过 Disposable 来控制 Observer 与 Observable 之间的订阅。
无论观察者 Observer 以何种方式订阅可观察对象 Observable,都会生成一个 Disposable。
Operator - 操作符
其实质是函数式编程中的高阶函数,是对响应式编程的各个过程拆分封装后的产物。以便于我们操作数据流。
常用操作符分类
创建:创建一个可观察对象 Observable 并发射数据
过滤:从 Observable 发射的数据中取出特定的值
变换:对 Observable 发射的数据执行变换操作
组合:组合多个 Observable ,例如:{1,2,3} + {4,5,6} --> {1,2,3,4,5,6}
聚合:聚合多个 Observable ,例如:{1,2,3} + {4,5,6} --> {[1,4],[2,5],[3,6]}
常用操作符
创建
create
-
just
just操作符可用来发送单条数据,数字,字符串,数组,对象,集合都可以当做单条数据发送。
Observable.just("hello world");//发送一个字符串"hello world" Observable.just(1,2,3,4);//逐一发送1,2,3,4这四个整数
-
fromArray
创建一个Observable,接受一个数组,并将数组中的数据逐一发送
过滤操作
-
filter
filter使用 Predicate 函数接口传入条件值,来判断Observable发射的每一个值是否满足这个条件,如果满足,则继续向下传递,如果不满足,则过滤掉。
-
distinct
过滤掉重复的数据项,过滤规则为:只允许还没有发射过的数据项通过。
变换
-
map
对Observable发射的每一项数据应用一个函数,执行变换操作
-
flatMap
将一个发射数据的 Observable 变换为多个 Observables ,然后将它们发射的数据合并后放进一个单独的 Observable
组合
- merge
使用Merge
操作符你可以将多个 Observables 的输出合并,就好像它们是一个单个的 Observable 一样。
Merge
可能会让合并的 Observables 发射的数据交错(有一个类似的操作符Concat
不会让数据交错,它会按顺序一个接着一个发射多个 Observables 的发射物)。
聚合
- zip
通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。
Schedulers - 调度器
如果你想给 Observable 操作符链添加多线程功能,你可以指定操作符(或者特定的 Observable )在特定的调度器( Scheduler / 线程 )上执行。
当然在每个语言的实现或者平台下都有属于自己特殊的调度器,比如 Android 的AndroidSchedulers.mainThread(),Swift 上的 MainScheduler等。
subscribeOn
指定一个观察者在哪个调度器上观察这个 Observable
若多次设定,则只有一次起作用。
observeOn
指定 Observable 自身在哪个调度器上执行
若多次设定,每次均起作用。
Rx 常用的场景?
它有什么使用场景?
场景 1:单请求异步处理
在显示场景中 ui线程(主线程)不能做耗时操作,比如网络请求、大文件读取等。
伪代码:
// Api 封装类
public class API {
// 获取 token 的实现
Observable<String> token(username, passowrd) {
// 返回一个 Observable
return Observable.create({ observer
request.get(xxx)
.callback({
void success(Response response) {
String token = ....// 解析返回值,获得 token
observer.onNext(token)
observer.onComplete(token)
}
void error(error error) {
observer.onError(error)
}
})
})
}
}
//
API.token()
.subscribe{
void onNext()
void onError()
void onComplete()
}
Swift:
enum API {
/// 通过用户名密码取得一个 token
static func token(username: String, password: String) -> Observable<String> { ... }
}
RxMoyaProvider<API>()
.request(.token("song", "123"))
.subscribe{ event in {
switch event {
case .Next(let response):
// 获得请求结果
case .Error(let error):
// 发生错误
case .Complete():
// 完成请求
}
}}
Java:
public interface Api {
@GET("getToken")
Observable<ResponseBody> token(@Query("username") String username,
@Query("password") String passowrd);
}
retrofit.create(Api.class)
.token("song", "123")
.subscribenOn(Schedulers.io()) ///在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
.subscribe(new Observer<ResponseBody>() {
@Override
public void onSubscribe(Disposable d) {
// 开启订阅
}
@Override
public void onNext(ResponseBody responseBody) {
// 获得请求结果
}
@Override
public void onError(Throwable e) {
// 发生错误
}
@Override
public void onComplete() {
// 完成订阅
}
})
场景 2:多异步请求连续调用
比如先通过用户名密码取得 Token, 然后通过 Token 取得用户信息。
传统方式实现:
/// 用回调的方式封装接口
enum API {
/// 通过用户名密码取得一个 token
static func token(username: String, password: String,
success: (String) -> Void,
failure: (Error) -> Void) { ... }
/// 通过 token 取得用户信息
static func userinfo(token: String,
success: (UserInfo) -> Void,
failure: (Error) -> Void) { ... }
}
/// 通过用户名和密码获取用户信息
API.token(username: "123", password: "123",
success: { token in
API.userInfo(token: token,
success: { userInfo in
print("获取用户信息成功: \(userInfo)")
},
failure: { error in
print("获取用户信息失败: \(error)")
})
},
failure: { error in
print("获取用户信息失败: \(error)")
})
Rx 实现:
/// 用 Rx 封装接口
enum API {
/// 通过用户名密码取得一个 token
static func token(username: String, password: String) -> Observable<String> { ... }
/// 通过 token 取得用户信息
static func userInfo(token: String) -> Observable<UserInfo> { ... }
}
API.token(username: "song", password: "123")
.flatMapLatest(API.userInfo) // 将 Observable 的元素转换成其他的 Observable(将当前请求转换为另一请求)
.subscribe(onNext: { userInfo in
print("获取用户信息成功: \(userInfo)")
}, onError: { error in
print("获取用户信息失败: \(error)")
})
.disposed(by: disposeBag)
当然这样可能感觉不到,但是当你当前的业务 需要有关系的连续请求 4 个、5个的时候,你就会发现这样可以避免回调地狱,从而使得代码易读,易维护。
场景 3:多异步请求合并处理
有时候在项目中,我们会碰到组合多个请求的结果后,再更新UI的情况,比如同时取得当前商品信息和评论
Swift
/// 用 Rx 封装接口
enum API {
/// 取得老师的详细信息
static func info(id: Int) -> Observable<Teacher> { ... }
/// 取得老师的评论
static func comments(id: Int) -> Observable<[Comment]> { ... }
}
Observable.zip(
API.info(id: 1),
API.comments(id: 1)
).subscribe(onNext: { (teacher, comments) in
print("获得当前商品信息: \(teacher)")
print("获得当前商品评论: \(comments.count) 条")
}, onError: { error in
print("获取商品信息或评论失败: \(error)")
})
.disposed(by: disposeBag)
java:
MyService myService = retrofit.create(MyService.class);
Observable getInfo = myService.getInfo();
Observable getComments = myService.getComments();
Observable.zip(getInfo,getComments)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::updateUI, this::showError);
题外话
Swift: Combine (官方)
Combine 可以使代码更加简洁、易于维护,也免除了饱受诟病的嵌套闭包和回调地狱。Combine 是 Reactive Programming 在 Swift 中的一个实现,更确切的说是对 ReactiveX (Reactive Extensions, 简称 Rx) 的实现,而这个实现正是基于观察者模式的。
https://icodesign.me/posts/swift-combine/
Kotlin: 协程(线程切换框架)
https://space.bilibili.com/27559447?from=search&seid=9369351450622626138
https://juejin.im/post/5a0ab91451882533d0229556
参考文档:
ReactiveX/RxJava: https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html
RxSwift: https://beeth0ven.github.io/RxSwift-Chinese-Documentation/