RxJS官方教程(五) Subject

Subject 主题

什么是Subject(主题)?RxJS Subject是一种特殊类型的Observable,允许将值多播到多个观察者Observer。虽然普通的Observable是单播的(每个订阅的Observer都拥有Observable的独立执行),但Subject是多播的。

Subject类似于Observable,但可以多播到许多观察者。Subject就像EventEmitters:它们维护着许多观察者的注册表。

每个Subject都是一个Observable。给定一个主题,您可以通过一个Observer subscribe他,它将开始正常接收值。从Observer的角度来看,它无法判断Observable执行是来自简单的单播Observable还是Subject。

在Subject的内部,subscribe不会调用一个新的传递值的执行。它只是将给定的Observer注册到Observers列表中,类似于其他库和语言中的addListener的工作方式。

每个Subject都是一个Observer。它是一个含有next(v)error(e)complete()的对象。要向主题提供新值,只需调用next(theValue),它将被多播到已注册接受该主题的观察者。

在下面的示例中,我们有两个观察者订阅了主题,我们向主题提供一些值:

var subject = new Rx.Subject();

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
})

subject.next(1);
subject.next(2);

在控制台上使用以下输出:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

由于Subject是Observer,这也意味着您可以提供Subject作为任何Observable subscribe的参数,如下例所示:

var subject = new Rx.Subject();

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
})
subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
})

var observable = Rx.Observable.from([1,2,3]);

observable.subscribe(subject); // You can subscribe providing a Subject

其执行如下:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

通过上面的方法,我们基本上只是通过Subject将单播Observable执行转换为多播。这演示了Subject是如何把Observable执行共享给多个Observer的唯一方法。

还有的几个Subject特例:BehaviorSubjectReplaySubject,和AsyncSubject

Multicasted Observable 多播的Observable

“多播Observable”通过可能有许多订阅者的Subject传递通知,而普通的“单播Observable”仅向单个Observer发送通知。

多播Observable使用一个Subject来使多个Observers看到相同的Observable执行。

这其实是multicast运算符的工作方式:观察者订阅基础主题,主题订阅源Observable。以下示例类似于上一个使用的示例observable.subscribe(subject)

var source = Rx.Observable.from([1,2,3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast返回一个看起来像普通Observable的Observable,但在订阅时就像Subject一样工作。multicast返回一个ConnectableObservable,它只是一个带有connect()方法的Observable 。

connect()方法对于确定何时开始共享Observable执行非常重要。因为connect()执行了source.subscribe(subject),并且connect()返回一个订阅,你可以执行取消订阅来取消Observable的执行。

参考计数

手动调用connect()和处理订阅通常很麻烦。通常,我们希望在第一个Observer到达时自动连接,并在最后一个Observer取消订阅时自动取消共享执行。

请考虑以下示例,其中订阅按此列表所述进行:

  1. First Observer订阅了多播Observable
  2. 多播的Observable已连接
  3. next0将传递给第一个Observer
  4. Second Observer订阅了多播Observable
  5. next1将传递给第一个Observer
  6. next1将传递给第二个Observer
  7. First Observer取消订阅多播Observable
  8. next2将传递给第二个Observer
  9. Second Observer取消订阅了多播Observable
  10. 与多播Observable的连接已取消订阅

为了通过显式调用connect()实现这一点,我们编写以下代码:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
var subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

如果我们希望避免显式调用connect(),我们可以使用ConnectableObservable的refCount()方法(引用计数),该方法返回一个Observable,用于跟踪它拥有多少订阅者。当订阅者数量从0增加到时1,它将调用connect()我们,这将启动共享执行。只有当订阅者数量从减少10完全取消订阅时,才会停止进一步执行。

refCount 使多播Observable在第一个订阅者到达时自动开始执行,并在最后一个订阅者离开时停止执行。

以下是一个例子:

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

哪个执行输出:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed

refCount()方法仅存在于ConnectableObservable上,它返回一个Observable而不是另一个ConnectableObservable。

BehaviorSubject

Subject的变体之一是BehaviorSubject,具有“当前值”的概念。它存储发布给消费者的最新值,每当新的Observer订阅时,它将立即从BehaviorSubject中获得“当前值” 。

BehaviorSubject用于表示“随时间变化的值”。例如,生日的事件流是Subject,但是人的年龄的流将是BehaviorSubject。

在以下示例中,BehaviorSubject使用0初始化,第一个Observer在订阅时将接收到0。第二个Observer将接收到2,即使它在2发送后才订阅的。

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

带输出:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject类似于BehaviorSubject,它可以将旧值发送给新订阅者,但它也可以记录Observable执行的一部分。

ReplaySubject记录来自Observable执行的多个值,并将它们重放给新订阅者。

创建ReplaySubject时,您可以指定要重播的值的数量:

var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

除了缓冲区大小之外,您还可以指定以毫秒为单位的窗口时间,以确定记录值的年龄。在下面的示例中,我们使用大缓冲区大小100,但窗口时间参数仅为500毫秒。

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

输出如下,其中第二个Observer将获取发生在订阅之前最后500毫秒的事件345

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...

AsyncSubject

AsyncSubject只将Observable执行的最后一个值发送给它的观察者,并且只有在执行完成时才会发送。

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

输出:

observerA: 5
observerB: 5

AsyncSubject类似于last()运算符,因为它等待complete通知以便传递单个值。

小结

  • BehaviorSubject 缓存一个值的Subject
  • ReplaySubject 缓存多个值的Subject
  • AsyncSubject 只返回最后一个值的Subject

官网 http://reactivex.io/rxjs/manual/overview.html#subject

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • 介绍 RxJS是一个异步编程的库,同时它通过observable序列来实现基于事件的编程。它提供了一个核心的类型:...
    泓荥阅读 16,581评论 0 12
  • 发现 关注 消息 RxSwift入坑解读-你所需要知道的各种概念 沸沸腾关注 2016.11.27 19:11*字...
    枫叶1234阅读 2,778评论 0 2
  • 本文章内部分图片资源来自RayWenderlich.com 本文结合自己的理解来总结介绍一下RxSwift最基本的...
    FKSky阅读 2,853评论 4 14
  • 创建Observable: Rx.Observable.create 是 Observable 构造函数的别名,它...
    柳源居士阅读 4,267评论 0 2
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    huqj阅读 1,822评论 0 21