SpringBoot项目实现发布订阅模式,真的很简单!

在项目里,经常会有一些主线业务之外的其它业务,比如,下单之后,发送通知、监控埋点、记录日志……

这些非核心业务,如果全部一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。

下单之后的逻辑

所以,一般在开发的时候,都会把这些操作å抽象成观察者模式,也就是发布/订阅模式(这里就不讨论观察者模式和发布/订阅模式的不同),而且一般会采用多线程的方式来异步执行这些观察者方法。


观察者模式

一开始,我们都是自己去写观察者模式。

自己实现观察者模式



观察者简图

观察者

观察者定义接口

/** *@Author: fighter3 *@Description: 观察者接口 *@Date: 2022/11/7 11:40 下午 */publicinterfaceOrderObserver{voidafterPlaceOrder(PlaceOrderMessage placeOrderMessage);}

具体观察者@Slf4j

public class OrderMetricsObserver implements OrderObserver {

@Override

public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {

log.info("[afterPlaceOrder] metrics");

}}@Slf4j

public class OrderLogObserver implements OrderObserver{

@Override

public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {

log.info("[afterPlaceOrder] log.");

}}@Slf4j

public class OrderNotifyObserver implements OrderObserver{

@Override

public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {

log.info("[afterPlaceOrder] notify.");

}}

业务通知观察者

日志记录观察者

监控埋点观察者

被观察者

消息实体定义

