在项目里,经常会有一些主线业务之外的其它业务,比如,下单之后,发送通知、监控埋点、记录日志……
这些非核心业务,如果全部一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。
下单之后的逻辑
所以,一般在开发的时候,都会把这些操作å抽象成观察者模式,也就是发布/订阅模式(这里就不讨论观察者模式和发布/订阅模式的不同),而且一般会采用多线程的方式来异步执行这些观察者方法。
观察者模式
一开始,我们都是自己去写观察者模式。
自己实现观察者模式
观察者简图
观察者
观察者定义接口
/** *@Author: fighter3 *@Description: 观察者接口 *@Date: 2022/11/7 11:40 下午 */publicinterfaceOrderObserver{voidafterPlaceOrder(PlaceOrderMessage placeOrderMessage);}
具体观察者@Slf4j
public class OrderMetricsObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] metrics");
}}@Slf4j
public class OrderLogObserver implements OrderObserver{
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] log.");
}}@Slf4j
public class OrderNotifyObserver implements OrderObserver{
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] notify.");
}}
业务通知观察者
日志记录观察者
监控埋点观察者
被观察者
消息实体定义
@DatapublicclassPlaceOrderMessageimplementsSerializable {/**
* 订单号
*/privateStringorderId;/**
* 订单状态
*/privateInteger orderStatus;/**
* 下单用户ID
*/privateStringuserId;//……}
被观察者抽象类
publicabstractclassOrderSubject{//定义一个观察者列表privateList orderObserverList =newArrayList<>();//定义一个线程池,这里参数随便写的ThreadPoolExecutor threadPoolExecutor =newThreadPoolExecutor(6,12,6, TimeUnit.SECONDS,newArrayBlockingQueue<>(30));//增加一个观察者publicvoidaddObserver(OrderObserver o){this.orderObserverList.add(o); }//删除一个观察者publicvoiddelObserver(OrderObserver o){this.orderObserverList.remove(o); }//通知所有观察者publicvoidnotifyObservers(PlaceOrderMessage placeOrderMessage){for(OrderObserver orderObserver : orderObserverList) {//利用多线程异步执行threadPoolExecutor.execute(() -> { orderObserver.afterPlaceOrder(placeOrderMessage); }); } }}
这里利用了多线程,来异步执行观察者。
被观察者实现类
/** *@Author: fighter3 *@Description: 订单实现类-被观察者实现类 *@Date: 2022/11/7 11:52 下午 */@Service@Slf4jpublicclassOrderServiceImplextendsOrderSubjectimplementsOrderService{/**
* 下单
*/@OverridepublicPlaceOrderResVOplaceOrder(PlaceOrderReqVO reqVO){ PlaceOrderResVO resVO =newPlaceOrderResVO();//添加观察者this.addObserver(newOrderMetricsObserver());this.addObserver(newOrderLogObserver());this.addObserver(newOrderNotifyObserver());//通知观察者this.notifyObservers(newPlaceOrderMessage()); log.info("[placeOrder] end.");returnresVO; }}
测试
@Test@DisplayName("下单")voidplaceOrder(){ PlaceOrderReqVO placeOrderReqVO =newPlaceOrderReqVO(); orderService.placeOrder(placeOrderReqVO); }
测试执行结果
2022-11-0800:11:13.617INFO20235---[pool-1-thread-1]c.f.obverser.OrderMetricsObserver:[afterPlaceOrder]metrics2022-11-0800:11:13.618INFO20235---[ main]cn.fighter3.obverser.OrderServiceImpl:[placeOrder]end.2022-11-0800:11:13.618INFO20235---[pool-1-thread-3]c.fighter3.obverser.OrderNotifyObserver:[afterPlaceOrder]notify.2022-11-0800:11:13.617INFO20235---[pool-1-thread-2]cn.fighter3.obverser.OrderLogObserver:[afterPlaceOrder]log.
可以看到,观察者是异步执行的。
利用Spring精简
可以看到,观察者模式写起来还是比较简单的,但是既然都用到了Spring来管理Bean的生命周期,代码还可以更精简一些。
Spring精简观察者模式
观察者实现类:定义成Bean
OrderLogObserver@Slf4j
@Service
public class OrderLogObserver implements OrderObserver {
@Override
public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
log.info("[afterPlaceOrder] log.");
}}
OrderMetricsObserver
@Slf4j@Servicepublic class OrderMetricsObserver implements OrderObserver {@Overridepublic void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {log.info("[afterPlaceOrder] metrics"); }}
OrderNotifyObserver
@Slf4j@Servicepublic class OrderNotifyObserver implements OrderObserver {@Overridepublic void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {log.info("[afterPlaceOrder] notify."); }}
被观察者:自动注入Bean
OrderSubjectpublic abstract class OrderSubject {
/**
* 利用Spring的特性直接注入观察者*/
@Autowired
protected List orderObserverList;
//定义一个线程池,这里参数随便写的
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
//通知所有观察者
public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
for (OrderObserver orderObserver : orderObserverList) {
//利用多线程异步执行
threadPoolExecutor.execute(() -> {orderObserver.afterPlaceOrder(placeOrderMessage);});}}}
OrderServiceImpl
@Service@Slf4jpublicclassOrderServiceImplextendsOrderSubjectimplementsOrderService{/**
* 实现类里也要注入一下
*/@AutowiredprivateList orderObserverList;/**
* 下单
*/@OverridepublicPlaceOrderResVOplaceOrder(PlaceOrderReqVO reqVO){ PlaceOrderResVO resVO =newPlaceOrderResVO();//通知观察者this.notifyObservers(newPlaceOrderMessage()); log.info("[placeOrder] end.");returnresVO; }}
这样一来,发现被观察者又简洁了很多,但是后来我发现,在SpringBoot项目里,利用Spring事件驱动驱动模型(event)模型来实现,更加地简练。
Spring Event实现发布/订阅模式
Spring Event对发布/订阅模式进行了封装,使用起来更加简单,还是以我们这个场景为例,看看怎么来实现吧。
自定义事件
PlaceOrderEvent:继承ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的所有应用程序事件扩展类。
publicclassPlaceOrderEventextendsApplicationEvent{publicPlaceOrderEvent(PlaceOrderEventMessage source){super(source); }}
PlaceOrderEventMessage:事件消息,定义了事件的消息体。
@DatapublicclassPlaceOrderEventMessageimplementsSerializable {/**
* 订单号
*/privateStringorderId;/**
* 订单状态
*/privateInteger orderStatus;/**
* 下单用户ID
*/privateStringuserId;//……}
事件监听者
事件监听者,有两种实现方式,一种是实现ApplicationListener接口,另一种是使用@EventListener注解。
事件监听者实现
实现ApplicationListener接口
实现ApplicationListener接口,重写onApplicationEvent方法,将类定义为Bean,这样,一个监听者就完成了。
OrderLogListener
@Slf4j@Servicepublic class OrderLogListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] log."); }}
OrderMetricsListener
@Slf4j@Servicepublic class OrderMetricsListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] metrics"); }}
OrderNotifyListener
@Slf4j@Servicepublic class OrderNotifyListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] notify."); }}
使用@EventListener注解
使用@EventListener注解就更简单了,直接在方法上,加上@EventListener注解就行了。
OrderLogListener@Slf4j
@Service
public class OrderLogListener {
@EventListener
public void orderLog(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}}
OrderMetricsListener@Slf4j
@Service
public class OrderMetricsListener {
@EventListener
public void metrics(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] metrics");
}}
OrderNotifyListener@Slf4j
@Service
public class OrderNotifyListener{
@EventListener
public void notify(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] notify.");
}}
异步和自定义线程池
异步执行
异步执行也非常简单,使用Spring的异步注解@Async就可以了。例如:
OrderLogListener
@Slf4j@Servicepublic class OrderLogListener {@EventListener@Asyncpublic void orderLog(PlaceOrderEvent event) {log.info("[afterPlaceOrder] log."); }}
当然,还需要开启异步,SpringBoot项目默认是没有开启异步的,我们需要手动配置开启异步功能,很简单,只需要在配置类上加上@EnableAsync注解就行了,这个注解用于声明启用Spring的异步方法执行功能,需要和@Configuration注解一起使用,也可以直接加在启动类上。
@SpringBootApplication@EnableAsyncpublic class DailyApplication {publicstaticvoidmain(String[] args) {SpringApplication.run(DairlyLearnApplication.class, args); }}
自定义线程池
使用@Async的时候,一般都会自定义线程池,因为@Async的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
自定义线程池有三种方式:
事件监听者实现
实现ApplicationListener接口
实现ApplicationListener接口,重写onApplicationEvent方法,将类定义为Bean,这样,一个监听者就完成了。
OrderLogListener
@Slf4j@Servicepublic class OrderLogListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] log."); }}
OrderMetricsListener
@Slf4j@Servicepublic class OrderMetricsListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] metrics"); }}
OrderNotifyListener
@Slf4j@Servicepublic class OrderNotifyListener implements ApplicationListener {@Overridepublic void onApplicationEvent(PlaceOrderEvent event) {log.info("[afterPlaceOrder] notify."); }}
使用@EventListener注解
使用@EventListener注解就更简单了,直接在方法上,加上@EventListener注解就行了。
OrderLogListener@Slf4j
@Service
public class OrderLogListener {
@EventListener
public void orderLog(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] log.");
}}
OrderMetricsListener@Slf4j
@Service
public class OrderMetricsListener {
@EventListener
public void metrics(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] metrics");
}}
OrderNotifyListener@Slf4j
@Service
public class OrderNotifyListener{
@EventListener
public void notify(PlaceOrderEvent event) {
log.info("[afterPlaceOrder] notify.");
}}
异步和自定义线程池
异步执行
异步执行也非常简单,使用Spring的异步注解@Async就可以了。例如:
OrderLogListener
@Slf4j@Servicepublic class OrderLogListener {@EventListener@Asyncpublic void orderLog(PlaceOrderEvent event) {log.info("[afterPlaceOrder] log."); }}
当然,还需要开启异步,SpringBoot项目默认是没有开启异步的,我们需要手动配置开启异步功能,很简单,只需要在配置类上加上@EnableAsync注解就行了,这个注解用于声明启用Spring的异步方法执行功能,需要和@Configuration注解一起使用,也可以直接加在启动类上。
@SpringBootApplication@EnableAsyncpublic class DailyApplication {publicstaticvoidmain(String[] args) {SpringApplication.run(DairlyLearnApplication.class, args); }}
自定义线程池
使用@Async的时候,一般都会自定义线程池,因为@Async的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
自定义线程池有三种方式:
测试
最后,我们还是测试一下。
@TestvoidplaceOrder(){ PlaceOrderReqVO placeOrderReqVO =newPlaceOrderReqVO(); orderService.placeOrder(placeOrderReqVO); }
执行结果
2022-11-0810:05:14.415INFO22674---[ main]c.f.o.event.event.OrderServiceImpl:[placeOrder]start.2022-11-0810:05:14.424INFO22674---[ main]c.f.o.event.event.OrderServiceImpl:[placeOrder]end.2022-11-0810:05:14.434INFO22674---[sync-executor-3]c.f.o.event.event.OrderNotifyListener:[afterPlaceOrder]notify.2022-11-0810:05:14.435INFO22674---[sync-executor-2]c.f.o.event.event.OrderMetricsListener:[afterPlaceOrder]metrics2022-11-0810:05:14.436INFO22674---[sync-executor-1]c.f.o.event.event.OrderLogListener:[afterPlaceOrder]log.
可以看到,异步执行,而且用到了我们自定义的线程池。
小结
这篇文章里,从最开始自己实现的观察者模式,再到利用Spring简化的观察者模式,再到使用Spring Event实现发布/订阅模式,可以看到,Spring Event用起来还是比较简单的。除此之外,还有Guava EventBus这样的事件驱动实现,大家更习惯使用哪种呢?