EventBus 使用以及源码浅析

EventBus用法及解析

EventBus介绍:

!
1.png
  • EventBus主要是用来组件之间进行信息传递的,相对于接口回调的方式的话,EventBus是可以解耦两个组件的,而接口回调,需要传递一个接口对象到另一个组件,耦合度比较高,而且如果在界面销毁时处理不当,还有可能造成内存泄漏,EventBus的另一个优点就是使用非常方便,API调用非常简单。

EventBus使用

  • 添加依赖

      compile 'org.greenrobot:eventbus:3.1.1'
    
  • 在需要发送消息的地方,添加如下代码

       EventBus.getDefault().post(messageEvent);
    
  • 在需要接收到事件的组件中需要做如下处理

    • 注册EventBus;
        EventBus.getDefault().register(this); 
* 定义一个接收事件的方法;

         @Subscribe(threadMode = ThreadMode.ASYNC, sticky = true)
         public void onMessageEvent(MessageObject object) {
                //执行接收到事件的操作
         }
    
         这里有几个点需要注意:
            1. 必须要带有@Subscibe 并且方法的参数只能为1个 ,修饰的关键字只能为public
            2. 可以指定线程,不指定默认为发出事件的那个线程;
                1.ThreadMode.ASYNC
                2.ThreadMode.POSTING
                3.ThreadMode.MAIN
                4.ThreadMode.BackGround
            3. 可以指定当前接收的事件是否为粘性事件,正常事件发送时,组件必须已经注册,不然无法接收到事件,而粘性事件的话,如果发送时组件没有注册,也可以接收到事件(发送时必须通过PostSticky()而不是post , 会将该事件添加到Sticky事件集合)
* 在组件销毁时,取消注册;

         EventBus.getDefault().unregister(this);

EventBus源码解析

从上面的EventBus使用可以看出EventBus使用非常简单,那么EventBus底层究竟是怎么做的,我们可以一起学习下

  • EventBus注册过程
    EventBus首先需要调用getDefault()方法,那么我们 getDefault方法中究竟做了什么

      public static EventBus getDefault() {
          if (defaultInstance == null) {
              synchronized (EventBus.class) {
                  if (defaultInstance == null) {
                  defaultInstance = new EventBus(); 
                  }
              }
          }
    
          return defaultInstance;
      }
    

    可以看出只是通过单例模式构造了一个EventBus对象,调用了EventBus()的构造方法

    public EventBus() {
        this(DEFAULT_BUILDER);
    }

    EventBus(EventBusBuilder builder) {
        logger = builder.getLogger(); //是否打印log
        subscriptionsByEventType = new HashMap<>(); // 
        typesBySubscriber = new HashMap<>();
        stickyEvents = new ConcurrentHashMap<>();
        mainThreadSupport = builder.getMainThreadSupport();
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        backgroundPoster = new BackgroundPoster(this);
        asyncPoster = new AsyncPoster(this);
        indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        logSubscriberExceptions = builder.logSubscriberExceptions;
        logNoSubscriberMessages = builder.logNoSubscriberMessages;
        sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
        sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
        throwSubscriberException = builder.throwSubscriberException;
        eventInheritance = builder.eventInheritance;
        executorService = builder.executorService;
    }

构造方法中需要传入一个EventBusBuilder对象,我们在getDefault()时传递了一个DEFAULT_BUILDER

    private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    boolean logSubscriberExceptions = true;
    boolean logNoSubscriberMessages = true;
    boolean sendSubscriberExceptionEvent = true;
    boolean sendNoSubscriberEvent = true;
    boolean throwSubscriberException;
    boolean eventInheritance = true;
    boolean ignoreGeneratedIndex;   // 查找方法策略
    boolean strictMethodVerification;
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
    List<Class<?>> skipMethodVerificationForClasses;
    List<SubscriberInfoIndex> subscriberInfoIndexes;
    Logger logger;
    MainThreadSupport mainThreadSupport;

    EventBusBuilder() {
    }