@DatapublicclassPlaceOrderMessageimplementsSerializable {/**

    * 订单号

    */privateStringorderId;/**

    * 订单状态

    */privateInteger orderStatus;/**

    * 下单用户ID

    */privateStringuserId;//……}

被观察者抽象类

publicabstractclassOrderSubject{//定义一个观察者列表privateList orderObserverList =newArrayList<>();//定义一个线程池,这里参数随便写的ThreadPoolExecutor threadPoolExecutor =newThreadPoolExecutor(6,12,6, TimeUnit.SECONDS,newArrayBlockingQueue<>(30));//增加一个观察者publicvoidaddObserver(OrderObserver o){this.orderObserverList.add(o);    }//删除一个观察者publicvoiddelObserver(OrderObserver o){this.orderObserverList.remove(o);    }//通知所有观察者publicvoidnotifyObservers(PlaceOrderMessage placeOrderMessage){for(OrderObserver orderObserver : orderObserverList) {//利用多线程异步执行threadPoolExecutor.execute(() -> {                orderObserver.afterPlaceOrder(placeOrderMessage);            });        }    }}

这里利用了多线程,来异步执行观察者。

被观察者实现类

/** *@Author: fighter3 *@Description: 订单实现类-被观察者实现类 *@Date: 2022/11/7 11:52 下午 */@Service@Slf4jpublicclassOrderServiceImplextendsOrderSubjectimplementsOrderService{/**

    * 下单

    */@OverridepublicPlaceOrderResVOplaceOrder(PlaceOrderReqVO reqVO){        PlaceOrderResVO resVO =newPlaceOrderResVO();//添加观察者this.addObserver(newOrderMetricsObserver());this.addObserver(newOrderLogObserver());this.addObserver(newOrderNotifyObserver());//通知观察者this.notifyObservers(newPlaceOrderMessage());        log.info("[placeOrder] end.");returnresVO;    }}

测试

@Test@DisplayName("下单")voidplaceOrder(){        PlaceOrderReqVO placeOrderReqVO =newPlaceOrderReqVO();        orderService.placeOrder(placeOrderReqVO);    }

测试执行结果

2022-11-0800:11:13.617INFO20235---[pool-1-thread-1]c.f.obverser.OrderMetricsObserver:[afterPlaceOrder]metrics2022-11-0800:11:13.618INFO20235---[          main]cn.fighter3.obverser.OrderServiceImpl:[placeOrder]end.2022-11-0800:11:13.618INFO20235---[pool-1-thread-3]c.fighter3.obverser.OrderNotifyObserver:[afterPlaceOrder]notify.2022-11-0800:11:13.617INFO20235---[pool-1-thread-2]cn.fighter3.obverser.OrderLogObserver:[afterPlaceOrder]log.

可以看到,观察者是异步执行的。

利用Spring精简

可以看到,观察者模式写起来还是比较简单的,但是既然都用到了Spring来管理Bean的生命周期,代码还可以更精简一些。


Spring精简观察者模式

观察者实现类:定义成Bean

OrderLogObserver@Slf4j

@Service

public class OrderLogObserver implements OrderObserver {

@Override

public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {

log.info("[afterPlaceOrder] log.");

}}

OrderMetricsObserver

@Slf4j@Servicepublic class OrderMetricsObserver implements OrderObserver {@Overridepublic void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {log.info("[afterPlaceOrder] metrics");    }}

OrderNotifyObserver

@Slf4j@Servicepublic class OrderNotifyObserver implements OrderObserver {@Overridepublic void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {log.info("[afterPlaceOrder] notify.");    }}

被观察者:自动注入Bean

OrderSubjectpublic abstract class OrderSubject {

/**

* 利用Spring的特性直接注入观察者*/

@Autowired

protected List orderObserverList;

//定义一个线程池,这里参数随便写的

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));

//通知所有观察者

public void notifyObservers(PlaceOrderMessage placeOrderMessage) {

for (OrderObserver orderObserver : orderObserverList) {

//利用多线程异步执行

threadPoolExecutor.execute(() -> {orderObserver.afterPlaceOrder(placeOrderMessage);});}}}

OrderServiceImpl

@Service@Slf4jpublicclassOrderServiceImplextendsOrderSubjectimplementsOrderService{/**

    * 实现类里也要注入一下

    */@AutowiredprivateList orderObserverList;/**

    * 下单

    */@OverridepublicPlaceOrderResVOplaceOrder(PlaceOrderReqVO reqVO){        PlaceOrderResVO resVO =newPlaceOrderResVO();//通知观察者this.notifyObservers(newPlaceOrderMessage());        log.info("[placeOrder] end.");returnresVO;    }}

这样一来,发现被观察者又简洁了很多,但是后来我发现,在SpringBoot项目里,利用Spring事件驱动驱动模型(event)模型来实现,更加地简练。

Spring Event实现发布/订阅模式

Spring Event对发布/订阅模式进行了封装,使用起来更加简单,还是以我们这个场景为例,看看怎么来实现吧。

自定义事件

PlaceOrderEvent:继承ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的所有应用程序事件扩展类。

publicclassPlaceOrderEventextendsApplicationEvent{publicPlaceOrderEvent(PlaceOrderEventMessage source){super(source);    }}

PlaceOrderEventMessage:事件消息,定义了事件的消息体。

@DatapublicclassPlaceOrderEventMessageimplementsSerializable {/**

    * 订单号

    */privateStringorderId;/**

    * 订单状态

    */privateInteger orderStatus;/**

    * 下单用户ID

    */privateStringuserId;//……}

事件监听者

事件监听者,有两种实现方式,一种是实现ApplicationListener接口,另一种是使用@EventListener注解。


事件监听者实现

实现ApplicationListener接口

实现ApplicationListener接口,重写onApplicationEvent方法,将类定义为Bean,这样,一个监听者就完成了。

OrderLogListener

@Slf4j@Servicepublic class OrderLogListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] log.");    }}

OrderMetricsListener

@Slf4j@Servicepublic class OrderMetricsListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] metrics");    }}

OrderNotifyListener

@Slf4j@Servicepublic class OrderNotifyListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] notify.");    }}

使用@EventListener注解

使用@EventListener注解就更简单了,直接在方法上,加上@EventListener注解就行了。

OrderLogListener@Slf4j

@Service

public class OrderLogListener {

@EventListener

public void orderLog(PlaceOrderEvent event) {

log.info("[afterPlaceOrder] log.");

}}

OrderMetricsListener@Slf4j

@Service

public class OrderMetricsListener {

@EventListener

public void metrics(PlaceOrderEvent event) {

log.info("[afterPlaceOrder] metrics");

}}

OrderNotifyListener@Slf4j

@Service

public class OrderNotifyListener{

@EventListener

public void notify(PlaceOrderEvent event) {

log.info("[afterPlaceOrder] notify.");

}}

异步和自定义线程池

异步执行

异步执行也非常简单,使用Spring的异步注解@Async就可以了。例如:

OrderLogListener

@Slf4j@Servicepublic class OrderLogListener  {@EventListener@Asyncpublic void orderLog(PlaceOrderEvent event) {log.info("[afterPlaceOrder] log.");    }}

当然,还需要开启异步,SpringBoot项目默认是没有开启异步的,我们需要手动配置开启异步功能,很简单,只需要在配置类上加上@EnableAsync注解就行了,这个注解用于声明启用Spring的异步方法执行功能,需要和@Configuration注解一起使用,也可以直接加在启动类上。

@SpringBootApplication@EnableAsyncpublic class DailyApplication {publicstaticvoidmain(String[] args) {SpringApplication.run(DairlyLearnApplication.class, args);    }}

自定义线程池

使用@Async的时候,一般都会自定义线程池,因为@Async的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。

自定义线程池有三种方式:

事件监听者实现

实现ApplicationListener接口

实现ApplicationListener接口,重写onApplicationEvent方法,将类定义为Bean,这样,一个监听者就完成了。

OrderLogListener

@Slf4j@Servicepublic class OrderLogListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] log.");    }}

OrderMetricsListener

@Slf4j@Servicepublic class OrderMetricsListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] metrics");    }}

