在Dart库中,有两种实现异步编程的方式(Future和Stream),使用它们只需要在代码中引入dart:async即可。
Flutter 中,整个 Stream 主要包含了 StreamController、Sink 、Stream 、StreamSubscription 四个对象,通过这四个对象来操控整个Stream的运行。
- StreamController
有一个事件源叫 Stream,为了方便控制 Stream ,官方提供了使用 StreamController 作为管理;同时它对外提供了 StreamSink 对象作为事件输入口,可通过 sink 属性访问; 又提供 stream 属性提供 Stream 对象的监听和变换,最后得到的 StreamSubscription 可以管理事件的订阅。
可以说, StreamController就是如其名字所示一样用来管理其他三个对象的对象。
- StreamSink
sink英文的意思为水槽,我们可以将其理解为日常生活中的厨房的洗碗槽,洗碗槽(sink)中的水(data)会流进管子(stream)中。一般作为事件的入口,提供如 add , addStream 等。
- StreamSubscription
这是一个事件订阅后的对象,,Stream中有两种订阅模式,分别是单点订阅和广播。
表面上用于管理订阅过等各类操作,如 cacenl 、pause ,同时在内部也是事件的中转关键。
- Stream
事件源本身,一般可用于监听事件或者对事件进行转换,如 listen 、 where 。
什么是Stream?
为了将Stream的概念可视化与简单化,可以将它想成是管道(pipe)的两端,它只允许从一端插入数据并通过管道从另外一端流出数据。
在Flutter中,
- 我们将这样的管道称作Stream;
- 为了控制Stream,我们通常可以使用StreamController来进行管理;
- 为了向Stream中插入数据,StreamController提供了类型为StreamSink的属性sink作为入口;
- StreamController提供stream属性作为数据的出口。
class StreamDemo extends StatelessWidget {
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('StreamDemo'),
elevation: 0.0,
),
body: StreamTestDemo(),
);
}
}
class StreamTestDemo extends StatefulWidget {
@override
State<StatefulWidget> createState() {
// TODO: implement createState
return StreamTestDemoState();
}
}
class StreamTestDemoState extends State<StreamTestDemo> {
StreamSubscription streamSubscription;
StreamController<String> _streamDemo;
StreamSink _sinkDemo;
var _data = "--";
@override
void initState() {
super.initState();
//使用StreamController 管理Stream
_streamDemo = StreamController<String>();
//为了向Stream中插入数据,StreamController提供了类型为StreamSink的属性sink作为入口;
_sinkDemo = _streamDemo.sink;
//StreamSubscription 可以管理事件的订阅 取消 暫停 添加
streamSubscription =
_streamDemo.stream.listen(onData, onError: onError, onDone: onDone);
}
@override
void dispose() {
// TODO: implement dispose
super.dispose();
_streamDemo.close();
}
void onData(String data) {
print('---------${data}');
setState(() {
_data = data;
});
}
void onError(error) {
print('--------error:$error');
}
void onDone() {
print('--------Done');
}
Future<String> fetchData() async {
await Future.delayed(new Duration(seconds: 3));
// throw 'soming err';
return 'hello word';
}
void _addDataToStream() async {
print('------Add data to stream.');
String data = await fetchData();
// _streamDemo.add(data);
_sinkDemo.add(data);
}
void _pauseStream() {
print('------Pause subscription');
streamSubscription.pause();
}
void _resumeStream() {
print('------Resume subscription');
streamSubscription.resume();
}
void _cancelStream() {
print('------Cancel subscription');
streamSubscription.cancel();
}
@override
Widget build(BuildContext context) {
// TODO: implement build
return Container(
child: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
Text(_data),
Row(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
FlatButton(
child: Text('Add'),
onPressed: _addDataToStream,
),
FlatButton(
child: Text('Pause'),
onPressed: _pauseStream,
),
FlatButton(
child: Text('Resume'),
onPressed: _resumeStream,
),
FlatButton(
child: Text('Cancel'),
onPressed: _cancelStream,
),
],
),
],
),
),
);
}
}
如下圖 点击add 后3s后会显示hello word 同时也可以点击 暂停 恢复 取消等 查看结果
另外 Stream 还支持多次订阅
修改代码
@override
void initState() {
super.initState();
print('-----Create a stream.');
//支持多次订阅
_streamDemo = StreamController.broadcast();
_sinkDemo = _streamDemo.sink;
print('-----Start listening on a stream.');
_streamDemoSubscription =
_streamDemo.stream.listen(onData, onError: onError, onDone: onDone);
_streamDemo.stream.listen(onDataTwo, onError: onError, onDone: onDone);
print('-----Initialize completed.');
}
void onDataTwo(String data) {
print('-----onDataTwo: $data');
}
运行后便可以查看到两次的ondata 输出