部分参考了作者 飞奔的蚂蚁
初识rxAndroid
1.rxAndroid涉及对象:订阅者(观察者),被观察者以及订阅事件
被观察者可以理解为数据源,它可以向订阅者(观察者)发送数据消息和消息,而订阅者则通过被观察者发送的消息做出相应的动作。而rxAndroid本身也提供了线程切换的API,可以分别指定订阅者和被观察者的线程。
每一段rxAndroid代码都从被观察者开始
observables 被观察者(事件源)
subscribers 观察者 订阅者
schedulers 子线程、主线程切换的调度 器,
schedulers.newThread() 在子线程中执行,
schedulers.mainThread()在主线程中执行,
schedulers.io(),访问网络和数据操作的线程执行
核心代码
Observable.subscribe(Subcribe);
其中被观察者通过call回调subscriber.onNext(数据)和onComplete()传递数据,而subcscriber实现onNext(数据){....},加上线程控制从而实现异步任务处理。
下面举个栗子:从网络获取json数据:
package com.example.rxtest;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import butterknife.BindView;
import butterknife.ButterKnife;
import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
@BindView(R.id.btn_search)
Button btnSearch;
@BindView(R.id.tv_result)
TextView tvResult;
String requestUrl = "http://www.cnblogs.com/whoislcj/p/5520384.html";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
ButterKnife.bind(this);
btnSearch.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
createSubscription();
}
});
}
private void createSubscription() {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String s = loadText();
subscriber.onNext(s);
subscriber.onCompleted();
}
})//被观察者
.subscribeOn(Schedulers.io())//指定上面的被观察者在io线程(阶级数据、访问网络)
.observeOn(AndroidSchedulers.mainThread())//指定下面的订阅者的线程(UI线程)
.subscribe(new Subscriber<String>() {//订阅者
@Override
public void onStart() {
tvResult.setText("开始加载..");
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
tvResult.setText("加载出错" + e.getMessage());
}
@Override
public void onNext(String s) {
//被观察者onnext传递过来的消息
tvResult.setText(s);
}
});
}
private String loadText() {
// 新建一个URL对象
URL url = null;
String result = null;
try {
url = new URL(requestUrl);
// 打开一个HttpURLConnection连接
HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
// 设置连接主机超时时间
urlConn.setConnectTimeout(5 * 1000);
//设置从主机读取数据超时
urlConn.setReadTimeout(5 * 1000);
// 设置是否使用缓存 默认是true
urlConn.setUseCaches(true);
// 设置为Get请求
urlConn.setRequestMethod("GET");
//urlConn设置请求头信息
//设置请求中的媒体类型信息。
urlConn.setRequestProperty("Content-Type", "application/json");
//设置客户端与服务连接类型
urlConn.addRequestProperty("Connection", "Keep-Alive");
// 开始连接
urlConn.connect();
// 判断请求是否成功
if (urlConn.getResponseCode() == 200) {
// 获取返回的数据
result = streamToString(urlConn.getInputStream());
Log.e(TAG, "Get方式请求成功,result--->" + result);
} else {
Log.e(TAG, "Get方式请求失败");
}
// 关闭连接
urlConn.disconnect();
} catch (Exception e) {
Log.e(TAG, e.toString());
}
return result;
}
private String streamToString(InputStream inputStream) {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
StringBuffer buffer = new StringBuffer();
String line;
try {
while ((line = bufferedReader.readLine()) != null){
buffer.append(line);
}
bufferedReader.close();
} catch (IOException e) {
e.printStackTrace();
}
return buffer.toString();
}
}
如图
rxAndroid操作符
map操作符
map 用来操作observable和最终的subscriber之间修改observable发出事件的类型,比如发出类型为int类型,最终为String类型,中间用map来转换
from操作符
将其他种类的对象和数据类型转换为Observable
当你使用Observable时,如果你要处理的数据都可以转换成展现为Observables,而不是需要使用Observables和其他类型的数据
/**
* 使用在被观察者,返回的对象一般都是数值类型
*/
public static void from(){
Integer [] items={1,2,3,4,5,6,7,8,9};
Observable observable = Observable.from(items);
observable.subscribe(new Action1() {
@Override public void call(Object o) {
Log.i("adu",o.toString());
}
});
}
interval操作符
interval操作符既可以延迟执行一段逻辑
/**
* 指定某一时刻进行数据发送
*/
public static void interval(){
Integer [] items = {1,2,3,4,5};
Observable observable = Observable.interval(items,1, TimeUnit.SECONDS);//每隔一秒发送数据
observable.subscribe(new Action1() {
@Override public void call(Object o) {
Log.i("adu",o.toString());
}
});
}
just
获取输入数据, 直接分发, 更加简洁, 省略其他回调.
/**
* 处理数组集合
*/
public static void just(){
Integer[] items1={1,2,3,4,5};
Integer[] items2={4,5,1,6,0};
Observable observable = Observable.just(items1,items2);
observable.subscribe(new Subscriber<Integer[]>() {
@Override public void onCompleted() {
Log.i("adu","onCompleted");
}
@Override public void onError(Throwable e) {
Log.i("adu","onError");
}
@Override public void onNext(Integer[] integers) {
for (int i= 0;i<integers.length;i++){
Log.i("adu","onNext==》》"+integers[i]);//先打印items1再打印items2
}
}
});
}
range操作符
range操作符的作用根据出入的初始值n和数目m发射一系列大于等于n的m个值。
/**
* 指定输出数据的范围
*/
public static void range(){
Observable observable = Observable.range(23,4);
observable.subscribe(new Subscriber<Integer>() {
@Override public void onCompleted() {
Log.i("adu","onCompleted");
}
@Override public void onError(Throwable e) {
Log.i("adu","onError");
}
@Override public void onNext(Integer integer) {
Log.i("adu","onNext==》》"+integer);
}
});
}
flter操作符
数据过滤器,比如得到大于或者小于某个数的值
/**
* 过滤某些条件
*/
public static void filter(){
Observable observable = Observable.just(1,2,3,4,5,6,7,8);
observable.filter(new Func1<Integer,Boolean>() {
@Override public Boolean call(Integer integer) {
return integer<5;
}
}).observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override public void onCompleted() {
Log.i("adu","onCompleted");
}
@Override public void onError(Throwable e) {
Log.i("adu","onError");
}
@Override public void onNext(Integer integer) {
Log.i("adu","inNext==》》"+integer);
}
});
}
take操作符
获取数据前几位值
takelast操作符
获取数据后几位值
deitinct
对一个值只处理一次
等等操作,详情参考rxAndroid操作
CompositeSubscription
用于订阅者的管理,一般在baseActivity中创建,将所有订阅者添加到CompositeSubcription中集中管理.
add()添加订阅者
clear()清理订阅者,可以再次使用
unsubscribe()取消订阅者,再也不能使用