Stream: 超级抽象的一个XXX
Stream 的分类
(1) "Single-subscription" -- 单订阅流
(2) "broadcast" -- 广播式的流(可多订阅)
- 单订阅流只能被订阅一次,重复订阅会报错, 直到设置listen 后才会发送。单订阅流通常用于流式数据块较大的连续数据,如文件I/O。
- 广播式的 可以订阅多次,在listen之前的数据会丢失。
- 如果一个流是单订阅模式 却想多次订阅,可以通过asBroadcastStream ()方法来修改。
Stream 的创建
(1) 从集合中创建一个新的单订阅流, Stream.fromIterable
Stream stream1 = Stream.fromIterable([11, 22, 33]);
(2) 从Future中创建一个新的单订阅流, Stream.fromFuture
Stream stream = Stream.fromFuture(Future(()=> 1));
(3) 通过Stream.fromFutures创建
Stream stream3 = Stream.fromFutures([
Future(() => 111),
Future(() => 111),
]);
(4) 创建一个每隔自定义时间发送一个数据的流
Stream stream2 = Stream.periodic(Duration(seconds: 2), (a) {
print("a------>$a");
if (a < 3)
return a;
else
return a *10;
});
它有2 个参数, 第一个是间隔的时间, 第二个是每次发送数据前的回调方法,可以通过这个方法的返回值来修改流的值。 (返回值即向流中添加的内容)
上面创建的流是一个固定间隔时间无限发送的流,这就有问题了。 正常情况下谁会去搞一个无限发送数据流的功能呢? 怎样控制它的结束?-----------
用stream的take 方法。 stream2 = stream2.take(5); 这样就只会发送5次了。
升级版的takeWhile () : 可以用来做筛选
stream2 = stream2.takeWhile((data) {
return data < 10; // 设置一个上限为10
});
为了方操作 Stream ,官方提供了StreamController;如上图所示,他提供StreamSink来添加流 (入口),同时又提供 stream 属性用于对外的监听和变换。 stream.listen的返回时一个StreamSubscription,可以通过它的pause(),resume(),cancel()等方法来操作流的订阅。
StreamController:
StreamController controller = StreamController<String>(); // 创建一个单订阅流
StreamController controller = StreamController.broadcast(); // 创建一个广播式的订阅流
参数sync用来指定是同步还是异步。
listen : 用来设置监听, 它的返回值是 StreamSubscribe。
StreamSubscribe:
pause() : 暂停监听(是立即暂停),暂停后的事件流不会丢失,会在resume后一起回调
resume(): 唤醒pause的流
cancel(): 取消
举个栗子🌰
// 1. StreamControl
StreamController controller = StreamController<String>();
// 2. StreamSink
StreamSink sink = controller.sink;
// 3. Stream
Stream stream = controller.stream;
stream.transform(StreamTransformer<String, String>.fromHandlers(
handleData: (String data, EventSink<String> sink) {
// 在这里设置transform 是没有用的,不会走这里; 除非在stream.transform返回的stream上加listen监听。
if (!data.contains("数据2")) {
sink.add(data);
}
}));
sink.add("3秒后才设置监听。");
// 4. subscribe
Timer(Duration(seconds: 3), () {
StreamSubscription subscription = controller.stream.transform(
StreamTransformer<String, String>.fromHandlers(
handleData: (String data, EventSink<String> sink) {
print("transform");
if (!data.contains("数据3")) {
sink.add(data);
}
})).listen((event) {
print("接收到新的消息: " + event);
});
sink.add("我是一条新的数据");
Timer(Duration(milliseconds: 100), () {
sink.add("pause...");
subscription.pause(); // 暂停
sink.add("我是一条新的数据pause");
});
Timer(Duration(seconds: 5), () {
subscription.resume();
sink.add("我是一条新的数据2");
});
});
输出结果: 绿色的先输出,过5秒后黄色的才输出
那么问题来了, 为什么pause 之前的add的那一个流没有输出呢? 跟进去看源码就明白了,pause 期间是不会分发事件的。
schedule 的实现
这样也就明白了, Stream最终是 想microtask queue 中添加了一个microtask 来实现异步的功能。
说点题外的: flutter是单线程,他的异步实现是通过Event Looper 来实现的。Event looper 中包含2个队列: (1)MicorTask Queue (2) Event Queue , MicroTask 的优先级是大于Event Queue的,只有所有的MicroTask Queue中的任务都完成以后才会去执行Event Queue中的内容。
当然 StreamController 可以是同步的,只要在创建的时候将参数sync设置为true即可,sync: true
如何通过Stream来实现响应式的组件 ?
通过StreamBuilder
看个例子:
class StreamModel {
StreamController _controller;
StreamSink<List<BookResponseData>> _sink;
Stream<List<BookResponseData>> stream;
StreamModel() {
// 构造方法中初始化流相关的对象
_controller = StreamController<List<BookResponseData>>.broadcast();
_sink = _controller.sink;
stream = _controller.stream;
}
/// 获取书本列表
getBookList() async {
var httpClient = new HttpClient();
var uri = new Uri.https('www.apiopen.top', '/novelApi');
var request = await httpClient.getUrl(uri);
var response = await request.close();
var responseBody = await response.transform(utf8.decoder).join();
// 将获取到的字符串转换成定义好的Book实体类
BookResponseEntity entity =
BookResponseEntity.fromJson(json.decode(responseBody));
// 接口中拿到数据之后,通过sink.add 添加一条流即可, 这样在StreamBuild中就会有回调。
_sink.add(entity.data);
}
/// 资源
dispose() {
_sink.close();
_controller.close();
}
}
组件类:
class BookList extends StatefulWidget {
@override
State<StatefulWidget> createState() {
return _BookListState();
}
}
//https://api.apiopen.top/getSingleJoke?sid=28654780
//https://www.apiopen.top/novelApi
class _BookListState extends State<BookList> {
StreamModel streamModel;
@override
void initState() {
super.initState();
streamModel = StreamModel();
}
@override
Widget build(BuildContext context) {
// print("build");
return Scaffold(
appBar: AppBar(
title: Text("stream demo"),
),
body: Container(
child: StreamBuilder<List<BookResponseData>>(
stream: streamModel.stream, // 要监听的流
initialData: [], // 初始值,可以不设
builder: (context, a) { // sink.add 后,就会回调这个方法。
List<Widget> views = [];
if (a.data != null && a.data.length > 0) {
a.data.forEach((BookResponseData data) {
views.add(Container(
padding: EdgeInsets.all(10.0),
child: Column(
children: <Widget>[
Text(
data.bookname,
style: TextStyle(fontWeight: FontWeight.w600),
),
Text(data.bookInfo),
],
),
));
});
}
return ListView(
children: views,
);
},
),
),
floatingActionButton: FloatingActionButton(
child: Text("获取数据"),
onPressed: () {
streamModel.getBookList();
}),
);
}
@override
void dispose() {
streamModel.dispose();
super.dispose();
}
}
以上便可以实现基于Stream的响应式组件。
又有问题了。。。。 没办法问题就是这么多 (来打我... )
为什么StreamBuilder能够监听到Stream的变化来刷新UI?
跟进去看一下源码
void _subscribe() {
if (widget.stream != null) {
_subscription = widget.stream.listen((T data) {
setState(() {
_summary = widget.afterData(_summary, data);
});
}, onError: (Object error) {
setState(() {
_summary = widget.afterError(_summary, error);
});
}, onDone: () {
setState(() {
_summary = widget.afterDone(_summary);
});
});
_summary = widget.afterConnected(_summary);
}
}
你会发现 StreamBuilder是一个StatefulWidget, 本质还是在stream.listen中通过setState 来实现响应数据刷新View。
这样,对Stream及基于Stream的响应式组件就有个大致的了解了... ...
over。。。