通过 RxJava 实现RxBus
1.添加依赖:
compile'io.reactivex:rxjava:1.0.14'
compile'io.reactivex:rxandroid:1.0.1'
2.新建rxbus文件:
public class RxBus {
private ConcurrentHashMap> subjectMaker=new ConcurrentHashMap<>();
private static RxBusinstanceRxBus;
public static synchronizedRxBusgetInstanceRxBus() {
if(null==instanceRxBus) {
instanceRxBus=newRxBus();
}
return instanceRxBus;
}
@SuppressWarnings("unchecked")
public Observable registerRxBus(@NonNullObject tag,@NonNullClass cls) {\
List subjectList =subjectMaker.get(tag);
if(null== subjectList) {
subjectList =newArrayList();
subjectMaker.put(tag,subjectList);
}
Subject subject =null;
subjectList.add(subject = PublishSubject.create());
return subject;
}
public void unRegisterRxBus(@NonNullObject tag,@NonNullObservable observable) {
List subjects =subjectMaker.get(tag);
if(null!= subjects) {
subjects.remove(observable);
if(!subjects.isEmpty()) {
subjectMaker.remove(tag);
}
}
}
public voidpost(@NonNullObject content) {
post(content.getClass().getName(),content);
}
@SuppressWarnings("unchecked")
public void post(@NonNullObject tag,Object content) {
List subjects =subjectMaker.get(tag);
if(!subjects.isEmpty()) {
for(Subject sub : subjects) {
sub.onNext(content);
}
}
}
}
3.通过这个 bus 发布一个事件:
RxBus.getInstanceRxBus().post("add","hello");
4.fragment,service里面接受一个string类型的值:
Observable addObservable = RxBus.getInstanceRxBus().registerRxBus("add",String.class);
addObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(newAction1() {
@Override
public voidcall(String s) {
//todo
}
});
5.解除rxbus:
RxBus.getInstanceRxBus().unRegisterRxBus("add",addObservable);