Stream in Dart

大家应该都吃过转盘小火锅吧,情形是这样的的:好多个人坐在一块,围着一条传送带,每个人的位置上都会有一个小火锅,厨师将菜品放到传送带上,这些菜品会随着传送带经过每个人的位置,如果看到你想吃的菜品,则直接拿着放到自己的小火锅里;如果没碰到想吃的,则直接滤过,但传送带会继续将他们传递到下一个人的身边。

上述情况我们需要注意这样几个问题:

1. 我们只有坐在这条传送带旁边,才能吃到上面的菜。

2. 我们想吃的菜品不是立马就能出现到你面前的。如果你想吃肥牛,前提是需要厨师在传送带上面放一盘肥牛,肥牛才会被传送到你旁边,但是你并不知道肥牛什么时候才能到你身边。

3. 传送带上的菜品都是按照厨师放置的顺序依次被传送到你身边的。

Stream

那么什么是 Stream 呢?上面的传送带就是 Stream,传送带上面可以传递任何菜品,相应的 Stream 也就可以传递任何数据类型。

为了方便的控制 Stream ,我们通常使用 StreamControllerStreamController 中提供了两个属性,一个是用来往 Stream 中添加数据的 sink,一个是用于接收数据的 stream

上面说了,只有坐在传送带旁边才能吃到传送带上的菜品,这里,我们也只需要使用streamController.stream.listen(...) 就可以收到通知 (当 Stream上有数据的时候)。

只要我们坐在传送带前,我们就成了消费者,对应的,当我们 listen 了一个 Stream 的时候,我们就成了一个 StreamSubscription (订阅者)。

最后,当我们不需要这个 Stream 的时候,我们需要将其 close 掉,使用streamController.close()

下面我们用一个例子来演示下:

import 'dart:async';  
  
void main() {  
  // 声明一个 StreamController  
  StreamController controller = StreamController();  
  
  // 监听此 Stream  
  StreamSubscription subscription1 =  
      controller.stream.listen((value) => print('$value'));  
  
  // 往 Stream 中添加数据  
  controller.sink.add(0);  
  controller.sink.add('a, b, c, d');  
  controller.sink.add(3.14);  
  
  // 关闭 StreamController  
  controller.close();  
}

输出:

0
a, b, c, d
3.14

上述代码中,我们声明了一个 StreamController,然后往往里面放置了三种不同的数据类型,当然,我们也可以使用泛型的方式来限制里面的数据类型:

StreamController<int> controller = StreamController<int>();

接下来我们看看 stream.listne(...) 这个方法,源码如下:

StreamSubscription<T> listen(void onData(T event),  
  {Function onError, void onDone(), bool cancelOnError});

可以看出 listen(...) 方法接受一个必选参数和三个可选参数,我们上一个例子中只是传递了必选参数 onData(...),其他额三个参数并没有传递,下面我们来举例说明 listen(...) 方法中各个参数的用途。

import 'dart:async';  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .listen(onData, onError: onError, onDone: onDone, cancelOnError: true);  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  // 发送一个 Error  
  controller.sink.addError(-1);  
  controller.sink.add(2);  
  
  controller.close();  
}  
  
void onData(int data) {  
  print('The value is $data');  
}  
  
void onError(err) {  
  print('The err is $err');  
}  
  
void onDone() {  
  print('The stream is done !');  
}

输出:

The value is 0
The value is 1
The err is -1

从上述代码中,我们可以看到,stream.listen(...) ,接受 4 个参数,这 4 个参数的作用分别如下:

onData(T data) : 用来接收 Stream 中的每一个事件。

onError(...) : 注释上是这样说的 The [onError] callback must be of type void onError(error),也就是所他需要一个接受一个参数的方法。但它还支持接受两个参数的方法 void onError(error, StackTrace stackTrace)

onDone() : 当一个 Stream 关闭了,也就是执行了 stream.close() 方法并且发送了 done 事件,这个方法会被调用。

cancelOnError : 这是一个 bool 类型的值,意思也很简单,就是当 Stream 碰到 Error 事件的时候,是否关闭这个 Stream