而DEFAULT_BUILDER只是一个默认值得Builder对象 , 有的变量在后面的分析中会用到,也就是说EventBus在getDefault()时,只是通过双重判断构造了一个EventBus单利对象,传入一个具有默认值得EventBusBuilder对象,接下来看register这个方法

    public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }   
    
在register时,我们传递了当前类,然后通过findSubscriberMethods(subscriberClass)获取当前类的所有订阅方法,再调用 subscribe(subscriber, subscriberMethod)这个方法,我们先看findSubscriberMethods(subscriberClass)做了什么事情;
     List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
       List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
1. 从缓存中获取订阅类中的订阅方法,如果缓存中有就返回,没有那么就根据ignoreGeneratedIndex这个标志位来确定查找方法的逻辑,而这个变量的值就是在EventBusBuilder()中定义的,默认值为false 也就是说默认是走findUsingInfo(subscriberClass)这个方法的,这个方法是优先使用索引方式查找订阅方法,如果为true是通过反射的方式获取订阅方法,找到之后会将方法加入到缓存,下次取出的时候就不需要通过反射获取。


        private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
            
            //...
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method,     subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
               findUsingReflectionInSingleClass(findState);
            }  
        }


         private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
            FindState findState = prepareFindState();
            findState.initForSubscriber(subscriberClass);
            while (findState.clazz != null) {
                findUsingReflectionInSingleClass(findState);
                findState.moveToSuperclass();
            }
            return getMethodsAndRelease(findState);
        }

通过findstate里面的subscriberInfo获取里面的方法,如果这个属性为空,那么就通过findUsingReflectionInSingleClass(findState)这个方法来获取订阅方法,这个属性的值是通过构建EventBus对象传递的属性值,我们通过getDefault()方式获取的EventBus这个属性就是null,传入Builder对象来使用EventBus也不常用,所以我也没有进行深入研究。

         private void findUsingReflectionInSingleClass(FindState findState) {
            Method[] methods;
            try {
                // This is faster than getMethods, especially when subscribers are fat classes like Activities
                methods = findState.clazz.getDeclaredMethods();
            } catch (Throwable th) {
                // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
                methods = findState.clazz.getMethods();
                findState.skipSuperClasses = true;
            }
            for (Method method : methods) {
                int modifiers = method.getModifiers();
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    if (parameterTypes.length == 1) {
                        Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                        if (subscribeAnnotation != null) {
                            Class<?> eventType = parameterTypes[0];
                            if (findState.(method, eventType)) {
                                ThreadMode threadMode = subscribeAnnotation.threadMode();
                                findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMod      e,
                                        subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                            }
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                        throw new EventBusException("@Subscribe method " + methodName +
                                "must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException(methodName +
                            " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
                }
            }
        }   

获取订阅类声明的所有方法; 然后对获取到的方法全部遍历一遍
获取方法的修饰符:即方法前面的public、private等关键字。
如果该类方法使用了@subscribe标注、方法中只有一个参数、且方法修饰符为public。 findState.checkAdd(method, eventType) 如果之前没有存在过则返回true
判断@Subscribe标注中的threadMode对应的值,默认模式ThreadMode.POSTING
创建一个SubscriberMethod对象,该对象很简单就是保存有方法、方法参数类型、线程模式、订阅的优先级、sticky标志位。与Retrofit类似只是这里创建了一个SubscriberMethod对象。并将该对象添加到FindSate的List集合中。
这样就完成了通过订阅类获取订阅的所有方法的过程,接下来就是调用 subscribe(subscriber, subscriberMethod);

        private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);

        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

获取方法参数类型;注意:使用@Subscribe标注的方法有且仅有一个参数
利用订阅者对象及其事件处理方法构建一个Subscription对象,该对象存储有Object、SubscriberMethod对象
从subscriptionsByEventType集合中获取当前事件对应的Subscription对象集合; 如果得到的集合为空则创建一个 这 样的集合,并将刚创建的Subscription对象添加进subscriptionsByEventType集合中;如果得到的集合不为空且 刚 创建的Subscription对象已经存在该集合中则抛出异常,即同一个对象不能注册两次!
将第二步创建的Subscription对象按照优先级存入Subscription对象集合中,该集合中的元素都是按照优先级从高到低存放.
以subscriber对象为键,从typesBySubscriber获取该对象对应的接收事件类型集合,没有则创建一个这样的集合,然后当前事件类型添加到该集合中,最后将整个集合添加进typesBySubscriber集合中;有则直接添加到接收事件类型集合中;
该值默认为false,除非在注册事件方法时使用了如下的标注@Subscribe(sticky = true);那么就会执行到这里。通过便利粘性事件Map集合找到对应的方法,如果找到了粘性事件方法就会调用checkPostStickyEventToSubscription(newSubscription, stickyEvent),而这个方法内部就是调用了EventBus.Post()这个方法。
接下来我们看他的Post过程

      public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get(); // 获取到当前线程的PostingThreadState 
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);  // 将发送的事件加到当前线程的事件队列中

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();  // 判断是不是在UI线程
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

       final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

               public void postSticky(Object event) {
                  synchronized (stickyEvents) {
                  stickyEvents.put(event.getClass(), event);
                    }
                // Should be posted after it is putted, in case the subscriber wants to remove immediately
                post(event);
                  }

