此篇文章的优化是基于我的另一篇文章
RxBus 基于 RxJava1.x 实现消息传递机制
需要优化的是两个方面:
- 解决RxBus可能存在的Observe和context泄漏的问题
- 解决RxBus关于rxJava1.x背压的问题
关于Observe和context泄漏的问题,在我的另一篇文章中有简单介绍,想了解的话可以转码:
CompositeSubscription 解决RxJava1.x中Observable持有Context导致的内存泄露的问题
好了,说了这么多预备的东西,改进入专题了。
首先,还是给app 的 gradle添加依赖:
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.2.7'
然后,上RxBus的代码
package com.test.http.rxbus;
import com.test.http.model.RxData;
import com.test.util.LogUtil;
import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
/**
* Created by Admin on 2017/5/19.
*
* RxBus 实现消息传递机制
*
* 需要引入RxJava1.x相关依赖
*
* //rxjava
* compile 'io.reactivex:rxjava:1.2.7'//rx对Android的支持
* compile 'io.reactivex:rxandroid:1.2.1'//rxjava
*
*/
public class RxBus {
private final Subject bus;
private CompositeSubscription mCompositeSubscription;
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
/**
* 单例模式RxBus
*
* @return
*/
private static class RxBusHolder {
private static final RxBus instance = new RxBus();
}
public static synchronized RxBus getInstance() {
return RxBusHolder.instance;
}
/**
* 接收消息
*
* @param eventType
* @param <T>
* @return
*/
private <T> Observable<T> toObserverable(Class<T> eventType) {
return bus.ofType(eventType);
}
public interface CallBack{
void onCallBack(Object obj);
}
/**
* 发送消息,外部调用
*
* @param data
*/
public void post(RxData data) {
bus.onNext(data);
}
/***
* 接收消息,外部调用(一般放在setListener中,类似button监听的用法)
*/
public void register(final CallBack callBack){
if(mCompositeSubscription==null||mCompositeSubscription.isUnsubscribed()){
mCompositeSubscription=new CompositeSubscription();
}
Subscription subscription = RxBus.getInstance().toObserverable(RxData.class)
//背压
.onBackpressureBuffer()
//在io线程进行订阅,可以执行一些耗时操作
.subscribeOn(Schedulers.io())
//在主线程进行观察,可做UI更新操作
.observeOn(AndroidSchedulers.mainThread())
//观察的对象
.subscribe(new Action1<RxData>() {
@Override
public void call(RxData data) {
if(callBack!=null){
callBack.onCallBack(data);
}
}
});
mCompositeSubscription.add(subscription);
}
/***
* 注销,外部调用
*/
public void unRegister(){
if(mCompositeSubscription!=null&&!mCompositeSubscription.isUnsubscribed()){
mCompositeSubscription.unsubscribe();
mCompositeSubscription=null;
LogUtil.e("=====rxbus注销======");
}
}
}
其中涉及到一个消息传递的载体RxData,它是一个model,代码如下:
package com.test.http.model;
import java.io.Serializable;
/**
* Created by Admin on 2017/5/19.
*/
public class RxData<T> implements Serializable{
private int code;
private String message;
private T data;
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public T getData() {
return data;
}
public void setData(T data) {
this.data = data;
}
}
调用的话直接参考我的上篇文章
RxBus 基于 RxJava1.x 实现消息传递机制
这里就不做过多讲解了,好了,关于RxBus基于RxJava1.x今天就讲到这里。谢谢诶