我们上述代码中 cancelOnError 参数传递的是 true ,也就是说当Stream遇到 Error 的时候,Stream 就关闭了,下面的事件就不会再发送出去了。我们把上面代码中的 cancelOnError 参数改为 false 其他的代码不变,输出如下:

The value is 0
The value is 1
The err is -1
The value is 2
The stream is done !

可以看出,这个情况下,即使 Stream 中遇到了 Error ,下面的事件依然会接着发送,并且最后的 done 事件也执行了。

Stream 类型

Stream 类型分为两种,分别为 Single-subscription StreamsBroadcast Streams

Single-subscription Streams

这种类型的 Stream 只允许一个订阅者,也就是只能 listen 一次。我们上一小节中的例子就是一个 Single-subscription Streams。接下来我们看看如果我们对这种类型的 Stream 订阅两次会发生什么情况。

import 'dart:async';  
  
void main() {  
  // Single-subscription Streams  
  StreamController<int> controller = StreamController<int>();  
  
  // 第一个订阅者  
  StreamSubscription subscription1 =  
      controller.stream.listen((value) => print('subscription1 $value'));  
  
  // 第二个订阅者  
  StreamSubscription subscription2 =  
      controller.stream.listen((value) => print('subscription2 $value'));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  
  controller.close();  
}

输出:

Unhandled exception:
Bad state: Stream has already been listened to.
...

Broadcast Streams

这种类型的 Stream 允许任意数量的订阅者,只是新的订阅者只能从它开始订阅的时候接收事件。也就是订阅之前 Stream 中的事件是接受不到的。

Broadcast Streams 的声明方式如下:

StreamController<int> controller = StreamController<int>.broadcast();

接下来我们写个示例看看:

import 'dart:async';  
  
void main() {  
  // Broadcast Streams  
  StreamController<int> controller = StreamController<int>.broadcast();  
  
  StreamSubscription sub1 =  controller.stream.listen((value) => print('sub1 value is $value'));  
  controller.sink.add(0);  
  controller.sink.add(1);  
  
  StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));  
  controller.sink.add(2);  
  controller.sink.add(3);  
  
  controller.close();  
}

输出:

sub1 value is 0
sub2 value is 2
sub1 value is 1
sub2 value is 3
sub1 value is 2
sub1 value is 3

可以看出 sub1 可以接收到 Stream 中所有的数据,而 sub2 只能接收到从订阅这个 Stream 之后发送的数据。

StreamTransformer

当数据通过 Stream 传递的时候,我们可以按需来转换里面的数据,Dart 中给我们提供了 StreamTransformer 来对数据做出一些特定转换。

我们可以通过三种方式来实现数据转换:

  • Stream 自带的方法,如 mapwhere
  • 通过 StreamTransformer.fromHandlers(...) 来转换
  • 直接实现一个 StreamTransformer 来定义一个转换器

map、where ...

import 'dart:async';  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .where((value) => value % 2 == 0) // where  
  .map((value) => 'The value is $value') // map  
  .listen((value) => print(value));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  controller.sink.add(2);  
  controller.sink.add(3);  
  controller.sink.add(4);  
  
  controller.close();  
}

输出:

The value is 0
The value is 2
The value is 4

上述代码中,我们使用了 wheremap 转换符。where 将满足条件的值过滤出来,然后 map 将整型的数字转换成字符串类型的值。

StreamTransformer.fromHandlers(...)

import 'dart:async';  
  
// 转换方法  
void handleData(data, EventSink sink) {  
  if (data % 2 == 0) {  
    sink.add(data);  
  }  
}  
  
void main() {  
  StreamController<int> controller = StreamController<int>();  
  
  controller.stream  
  .transform(StreamTransformer.fromHandlers(handleData: handleData))  
      .listen((value) => print(value));  
  
  controller.sink.add(0);  
  controller.sink.add(1);  
  controller.sink.add(2);  
  controller.sink.add(3);  
  controller.sink.add(4);  
  
  controller.close();  
}

输出:

0
2
4

自定义 StreamTransformer

在自定义我们自己的 Transformer 之前,我们先来看看 stream.transform(...) 做了什么事情,毕竟我们是通过 transform(...) 方法传入的 Transformer, transform(...) 源码如下:

Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {  
  return streamTransformer.bind(this);  
}

