异步
Dart中的异步操作主要是通过Future
和 Stream
来实现的。
Future
- 使用关键字
async await
实现一个异步函数,返回Future
。async await
的语法会让整个代码看起来像是同步操作,增加了代码可读性。
Future<String> loadData() async{
var result = await loadFromHttp();
return result; // result 是一个String类型
}
- 表示一个异步函数时,一定要在函数体前加上
async
关键字,await
只有在async
修饰的函数体内才有效- 异步函数返回类型一定是
Future
,函数体没有返回值时,函数的返回类型为Future<void>
- 异步函数的返回值不需要返回
Future
的实例,只需要返回真正关心的数据类型
- 在异步函数中,可以使用多次await
Future<void> loadData() async {
var user = await loadUserInfoFromHttp(user);
var info = await loadInfoFromHttp(info);
await storeInfoToDB();
}
await
后面的语句一定会等await
的语句执行完才会执行
- 异步函数的异常处理,使用try...catch
Future<void> loadData() async {
try{
await loadDataFromHttp();
}catch(e){
print('exception occur');
}finaly{
print('do something');
}
}
- Future 表示异步的结果,它有两种状态:未完成 和 已完成。
// 不等异步执行完
void main() {
print(getData()); // Instance of _Future<String>
}
// 等异步执行完
void main() async{
print(await getData()); // abc
}
Future<String> getData() async{
var result = await getFuture();
return result;
}
// 获取一个Future<String>的对象, 在3s后返回,字符串为 abc
Future<String> getFuture() => Future(Duration(seconds:3),()=>'abc');
await
方法会立即返回一个Future
的对象,而不是等异步完成后返回,因此在使用时要注意其是否时完成状态,未完成时是Future
对象,已完成时它是期待类型的值
- 使用
await for
来处理异步循环Future<void> testAwaitFor() async{ await for( var value in stream){ //do something; } }
- 循环内必须是一个
stream
类型 - 使用
break / return
可以跳出循环 - 一定不能在UI监听器上使用
awit for
- 循环内必须是一个
Stream
Stream
是一系列异步事件的序列。其类似于一个异步的 Iterable
,不同的是当向 Iterable
获取下一个事件时它会立即给你,但是 Stream
则不会立即给你而是在它准备好时告诉你。
创建Stream
创建一个Stream
通常有三种方式:
- 通过
async* yield
生成器创建Stream - 通过现有的
Stream/Future
转换 - 通过
StreamController
创建
- 通过
async*
生成器创建Stream
。主要语法async* yield/yield*
Stream<int> createStream() async*{
int i=0;
while(true){
await Future.delay(Duration(seconds:1));
yield i++; // 向Stream 发送数据
if(i==10){
break;
}
}
}
async*
异步生成器在函数调用时,生成Stream
;当Stream
被监听时,函数开始执行,函数执行完,Stream
关闭。如果调用StreamSubscription
(由Stream的listen()
返回)的cancel()
方法,那么函数执行到下一次yield
时即会终止,类似于return
效果。
通常很少使用这种方式创建Stream
,大部分情况下Stream都是由其他Stream转换而来的
-
通过现有的
Stream/Future
转换来- 通过
async* await for yield
将其他Stream
的值或Future
做处理后生成新的Stream
Stream<String> createStream(Stream<String> orginStream) async*{ await for(final value in originStream) { yield `new Stream value => ${value}`; } }
Stream<T> createStream<T>(Iterable<Future<T>> futures) async*{ for(final future: futures){ var result = await future; yield* result; } }
- 通过
Stream
的转换函数来转化
const newStream = Stream<int>.periodic(const Duration(seconds: 1), (x) => x).take(15); //每秒发射一个自增数,取前15个 const doubleStream = newStream.map((x)=> x * 2); // 将原stream中的每个数值*2
Dart 提供了大量的转换函数可以生成一个新的
Stream
,如果已有的函数不能满足需求,还可以自定义StreamTransformer
来实现。Dart:conver
中也实现了很多Stream
的转换:utf8.decoder
LineSplitter
.Stream<List<int>> content = File('someFile.txt').openRead(); List<String> lines = await content .transform(utf8.decoder) .transform(const LineSplitter()) .toList();
- 通过
通过
StreamController
创建
Stream<int> timedCounter(Duration interval, [int? maxCount]) {
late StreamController<int> controller;
Timer? timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter); // Ask stream to send counter values as event.
if (counter == maxCount) {
timer?.cancel();
controller.close(); // Ask stream to shut down and tell listeners.
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
timer?.cancel();
timer = null;
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream;
}
Stream
需要在有订阅之后才开始生产事件,否则StreamCtroller
从一开始就不断生产事件并缓存,就有可能造成内存泄漏。
构建StreamController
时,可以为其指定一个onListen
参数回调用以接收订阅通知。当Stream
获取到它的第一个订阅者时会触发调用onListen
回调。同样地,也可以指定一个onCancel
回调,该回调则会在控制器丢失它最后一个订阅者时触发调用。
Stream的使用
- 通过异步循环
await for
来接收stream
中的事件Future<int> sumStream(Stream<int> stream) async { var sum = 0; await for (final value in stream) { sum += value; } return sum; }
await for
最终只会返回一个处理结果。
- 通过
Stream
的函数返回符合条件的事件
Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object? needle);
Future<E> drain<E>([E? futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function()? orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = '']);
Future<T> lastWhere(bool Function(T element) test, {T Function()? orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function()? orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();
除了
contians
和pipe
方法外,其他方法在Iterable
中都有类似的方法。
- 使用
listen
方法,listen
方法会返回一个StreamSubscription
,可以通过StreamSubcription
对象对Stream
进行接收数据,暂停,取消等操作final stream = Stream.periodic(const Duration(seconds: 1), (i) => i * i) .take(10); final subscription = stream.listen((value)=>print(value)); //0 1 4 9 16 25 36 49 64 81 subscription.pause(); print(subscription.isPaused); // true subscription.resume(); print(subscription.isPaused); // false subscription.cancel();
只有被调用了
listen
方法,Stream
才开始生产事件;自定义Stream
重新listen
方法时一锭要注意这点
Stream 的异常处理
- 当使用
await for
时,可以使用try catch
捕获
```
final stream = Stream.periodic(const Duration(seconds: 1), (i){
print('product event...');
if(i== 10){
throw Exception('wrong number');
}else {
return i *i;
}})
.take(20);
try{
await for(final value in stream){
print(value);
}
}catch(e){
print(e);
}
```
- 当使用
listen
订阅时,可以通过可选参数的onError
接受错误事件。
final stream = Stream.periodic(const Duration(seconds: 1), (i){
print('product event...');
if(i== 10){
throw Exception('wrong number');
}else {
return i *i;
}})
.take(20);
stream.listen((value)=>print(value), onError:(e)=>print(e));
两者区别在于,
await for
在捕获异常时,会停止接受事件,而listen
则会跳过继续执行下面的事件