文档网址:https://mcxiaoke.gitbooks.io/rxdocs/content/
private void repeat(){
/**
* main 1
main 2
main 1
main 2
main 1
main 2
main 1
main 2
main 1
main 2
*/
Observable.range(1, 2).repeat(5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("qwer",Thread.currentThread().getName() + " " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void range(){
Observable.range(1,10).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("qwer",Thread.currentThread().getName() + " " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void just(){
Observable.just(1,"df").subscribe();
Observable.just(1,"df",99L,new Object()).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void timer_interval(){
Observer<Long> observer = new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//延迟 1s 后执行一个任务,然后结束
//Observable.timer(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).subscribe(new Observer<Long>() {
Observable.timer(1000, TimeUnit.MILLISECONDS).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(observer);
//每隔 1s 执行一次任务,第一次任务执行前有 1s 的间隔,执行无限次
Observable.interval(1000, TimeUnit.MILLISECONDS).
subscribeOn(Schedulers.io()).subscribe(observer);
//每隔 1s 执行一次任务,立即执行第一次任务,执行无限次
Observable.interval(0, 1000, TimeUnit.MILLISECONDS).
subscribeOn(Schedulers.io()).subscribe(observer);
//每隔 1s 执行一次任务,立即执行第一次任务,只执行五次
Observable.interval(0, 1000, TimeUnit.MILLISECONDS).
take(5).
subscribe(observer);
//先执行一个任务,等待 1s,再执行另一个任务,然后结束
Observable.just(0L).doOnNext(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d("qwer", "执行第一个任务");
}
}).delay(1000, TimeUnit.MILLISECONDS).subscribe(observer);
}
private void from(){
List<String> data = new ArrayList<>();
data.add("1");
data.add("2");
data.add("3");
data.add("4");
Observable.fromIterable(data).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
}).subscribe();
Observable.fromIterable(data).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("qwer","accept : " + s);
}
}).dispose();
Observable.fromIterable(data).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("qwer","onSubscribe");
}
@Override
public void onNext(String s) {
Log.e("qwer","onNext" + s);
}
@Override
public void onError(Throwable e) {
Log.e("qwer","onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e("qwer","onComplete");
}
});
/**
* Observable 1
onNext 1
Observable 2
onNext 2
Observable 3
onNext 3
Observable 4
onNext 4
*/
Observable.fromIterable(data).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e("qwer"," Observable " + s);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("qwer"," onNext " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void create(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
for(int i = 0;i < 13;i++){
if(i == 10){
emitter.onError(new Throwable(""));
}else {
emitter.onNext(i);
}
}
emitter.onComplete();
}
}).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return 100;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("qwer","onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("qwer","onNext" + integer);
}
@Override
public void onError(Throwable e) {
Log.e("qwer","onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.e("qwer","onComplete");
}
});
}