可以看出 transform 方法返回的依然回一个 Stream ,只不过这个 Stream 是经过转换后的Stream。 类似于 Java8 Stream 中的中间流,就是不停的返回 Stream 的那种。然后该方法是一个泛型方法,泛型类型分别为 T,和 S,可以这样理解,T 为该方法的入参类型,S 为该方法的出参类型,类似于 Java8 中的 Function,我们可以看下 Java 中的 Function 接口的部分源码:

/**  
 * ...
 * @param <T> the type of the input to the function  
 * @param <R> the type of the result of the function  
 * * @since 1.8  
 */
@FunctionalInterface  
public interface Function<T, R> {  
 R apply(T t);
 ...
 }

最后,transform(...) 方法调用了 streamTransformer.bind(this); 该方法返回的是一个新的 Stream,也就是转换后的 Stream,当然,bind() 方法也是我们自定义 StreamTransformer 时需要实现的方法。

我们在之前的例子中声明一个 StreamController 的时候,都是没有传递参数的,其实,StreamController 的构造方法是这样的:

factory StreamController(  
    {void onListen(),  
  void onPause(),  
  void onResume(),  
  onCancel(),  
  bool sync: false}) {  
  return sync  
      ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)  
      : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);  
}

可以看出,StreamController 是有参数的,并且都是可选参数,在这些参数中,我们重点实现 onListen(),关于 onListen 的源码解释如下:

A Stream should be inert until a subscriber starts listening on it (using
the [onListen] callback to start producing events). Streams should not
leak resources (like websockets) when no user ever listens on the stream.

重点就是 onListen 方法是用来生产事件的。

接下来,来实现一个自定义的 StreamTransformer:

/// 自定义一个 StreamTransformer ,
/// 泛型类型 S 为入参类型,T 为出参类型
/// 这些类型都是 Stream 中传递的数据类型
class MyTransformer<S, T> implements StreamTransformer<S, T> {

  // 用来生成一个新的 Stream 并且控制符合条件的数据
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOrError;

  // 转换之前的 Stream
  Stream<S> _stream;

  MyTransformer({bool sync: false, this.cancelOrError}) {
    _controller = new StreamController<T>(
        onListen: _onListen,
        onCancel: _onCancel,
        onPause: () {
          _subscription.pause();
        },
        onResume: () {
          _subscription.resume();
        },
        sync: sync);
  }

  MyTransformer.broadcast({bool sync: false, bool this.cancelOrError}) {
    // 定义一个 StreamController,注意泛型类型为 T,也就是出参类型,因为
    // 我们是使用该 _controller 生成一个用来返回的新的 Stream<T>
    _controller = new StreamController<T>.broadcast(
        onListen: _onListen, onCancel: _onCancel, sync: sync);
  }

  void _onListen() {
    // _stream 为转换之前的 Stream<S>
    _subscription = _stream.listen(onData,
        onError: _controller.addError,
        onDone: _controller.close,
        cancelOnError: cancelOrError);
  }

  void _onCancel() {
    _subscription.cancel();
    _subscription = null;
  }

  // 数据转换
  void onData(S data) {
    if ((data as int) % 2 == 0) {
      // 将符合条件的数据添加到新的 Stream 中
      _controller.sink.add(data);
    }
  }

  // 参数为转换之前的 Stream<S>
  // 返回的是一个新的 Stream<T> (转换之后的 Stream)
  @override
  Stream<T> bind(Stream<S> stream) {
    this._stream = stream;
    return _controller.stream;
  }

  @override
  StreamTransformer<RS, RT> cast<RS, RT>() {
    // TODO: implement cast
    return null;
  }
}

使用如下:

void main() {
  StreamController<int> controller = StreamController<int>();

  controller.stream
      .transform(new MyTransformer()) // 自定义的 StreamTransformer
      .listen((value) => print('$value'));

  controller.sink.add(0);
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);
  controller.sink.add(4);

  controller.close();
}

输出:

0
2
4

如有错误,还请指出。谢谢!!!

参考链接

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容