EventBus 是 Android 和 Java 的发布/订阅事件总线。
可以在任意一个地方抛出事件,所有注册这个事件的地方(这里指标记了@Subscribe注解的订阅方法)都能收到事件(被触发调用),EventBus同时支持线程控制,可以在注解上直接声明ThreadMode,指名该方法最后期望执行在哪个线程。用法这里不在赘述,网上教程都很详细,这里主要从源码角度分析EventBus。
一.EventBus创建
EventBus提供了一个getDefault() 方法,懒汉式的单例模式获取EventBus对象。
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
同时EventBus提供了EventBusBuilder去构建特殊配置的对象,EventBusBuilder是一个建造者,可以直接使用build()创建一个EventBus也可以构建出EventBusBuilder对象再传给EventBus的构造方法。
getDefault()中构造的EventBus对象是使用DEFAULT_BUILDER去创建的:
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//subscriberMethodFinder是在这里创建的,查找注解修饰的方法都是通过这个对象处理的
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
DEFAULT_BUILDER的属性都是EventBusBuilder中的默认值,一般我们都是通过getDefault()去拿默认的EventBus,这里EventBusBuilder我们可以先忽略。
上面代码会创建一个subscriberMethodFinder对象,SubscriberMethodFinder这个类是负责去查找注解标注的方法的,后面会说到。
二.注册流程
如果一个类对象需要接收Event事件处理,我们称该对象为『订阅者』,除了要对其接收事件的方法加上@Subscribe注解外,还需要对该对象进行主动的注册:调用EventBus.register方法,该方法传入当前的订阅者对象进行注册,register方法如下:
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
//查找订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//维护映射关系
subscribe(subscriber, subscriberMethod);
}
}
}
注册时,我们传入了订阅者的对象subscriber:
传入的subscriber主要用于以下两点:
-
1.通过subscriber对象可以很方便的找到subscriber类型中需要监听事件的方法(@Subscribe注解标注的方法,后面直接称『订阅方法』)。
所以如果事件抛出(EventBus.post)在注册之前,订阅方法就不会被触发了,但是EventBus也设计了sticky的概念,可以通过postSticky抛出,可以在注册后收到之前已经抛出的事件,这个原理后面也会说。 - 2.事件触发后,订阅者的方法之所以能执行,就是通过此对象去调用的订阅方法(订阅方法需要是public的)。
register的代码分析:
register主要是调用了下面两个方法
- findSubscriberMethods():通过subscriberMethodFinder查找订阅者Subscribe注解修饰的方法(含有"祖先类"的)。
-
subscribe():主要是对查找到的方法做一个存放;维护了两个map,方便后面事件抛出后,找到需要调用的SubscriberMethod(subscriberMethodFinder查找到的结果)
subscriptionsByEventType:事件类型和方法订阅方法Subscription的映射;
typesBySubscriber:以及订阅者对象subscriber和这个订阅者对象关心的所有事件类型的映射。
我们具体看下这两个方法做的事情:
-
1.findSubscriberMethods(Class<?> subscriberClass):
findSubscriberMethods方法返回了SubscriberMethod集合,SubscriberMethod这个对象存放的就是订阅者被@Subscribe注解修饰的方法相关信息:
//SubscriberMethod.java
class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;
}
method是Subscribe注解所修饰的方法;threadMode是声明此方法最后期望运行线程模式;eventType对应定义的Event事件类型;priority也比较重要,优先级,同一个事件被多个方法订阅时,遵循这个优先级顺序执行;sticky是一个粘性标记,需要搭配EventBus.postSticky方法发出事件。
findSubscriberMethods:
//SubscriberMethodFinder.java
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
METHOD_CACHE是个cache,ignoreGeneratedIndex由于EventBusBuilder是默认实现,所以是false,所以直接看findUsingInfo就行了。
//SubscriberMethodFinder.java
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//FindState使用了一个池(FIND_STATE_POOL)避免在查找过程中的过多开销
FindState findState = prepareFindState();
//赋值class对象
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//getSubscriberInfo这里返回faluse,直接看false的分支
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//此处是最终的订阅方法的查找,结果会存放到findState对象里面
findUsingReflectionInSingleClass(findState);
}
//这里向上查找,向订阅者的父类查找,所以只需要在子类中注册即可
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
//返回查找结果并且recycle findState对象,重新放回FIND_STATE_POOL池
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
//与findUsingInfo的while (findState.clazz != null) 一起实现了向上查找
void moveToSuperclass() {
if (skipSuperClasses) {
clazz = null;
} else {
clazz = clazz.getSuperclass();
String clazzName = clazz.getName();
/** Skip system classes, this just degrades performance. */
if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
clazz = null;
}
}
}
上诉代码中关键处我已经加上了注释,很容易理解。while (findState.clazz != null) 和findState.moveToSuperclass()相结合实现了一个向上查找,最后可以找到包含当前订阅者subscriber以及其向上的父类、"爷爷类"等所有"祖先类"中被@Subscribe修饰的方法。
查找SubscriberMethod的关键方法findUsingReflectionInSingleClass:
//SubscriberMethodFinder.java
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
//方法必须是public并且不是abstract和static的
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//读取注解信息组装SubscriberMethod
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
由于上面moveToSuperclass逻辑的存在,这个方法会被调用多次直到没有向上的父类为止,通过反射拿到订阅者类以及其父类中的所有方法,通过筛选,首先要满足以下条件:
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
(modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0
即需要是public的并且不是abstract和static的,因为只有这些方法才能被外部通过对象调用。
其次,肯定是含有Subscribe注解修饰的方法:
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
找到这些方法,从Subscribe注解中什么的配置信息,组装成SubscriberMethod对象返回,至此,我们在register时得到订阅者subscriber所有的SubscriberMethod。
到这里注册过程就完成了一半,拿到了subscriberMethods后还需要按一个比较好查找的数据结构存储下来方便我们后续查找,这就是subscribe()方法做的事情。
-
2.subscribe(Object subscriber, SubscriberMethod subscriberMethod):
//EventBus.java
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//注册的事件类型
Class<?> eventType = subscriberMethod.eventType;
//组装Subscription
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
//根据优先级写入到subscriptions
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//触发粘性事件对应的订阅方法
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
此方法做的事情主要就是将查找的方法存放到EventBus中对应的数据集里面,注意添加时是按优先级顺序添加到对应顺序的,注解的priority属性在这里生效:
Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType
存放的是Event事件的类型->对应的订阅方法Subscription的映射,Subscription会以注解中声明的优先级顺序放入 CopyOnWriteArrayList<Subscription>集合中。Map<Object, List<Class<?>>> typesBySubscriber;
存放的是订阅者对象subscriber->其关心的Event事件类型映射。另外,如果这个subscriberMethod是sticky的,那么在注册时,会去缓存的粘性事件stickyEvents中取处事件,主动触发订阅方法(这里调用的postToSubscription()方法,后面说post流程的时候再细说)。这里解答了为什么sticky可以收到注册之前已经发送过的sticky事件。
Map<Class<?>, Object> stickyEvents
存放的是postSticky抛出的事件类型以及对象。
三.取消注册流程
清除了注册流程,取消流程也就是对subscriptionsByEventType和typesBySubscriber中的元素做一些删除操作。
除此以外,取消注册时可能事件已经抛出了,所以取消注册的过程中会对订阅方法Subscription的active属性置为false(EventBus.unsubscribeByEventType()方法中)。
#EventBus.java
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
//此处将subscription.active设置成false
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
所以后续在执行订阅消息前,需要先检查active的状态,这个检查的逻辑可以在后面三种XXXPoster类在调用EventBus的invokeSubscriber()时可以看到,见下文HandlerPoster章节。
四.发送事件
如果不需要额外处理线程、中间过程取消等控制的话,从之前的注册流程我们已经可以很轻松的获取到所有事件的订阅者及方法,直接通过反射调用订阅者的方法,我们已经可以很轻松触发订阅者的方法。
但是显然,EventBus给我们提供了一些管控方法、线程管理等。
post(Object event) :
//EventBus.java
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
currentPostingThreadState是一个ThreadLocal对象,通过ThreadLocal维护了多个线程内单例的PostingThreadState对象。
ThreadLocal
关于ThreadLocal,如果看过Handle的实现的话对这个应该很熟悉,ThreadLocal<T>内部通过维护了一个key为线程名的Map,保证线程内单例的T对象,ThreadLocal的设计比较简单,感兴趣的可以自行研究,这里不多赘述。
这里只要知道:调用currentPostingThreadState.get()会去获取跟当前线程绑定的一个PostingThreadState对象,并且是当前线程类单例的,如果没创建的话,会调用initialValue方法创建一个PostingThreadState对象。
PostingThreadState
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<Object>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
内部维护了一个『队列』eventQueue,事实上是个ArrayList。post会将Event事件加入到事件队列中(这里可以类比Handle的消息队列MessageQueue,MessageQueue是通过Looper间接性实现的线程内单例,Looper也是通过ThreadLocal维护的)。
如果当前PostingThreadState不在执行中(即isPosting=false):会将isPosting置为true,记录下当前线程是否为主线程( postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();)。并且开始从eventQueue中依次出队事件去执行postSingleEvent()方法,直到队列为空时结束(类比Handle里Looper的loop方法,设计上是一样的,熟悉Handler的同学看到这里肯定会很熟悉)并且记录isPosting=false,这里用eventQueue.remove(0)达到一个从队列出队的操作。如果已经在执行中(isPosting=false)那么什么都不用做,只要加入到当前事件队列并等待这个队列任务的依次执行即可。
总结起来:EventBus.post()就是将新的事件加入当前线程的事件队列中,然后有个任务不断的从队列中取出事件执行postSingleEvent()直到事件全部出队后结束,如果这个任务当前不再执行就主动唤醒。
postSingleEvent(Object event, PostingThreadState postingState)
从事件队列中拿到的事件最后是到了这个方法,看下实现
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
根据eventInheritance是否为真分成了两条分支:
- 为false的一条直接以当前事件Class类型为参数调用postSingleEventForEventType;
- 而true的分支则先通过lookupAllEventTypes(eventClass)获取到一个Class集合去调用了postSingleEventForEventType。
先看postSingleEventForEventType方法,这个方法是从subscriptionsByEventType中查找所有注册了eventClass事件类型的订阅者集合,如果可以查找到,最后交给postToSubscription去触发订阅方法。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
注意这里有个aborted的布尔值,在循环体中每次执行完postToSubscription之后回去校验PostingThreadState的canceled状态,如果为true,则跳出循环体,canceled赋值为true只有在cancelEventDelivery()方法中。
public void cancelEventDelivery(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
if (!postingState.isPosting) {
throw new EventBusException(
"This method may only be called from inside event handling methods on the posting thread");
} else if (event == null) {
throw new EventBusException("Event may not be null");
} else if (postingState.event != event) {
throw new EventBusException("Only the currently handled event may be aborted");
} else if (postingState.subscription.subscriberMethod.threadMode != ThreadMode.POSTING) {
throw new EventBusException(" event handlers may only abort the incoming event");
}
postingState.canceled = true;
}
此方法可以取消某个事件,调用后,后续的订阅者的订阅方法就不会被调用到了。通过代码可以得出canceled起作用的前置条件是:PostingThreadState当前的event对象必须和cancelEventDelivery传入的对象相同。所以:
cancelEventDelivery()只有在处理此事件时调用才能生效,所以cancelEventDelivery的使用场景是在某个订阅方法里,主动的拦截事件,让后续的订阅者的订阅方法不会被执行到。可以理解为事件被当前方法所消费了,不再传递下去。
postToSubscription(Subscription subscription, Object event, boolean isMainThread)
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
postToSubscription方法也比较简单,会根据当前事件的抛出是否在主线程(postingState.isMainThread)以及当前订阅方法注解声明的threadMode,一起决定如何触发订阅方法,具体如下:
- ThreadMode.POSTING时,直接在发送事件的线程执行,所以直接调用;另外,ThreadMode.MAIN并且当前就是在MainThread时、ThreadMode.BACKGROUND并且isMainThread是false。这些场景下都不需要做任何线程切换,直接调用invokeSubscriber(subscription, event)。
- ThreadMode.MAIN时,但是isMainThread是false的时候,需要切换到主线程执行。此时调用了mainThreadPoster.enqueue(subscription, event),HandlerPoster如何处理这个我们后面单独看。
- 如果ThreadMode.BACKGROUND时,但isMainThread是true时,执行了backgroundPoster.enqueue(subscription, event),BackgroundPoster我们放到和HandlerPoster一起看。
- ThreadMode.ASYNC直接通过线程池异步处理。
其实最后都是调用invokeSubscriber(subscription, event)通过反射调用的订阅方法。
通过以上处理,EventBus实现了Subscribe注解中什么的threadMode配置。
再回过来看eventInheritance分成的两条支路,lookupAllEventTypes拿到了当前eventClass的父类类型,遍历传入postSingleEventForEventType中去查找订阅了当前事件的订阅者,所以二者的区分就是:当订阅者通过Subscribe订阅一个事件时A,我们post一个A的子类时,eventInheritance为true时依然可以触发订阅方法的调用,但是eventInheritance为false时就不会触发,eventInheritance默认为true。
关于lookupAllEventTypes,这里举个例子:
//事件A
open class UpdateColorEvent(val color: Int)
//事件B
class UpdateStyleEvent(color: Int, val size: Float) : UpdateColorEvent(color)
//订阅者
class Subscriber : Activity() {
private lateinit var textView: TextView
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
textView = TextView(this)
EventBus.getDefault().register(this)
}
override fun onDestroy() {
super.onDestroy()
EventBus.getDefault().unregister(this)
}
@Subscribe(threadMode = ThreadMode.MAIN)
fun updateTextStyle(event: UpdateColorEvent) {
textView.setTextColor(event.color)
}
}
我们定义了两个父子关系的事件UpdateColorEvent和UpdateStyleEvent,订阅者Subscriber订阅了UpdateColorEvent这个父事件。然后使用下面代码抛出一个子事件:
EventBus.getDefault().post(UpdateStyleEvent(Color.RED,32f))
在eventInheritance = true的情况下,updateTextStyle方法会触发,但是false的情况下则无法触发。eventInheritance是通过EventBusBuilder配置的,默认true。
postSticky(Object event) :
看完了post方法后,postSticky方法就很简单了
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
就是post前多了向stickyEvents中添加当前事件的,前面我们分析过,订阅者注册时,如果注解声明的是订阅粘性事件,则会去检索stickyEvents中是否存在当前订阅者订阅的事件,如果有,则会直接去执行postToSubscription方法。以此,实现了注册时可以感知到之前已经抛出的事件。
五.HandlerPoste、BackgroundPoster、AsyncPoster
在最后的postToSubscription环节,EventBus的线程切换是通过这两个类去处理的,现在具体看下他们是如何处理的。
-
HandlerPoster
HandlerPoster继承自Handler,本质是个Handler
首先HandlerPoster对象的创建是在EventBus的构造方法里
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
入参一个主线程的looper和事件执行时长的背压阈值maxMillisInsideHandleMessage,因为是主线程的looper,那么我们知道通过HandlerPoster.sendMessage的消息,最后的handleMessage方法会在主线程执行。
HandlerPoster内部维护了一个PendingPostQueue,是个队列。
//HandlerPoster.java
queue = new PendingPostQueue();
直接看看enqueue方法
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
首先,PendingPost对象是从池中取出来了,跟Handle的Message一样,避免重复创建中间对象,浪费开销,将订阅方法subscription和事件对象event装载到PendingPost里面,然后入队到queue队列中。当handlerActive=false时,唤起任务:执行sendMessage方法发送一个空消息。
根据Handler我们知道:我们会在HandlerPoster的handleMessage(Message msg)中收到这个空消息,并且,由于HandlerPoster是通过主线程的Looper创建的,所以handleMessage()运行在主线程,收到这个空消息后,从队列queue中依次取出PendingPost,PendingPost对象中存放的是subscription和event,然后执行EventBus的invokeSubscriber()方法,所以前面说『最后都是在invokeSubscriber方法中通过反射调用的订阅方法』。
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
handleMessage方法中的循环体执行时长timeInMethod超过设置的maxMillisInsideHandleMessage会停止执行,并将handlerActive置为flase,直到等到下一次enqueue的到来,避免长时间卡在订阅方法的执行上,毕竟这些方法运行在主线程,长时间运行在这个循环会导致ANR。当我们声明订阅方法的threadMode=ThreadMode.MAIN时,不能写太耗时的任务,这跟安卓整体的设计也是一致的,主线程禁止执行耗时操作,虽然EventBus这里做了背压策略减少这种可能,但是我们也仍然不应该这么做。
invokeSubscriber(PendingPost pendingPost)
//EventBus.java
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
此方法最终就是调用了invokeSubscriber(Subscription subscription, Object event),在此之前,需要判断subscription的active状态,前面取消订阅的时候说过,取消订阅后会将当前对象下的所有Subscription(订阅方法对象)的active置为false,也就不会被执行到。
至此,EventBus切换到主线程的分析就结束了,主要是还是使用了安卓的Handler机制,EventBus甚至贴心的帮我们处理了背压,但是我们任然需要注意不要滥用(方法内执行耗时操作或者好几百个地方同时订阅一个事件)。
-
BackgroundPoster和AsyncPoster
BackgroundPoster和AsyncPoster都使用了ExecutorService线程池。
- AsyncPoster相对简单,enqueue一个则直接通过ExecutorService开辟一个子线程执行invokeSubscriber;
- BackgroundPoster则尽可能的执行在一个当前已经存在的线程,如果有任务在执行则只入队等待,没有则开启线程。
//AsyncPoster.java
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
//BackgroundPoster.java
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
六.总结
至此,EventBus的源码主要部分都走了一遍,我们清除了注册、取消注册以及抛的事件如何流入到订阅方法并且调用到订阅方法。值得注意的是由于subscriptionsByEventType、typesBySubscriber、stickyEvents这些都是维护在EventBus对象内部的,所以我们如果通过EventBusBuilder创建处不同的EventBus对象,它的注册以及订阅都是不能互通的,也就是通过那个EventBus对象注册的,则只能通过这个对象post事件才能触发订阅方法的执行。
如有错误,还请见谅。错误之处,还望大家帮忙指正。一起学习一起进步。