本文使用eclipse编辑器,gradle依赖jar,如若未配置此环境,请转Java Eclipse配置gradle编译项目配置好环境后再查看此文
- 创建Gradle(STS) Project工程,并删除其他一些不需要的文件。
在build.gradle文件的dependencies中依赖,并刷新依赖。
compile "io.reactivex.rxjava2:rxjava:2.1.3"
创建一个Client.java类,实现main方法。接下来开始使用RxJava。
创建一个simple方法,该方法简单的使用RxJava.
/**
* 简单使用
*/
public static void simple() {
Flowable//流
.just("one") //数据
.subscribe(new Consumer<String>() {//订阅一个消费者
public void accept(String t) throws Exception {
System.out.println(t); // 打印数据
}
});
}
输出为:
one
- 不同线程的调度,切换线程,不同线程中传递数据。
/**
* 线程示例
* @throws InterruptedException
*/
public static void threadSimple() throws InterruptedException {
Flowable//流
.fromCallable(new Callable<String>() {//子线程调用
public String call() throws Exception {
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
return "true";
}
})
.subscribeOn(Schedulers.io())//io线程
.observeOn(Schedulers.single())//单线程
.subscribe(new Consumer<String>() {//主线程订阅
public void accept(String t) throws Exception {
System.out.println(Thread.currentThread().getName());
System.out.println(t);
}
}, new Consumer<Throwable>() {
public void accept(Throwable t) throws Exception {
System.out.println(t);
}
});
Thread.sleep(2000);
}
打印结果:
RxCachedThreadScheduler-1
RxSingleScheduler-1
true
3.实现1-10数字的自乘。
/**
* map使用
*/
public static void mapSimple() {
Flowable//流
.range(1, 10)//从1到10
.observeOn(Schedulers.computation())//用于计算工作的实例
.map(new Function<Integer, Integer>() {//对每一项进行自乘
public Integer apply(Integer t) throws Exception {
return t*t;
}
})
.blockingSubscribe(new Consumer<Integer>() {//当前线程回调
public void accept(Integer t) throws Exception {
System.out.println(t);
}
});
}
打印结果:
1
4
9
16
25
36
49
64
81
100
- 实现1-10自乘,乱序打印。
/**
* flatMap使用
*/
public static void flatMapSimple() {
Flowable
.range(1, 10)
.flatMap(new Function<Integer, Publisher<? extends Integer>>() {
public Publisher<? extends Integer> apply(Integer t) throws Exception {
return Flowable
.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Integer>() {
public Integer apply(Integer t) throws Exception {
return t*t;
}
});
}
})
.blockingSubscribe(new Consumer<Integer>() {
public void accept(Integer t) throws Exception {
System.out.println(t);
}
});
}
打印结果:
9
16
25
36
49
64
1
4
81
100
- 从2.0.5开始,有一个“实验”操作符的并行和“平行流”,有助于实现相同的并行处理模式。
/**
* 平行调用map
*/
public static void parallelSimple() {
Flowable
.range(1, 10)
.parallel()//平行
.runOn(Schedulers.computation())
.map(new Function<Integer, Integer>() {
public Integer apply(Integer t) throws Exception {
return t*t;
}
})
.sequential()//顺序
.blockingSubscribe(new Consumer<Integer>() {
public void accept(Integer t) throws Exception {
System.out.println(t);
}
});
}
打印结果:
1
4
9
16
25
36
49
64
81
100