在Post方法中,只是获取到一个当前线程的PostingThreadState,然后将事件添加到了事件队列中,真正执行post事件的还是通过 postSingleEvent(eventQueue.remove(0), postingState)

     private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); // 传递参数类型类的继承的类,以及所有接口
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); // 根据传入参数的类型进行查找所有的订阅方法
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

在上面的方法中,只是找到所有的事件类的父类或者接口,并且通过postSingleEventForEventType(event, postingState, clazz)进行处理,根据处理结果判断是否找到订阅方法,没有找到则根据是否需要打印log信息,打印log以及发送了一个没有订阅方法方法的事件

       private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {

        //...
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);  // subscriptionsByEventTypemap集合,保存了发送的参数对应的Class文件以及所有的事件处理者 ,这个方法是用来获取所有订阅了该改事件的处理者
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                 if (aborted) {
                        break;
                      }
            }
            return true;
        }
        return false;
    }

获取到了所有的事件处理者后就直接遍历,调用 postToSubscription(subscription, event, postingState.isMainThread) 这个方法进行处理
释放对象中的资源

         private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:   // 在当前发送的线程中执行 , 不作处理
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:      // 在UI线程执行,如果当前是主线程,则直接不做处理 ,不是就通过mainThreadPoster ,post到主线程进行处理
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED: // 相当于主线程优先,如果主线程的poster存在就在主线程执行,不然在当前线程执行
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:    // 在子线程中执行,如果为主线程就通过backgroundPoster去执行,否则在当前线程执行
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC: // 在子线程中执行,通过asyncPoster执行
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }

