在《Android EventBus使用》博文中,我们已经对EventBus的使用进行了分析,在这篇博文中,我们就抽丝剥茧,对EventBus源码进行分析。
基于Android EventBus 3.0.0版本进行分析
1. 观察者注册
我们知道事件的注册是这样的:
EventBus.getDefault().register(this);
这一行代码其实包含了两个操作:
- 获取EventBus实例
- 注册订阅者
1.1 获取EventBus实例
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
可见EventBus的设计使用了设计模式中的[单例模式]以及[构建者模式],并采用DCL检查来解决多线程的问题。按照单例模式的设计思路,构造函数应该是私有的,我们来看看EventBus的构造函数是否如此呢?
public EventBus() {
this(DEFAULT_BUILDER);
}
// 构造者模式
EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
这是什么鬼?构造函数竟然是public的,那开发者为何这样设计呢?原因有如下两点:
一方面,我们通过getDefault方法获取到的是永远是同一个对象,而在项目或模块中使用同一个对象,有利于管理订阅者和发布者;
另一方面,把构造的主动权交给使用者,方便使用者根据自己的需求和设计定制EventBus对象(通过Builder模式构造)。
1.2 注册
明白了EventBus的构造,我们从注册(register)开始来分析其源码。
EventBus.register
//注册订阅者来接收事件。
//订阅者必须调用 unregister方法,一旦他们决定不再接收事件。
//订阅者接收事件的方法必须使用Subscribe注解来表示。
public void register(Object subscriber) {
//获取订阅者的class
Class<?> subscriberClass = subscriber.getClass();
//获取订阅者所有的事件处理方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//订阅事件
subscribe(subscriber, subscriberMethod);
}
}
}
SubscriberMethod
在进行解析前,我们先来了解下SubscriberMethod是什么?
public class SubscriberMethod {
// 方法
final Method method;
// 方法运行模式
final ThreadMode threadMode;
// 方法事件类型
final Class<?> eventType;
// 方法优先级
final int priority;
// 是否接收黏性事件
final boolean sticky;
// Used for efficient comparison
String methodString;
public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
this.method = method;
this.threadMode = threadMode;
this.eventType = eventType;
this.priority = priority;
this.sticky = sticky;
}
···
}
可见SubscriberMethod就是一个订阅方法的实体类,记录了订阅方法的一些信息。
回到register方法,我们接着来分析如何遍历查找订阅者中所有的订阅方法。
SubscriberMethodFinder.findSubscriberMethods
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//首先从缓存中读取,如果缓存中已经存在则直接返回
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//是否忽略注解生成的编译类
//该值默认为false,所以默认会采取编译时注解实现
if (ignoreGeneratedIndex) {
//使用反射获取订阅方法
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//索引方式
//如果未配置,那最终还是将调用SubscriberMethodFinder.findUsingReflectionInSingleClass方法
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//将获取到的方法放入缓存中,方便后续获取
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
通过抛出的异常我们知道,订阅方法应该定义为public.
findSubscriberMethods方法就是用来获取订阅者中所有的订阅方法,并根据外部配置决定获取订阅方法的方式(通过生成的编译类、通过反射),最后将获取到的订阅方法放入到缓存中备用。
总的来讲,该方法还是很简单明了的。
SubscriberMethodFinder.findUsingReflection
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
FindState其实就是一个保存了订阅者和订阅方法信息的一个实体类,包括订阅类中所有订阅的事件类型和所有的订阅方法等。
SubscriberMethodFinder.findUsingReflectionInSingleClass
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
//获取类中声明的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
//获取类中声明的所有public方法
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//过滤方法:订阅方法必须为public、非抽象、非static方法,必须只能有一个参数
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取订阅方法的参数类型
Class<?>[] parameterTypes = method.getParameterTypes();
//必须只能有一个参数,否则抛出异常
if (parameterTypes.length == 1) {
//获取方法上的注解,只有使用了Subscribe注解的方法,Subscribe Annotation才不为空
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//获取注解方法中的参数类型
Class<?> eventType = parameterTypes[0];
//检查是否添加方法
if (findState.checkAdd(method, eventType)) {
//获取注解上的threadMode的值
ThreadMode threadMode = subscribeAnnotation.threadMode();
//添加方法
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
// 使用了注解,但是修饰符不符合要求
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
该方法主要作用就是找出订阅者类及其父类中所有的订阅方法(添加了@Subscribe注解、非抽象、非static、只有一个参数)。
值得注意的是:如果子类与父类中同时存在了相同订阅方法,则父类中的订阅方法不会被添加到subscriberMethods。
EventBus.subscribe
在找到订阅方法后,会遍历找到的所有订阅方法并调用subscribe方法将所有订阅方法注册到EventBus中。
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 获取事件类型,其实就是参数的class
Class<?> eventType = subscriberMethod.eventType;
// 获取订阅了某种事件类型数据的Subscription。
// 使用CopyOnWriteArrayList来保证线程安全
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 根据订阅的事件类型,获取所有的订阅者信息
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
// 将订阅者添加到subscriptionsByEventType集合中
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//根据优先级,将订阅者插入到指定的位置
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
// CopyOnWriteArrayList的add方法相当于在index位置插入
subscriptions.add(i, newSubscription);
break;
}
}
// 获取订阅者所有的订阅事件类型
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
// 添加当前订阅的事件类型
subscribedEvents.add(eventType);
// 如果订阅事件方法是黏性的,那么立即分发事件
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
注册的流程到此分析完毕,下面我们来总结一下,简单来看主要有一下流程:
- 查找订阅者中的订阅方法
- 将事件注册到EventBus
- 如果订阅者方法为黏性的,那么立即分发黏性事件
2. post事件
register方法我们已经分析完毕,通过上个章节的分析,我们了解了EventBus如何查找订阅方法、如何将方法注册给EventBus,那么接下来我们就来分析EventBus的post过程。
我们从post方法入手,来一窥究竟。
EventBus.post
// Posts the given event to the event bus.
public void post(Object event) {
// 获取当前线程的PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
// 获取当前线程的事件队列
List<Object> eventQueue = postingState.eventQueue;
// 添加事件到当前事件队列中去,事件处于待分发状态
eventQueue.add(event);
//
if (!postingState.isPosting) {
// 判断当前线程是否为主线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
// 将状态置为分发状态
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 如果事件队列不为空,则进行事件分发
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 事件分发完毕后,充值PostingState状态
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post方法主要作用就是添加事件到事件队列,并进行分发,其中一个很关键的类为PostingThreadState,我们来看下该类的定义。
PostingThreadState
final static class PostingThreadState {
// 当前线程的事件队列
final List<Object> eventQueue = new ArrayList<Object>();
// 是否有事件正在分发
boolean isPosting;
// 是否为主线程
boolean isMainThread;
// 订阅者信息
Subscription subscription;
// 待分发的事件
Object event;
// 是否被取消
boolean canceled;
}
PostingThreadState中包含了当前线程的事件队列,以及订阅者订阅事件等信息,如此,我们便可以从事件队列中取出事件分发给对应的订阅者。
我们接着来分析事件的分发,在post方法调用了postSingleEvent分发来进行的事件分发,让我们看下postSingleEvent方法究竟做了什么操作。
EventBus.postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// 获取事件类型(eventType)
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// 是否触发订阅了该事件的父类,以及接口的类的响应方法.
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 真正的事件分发
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 真正的事件分发
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
通过分析,我们可以发现真正进行事件分发的方法是:postSingleEventForEventType。
EventBus.postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 根据事件类型获取所有的订阅者信息
subscriptions = subscriptionsByEventType.get(eventClass);
}
// 向每个订阅者分发事件
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
postSingleEventForEventType方法中会向事件的所有订阅者分发事件,其中调用了postToSubscription方法进行处理。
EventBus. postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 默认的threadmode,直接在post事件的线程中调用订阅方法
invokeSubscriber(subscription, event);
break;
case MAIN:
// 在主线程中调用方法
// 如果post线程为主线程,直接回调方法
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
// 在后台线程中调用方法
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 在异步线程中调用方法
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
通过分析我们看到postToSubscription方法会根据订阅方法threadMode处理订阅方法的线程问题,最终回调方法的调用是invokeSubscriber接口。
EventBus.invokeSubscriber
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
invokeSubscriber方法就是反射调用,从而完成最终的订阅方法调用。
貌似到此,我们的分析流程已经结束了,但是我们是否注意到在postToSubscription方法中,由于post线程threadMode指定的线程可能并不一致,使用了poster进行订阅方法线程切换处理,我们怎么能放过这几个poster呢,让我们打起精神来分析吧。
HandlerPoster
此类用于切换到主线程执行方法,postToSubscription方法中当threadMode为MAIN时,可能会使用到该类。
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
// 切换线程时使用的HandlerPoster为:mainThreadPoster = new HandlerPoster(this,
// Looper.getMainLooper(), 10);
// 因此mainThreadPoste中处理消息时运行在主线程
// 因此请注意不要在订阅方法中处理耗时操作,防止出现ANR
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
// 这个方法是EventBus分发处理时调用的方法
void enqueue(Subscription subscription, Object event) {
// PendingPost封装了event\subscription,代表待执行的事件分发请求
// PendingPost中还包含了一个静态的ArrayList对象pendingPostPool,用于复用PendingPost对象
// obtainPendingPost方法就是从pendingPostPool获取可复用对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 将待执行请求添加到队列中
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
// 发送信息处理请求
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
// 处理事件请求的方法
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
// 不断的从队列中获取、执行待执行请求
while (true) {
PendingPost pendingPost = queue.poll();
// 如果队列中没有待执行的请求,那么退出循环
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
// 真正的调用订阅者方法
// 处理完后会回收释放PendingPost
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
// 如果任务执行超时,那么需要重新触发
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
BackgroundPoster
final class BackgroundPoster implements Runnable {
// 待执行请求队列
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
//将待执行请求加入队列
public void enqueue(Subscription subscription, Object event) {
// 根据事件和订阅者信息构造待执行请求
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 将请求加入队列
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 执行请求
// 使用了缓存线程池来控制线程并发
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
// 不断遍历请求队列,处理其中的待执行请求
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
// 真正调用
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
AsyncPoster
class AsyncPoster implements Runnable {
// 待执行请求队列
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
// 将待执行事件加入事件请求队列
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
// 直接执行
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
3. 注销注册
上面我们已经分析了EventBus的register和post过程,这两个过程也是EventBus的核心,下面我们来分析下取消注册的流程。
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
取消注册的过程相对简单,与register方法相对。
至此,我们已经分析了EventBut流程,相信大家对EventBus也有了进一步的了解。
如文章中有描述不正的地方,还请各位看官毫不留情的指出。
谢谢各位花费时间来阅读此文。