getDefault 方法
我们先从 EventBus 的入口,getDefalut 方法入手:
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
从 getDefault 方法可以看出,EventBus 类是一个采用了 Double Check 的单例类。
我们接下来看到它的构造函数,它的无参构造函数 EventBus() 实际上是调用了 EventBus(EventBusBuilder) 这个有参构造函数的,传入的参数是 DEFAULT_BUILDER,而 DEFAULT_BUILDER 则是一个调用了 EventBusBuilder 默认构造器的对象。可以看出,这里用到了 Builder 模式来支持用 EventBusBuilder 进行一些配置。
下面我们看到 EventBus(EventBusBuilder):
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
这个构造方法中主要进行的是一些容器的初始化以及将一些配置参数从 Builder 中取出。
register 方法
下面我们再从 register 方法的角度进行分析,看看 EventBus 在我们进行 register 时做了一些什么事:
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
register 传入的 register 是一个 Object 类型,也就是任意类都可以向 EventBus 进行 register。它首先获取到了 subscriber 的类型信息,然后将其传递给了 subscriberMethodFinder 的 findSubscriberMethods() 方法。
通过名称可以很容易看出,SubscriberMethodFinder 类是一个专门用来搜寻 subscriber 中含有 @Subscribe 注解的方法的类。这里进行的操作就是将所有被 @Subscribe 标记的方法都加入到 List 中。
而在找到了这些方法后,则一个个进行遍历,并将其执行 subscribe操作。
也就是说 register 可以分为两部分来看——搜寻及订阅
搜寻过程
我们先进入 findSubscriberMethods() 看看它的搜寻过程
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) { // 1
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
可以分别看到 1 和 2 处的 if 语句,在 ignoreGeneratedIndex 为 true 时,调用了 findUsingReflection 来使用反射搜寻方法。反之则调用了 findUsingInfo 方法进行搜寻。ignoreGenereatedIndex 是用于标记是否忽略由 Builder 传入的 SubscriberInfoIndex。
直接搜寻
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
可以看出,搜寻过程的结果是用了一个名为 FindState 的类进行存储的,它是 SubscriberMethodFinder 的一个内部类。我们重点关注的是这里的 while 循环,它先执行了 findUsingReflectionInSingleClass 方法通过反射找到所有被 @Subscribe 标注的方法,再通过 moveToSuperclass 向这个类的父类进行搜寻。
查找
我们看到 findUsingReflectionInSingleClass 的实现:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
这里的代码比较长,我们慢慢分析。
首先进行的是 Method 列表的获取。这里通过注释可以看出,使用 getDeclaredMethods() 方法其实是比 getMethods() 方法的效率更高的,但有时会导致 NoClassDefFoundError,此时采取备用方案,使用 getMethods() 进行获取。
之后遍历 Method 数组,其中进行了两次校验:第一次校验用于检查被 @Subscribe 修饰的方法是否是 public、non-static、non-abstract 的。第二次检查则是检查其参数个数是否符合 1 的要求。当不满足时会抛出对应异常,而满足要求则会进行 @Subscribe 注解的搜索。
当找到了对应注解后会进行一次 checkAdd,在这个方法中会将 方法及其对应的 Event 放入一个 HashMap anyMethodByEventType
,同时还会将方法的签名(形式为 方法名>Event 类型名)及对应方法放入另一个 HashMap subscriberClassByMethodKey
。
当 checkAdd 检查没有放入过这个方法及 Event 后,就会将方法的信息包装为一个 SubscriberMethod 类,然后放入我们需要的结果列表。
向上查找
当查找完成后,会向调用 moveToSuperclass 方法向其父类进行查询,直到遇到了系统提供的库。通过这种向上查找的方式可以使得 EventBus 支持继承这一 OOP 特性。
void moveToSuperclass() {
if (skipSuperClasses) {
clazz = null;
} else {
clazz = clazz.getSuperclass();
String clazzName = clazz.getName();
/** Skip system classes, this just degrades performance. */
if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {
clazz = null;
}
}
}
非直接搜寻
下面我们看到搜寻过程的另一种实现:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
这里可以看到,先通过 getSubscriberInfo 获取到了由 Builder 过程传入的 SubscriberInfoIndex,从中获取需要的信息。当其值为 null 时,再使用反射搜寻的方式进行搜寻。
而具体添加过程,则与直接搜寻中的方式类似,这里不再赘述。
FindState 复用
这里要注意,无论是直接搜索还是非直接搜索,它们所使用的 FindState 都是通过 prepareFindState() 来进行获取,同时,它们的结果最后都会通过 getMethodsAndRelease(findState) 来进行返回。其实 SubscriberMethodFinder 类中维护了一个 FindState 池,是一个默认大小为 4 的数组,在 prepareFindState 中会遍历数组找到非 null 的 FindState 进行返回。而在 getMethodsAndRelease(findState) 中则是将搜寻的结果取出后,对 FindState 进行 recycle,之后再将其放回 FindState 池中。这种池的复用思想非常值得我们在设计库的时候学习。
订阅过程
我们回到 register 方法中来:
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
可以看到 EventBus 为方法的订阅过程进行了加锁,保证了线程安全。然后遍历了每一个被标记的方法,一一将其订阅。
下面我们看看具体的订阅流程,进入 subscribe 方法:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
这里的代码也是很长...让我们慢慢进行分析。
先看看 Subcription,它就是一个将 Subcriber 及 SubcriberMethod 进行包装的类。
首先,这个方法进行了一个安全检查,检查了同样的方法是否已经被注册过,没有则将其放入 Map 中,否则抛出异常。
之后再将其按照优先级放入以该 Event 为参数的方法列表中。
然后,又将这个方法加入了以该 Subcriber 为 key,以它内部所有被 @Subscribe 标记的方法的列表为 value 的 Map typesBySubscriber
中。
最后,进行了一个对 sticky 事件的特殊处理,如果为 sticky 事件则会在 register 时进行一次对 Method 的调用。主要逻辑是:首先判断了 Event 是否子 Event,若是一个子 Event 则找到其父 Event 作为参数 Event,否则将其作为参数 Event,然后在判 null 的情况下调用 postToSubscription 方法来执行这个方法。关于具体的执行过程,会在下文中进行讲解。
post 方法
post 方法开始,就会涉及我们的 Event 的执行过程了。这里根据 register 的分析过程可以大概猜出方法应该是由反射的方式被执行的,下面让我们进入 post 方法的源码看看是否是这样:
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
存放线程信息
首先,它获取到了 currentPostingThreadState 这一个对象,它的类型是 PostingThreadState,这个类的主要用途是记录事件的发布者的线程信息。
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
我们看到 currentPostingThreadState 的创建过程:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
这里可以看到 currentPostingThreadState 是通过 ThreadLocal 进行保存。ThreadLocal 是一个用于创建线程局部变量的类。它创建的变量只能被当前线程访问,其他线程则无法访问和修改。
之后在 post 中进行的操作就是为 currentPostingThreadState 填充各种线程相关信息了。
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
我们回到 post 中,可以发现,在 currentPostingThreadState 中维护了一个 eventQueue。
首先,将 Event 插入了 eventQueue 中,之后将 isMainThread 等信息进行填充。同时将 postingState 的 isPosting 置为了 true,使得事件 post 的过程中当前线程的其他 post 事件无法被相应,当 post 过程结束后,再将其置为 true。
之后,便开始遍历 eventQueue, 将事件一个个出队并执行 postSingleEvent 方法,接下来我们看到 postSingleEvent 方法:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
可以看到,这里通过 postSingleEventForEventType 来进行搜寻对应 Subscription,如果 Event 是子 Event,则获取它的所有父 Event 列表,再遍历列表进行搜寻。否则直接调用 postSingleEventForEventType 进行搜寻。
搜寻 Subscription
下面我们看到 postSingleEventForEventType:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
首先,这里根据 event 找到了所有对应的 Subscription,然后遍历 subscription 列表调用 postToSubscription() 方法,这个方法在之前 register 的分析中针对 sticky 事件的代码中也有调用,它的作用是调用 event 对应的方法。
未找到对应 Subscription 的处理
我们在这里先不关心 postToSubscription 的具体实现,先看看在 Subscription 调用结束后做了什么事,我们回到 postSingleEvent 方法中:
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
可以看到,如果在没有找到对应的 subscription,则会创建一个 NoSubscriberEvent 再调用 post 请求。这样,无论 Event 是否有 Subscriber,它都会进行一次检测。
执行对应 Subscription
下面我们看到 postToSubscription,看看是如何调用 Event 对应的方法的:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
可以看到,这里对订阅者的线程进行了判断,采用不同的 Poster 进行入队操作,采用队列的方式一一处理。对某些特殊情况则直接调用 invokeSubscriber(subscription, event) 进行处理。关于 Poster 的设计我们在之后进行分析,在 Poster 内部调用的是 invokeSubscriber(PendingPost) 方法。
invokeSubscriber(subscription, event)
我们先看到 invokeSubscriber(subscription, event) 方法:
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
果然,如我们之前猜测的一样,它是通过反射来对方法进行调用的,代码很简单,就不再赘述了。
invokeSubscriber(PendingPost)
下面我们看到 invokeSubscriber(PendingPost) 方法:
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
可以看到,它在内部判断了 Subscription 的 active 状态,如果为 active,再调用 invokeSubscriber(subscription, event) 方法对 Method 进行执行。
这个 active 状态可以使得 unregister 的类中的对应 Method 不再被执行。
postSticky 方法
这时你可能会想到,还有一个方法 postSticky 呢,我们接下来看到 postSticky 方法:
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
这里的逻辑十分简单,先将 event 加入 stickyEvents 列表,再执行 post 请求。
unregister 方法
我们接着看到 unregister 方法:
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
unregister 方法相对比较简单,首先它先判断了这个 Event 是否还未注册,若已注册则遍历在 typesBySubscriber 中 Subscriber 所对应的每一个 EventType,并一一对其执行 unsubscribeByEventType 方法取消订阅,同时 Subcriber 所对应的信息从 typesBySubscriber
这个 Map 中移除。
下面我们看看 unsubscribeByEventType 中具体做了什么:
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
可以看到,它是将 subscriptionsByEventType 中与 EventType 对应的 Subscription 列表取出,遍历并将该 Subcriber 对应的 Subscription 的 active 标记为 false 并删除。
Poster
前面提到了 post 方法中会用到 Poster 来进行方法的按队列执行,EventBus 中定义了一种 Poster 这个接口,用于处理 post 后方法执行的调度。
/**
* Posts events.
*
* @author William Ferguson
*/
interface Poster {
/**
* Enqueue an event to be posted for a particular subscription
*
* @param subscription Subscription which will receive the eve
* @param event Event that will be posted to subscriber
*/
void enqueue(Subscription subscription, Object event);
}
可以看到,它内部只有一个方法 enqueue,用于将 Event 放入待定队列。
在 EventBus 中的 Poster 用到了三种,我们分别介绍:
HandlerPoster
HandlerPoster 是默认情况下 mainThreadPoster 的类型,它的内部是使用 Handler 实现进程的调度,主要关注其 handleMessage 的实现:
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
可以看到,内部是通过一个死循环遍历 PendingPost 的队列,分别对其执行 invokeSubscriber。
AsyncPoster
AsyncPoster 对应了 EventBus 中的 asyncPoster,下面是它的代码:
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
它的代码很简单,通过 EventBus 的线程池中取出一个线程并在该线程中调用 invokeSubscriber 方法。
BackgroundPoster
BackgroundPoster 对应 EventBus 中的 backgroundPoster,下面是它的代码:
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
这段代码比较长,和 AsyncPoster 有些类似,它使用了 synchronized 以保证线程安全,在 run 方法中遍历了 PendingPost 列表,取出并执行 Event。
PendingPost 池
PendingPost 内部也是维护了一个 PendingPost 池的,上面的几个 Poster 都是调用 PendingPost.obtainPendingPost 方法从其中取出 PendingPost,它的代码如下:
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
原理比较简单,就不再分析了。
总结
通过这次源码解析,收获了很多,果然还是自己去看源码才能对这些库有更深层的了解。
EventBus 实际上就是一个基于反射实现的『公告牌』,发布者可以将各种类型的公告放置到公告牌中,而订阅者可以订阅某种类型的公告,当公告牌上出现了这种类型的公告时,便会从上面取出并通知订阅者。