- 直接上代码 ,三个类
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
/**
* @author xzj
* @date 2016/8/24 14:00.
*/
public class RxBus {
private static RxBus mRxBus = null;
/**
* PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
*/
private Subject<Object, Object> mRxBusObserverable = new SerializedSubject<>(PublishSubject.create());
public static synchronized RxBus getInstance() {
if (mRxBus == null) {
mRxBus = new RxBus();
}
return mRxBus;
}
public void send(Object o, String tag) {
mRxBusObserverable.onNext(new RxBusObject(tag, o));
}
// public Observable<Object> toObserverable() {
// return mRxBusObserverable;
// }
public <T> Observable<T> toObservable(final Class<T> eventType, final String tag) {
return mRxBusObserverable.filter(new Func1<Object, Boolean>() {
@Override
public Boolean call(Object o) {
if (!(o instanceof RxBusObject)) return false;
RxBusObject ro = (RxBusObject) o;
return eventType.isInstance(ro.getObj()) && tag != null
&& tag.equals(ro.getTag());
}
}).map(new Func1<Object, T>() {
@Override
public T call(Object o) {
RxBusObject ro = (RxBusObject) o;
return (T) ro.getObj();
}
});
}
/**
* 判断是否有订阅者
*/
public boolean hasObservers() {
return mRxBusObserverable.hasObservers();
}
}
/**
* @author xzj
* @date 2016/8/24 14:22.
*/
public class RxBusObject {
private String tag;
private Object obj;
public RxBusObject(String tag, Object obj) {
this.tag = tag;
this.obj = obj;
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public Object getObj() {
return obj;
}
public void setObj(Object obj) {
this.obj = obj;
}
public static RxBusObject newInstance(String tag, Object obj) {
return new RxBusObject(tag, obj);
}
}
/**
* @author xzj
* @date 2016/8/24 14:24.
*/
import rx.Subscriber;
/**
* 请使用此类来subscribe RxBus返回的Observable以简化onError与onCompleted函数.
*/
public abstract class RxBusSubscriber<T> extends Subscriber<T> {
@Override
public void onCompleted() {
completed();
}
@Override
public void onError(Throwable e) {
error(e);
}
@Override
public void onNext(T t) {
receive(t);
}
public abstract void receive(T data);
public void error(Throwable e) {
e.printStackTrace();
}
public void completed() {}
}
- 用法
一个简单的例子
就是上下两个fragment,change按钮是在fragment里的,点击随机生成一个color发给下面的fragment2,下面的fragmeng2收到了,更改背景
public class MyFragment extends Fragment {
@Nullable
@Override
public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
View view = inflater.inflate(R.layout.layout_fragment, null);
Button btn = (Button) view.findViewById(R.id.btn);
btn.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View view) {
Random random = new Random();
int color = Color.argb(random.nextInt(256), random.nextInt(256), random.nextInt(256), random.nextInt(256));
RxBus.getInstance().send(color,"color");
}
});
return view;
}
}
public class MyFragment2 extends Fragment {
private Subscription subscribe;
@Nullable
@Override
public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
final View view = inflater.inflate(R.layout.layout_fragment2, null);
//标记发过来的是个int,tag是“color”,和被观测者send()方法里参数能匹配上,才会被这里观测到
subscribe = RxBus.getInstance().toObservable(Integer.class, "color")
.subscribe(new RxBusSubscriber<Integer>() {
@Override
public void receive(Integer data) {
view.setBackgroundColor(data);
}
});
return view;
}
@Override
public void onDestroyView() {
super.onDestroyView();
if (!subscribe.isUnsubscribed()) {
subscribe.unsubscribe();
}
}
}