15. sharding-jdbc源码之EventBus-轻量级进程内事件分发组件

阿飞Javaer,转载请注明原创出处,谢谢!

EventBus来自于google-guava包中。源码注释如下:

Dispatches events to listeners, and provides ways for listeners to register themselves.
The EventBus allows publish-subscribe-style communication between components 
without requiring the components to explicitly register with one another (and thus be 
aware of each other).  It is designed exclusively to replace traditional Java in-process 
event distribution using explicit registration. It is not a general-purpose publish-subscribe
system, nor is it intended for interprocess communication.

翻译:将事件分派给监听器,并为监听器提供注册自己的方法。EventBus允许组件之间的发布 - 订阅式通信,而不需要组件彼此明确注册(并且因此彼此意识到)。 它专门用于使用显式注册替换传统的Java进程内事件分发。 它不是一个通用的发布 - 订阅系统,也不是用于进程间通信。

使用参考

关于EventBus的用例代码提取自sharding-jdbc源码,并结合lombok最大限度的简化:

  • EventBusInstance--用于获取EventBus实例(饿汉式单例模式)
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class EventBusInstance {

    private static final EventBus INSTANCE = new EventBus();

    public static EventBus getInstance() {
        return INSTANCE;
    }
}
  • DMLExecutionEvent--发布订阅事件模型
@Getter
@Setter
public class DMLExecutionEvent {
    private String id;
    private String dataSource;
    private Date sendTime;
}
  • DMLExecutionEventListener--事件监听器
public final class DMLExecutionEventListener {

    @Subscribe
    @AllowConcurrentEvents
    public void listener(final DMLExecutionEvent event) {
        System.out.println("监听的DML执行事件: " + JSON.toJSONString(event));
        // do something
    }
}

-- Main--主方法:注册订阅者监听事件,以及发布事件。

/**
 * @author wangzhenfei9
 * @version 1.0.0
 * @since 2018年04月24日
 */
public class Main {

    static{
        System.out.println("register listener...");
        EventBusInstance.getInstance().register(new DMLExecutionEventListener());
    }

    public static void main(String[] args) throws InterruptedException {

        for (int i=0; i<10; i++) {
            pub();
            Thread.sleep(3000);
        }
    }

    private static void pub(){
        DMLExecutionEvent event = new DMLExecutionEvent();
        event.setId(String.valueOf(new Random().nextInt(1000)));
        event.setDataSource("sj_db_1");
        event.setSendTime(new Date());
        System.out.println("发布的DML执行事件: " + JSON.toJSONString(event));
        EventBusInstance.getInstance().post(event);
    }
}

核心方法

EventBus一些重要方法解释如下:

  • post(Object):Posts an event to all registered subscribers. This method will return successfully after the event has been posted to all subscribers, and regardless of any exceptions thrown by subscribers.
  • register(Object): Registers all subscriber methods on object to receive events.Subscriber methods are selected and classified using this EventBus's SubscriberFindingStrategy; the default strategy is the AnnotatedSubscriberFinder.
  • unregister(Object):Unregisters all subscriber methods on a registered object.

源码分析

主要分析发布事件以及订阅的核心源码;

发布源码分析

public void post(Object event) {
    // 得到所有该类已经它的所有父类(因为有些注册监听器是监听其父类)
    Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());

    boolean dispatched = false;
    // 遍历类本身以及所有父类
    for (Class<?> eventType : dispatchTypes) {
        // 重入读锁先锁住
        subscribersByTypeLock.readLock().lock();
        try {
            // 得到类的所有订阅者,例如DMLExecutionEvent的订阅者就是DMLExecutionEventListener(EventSubscriber有两个属性:重要的属性target和method,target就是监听器即DMLExecutionEventListener,method就是监听器方法即listener;从而知道DMLExecutionEvent这个事件由哪个类的哪个方法监听处理)
            Set<EventSubscriber> wrappers = subscribersByType.get(eventType);

            if (!wrappers.isEmpty()) {
                // 如果有时间订阅者,那么dispatched = true,表示该事件可以分发
                dispatched = true;
                // 遍历所有的时间订阅者,每个订阅者的队列都增加该事件
                for (EventSubscriber wrapper : wrappers) {
                    enqueueEvent(event, wrapper);
                }
            }
        } finally {
            subscribersByTypeLock.readLock().unlock();
        }
    }

    if (!dispatched && !(event instanceof DeadEvent)) {
        post(new DeadEvent(this, event));
    }

    // 分发进入队列的事件
    dispatchQueuedEvents();
}

/** 
 * queues of events for the current thread to dispatch;
 * 核心数据结构为LinkedList,保存的是EventBus.EventWithSubscriber类型数据
 */
private final ThreadLocal<Queue<EventBus.EventWithSubscriber>> eventsToDispatch =
        new ThreadLocal<Queue<EventBus.EventWithSubscriber>>() {
            @Override protected Queue<EventBus.EventWithSubscriber> initialValue() {
                return new LinkedList<EventBus.EventWithSubscriber>();
            }
        };

void enqueueEvent(Object event, EventSubscriber subscriber) {
    // 数据结构为new LinkedList<EventWithSubscriber>(),EventWithSubscriber就是对event和subscriber的封装,LinkedList数据结构保证进入队列和消费队列顺序一致
    eventsToDispatch.get().offer(new EventBus.EventWithSubscriber(event, subscriber));
}

/**
 * Drain the queue of events to be dispatched. As the queue is being drained,
 * new events may be posted to the end of the queue. 
 * 排干要被分发的事件队列,正在排干的过程中,可能有新的事件被追加到队列尾部
 */
void dispatchQueuedEvents() {
    // don't dispatch if we're already dispatching, that would allow reentrancy
    // and out-of-order events. Instead, leave the events to be dispatched
    // after the in-progress dispatch is complete.
    // 如果正在排干队列,则不分发
    if (isDispatching.get()) {
        return;
    }

    // ThreadLocal设置正在分发即isDispatching为true
    isDispatching.set(true);
    try {
        Queue<EventBus.EventWithSubscriber> events = eventsToDispatch.get();
        EventBus.EventWithSubscriber eventWithSubscriber;
        while ((eventWithSubscriber = events.poll()) != null) {
            // 调用订阅者处理事件(method.invoke(target, new Object[] { event });,method和target来自订阅者)
            dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
        }
    } finally {
        // ThreadLocal可能内存泄漏,用完需要remove
        isDispatching.remove();
        // 队列中的事件任务处理完,清空队列,即所谓的排干(Drain)
        eventsToDispatch.remove();
    }
}

订阅源码分析

/**
 * Registers all subscriber methods on {@code object} to receive events.
 * 注册object上所有订阅方法,用来接收事件,上面的使用参考,DMLExecutionEventListener就是这里的object
 */
public void register(Object object) {
    // Multimap是guava自定义数据结构,类似Map<K, Collection<V>>,key就是事件类型,例如DMLExecutionEvent,value就是EventSubscriber即事件订阅者集合(说明,这个的订阅者集合是指object里符合订阅者的所有方法,例如DMLExecutionEventListener.listener(),DMLExecutionEventListener中可以有多个订阅者,注解@Subscribe即可),
    Multimap<Class<?>, EventSubscriber> methodsInListener =
            finder.findAllSubscribers(object);
    // 重入写锁保证线程安全
    subscribersByTypeLock.writeLock().lock();
    try {
        // 把订阅者信息放到map中缓存起来(发布事件post()时就会用到)
        subscribersByType.putAll(methodsInListener);
    } finally {
        subscribersByTypeLock.writeLock().unlock();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345