大家应该都吃过转盘小火锅吧,情形是这样的的:好多个人坐在一块,围着一条传送带,每个人的位置上都会有一个小火锅,厨师将菜品放到传送带上,这些菜品会随着传送带经过每个人的位置,如果看到你想吃的菜品,则直接拿着放到自己的小火锅里;如果没碰到想吃的,则直接滤过,但传送带会继续将他们传递到下一个人的身边。
上述情况我们需要注意这样几个问题:
1. 我们只有坐在这条传送带旁边,才能吃到上面的菜。
2. 我们想吃的菜品不是立马就能出现到你面前的。如果你想吃肥牛,前提是需要厨师在传送带上面放一盘肥牛,肥牛才会被传送到你旁边,但是你并不知道肥牛什么时候才能到你身边。
3. 传送带上的菜品都是按照厨师放置的顺序依次被传送到你身边的。
Stream
那么什么是 Stream 呢?上面的传送带就是 Stream,传送带上面可以传递任何菜品,相应的 Stream 也就可以传递任何数据类型。
为了方便的控制 Stream ,我们通常使用 StreamController,StreamController
中提供了两个属性,一个是用来往 Stream
中添加数据的 sink
,一个是用于接收数据的 stream
。
上面说了,只有坐在传送带旁边才能吃到传送带上的菜品,这里,我们也只需要使用streamController.stream.listen(...)
就可以收到通知 (当 Stream上有数据的时候)。
只要我们坐在传送带前,我们就成了消费者,对应的,当我们 listen
了一个 Stream
的时候,我们就成了一个 StreamSubscription (订阅者)。
最后,当我们不需要这个 Stream
的时候,我们需要将其 close
掉,使用streamController.close()
下面我们用一个例子来演示下:
import 'dart:async';
void main() {
// 声明一个 StreamController
StreamController controller = StreamController();
// 监听此 Stream
StreamSubscription subscription1 =
controller.stream.listen((value) => print('$value'));
// 往 Stream 中添加数据
controller.sink.add(0);
controller.sink.add('a, b, c, d');
controller.sink.add(3.14);
// 关闭 StreamController
controller.close();
}
输出:
0
a, b, c, d
3.14
上述代码中,我们声明了一个 StreamController,然后往往里面放置了三种不同的数据类型,当然,我们也可以使用泛型的方式来限制里面的数据类型:
StreamController<int> controller = StreamController<int>();
接下来我们看看 stream.listne(...)
这个方法,源码如下:
StreamSubscription<T> listen(void onData(T event),
{Function onError, void onDone(), bool cancelOnError});
可以看出 listen(...)
方法接受一个必选参数和三个可选参数,我们上一个例子中只是传递了必选参数 onData(...)
,其他额三个参数并没有传递,下面我们来举例说明 listen(...)
方法中各个参数的用途。
import 'dart:async';
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.listen(onData, onError: onError, onDone: onDone, cancelOnError: true);
controller.sink.add(0);
controller.sink.add(1);
// 发送一个 Error
controller.sink.addError(-1);
controller.sink.add(2);
controller.close();
}
void onData(int data) {
print('The value is $data');
}
void onError(err) {
print('The err is $err');
}
void onDone() {
print('The stream is done !');
}
输出:
The value is 0
The value is 1
The err is -1
从上述代码中,我们可以看到,stream.listen(...)
,接受 4 个参数,这 4 个参数的作用分别如下:
onData(T data)
: 用来接收 Stream
中的每一个事件。
onError(...)
: 注释上是这样说的 The [onError] callback must be of type void onError(error)
,也就是所他需要一个接受一个参数的方法。但它还支持接受两个参数的方法 void onError(error, StackTrace stackTrace)
。
onDone()
: 当一个 Stream
关闭了,也就是执行了 stream.close()
方法并且发送了 done
事件,这个方法会被调用。
cancelOnError
: 这是一个 bool
类型的值,意思也很简单,就是当 Stream
碰到 Error
事件的时候,是否关闭这个 Stream
。
我们上述代码中 cancelOnError
参数传递的是 true
,也就是说当Stream
遇到 Error
的时候,Stream
就关闭了,下面的事件就不会再发送出去了。我们把上面代码中的 cancelOnError
参数改为 false
其他的代码不变,输出如下:
The value is 0
The value is 1
The err is -1
The value is 2
The stream is done !
可以看出,这个情况下,即使 Stream
中遇到了 Error
,下面的事件依然会接着发送,并且最后的 done
事件也执行了。
Stream 类型
Stream 类型分为两种,分别为 Single-subscription Streams 和 Broadcast Streams
Single-subscription Streams
这种类型的 Stream
只允许一个订阅者,也就是只能 listen
一次。我们上一小节中的例子就是一个 Single-subscription Streams。接下来我们看看如果我们对这种类型的 Stream
订阅两次会发生什么情况。
import 'dart:async';
void main() {
// Single-subscription Streams
StreamController<int> controller = StreamController<int>();
// 第一个订阅者
StreamSubscription subscription1 =
controller.stream.listen((value) => print('subscription1 $value'));
// 第二个订阅者
StreamSubscription subscription2 =
controller.stream.listen((value) => print('subscription2 $value'));
controller.sink.add(0);
controller.sink.add(1);
controller.close();
}
输出:
Unhandled exception:
Bad state: Stream has already been listened to.
...
Broadcast Streams
这种类型的 Stream
允许任意数量的订阅者,只是新的订阅者只能从它开始订阅的时候接收事件。也就是订阅之前 Stream
中的事件是接受不到的。
Broadcast Streams 的声明方式如下:
StreamController<int> controller = StreamController<int>.broadcast();
接下来我们写个示例看看:
import 'dart:async';
void main() {
// Broadcast Streams
StreamController<int> controller = StreamController<int>.broadcast();
StreamSubscription sub1 = controller.stream.listen((value) => print('sub1 value is $value'));
controller.sink.add(0);
controller.sink.add(1);
StreamSubscription sub2 = controller.stream.listen((value) => print('sub2 value is $value'));
controller.sink.add(2);
controller.sink.add(3);
controller.close();
}
输出:
sub1 value is 0
sub2 value is 2
sub1 value is 1
sub2 value is 3
sub1 value is 2
sub1 value is 3
可以看出 sub1
可以接收到 Stream
中所有的数据,而 sub2
只能接收到从订阅这个 Stream
之后发送的数据。
StreamTransformer
当数据通过 Stream
传递的时候,我们可以按需来转换里面的数据,Dart 中给我们提供了 StreamTransformer
来对数据做出一些特定转换。
我们可以通过三种方式来实现数据转换:
-
Stream
自带的方法,如map
,where
等 - 通过
StreamTransformer.fromHandlers(...)
来转换 - 直接实现一个
StreamTransformer
来定义一个转换器
map、where ...
import 'dart:async';
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.where((value) => value % 2 == 0) // where
.map((value) => 'The value is $value') // map
.listen((value) => print(value));
controller.sink.add(0);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.add(4);
controller.close();
}
输出:
The value is 0
The value is 2
The value is 4
上述代码中,我们使用了 where
和 map
转换符。where
将满足条件的值过滤出来,然后 map
将整型的数字转换成字符串类型的值。
StreamTransformer.fromHandlers(...)
import 'dart:async';
// 转换方法
void handleData(data, EventSink sink) {
if (data % 2 == 0) {
sink.add(data);
}
}
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.transform(StreamTransformer.fromHandlers(handleData: handleData))
.listen((value) => print(value));
controller.sink.add(0);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.add(4);
controller.close();
}
输出:
0
2
4
自定义 StreamTransformer
在自定义我们自己的 Transformer 之前,我们先来看看 stream.transform(...)
做了什么事情,毕竟我们是通过 transform(...)
方法传入的 Transformer, transform(...)
源码如下:
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}
可以看出 transform 方法返回的依然回一个 Stream ,只不过这个 Stream 是经过转换后的Stream。 类似于 Java8 Stream 中的中间流,就是不停的返回 Stream 的那种。然后该方法是一个泛型方法,泛型类型分别为 T,和 S,可以这样理解,T 为该方法的入参类型,S 为该方法的出参类型,类似于 Java8 中的 Function,我们可以看下 Java 中的 Function 接口的部分源码:
/**
* ...
* @param <T> the type of the input to the function
* @param <R> the type of the result of the function
* * @since 1.8
*/
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
...
}
最后,transform(...)
方法调用了 streamTransformer.bind(this);
该方法返回的是一个新的 Stream
,也就是转换后的 Stream
,当然,bind()
方法也是我们自定义 StreamTransformer
时需要实现的方法。
我们在之前的例子中声明一个 StreamController
的时候,都是没有传递参数的,其实,StreamController 的构造方法是这样的:
factory StreamController(
{void onListen(),
void onPause(),
void onResume(),
onCancel(),
bool sync: false}) {
return sync
? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
可以看出,StreamController 是有参数的,并且都是可选参数,在这些参数中,我们重点实现 onListen()
,关于 onListen
的源码解释如下:
A Stream should be inert until a subscriber starts listening on it (using
the [onListen] callback to start producing events). Streams should not
leak resources (like websockets) when no user ever listens on the stream.
重点就是 onListen
方法是用来生产事件的。
接下来,来实现一个自定义的 StreamTransformer:
/// 自定义一个 StreamTransformer ,
/// 泛型类型 S 为入参类型,T 为出参类型
/// 这些类型都是 Stream 中传递的数据类型
class MyTransformer<S, T> implements StreamTransformer<S, T> {
// 用来生成一个新的 Stream 并且控制符合条件的数据
StreamController _controller;
StreamSubscription _subscription;
bool cancelOrError;
// 转换之前的 Stream
Stream<S> _stream;
MyTransformer({bool sync: false, this.cancelOrError}) {
_controller = new StreamController<T>(
onListen: _onListen,
onCancel: _onCancel,
onPause: () {
_subscription.pause();
},
onResume: () {
_subscription.resume();
},
sync: sync);
}
MyTransformer.broadcast({bool sync: false, bool this.cancelOrError}) {
// 定义一个 StreamController,注意泛型类型为 T,也就是出参类型,因为
// 我们是使用该 _controller 生成一个用来返回的新的 Stream<T>
_controller = new StreamController<T>.broadcast(
onListen: _onListen, onCancel: _onCancel, sync: sync);
}
void _onListen() {
// _stream 为转换之前的 Stream<S>
_subscription = _stream.listen(onData,
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: cancelOrError);
}
void _onCancel() {
_subscription.cancel();
_subscription = null;
}
// 数据转换
void onData(S data) {
if ((data as int) % 2 == 0) {
// 将符合条件的数据添加到新的 Stream 中
_controller.sink.add(data);
}
}
// 参数为转换之前的 Stream<S>
// 返回的是一个新的 Stream<T> (转换之后的 Stream)
@override
Stream<T> bind(Stream<S> stream) {
this._stream = stream;
return _controller.stream;
}
@override
StreamTransformer<RS, RT> cast<RS, RT>() {
// TODO: implement cast
return null;
}
}
使用如下:
void main() {
StreamController<int> controller = StreamController<int>();
controller.stream
.transform(new MyTransformer()) // 自定义的 StreamTransformer
.listen((value) => print('$value'));
controller.sink.add(0);
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
controller.sink.add(4);
controller.close();
}
输出:
0
2
4
如有错误,还请指出。谢谢!!!