EventBus(3.1.1)源码浅析

前言

EventBus是Android的发布/订阅事件总线,简化了Android的事件传递。提高了代码的简洁性。

官方图.png
1、EventBus的简单使用
public class MainActivity extends BaseActivity {

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        //注册
        EventBus.getDefault().register(this);
    }
    
    @Subscribe(threadMode = ThreadMode.MAIN)
    public void receiveEventBusMessage(EventBusMessage message){
        Log.e("receive", "receiveEventBusMessage: "+message.getText());
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        //解注册
        EventBus.getDefault().unregister(this);     
    }   
}

//可以在其它组件、fragment或工作线程中发送事件
private void sendMessage(){
        EventBus.getDefault().post(new EventBusMessage("消息:"+ new Random().nextInt(50)));
    }

从上面代码可以发现,EventBus实现事件传递有多简洁,无任何花哨。首先,通过register()进行注册,这是使用EventBus的前提;然后声明一个订阅方法用于事件的接收(利用@Subscribe注解),threadMode指定线程模式(在这里指定于主线程),其中EventBusMessage是自己定义一个事件类;再然后通过在其它组件、Fragment或工作线程中使用post()发送事件,这样就完成了事件的传递。当组件销毁或者EventBus不再使用时千万别忘记了unregister()解注册。这很重要!!!

2、经过上面的一个简单分析,初步的了解了EventBus的使用。那么,看似简单的几个步骤,里面究竟蕴含了什么内功心法或者招式呢?(故事说起来就长了)前排准备瓜子顺带点小酒。接下来从EventBus.getDefault()开始:
 public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }

//EventBusBuilder以建造者模式初始化一以下属性
EventBus(EventBusBuilder builder) {
        logger = builder.getLogger();
        subscriptionsByEventType = new HashMap<>();    //订阅的事件类型
        typesBySubscriber = new HashMap<>();           //订阅类型
        stickyEvents = new ConcurrentHashMap<>();      //粘性事件
        mainThreadSupport = builder.getMainThreadSupport(); //主线程接口类
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        backgroundPoster = new BackgroundPoster(this); //在后台提交的事件
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        //查找订阅方法的类
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        //获取一个线程池
        executorService = builder.executorService;
    }

很清晰,EventBus.getDefault()利用了单例模式中的双重校验方式获取一个EventBus实例。在构造函数最后一步通过Executors.newCachedThreadPool()创建一个线程池(主要在后台任务或异步任务时使用),对于线程池的创建可以了解这篇文章【探索Java 线程池

3、EventBus的注册过程(register)。
public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        //获取一个订阅方法List,包含了所订阅的类声明的订阅方法
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

在上面的代码中可以知道,做了两件事,
1、查找订阅的方法,SubscriberMethodFinder # findSubscriberMethods()
2、进行订阅。subscribe()

3.1、是如何查找已声明的订阅方法?进入SubscriberMethodFinder类的indSubscriberMethods()方法

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
       //1
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) { //2
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else { //3
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

在上面代码1处,首先会从缓存中查找,subscriberClass正是调用register()注册的订阅者,METHOD_CACHE是一个ConcurrentHashMap的实例,从这里可以知道,subscriberClass作为ConcurrentHashMap的key,而ConcurrentHashMap的value(List<SubscriberMethod>)存放的是当前订阅者中声明的订阅方法。注释2处表示是否忽略索引的位置,默认为false,因此会在通过findUsingInfo()查找,注释3处把找到的方法保存到METHOD_CACHE中。一起研究findUsingInfo()方法:

 private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        //从对象池中寻找FindState实例(数组的形式实现对象池)
        FindState findState = prepareFindState();
        //初始化需要寻找的方法的所属类
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
          //找到后,获取订阅者的信息 
            findState.subscriberInfo = getSubscriberInfo(findState);
           //开始保存获取的订阅方法
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                //如果上面没有找到,则通过反射、注解的方式寻找
                findUsingReflectionInSingleClass(findState);
            }
            //移动到父类
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState); //找到方法后释放findState
    }

