demo 地址 https://github.com/YardWill/beginner-reactive-programming-with-rxjs
我们通过四个例子来看rxjs.
1. example-1
const promise = new Promise((resolve) => {
setTimeout(() => {
resolve('Hello from a Promise!');
}, 2000);
});
promise.then(value => console.log(value));
这是一个简单的promise实现2000ms之后的输出。我们接下来使用rxjs来试试如何实现这个功能。
2. example-2
import { Observable } from 'rxjs/Observable';
const observable = new Observable((observer) => {
setTimeout(() => {
observer.next('Hello from a Observable!');
}, 2000);
});
observable.subscribe(value => console.log(value));
我们先通过Observable的接口来注册一个可观测对象,(如果对观察者模式不太熟悉的话可以先看这篇文章观察者简单实现)
之后我们使用subscribe来订阅一个事件(subscribe函数内部执行在observer内注册的函数),当observer触发next的时候就执行subscribe内注册的函数。
我们来看看源码。
以下是Observable的构造函数,可以看出,除了绑定subscribe函数之外,什么都没有做。
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
下面是Observable类上的subscribe方法。简单来讲就是去执行了在constructor内注册的subscribe函数,并将observerOrNext push到subscriptions内。然后return的对象是一个subscriptions。
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Subscription {
const { operator } = this;
const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
operator.call(sink, this.source);
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}
if (config.useDeprecatedSynchronousErrorHandling) {
if (sink.syncErrorThrowable) {
sink.syncErrorThrowable = false;
if (sink.syncErrorThrown) {
throw sink.syncErrorValue;
}
}
}
return sink;
}
你一定觉得很疑惑Observable、subscribe、subscriptions这些名字都代表什么意思。我们下面来讲一个通俗的故事。
从前有一家鲜奶公司,提供不同种类的鲜奶,每天都会送货一次。有一天,小明花了20/天的钱去订了这家公司的进口鲜奶,小花花了10元/天的钱订了国产鲜奶。那么公司收到的订单列表就是这两个,公司会在固定时间去把鲜奶送到用户的手上。
我们看看这个故事里面的observable、subscribe、subscriptions分别是什么?
- observable: 毫无疑问,鲜奶公司是一个可订阅对象,我们可以向鲜奶公司订阅我们需要的鲜奶。
- subscribe: subscribe这是一个动作,小明和小花去定了这家鲜奶公司的牛奶。
- subscriptions: 订完牛奶后,小明和小花的订单就已经在鲜奶公司的订单列表上了,这个订单列表就是subscriptions。
- 另外,我们将setTimeout改成setInterval,这时我们就可以想象鲜奶公司每天都会触发发货的工作,也就是执行next方法。next方法可以当做是鲜奶公司对照着订单列表对小明和小花进行发货。
这样看起来,理解这几个对象应该不难了吧。我们把这个故事改编成代码。
import { Observable } from 'rxjs/Observable';
// 鲜奶公司
const interval$ = new Observable((observer) => {
let count = 0;
const interval = setInterval(() => {
console.log('鲜奶公司准时发货');
observer.next(count += 1);
}, 1000);
return () => {
clearInterval(interval);
};
});
// 小明订奶
const littleMing = count => console.log('小明收到', count, '瓶奶');
// 小花订奶
const littleHua = count => console.log('小花收到', count, '瓶奶');
const subscription1 = interval$.subscribe(littleMing);
const subscription2 = interval$.subscribe(littleHua);
subscriptions包含[subscription1, subscription2]。
那么接下来我们再来思考一个问题,如果小明不想继续订牛奶了,他应该怎么通知鲜奶公司不再发货?我们来看 example-3
3. example-3
我们通过上面的故事来修改一下我们的代码。
import { Observable } from 'rxjs/Observable';
// 鲜奶公司
const interval$ = new Observable((observer) => {
let count = 0;
const interval = setInterval(() => {
console.log('鲜奶公司准时发货');
observer.next(count += 1);
}, 1000);
return () => {
clearInterval(interval);
};
});
// 小明订奶
const littleMing = count => console.log('小明收到', count, '瓶奶');
// 小花订奶
const littleHua = count => console.log('小花收到', count, '瓶奶');
const subscription1 = interval$.subscribe(littleMing);
const subscription2 = interval$.subscribe(littleHua);
setTimeout(() => subscription1.unsubscribe(), 3000);
我们可以看到在最后我们把subscription1给unsubscribe了,我们看看unsubscribe函数内做了什么?
/**
* Disposes the resources held by the subscription. May, for instance, cancel
* an ongoing Observable execution or cancel any other type of work that
* started when the Subscription was created.
* @return {void}
*/
unsubscribe(): void {
let hasErrors = false;
let errors: any[];
if (this.closed) {
return;
}
let { _parent, _parents, _unsubscribe, _subscriptions } = (<any> this);
this.closed = true;
this._parent = null;
this._parents = null;
// null out _subscriptions first so any child subscriptions that attempt
// to remove themselves from this subscription will noop
this._subscriptions = null;
let index = -1;
let len = _parents ? _parents.length : 0;
// if this._parent is null, then so is this._parents, and we
// don't have to remove ourselves from any parent subscriptions.
// 移除subscription
while (_parent) {
_parent.remove(this);
// if this._parents is null or index >= len,
// then _parent is set to null, and the loop exits
_parent = ++index < len && _parents[index] || null;
}
if (isFunction(_unsubscribe)) {
let trial = tryCatch(_unsubscribe).call(this);
if (trial === errorObject) {
hasErrors = true;
errors = errors || (
errorObject.e instanceof UnsubscriptionError ?
flattenUnsubscriptionErrors(errorObject.e.errors) : [errorObject.e]
);
}
}
if (isArray(_subscriptions)) {
index = -1;
len = _subscriptions.length;
while (++index < len) {
const sub = _subscriptions[index];
if (isObject(sub)) {
let trial = tryCatch(sub.unsubscribe).call(sub);
if (trial === errorObject) {
hasErrors = true;
errors = errors || [];
let err = errorObject.e;
if (err instanceof UnsubscriptionError) {
errors = errors.concat(flattenUnsubscriptionErrors(err.errors));
} else {
errors.push(err);
}
}
}
}
}
if (hasErrors) {
throw new UnsubscriptionError(errors);
}
}
这里的代码理解起来也不难,最后去执行了clearInterval(interval)将定时器去掉,并把当前的订单(subscription)移除出订单列表(subscriptions)。
4. example-4
接下来小明和小花都有各自的需求改变,比如小明想要每天两瓶奶,而小花需要隔天收到一瓶奶,那么我们应该怎么做呢?看下面代码。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/filter';
// 鲜奶公司
const interval$ = new Observable<number>((observer) => {
let count = 0;
const interval = setInterval(() => {
console.log('鲜奶公司准时发货');
observer.next(count += 1);
}, 1000);
return () => {
clearInterval(interval);
};
});
// 小明订奶
const littleMing = count => console.log('小明收到', count, '瓶奶');
// 小花订奶
const littleHua = count => console.log('小花收到', count, '瓶奶');
// 小明打算每天多订一份的鲜奶
const subscription1 = interval$
.map(value => value * 2)
.subscribe(littleMing);
// ----1----2----3----4--->
// map => x * 2
// ----2----4----6----8--->
// 小花打算让鲜奶公司隔天送一瓶
const subscription2 = interval$
.filter(value => value % 2 === 0)
.map(value => value / 2)
.subscribe(littleHua);
// ----1----2----3----4--->
// filter & map
// ---------1---------2--->
在这里我们引入了map和filter这两个rx的操作符,来实现我们需要变更的需求,看起来是不是很简单。当然还有更多的操作符(我们就不一一介绍了),操作符也是rxjs内的很大一部分组成,可以把它比作是lodash内的工具类。
5. example-5
我们最后再来看看rxjs在前端事件监听上的用法。
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/fromEvent';
import 'rxjs/add/observable/merge';
import 'rxjs/add/operator/scan';
import 'rxjs/add/operator/map';
const incrementClicks$ = Observable.fromEvent(document.getElementById('increment'), 'click');
const decrementClicks$ = Observable.fromEvent(document.getElementById('decrement'), 'click');
Observable
.merge(incrementClicks$, decrementClicks$)
.map((event: any) => parseInt(event.target.value, 10))
.scan((total, value) => total + value, 0)
.subscribe((total) => {
document.getElementById('counter').innerText = total.toString();
});
merge将两个Observable对象合成一个Observable对象,然后map对数据进行操作。
scan可以比作一个reducer函数,每一次click之后会对数据进行处理,并保留之前的数据。
最后我们去subscribe这个事件,然后做出相应修改。