OrderNotifyListener

@Slf4j@Servicepublic class OrderNotifyListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] notify.");    }}

使用@EventListener注解

使用@EventListener注解就更简单了,直接在方法上,加上@EventListener注解就行了。

OrderLogListener@Slf4j

@Service

public class OrderLogListener {

@EventListener

public void orderLog(PlaceOrderEvent event) {

log.info("[afterPlaceOrder] log.");

}}

OrderMetricsListener@Slf4j

@Service

public class OrderMetricsListener {

@EventListener

public void metrics(PlaceOrderEvent event) {

log.info("[afterPlaceOrder] metrics");

}}

OrderNotifyListener@Slf4j

@Service

public class OrderNotifyListener{

@EventListener

public void notify(PlaceOrderEvent event) {

log.info("[afterPlaceOrder] notify.");

}}

异步和自定义线程池

异步执行

异步执行也非常简单,使用Spring的异步注解@Async就可以了。例如:

OrderLogListener

@Slf4j@Servicepublic class OrderLogListener  {@EventListener@Asyncpublic void orderLog(PlaceOrderEvent event) {log.info("[afterPlaceOrder] log.");    }}

当然,还需要开启异步,SpringBoot项目默认是没有开启异步的,我们需要手动配置开启异步功能,很简单,只需要在配置类上加上@EnableAsync注解就行了,这个注解用于声明启用Spring的异步方法执行功能,需要和@Configuration注解一起使用,也可以直接加在启动类上。

@SpringBootApplication@EnableAsyncpublic class DailyApplication {publicstaticvoidmain(String[] args) {SpringApplication.run(DairlyLearnApplication.class, args);    }}

自定义线程池

使用@Async的时候,一般都会自定义线程池,因为@Async的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。

自定义线程池有三种方式:

测试

最后,我们还是测试一下。

@TestvoidplaceOrder(){        PlaceOrderReqVO placeOrderReqVO =newPlaceOrderReqVO();        orderService.placeOrder(placeOrderReqVO);    }

执行结果

2022-11-0810:05:14.415INFO22674---[          main]c.f.o.event.event.OrderServiceImpl:[placeOrder]start.2022-11-0810:05:14.424INFO22674---[          main]c.f.o.event.event.OrderServiceImpl:[placeOrder]end.2022-11-0810:05:14.434INFO22674---[sync-executor-3]c.f.o.event.event.OrderNotifyListener:[afterPlaceOrder]notify.2022-11-0810:05:14.435INFO22674---[sync-executor-2]c.f.o.event.event.OrderMetricsListener:[afterPlaceOrder]metrics2022-11-0810:05:14.436INFO22674---[sync-executor-1]c.f.o.event.event.OrderLogListener:[afterPlaceOrder]log.

可以看到,异步执行,而且用到了我们自定义的线程池。

小结

这篇文章里,从最开始自己实现的观察者模式,再到利用Spring简化的观察者模式,再到使用Spring Event实现发布/订阅模式,可以看到,Spring Event用起来还是比较简单的。除此之外,还有Guava EventBus这样的事件驱动实现,大家更习惯使用哪种呢?

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容