1. debounce操作符
debounce:“抖动”,该操作符对Observable每产生一个结果后,如果在规定的间隔时间内没有别的结果产生,则把这个结果提交给订阅者处理,否则忽略该结果。
示例原理用法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
})
// 设置时间为0.5秒
.debounce(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
运行结果
“2”,“4”,“5”
分析
- 事件1发射后过了400毫秒后发射事件2,此时事件1不满足时间的条件被遗弃,然后重新计时;
- 事件2发出后休眠了505毫秒,超过了500毫秒,所以事件2被发射出来;
- 事件3发出来后又过了100毫秒事件4发出来,所以事件3被遗弃;
- 事件4重新计时,后又过了605毫秒下一个事件才发出,所以4被发射了出来;
- 同理,5之后的0.5秒内也没有再发出别的事件,所以最终5也被发射了出来。
2. switchMap操作符
当源Observable发射一个新的数据项时,如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项。如果都是在同一个线程里跑的话,那么该操作符与ContactMap无异;只有在不同的线程里跑的时候,即线程方案为newThread的时候,才会出现这种情况。
同一线程
Observable.just("A", "B", "C", "D", "E").switchMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
Observable<String> ob = Observable.just(s);
return ob;
}
}).subscribeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d("------>onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.d("------>onError()" + e);
}
@Override
public void onNext(String s) {
Log.d("------>onNext:" + s);
}
});
输出结果:
------>onNext:A
------>onNext:B
------>onNext:C
------>onNext:D
------>onNext:E
------>onCompleted()
并发
Observable.just("A", "B", "C", "D", "E").switchMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.just(s).subscribeOn(Schedulers.newThread());
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d("------>onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.d("------>onError()" + e);
}
@Override
public void onNext(String s) {
Log.d("------>onNext:" + s);
}
});
输出结果:
------>onNext:E
------>onCompleted()
3. 搜索功能
使用RxJava2提供的三个操作符进行了优化:
- 使用debounce操作符,当输入框发生变化时,不会立刻将事件发送给下游,而是等待200ms,只有在这段事件内,输入框没有发生变化,那么才发送该事件;
- 使用switchMap操作符,当发起了123的请求之后,即使12的结果返回了,也不会发送给下游。
public class SearchActivity extends AppCompatActivity {
private EditText mEtSearch;
private TextView mTvSearch;
private PublishSubject<String> mPublishSubject;
private DisposableObserver<String> mDisposableObserver;
private CompositeDisposable mCompositeDisposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_search);
mEtSearch = (EditText) findViewById(R.id.et_search);
mTvSearch = (TextView) findViewById(R.id.tv_search_result);
mEtSearch.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override
public void afterTextChanged(Editable s) {
startSearch(s.toString());
}
});
mPublishSubject = PublishSubject.create();
mDisposableObserver = new DisposableObserver<String>() {
@Override
public void onNext(String s) {
mPublishSubject.onNext(s);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
mPublishSubject.debounce(200, TimeUnit.MILLISECONDS).filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return s.length() > 0;
}
}).switchMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String query) throws Exception {
return getSearchObservable(query);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(mDisposableObserver);
mCompositeDisposable = new CompositeDisposable();
mCompositeDisposable.add(mDisposableObserver);
}
private Observable<String> getSearchObservable(final String query) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
try {
Thread.sleep(100 + (long) (Math.random() * 500));
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
observableEmitter.onComplete();
}
}).subscribeOn(Schedulers.io());
}
@Override
protected void onDestroy() {
super.onDestroy();
mCompositeDisposable.clear();
}
}