根据当前线程判断以及需要执行的线程判断是在哪个线程执行,如果不需要更改线程,则调用 invokeSubscriber(subscription, event) , 不然则通过每个线程的poster来执行

        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        new HandlerPoster(eventBus, looper, 10);

        Object looperOrNull = getAndroidMainLooperOrNull();  // 获取主线程的looper
        return looperOrNull == null ? null :
                new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);

        public class HandlerPoster extends Handler implements Poster {

            private final PendingPostQueue queue;
            private final int maxMillisInsideHandleMessage;
            private final EventBus eventBus;
            private boolean handlerActive;
        
            protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
                super(looper);
                this.eventBus = eventBus;
                this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
                queue = new PendingPostQueue();
            }
        
            public void enqueue(Subscription subscription, Object event) {
                PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
                synchronized (this) {
                    queue.enqueue(pendingPost);
                    if (!handlerActive) {
                        handlerActive = true;
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                    }
                }
            }
        
            @Override
            public void handleMessage(Message msg) {
                boolean rescheduled = false;
                try {
                    long started = SystemClock.uptimeMillis();
                    while (true) {
                        PendingPost pendingPost = queue.poll();
                        if (pendingPost == null) {
                            synchronized (this) {
                                // Check again, this time in synchronized
                                pendingPost = queue.poll();
                                if (pendingPost == null) {
                                    handlerActive = false;
                                    return;
                                }
                            }
                        }
                        eventBus.invokeSubscriber(pendingPost);
                        long timeInMethod = SystemClock.uptimeMillis() - started;
                        if (timeInMethod >= maxMillisInsideHandleMessage) {
                            if (!sendMessage(obtainMessage())) {
                                throw new EventBusException("Could not send handler message");
                            }
                            rescheduled = true;
                            return;
                        }
                    }
                } finally {
                    handlerActive = rescheduled;
                }
            }
        }

我们可以看到mainThreadPoster其实就是一个绑定了主线程的looper对象的Handler 通过handleMessage将消息传回主线程处理

final class BackgroundPoster implements Runnable, Poster {

        private final PendingPostQueue queue;
        private final EventBus eventBus;
    
        private volatile boolean executorRunning;
    
        BackgroundPoster(EventBus eventBus) {
            this.eventBus = eventBus;
            queue = new PendingPostQueue();
        }
    
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            synchronized (this) {
                queue.enqueue(pendingPost);
                if (!executorRunning) {
                    executorRunning = true;
                    eventBus.getExecutorService().execute(this);
                }
            }
        }
    
        @Override
        public void run() {
            try {
                try {
                    while (true) {
                        PendingPost pendingPost = queue.poll(1000);
                        if (pendingPost == null) {
                            synchronized (this) {
                                // Check again, this time in synchronized
                                pendingPost = queue.poll();
                                if (pendingPost == null) {
                                    executorRunning = false;
                                    return;
                                }
                            }
                        }
                        eventBus.invokeSubscriber(pendingPost);
                    }
                } catch (InterruptedException e) {
                    eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
                }
            } finally {
                executorRunning = false;
            }
        }
    
    }

BackgroundPoster内部维护了一个消息队列,执行的时候从消息队列中取出消息通过线程池进行处理,而线程池是在EventBuilder中的参数,在我们通过getDefault()传入默认的Builder时,其实已经创建了一个默认的线城池对象,某一时段内BackgroundThread模式的事件都会在BackgroundPoster的run方法中排队处理,也就是说该时段内的所有事件是在一个线程中排队后串行执行的(队列中的事件处理完之后又有新的事件发布才会新开线程)

        ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
        private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool(); 

创建一个线程池,根据需要创建新线程,但当它们可用时,将重用先前构建的线程。这些池通常会提高执行许多短命异步任务的程序的性能。调用{@代码Exc}将重用以前构建的线程,如果可用的话。如果没有现有线程可用,则将创建新线程并将其添加到池中。未使用六十秒的线程会终止。因此,一个空闲时间足够长的池不要消耗任何资源

        class AsyncPoster implements Runnable, Poster {
        
            private final PendingPostQueue queue;
            private final EventBus eventBus;
        
            AsyncPoster(EventBus eventBus) {
                this.eventBus = eventBus;
                queue = new PendingPostQueue();
            }
        
            public void enqueue(Subscription subscription, Object event) {
                PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
                queue.enqueue(pendingPost);
                eventBus.getExecutorService().execute(this);
            }
        
            @Override
            public void run() {
                PendingPost pendingPost = queue.poll();
                if(pendingPost == null) {
                    throw new IllegalStateException("No pending post available");
                }
                eventBus.invokeSubscriber(pendingPost);
            }
        
        }

          void invokeSubscriber(PendingPost pendingPost) {
                Object event = pendingPost.event;
                Subscription subscription = pendingPost.subscription;
                PendingPost.releasePendingPost(pendingPost);
                if (subscription.active) {
                    invokeSubscriber(subscription, event);      // 最终还是执行invokeSubscriber
                }
            }