经过上面代码注释,对寻找订阅方法的步骤已经很清晰了,下面贴出getSubscriberInfo()和findUsingReflectionInSingleClass()两个方法:

 private SubscriberInfo getSubscriberInfo(FindState findState) {
        if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
            SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
            //判断是否是需要寻找的具体类
            if (findState.clazz == superclassInfo.getSubscriberClass()) {
                return superclassInfo;
            }
        }
        //通过订阅信息索引进行查找
        if (subscriberInfoIndexes != null) {
            for (SubscriberInfoIndex index : subscriberInfoIndexes) {
                SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
                if (info != null) {
                    return info;
                }
            }
        }
        return null;
    }
private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
      //遍历订阅者中的所有方法
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            //对订阅方法的作用域进行判断
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

3.2、是如何进行订阅的?subscribe()

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;

        //通过订阅者和订阅方法创建一个订阅事件
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);

        //根据eventType从缓存中获取subscriptions
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);

        //若缓存中没有,则添加到缓存
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

         //遍历订阅事件,根据优先级重新排序
        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }
      
        //这里根据订阅者查找EventType缓存
        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);
        
        //对粘性进行处理
        if (subscriberMethod.sticky) {
            if (eventInheritance) {      //同时考虑子类
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

上面的代码,就是整个的订阅过程了,比较关键的两个东西subscriptionsByEventType和typesBySubscriber,首先把订阅者和订阅方法封装进subscriptionsByEventType,当post事件时根据eventType查找具体的订阅者,然后进行处理。而typesBySubscriber存放的订阅者,在解注册时根据订阅者查找对应的eventType。在最后,会判断是否是粘性事件,如果是,则立刻进行处理。

  private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
        if (stickyEvent != null) {    
            //post事件是最终也调用这个方法,所以在下一步进行分析
            postToSubscription(newSubscription, stickyEvent, isMainThread());
        }
    }
4、发送事件 post()
public void post(Object event) {
        //每个线程都有一个提交事件的状态
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);    //把提交的事件对象添加到eventQueue

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            // 2
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

从上面代码可以发现,首先根据PostingThreadState确定当前线程的提交状态,从PostingThreadState获取eventQueue,用于存放提交的事件;然后会在注释2处,对事件进行遍历,直到eventQueue内部元素为空。

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        //考虑子类
        if (eventInheritance) {
            //查找所有的EventType
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();

            //遍历,对eventTypes内部逐一处理
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

在上面的代码中,先查找出所有的EventType,然后通过postSingleEventForEventType()逐个逐个的处理,postSingleEventForEventType()内部又调用了postToSubscription()。

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

分析到这里,可以知道,会根据在最初声明的订阅方法中标识的threadMode进行对应处理,这里就选择文章第一步中的MAIN标识进行分析。
1、isMainThread为true,表示在主线程中执行,直接调用invokeSubscriber()。
2、若不在主线程执行,调用mainThreadPoster.enqueue(),mainThreadPoster实际是HandlerPoster(继承了Handler)类对象的引用。

//HandlerPoster # enqueue()
 public void enqueue(Subscription subscription, Object event) {
        //把订阅者和提交的事件封装在pendingPost
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //内部利用了单向链表的方式进行摆放
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    //HandlerPoster # handleMessage()
    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

根据上面HandlerPoster中的两段代码发现,最终是以Handler消息机制的方式进行处理事件,其中HandlerPoster的Looper是在初始化EventBus的时候获取了主线程中的Looper对象,在MainThreadSupport的实现类AndroidHandlerMainThreadSupport中。最后在handleMessage()中轮询queue,queue是PendingPostQueue的一个实例。调用poll()获取pendingPost,再调用eventBus.invokeSubscriber()进行处理。

 synchronized PendingPost poll() {
        PendingPost pendingPost = head;
        //从链表头部head开始,取完后就移除,下一个链节点作为head,如此循环
        if (head != null) {
            head = head.next;
            if (head == null) {
                tail = null;
            }
        }
        return pendingPost;
    }

//从链表中获取消息后的处理
void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);  //释放PendingPost
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }

在(threadMode == MAIN)时,经过上面的分析可以知道,不管是在主线程调用,还是工作线程中(通过Handler机制处理消息),都是调用了EventBus # invokeSubscriber(Subscription subscription, Object event) 这个方法。

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //通过反射进行处理
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }
5、解注册unregister()
public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

解注册的过程就比较简单了,还记得提交事件时讲到的typesBySubscriber吗?里面以键值对的形式存储subscriber和eventType的List。因此,在这根据订阅者subscriber找到eventType的List,然后遍历,进行释放。
完篇

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容