概述: EventBus用于不同的Activity之间或者Activity与Service之间进行通信,非常的方便,即使是不同线程之间的数据发送,我们定义的数据接收方法也能收到。
简单使用
public class MainActivity extend Activity{
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
EventBus.getDefault().register(this)
}
@Override
protected void onDestroy() {
super.onDestroy();
EventBus.getDefault().unregister(this)
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onEvent(Event event){//自定义Event实体类
Log.i("onEvent", "接收到消息");
}
}
//在其他地方直接发送Event对象
EventBus.getDefault().post(new Event());
只要我们页面内注册了好了EventBus,并且定义好了接收方法,不管我们从哪个线程发送消息都可以接收到。
onEvent()方法添加了@Subscribe注解;
Subscribe注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING;
/**
* If true, delivers the most recent sticky event (posted with
* {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
*/
boolean sticky() default false;
/** Subscriber priority to influence the order of event delivery.
* Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
* others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
* delivery among subscribers with different {@link ThreadMode}s! */
int priority() default 0;
}
- threadMode属性定义方法在哪个线程接收消息;
- sticky属性定义方法是否接收粘性消息,用于接收页面创建之前发送的粘性消息;
- priority属性定义方法的优先级,优先级高的先获取到消息回调;
ThreadMode
public enum ThreadMode {
//哪个线程发送的消息,就在哪个线程回调接收方法;
//接收方法不能过于耗时,会阻塞发送线程(可能为主线程)
POSTING,
//不管哪个线程发送,都在主线程回调接收方法;
MAIN,
//在主线程回调接收方法,消息被发送直接回放入队列,不一定会马上回调接收方法;
MAIN_ORDERED,
//在子线程回调接收方法,如果发送消息是在子线程,直接回调接收方法;
BACKGROUND,
//在一个独立的线程回调接收方法,不在主线程,也不在发送消息的线程;
ASYNC
}
接下来查看post方法是如何执行的:
1、post(Object event)
public class EventBus {
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();//往下看分析1和分析2
List<Object> eventQueue = postingState.eventQueue;//获取消息队列
eventQueue.add(event);//把事件添加到队列中
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();//记录是否在主线程
postingState.isPosting = true;//设置为正在发送中的状态
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//循环发送消息队里中的消息
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
//分析1
//ThreadLocal 是一个线程内部的数据存储类,存储线程私有数据;
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
//分析2
//记录发送线程的信息,如发送的消息队里、是否主线程、是否正在发送,是否取消发送等
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
}
分析:
- post方法中,先从线程中获取PostingThreadState数据,PostingThreadState用于保存消息队列,发送状态,是否为主线程;
- 把消息添加到线程所在的消息队列中;
- 如果现在还没开始发送消息,循环发送消息队列中的消息,设置发送状态为true;
- 调用postSingleEvent(eventQueue.remove(0), postingState)发送单个消息;
2、postSingleEvent(eventQueue.remove(0), postingState)
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();//获取消息实体类对应的Class对象
boolean subscriptionFound = false;
if (eventInheritance) {//默认为true, 判断是否要查找消息实体类的父类或实现的接口类
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);//分析3,获取消息实体类的父类或者接口类
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {//遍历所有的event类对象
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
//分析3
/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);//获取缓存
if (eventTypes == null) {//如果缓存为空
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());//添加class的接口类
clazz = clazz.getSuperclass();//获取class的父类
}
eventTypesCache.put(eventClass, eventTypes);//添加进集合中
}
return eventTypes;
}
}
分析:
- 获取消息实体类的类对象,判断是否需要获取这个类对象的父类和接口类的类对象;
- 获取这个类对象的所有父类和接口类的类对象, 然后遍历这些类对象,调用postSingleEventForEventType(event, postingState, clazz);
- 不需要获取消息类对象的父类和接口,直接调用postSingleEventForEventType(event, postingState, eventClass);
3、postSingleEventForEventType(event, postingState, clazz)
//subscriptionsByEventType记录了消息实体类的类对象,与消息订阅类和消息订阅方法的映射关系
//subscriptionsByEventType在EventBus初始化时被赋值为HashMap;
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;//Subscription记录了消息订阅类和订阅方法
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);//获取消息的所有订阅者信息
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {//遍历所有的消息订阅者
postingState.event = event;//配置发送线程配置的消息时间
postingState.subscription = subscription;//配置发送线程配置的消息订阅者
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
//消息订阅者
final class Subscription {
final Object subscriber;//消息订阅类
final SubscriberMethod subscriberMethod;//消息订阅方法
}
分析:
1.首先获取消息实体类对应的消息订阅者集合;这个集合是在我们调用EventBus的注册方法时,遍历注册方法所在类的所有方法,获取添加了@Subscribe的方法所有方法,再获取这些方法的参数,然后以参数的类对象为key,Subscription的集合为value,保存消息参数和消息订阅者的映射关系,消息和订阅类以及订阅方法的映射关系为1:N:N;有兴趣的童鞋可以自己查看register()方法的相关代码;
- 获取到消息订阅者的集合后,遍历这个集合,调用postToSubscription(subscription, event, postingState.isMainThread);
3、postToSubscription(subscription, event, postingState.isMainThread)
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {//判断消息订阅方法定义了要在回调线程模式
case POSTING:
invokeSubscriber(subscription, event);//直接在发送消息的线程回调
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);//发送消息是主线程,直接回调
} else {
mainThreadPoster.enqueue(subscription, event);//保存到主线程handler的消息队列
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
//保存到主线程handler的消息队列,然后发送handler消息
mainThreadPoster.enqueue(subscription, event);//分析4
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
//backgroundPoster是一个Runnable, 把消息保存到子线程的消息队列,然后调用线程池执行runnable
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//asyncPoster是一个Runnable, 把消息保存到子线程的消息队列,然后调用线程池执行runnable
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
//分析4, mainThreadPoster变量的初始化
private final Poster mainThreadPoster;
//消息发送接口,
interface Poster {
void enqueue(Subscription subscription, Object event);//消息入队方法
}
EventBus(EventBusBuilder builder) {
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
}
//mainThreadPoster靠MainThreadSupport来初始化
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
//返回HandlerPoster,赋值给mainThreadPoster, looper为主线程的looper
return new HandlerPoster(eventBus, looper, 10);
}
}
}
//mainThreadPoster实际上就是一个HandlerPoster,是一个Handler
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);//消息入队
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {//然后发送空消息通知handler处理队列中的消息
throw new EventBusException("Could not send handler message");
}
}
}
}
//主线程处理消息
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);//分析5
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
//分析5
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);通过反射,调用订阅方法
}
}
分析:
- 这里主要分析一下发送消息所在线程不是主线程时,调用mainThreadPoster.enqueue(subscription, event)是如何切换到主线程回调订阅方法的;
- 从上面的代码可以知道mainThreadPoster是一个handler,而looper是获取了主线程的looper后传递进来的;
- 调用完enqueue方法,让消息进入队列,然后调用sendMessage方法,让handleMessage来处理消息,也就切换到主线程了,然后直接通过反射,在主线程调用订阅方法;
- 如果是通过backgroundPoster.enqueue(subscription, event)或者asyncPoster.enqueue(subscription, event),会通过线程池来执行runnable,也就切换不同的线程来调用订阅方法。
总结:
1、调用EventBus发送消息后,会判断发送消息所在的线程是否为主线程;
2、获取消息的类对象,然后查找这个消息对应的所有订阅者;
3、调用消息的订阅方法时,会判断订阅方法设置的线程模式;如果需要在主线程回调,发送消息时不在主线程,那么通过主线程的handler来回调订阅方法;如果需要在子线程回调订阅方法,发送消息时是在主线程,那么通过继承了Runnable的类来调用订阅方法,通过线程池启动这个Runnable;这样就能实现子线程发送,切换成主线程回调订阅方法,以及主线程发送,子线程回调订阅方法。