AsyncPoster与BackgroundPoster类似 ,内部维护了一个消息队列,执行的时候从消息队列中取出消息通过线程池进行处理,AsyncPoster同样也是一个Runnable,与Backgroundposter不同的是AsyncPoster并发度更高,不是在一个线程中对队列中的事件进行串行处理,而是每一个新添加的任务都会在线程池中开辟一个新线程执行
接下来看下invokeSubscriber(subscription, event),是如何处理事件的

     void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

可以看到是通过反射执行subscriberMethod,到此为止订阅方法才被执行

  • EventBus 取消注册流程

       public synchronized void unregister(Object subscriber) {
          List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); // 获取到参数类型集合
          if (subscribedTypes != null) {
              for (Class<?> eventType : subscribedTypes) {
                  unsubscribeByEventType(subscriber, eventType);
              }
              typesBySubscriber.remove(subscriber);
          } else {
              logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
          }
      }
      
      可以看出通过订阅类获取到该类所有订阅方法的参数类型集合,遍历集合调用 unsubscribeByEventType(subscriber, eventType),并且移除当前类的订阅信息
    
    
      private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
          List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
          if (subscriptions != null) {
              int size = subscriptions.size();
              for (int i = 0; i < size; i++) {
                  Subscription subscription = subscriptions.get(i);
                  if (subscription.subscriber == subscriber) {
                      subscription.active = false;
                      subscriptions.remove(i);
                      i--;
                      size--;
                  }
              }
          }
      }
    
    
      final class Subscription {
          final Object subscriber;
          final SubscriberMethod subscriberMethod;
          /**
           * Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery
           * {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions.
           */
          volatile boolean active;
      
          Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
              this.subscriber = subscriber;
              this.subscriberMethod = subscriberMethod;
              active = true;
          }
      
          @Override
          public boolean equals(Object other) {
              if (other instanceof Subscription) {
                  Subscription otherSubscription = (Subscription) other;
                  return subscriber == otherSubscription.subscriber
                          && subscriberMethod.equals(otherSubscription.subscriberMethod);
              } else {
                  return false;
              }
          }
      
          @Override
          public int hashCode() {
              return subscriber.hashCode() + subscriberMethod.methodString.hashCode();
          }
      }
    

    并且将保存的订阅方法从集合中移除,这样就完成了取消订阅过程

    到此为止EventBus源码分析也到此结束,有的地方我也研究的不是很清楚,但是其实我们可以将它的流程看清楚,其实也很简单,就是注册的时候找到注册类内的所有注册方法,形成一个一对多的映射关系,保存到集合中,再根据方法中的参数类型,+ 方法名,与订阅的类,也有一个映射关系,当发送事件时,就根据发送的事件参数类型,找到与之对应的所有的类,并且获取到每个类中订阅该事件的方法信息,通过反射调用该方法就可以了。原理并不是很复杂。

遇到的问题 :

发送Int类型 接收失败 
    : 由于反射Int参数类型反射获取到的是它的包装类类型,而存入的时候,存入的是Int类型,所以接收不到

泛型被擦除

    : 加入参数是List<> + 泛型 由于反射获取的类型只会是List类型,所以通过参数类型匹配时,会匹配所有的list类型的参数方法,而不会管集合的泛型
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 193,968评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,682评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,254评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,074评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,964评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,055评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,484评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,170评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,433评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,512评论 2 308
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,296评论 1 325
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,184评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,545评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,150评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,437评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,630评论 2 335

推荐阅读更多精彩内容