基本API补充
1.不完整回调函数 Action1 中call() 方法
Observable<String> observable = Observable.just("hello", "world!");
// 我们之前的写法
// observable.subscribe(new Observer<String>() {
//
// @Override
// public void onCompleted() {
//
// }
//
// @Override
// public void onError(Throwable e) {
//
// }
//
// @Override
// public void onNext(String t) {
// Log.i("main", "值:" + t);
// }
// });
observable.subscribe(new Action1<String>() {
/**
* 相当于onNext
*/
@Override
public void call(String t) {
Log.e("main", "值:" + t);
}
});
// observable.subscribe(onNext, onError)
// observable.subscribe(onNext, onError, onCompleted);
结果输出:
08-07 02:46:32.001 4533-4533/com.haocai.architect.rxjava E/main: 值:hello
08-07 02:46:32.001 4533-4533/com.haocai.architect.rxjava E/main: 值:world!
call()相当于onNext方法
2.过滤函数
(1) filter
filter(Func1)用来过滤观测序列中我们不想要的值,只返回满足条件的值,我们看下原理图:
public class FilterActivity extends Activity {
private Observable<AppInfo> observable;
private AppInfoAdapter appInfoAdapter;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple9);
observable = getApps();
initView();
}
private void initView() {
ListView listView = (ListView) findViewById(R.id.lv_app_name);
appInfoAdapter = new AppInfoAdapter(this);
listView.setAdapter(appInfoAdapter);
}
/**
* 创建Observable
*
* @return
*/
private Observable<AppInfo> getApps() {
AppInfo appInfo1 = new AppInfo("Xiong", 0);
AppInfo appInfo2 = new AppInfo("Tony", 0);
AppInfo appInfo3 = new AppInfo("Tomcat", 0);
AppInfo appInfo4 = new AppInfo("Lucy", 0);
AppInfo appInfo5 = new AppInfo("Lucy pioneer", 0);
return Observable
.just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).filter(
new Func1<AppInfo, Boolean>() {
@Override
public Boolean call(AppInfo t) {
return t.getName().contains("Lucy");
}
});
}
public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
//完成之后刷新UI
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(AppInfo t) {
Log.e("main",t.getName());
//添加数据
appInfoAdapter.addAppInfo(t);
}
});
}
}
结果输出:
08-07 03:19:10.492 32521-32521/com.haocai.architect.rxjava E/main: Lucy
08-07 03:19:10.492 32521-32521/com.haocai.architect.rxjava E/main: Lucy pioneer
filter相关源码:
public final class OnSubscribeFilter<T> implements OnSubscribe<T> {
final Observable<T> source;
final Func1<? super T, Boolean> predicate;
public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) {
this.source = source;
this.predicate = predicate;
}
@Override
public void call(final Subscriber<? super T> child) {
FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
child.add(parent);
source.unsafeSubscribe(parent);
}
static final class FilterSubscriber<T> extends Subscriber<T> {
final Subscriber<? super T> actual;
final Func1<? super T, Boolean> predicate;
boolean done;
public FilterSubscriber(Subscriber<? super T> actual, Func1<? super T, Boolean> predicate) {
this.actual = actual;
this.predicate = predicate;
request(0);
}
@Override
public void onNext(T t) {
boolean result;
try {
result = predicate.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
if (result) {
actual.onNext(t);
} else {
request(1);
}
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
super.setProducer(p);
actual.setProducer(p);
}
}
}
(2) take(获取前几位或指定范围)
/**
* 创建Observable
*
* @return
*/
private Observable<AppInfo> getApps() {
AppInfo appInfo1 = new AppInfo("Xiong", 0);
AppInfo appInfo2 = new AppInfo("Tony", 0);
AppInfo appInfo3 = new AppInfo("Tomcat", 0);
AppInfo appInfo4 = new AppInfo("Lucy", 0);
AppInfo appInfo5 = new AppInfo("Lucy pioneer", 0);
//获取当前数据前两条
return Observable
.just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).take(2);
}
public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
//完成之后刷新UI
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(AppInfo t) {
Log.e("main",t.getName());
//添加数据
appInfoAdapter.addAppInfo(t);
}
});
}
结果输出:
08-07 05:54:16.887 12684-12684/com.haocai.architect.rxjava E/main: Xiong
08-07 05:54:16.887 12684-12684/com.haocai.architect.rxjava E/main: Tony
take相关源码
public final class OperatorTake<T> implements Operator<T, T> {
final int limit;
public OperatorTake(int limit) {
if (limit < 0) {
throw new IllegalArgumentException("limit >= 0 required but it was " + limit);
}
this.limit = limit;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Subscriber<T> parent = new Subscriber<T>() {
int count;
boolean completed;
@Override
public void onCompleted() {
if (!completed) {
completed = true;
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!completed) {
completed = true;
try {
child.onError(e);
} finally {
unsubscribe();
}
}
}
@Override
public void onNext(T i) {
if (!isUnsubscribed() && count++ < limit) {
boolean stop = count == limit;
child.onNext(i);
if (stop && !completed) {
completed = true;
try {
child.onCompleted();
} finally {
unsubscribe();
}
}
}
}
/**
* We want to adjust the requested values based on the `take` count.
*/
@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {
// keeps track of requests up to maximum of `limit`
final AtomicLong requested = new AtomicLong(0);
@Override
public void request(long n) {
if (n > 0 && !completed) {
// because requests may happen concurrently use a CAS loop to
// ensure we only request as much as needed, no more no less
while (true) {
long r = requested.get();
long c = Math.min(n, limit - r);
if (c == 0) {
break;
} else if (requested.compareAndSet(r, r + c)) {
producer.request(c);
break;
}
}
}
}
});
}
};
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
}
/*
* We decouple the parent and child subscription so there can be multiple take() in a chain such as for
* the groupBy Observer use case where you may take(1) on groups and take(20) on the children.
*
* Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM.
*
* However, if we receive an unsubscribe from the child we still want to propagate it upwards so we
* register 'parent' with 'child'
*/
child.add(parent);
return parent;
}
}
(3) takeLast (获取最后几位)
/**
* 创建Observable
*
* @return
*/
private Observable<AppInfo> getApps() {
AppInfo appInfo1 = new AppInfo("Xiong", 0);
AppInfo appInfo2 = new AppInfo("Tony", 0);
AppInfo appInfo3 = new AppInfo("Tomcat", 0);
AppInfo appInfo4 = new AppInfo("Lucy", 0);
AppInfo appInfo5 = new AppInfo("Lucy pioneer", 0);
//获取当前数据前两条
return Observable
.just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).takeLast(2);
}
public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
//完成之后刷新UI
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(AppInfo t) {
Log.e("main",t.getName());
//添加数据
appInfoAdapter.addAppInfo(t);
}
});
}
结果输出:
08-07 06:16:08.483 32192-32192/com.haocai.architect.rxjava E/main: Lucy
08-07 06:16:08.483 32192-32192/com.haocai.architect.rxjava E/main: Lucy pioneer
(4) distinct (去重)
private Observable<String> getApps() {
//获取当前数据前两条
return Observable.just("Tony","pioneer", "Tomcat","Tony","Lucy","Tomcat","Tony").distinct();
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
//完成之后刷新UI
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.e("main",t);
}
});
}
结果输出:
08-07 06:47:40.045 28387-28387/com.haocai.architect.rxjava E/main: Tony
08-07 06:47:40.045 28387-28387/com.haocai.architect.rxjava E/main: pioneer
08-07 06:47:40.045 28387-28387/com.haocai.architect.rxjava E/main: Tomcat
08-07 06:47:40.045 28387-28387/com.haocai.architect.rxjava E/main: Lucy
(5) distinctUntilChanged(去除位置相邻重复数据)
/**
* 创建Observable
*
* @return
*/
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("Michael");
list.add("pioneer");
list.add("Michael");
list.add("Michael");
list.add("Huni");
list.add("Huni");
list.add("Huni");
list.add("King");
list.add("Huni");
return Observable.from(list).distinctUntilChanged();
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.e("main", "过滤后的值: " + t);
}
});
}
结果输出:
08-07 07:46:45.444 17378-17378/com.haocai.architect.rxjava E/main: 过滤后的值: Michael
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 过滤后的值: pioneer
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 过滤后的值: Michael
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 过滤后的值: Huni
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 过滤后的值: King
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 过滤后的值: Huni
(6) First
first()顾名思义,它是的Observable只发送观测序列中的第一个数据项。
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
// first:发送序列中第一个值(内部调用了take(1).single())
// last:发送最后一个(内部调用了takeLast(1).single())
return Observable.from(list).first();
// return Observable.from(list).last();
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.e("main", "过滤后的值: " + t);
}
});
}
结果输出:
08-07 08:30:57.648 25076-25076/com.haocai.architect.rxjava E/main: 过滤后的值: Michael
(7) Last
last()只发射观测序列中的最后一个数据项。
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
// first:发送序列中第一个值(内部调用了take(1).single())
// last:发送最后一个(内部调用了takeLast(1).single())
return Observable.from(list).last();
}
结果输出:
08-07 08:30:57.648 25076-25076/com.haocai.architect.rxjava E/main: 过滤后的值: Cookie
(8)Skip
skip(int)让我们可以忽略Observable发射的前n项数据。
/**
* 创建Observable
*
* @return
*/
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("Pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
list.add("Faker");
list.add("Gigi");
// skip:从头开始,跳过多少个,然后在发送
// skipLast:最后面的多少个我不需要
return Observable.from(list).skip(2);
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.i("main", "过滤后的值: " + t);
}
});
}
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 过滤后的值: Huni
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 过滤后的值: King
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 过滤后的值: Cookie
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 过滤后的值: Faker
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 过滤后的值: Gigi
(9)SkipLast
skipLast(int)忽略Observable发射的后n项数据。
/**
* 创建Observable
*
* @return
*/
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("Pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
list.add("Faker");
list.add("Gigi");
// skip:从头开始,跳过多少个,然后在发送
// skipLast:最后面的多少个我不需要
return Observable.from(list).skipLast(2);
}
结果输出:
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 过滤后的值: Michael
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 过滤后的值: Pioneer
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 过滤后的值: Huni
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 过滤后的值: King
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 过滤后的值: Cookie
(10)SkipLast
elementAt(int)用来获取元素Observable发射的事件序列中的第n项数据,并当做唯一的数据发射出去。
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("Pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
list.add("Faker");
list.add("Gigi");
// skip:从头开始,跳过多少个,然后在发送
// skipLast:最后面的多少个我不需要
return Observable.from(list).elementAt(2);
}
08-07 09:01:17.495 20645-20645/com.haocai.architect.rxjava I/main: 过滤后的值: Huni
(11)Sample
sample操作符是定期扫描源Observable产生的结果,在指定的间隔周期内进行采样
获得定期发射Observable最近的数据
例一
observable.interval(1, TimeUnit.SECONDS).sample(2, TimeUnit.SECONDS).subscribe(
new Observer<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long t) {
Log.i("main", "接收到的值: " + t);
}
});
08-07 09:36:08.478 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 0
08-07 09:36:10.477 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 2
08-07 09:36:12.478 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 4
08-07 09:36:14.479 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 6
08-07 09:36:16.478 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 8
......
例二
Observable.create(subscriber -> {
subscriber.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}).sample(999, TimeUnit.MILLISECONDS)//或者为throttleLast(1000, TimeUnit.MILLISECONDS)
.subscribe(item-> Log.d("JG",item.toString()));
//结果为2,3,5
(12)Timeout
timeout: 如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable。
private Observable<String> getApps() {
observable = Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
observer.onNext("Kpioneer");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
observer.onNext("Lucy");
observer.onCompleted();
}
});
return observable;
}
public void click(View v) {
observable.timeout(999, TimeUnit.MILLISECONDS,Observable.just("Michel","QQ")).subscribe(
new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.i("main", "接收到的值: " + t);
}
});
}
结果输出:
08-07 10:02:30.806 11757-11757/com.haocai.architect.rxjava I/main: 接收到的值: Kpioneer
08-07 10:02:31.808 11757-11824/com.haocai.architect.rxjava I/main: 接收到的值: Michel
08-07 10:02:31.808 11757-11824/com.haocai.architect.rxjava I/main: 接收到的值: QQ
}
如果不指定备用Observable结果为Kpioneer, onError
3.变换操作
(1) Map
map()函数接受一个Func1类型的参数(就像这样map(Func1<? super T, ? extends R> func)),然后把这个Func1应用到每一个由Observable发射的值上,将发射的值转换为我们期望的值。这种狗屁定义我相信你也听不懂,我们来看一下官方给出的原理图:
userModelList = new ArrayList<UserModel>();
for (int i = 0; i < 3; i++) {
UserModel userModel = new UserModel("userId_" + i, "userName_" + i);
List<OrderModel> orderList = new ArrayList<OrderModel>();
for (int j = 0; j < 2; j++) {
OrderModel orderModel = new OrderModel("userId_" + i
+ "_orderId_" + j, "user_" + i + "_orderName_" + j);
orderList.add(orderModel);
}
userModel.setOrderList(orderList);
userModelList.add(userModel);
}
Observable.from(userModelList).map(new Func1<UserModel, String>() {
@Override
public String call(UserModel userModel) {
return userModel.getUserName();
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("main", "转换之后的值:" +s);
}
});
08-07 11:39:51.493 2499-2499/com.haocai.architect.rxjava I/main: 转换之后的值:userName_0
08-07 11:39:51.493 2499-2499/com.haocai.architect.rxjava I/main: 转换之后的值:userName_1
08-07 11:39:51.493 2499-2499/com.haocai.architect.rxjava I/main: 转换之后的值:userName_2
(2) flatmap
flatMap()的原理是这样的:
1.将传入的事件对象装换成一个Observable对象;
2.这是不会直接发送这个Observable, 而是将这个Observable激活让它自己开始发送事件;
3.每一个创建出来的Observable发送的事件,都被汇入同一个Observable,这个Observable负责将这些事件统一交给Subscriber的回调方法。
这三个步骤,把事件拆成了两级,通过一组新创建的Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是flatMap()所谓的flat。
最后我们来看看flatMap的原理图:
从前面的例子中你坑定发现了,flatMap()和map()都是把传入的参数转化之后返回另一个对象。但和map()不同的是,flatMap()中返回的是Observable对象,并且这个Observable对象并不是被直接发送到 Subscriber的回调方法中。
userModelList = new ArrayList<UserModel>();
for (int i = 0; i < 3; i++) {
UserModel userModel = new UserModel("userId_" + i, "userName_" + i);
List<OrderModel> orderList = new ArrayList<OrderModel>();
for (int j = 0; j < 2; j++) {
OrderModel orderModel = new OrderModel("userId_" + i
+ "_orderId_" + j, "user_" + i + "_orderName_" + j);
orderList.add(orderModel);
}
userModel.setOrderList(orderList);
userModelList.add(userModel);
}
// flatmap提供这样的解决方案(权衡)
// 场景:解决会到接口嵌套问题(例如:授权认证成功之后,登录场景)
Observable.from(userModelList).flatMap(new Func1<UserModel, Observable<OrderModel>>() {
@Override
public Observable<OrderModel> call(UserModel userModel) {
return Observable.from(userModel.getOrderList());
}
}).subscribe(new Action1<OrderModel>() {
@Override
public void call(OrderModel orderModel) {
Log.i("main", "转换之后的值:" +orderModel.getOrderId());
}
});