news 2026/2/14 16:46:03

SpringBoot项目实现发布订阅模式

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
SpringBoot项目实现发布订阅模式

大家好,我是老三,在项目里,经常会有一些主线业务之外的其它业务,比如,下单之后,发送通知、监控埋点、记录日志……

这些非核心业务,如果全部一梭子写下去,有两个问题,一个是业务耦合,一个是串行耗时。

下单之后的逻辑

所以,一般在开发的时候,都会把这些操作å抽象成观察者模式,也就是发布/订阅模式(这里就不讨论观察者模式和发布/订阅模式的不同),而且一般会采用多线程的方式来异步执行这些观察者方法。

观察者模式

一开始,我们都是自己去写观察者模式。

自己实现观察者模式

观察者简图

观察者

  • 观察者定义接口
/** * @Author: fighter3 * @Description: 观察者接口 * @Date: 2022/11/7 11:40 下午 */ public interface OrderObserver { void afterPlaceOrder(PlaceOrderMessage placeOrderMessage); }
  • 具体观察者@Slf4j
    public class OrderMetricsObserver implements OrderObserver {
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
    log.info("[afterPlaceOrder] metrics");
    }}@Slf4j
    public class OrderLogObserver implements OrderObserver{
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
    log.info("[afterPlaceOrder] log.");
    }}@Slf4j
    public class OrderNotifyObserver implements OrderObserver{
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
    log.info("[afterPlaceOrder] notify.");
    }}
    • 业务通知观察者
    • 日志记录观察者
    • 监控埋点观察者

被观察者

  • 消息实体定义
@Data public class PlaceOrderMessage implements Serializable { /** * 订单号 */ private String orderId; /** * 订单状态 */ private Integer orderStatus; /** * 下单用户ID */ private String userId; //…… }
  • 被观察者抽象类
public abstract class OrderSubject { //定义一个观察者列表 private List<OrderObserver> orderObserverList = new ArrayList<>(); //定义一个线程池,这里参数随便写的 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30)); //增加一个观察者 public void addObserver(OrderObserver o) { this.orderObserverList.add(o); } //删除一个观察者 public void delObserver(OrderObserver o) { this.orderObserverList.remove(o); } //通知所有观察者 public void notifyObservers(PlaceOrderMessage placeOrderMessage) { for (OrderObserver orderObserver : orderObserverList) { //利用多线程异步执行 threadPoolExecutor.execute(() -> { orderObserver.afterPlaceOrder(placeOrderMessage); }); } } }

这里利用了多线程,来异步执行观察者。

  • 被观察者实现类
/** * @Author: fighter3 * @Description: 订单实现类-被观察者实现类 * @Date: 2022/11/7 11:52 下午 */ @Service @Slf4j public class OrderServiceImpl extends OrderSubject implements OrderService { /** * 下单 */ @Override public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) { PlaceOrderResVO resVO = new PlaceOrderResVO(); //添加观察者 this.addObserver(new OrderMetricsObserver()); this.addObserver(new OrderLogObserver()); this.addObserver(new OrderNotifyObserver()); //通知观察者 this.notifyObservers(new PlaceOrderMessage()); log.info("[placeOrder] end."); return resVO; } }

测试

@Test @DisplayName("下单") void placeOrder() { PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO(); orderService.placeOrder(placeOrderReqVO); }
  • 测试执行结果
2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver : [afterPlaceOrder] metrics 2022-11-08 00:11:13.618 INFO 20235 --- [ main] cn.fighter3.obverser.OrderServiceImpl : [placeOrder] end. 2022-11-08 00:11:13.618 INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver : [afterPlaceOrder] notify. 2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver : [afterPlaceOrder] log.

可以看到,观察者是异步执行的。

利用Spring精简

可以看到,观察者模式写起来还是比较简单的,但是既然都用到了Spring来管理Bean的生命周期,代码还可以更精简一些。

Spring精简观察者模式

观察者实现类:定义成Bean

  • OrderLogObserver@Slf4j
    @Service
    public class OrderLogObserver implements OrderObserver {
    @Override
    public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {
    log.info("[afterPlaceOrder] log.");
    }}
  • OrderMetricsObserver
@Slf4j @Service public class OrderMetricsObserver implements OrderObserver { @Override public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) { log.info("[afterPlaceOrder] metrics"); } }
  • OrderNotifyObserver
@Slf4j @Service public class OrderNotifyObserver implements OrderObserver { @Override public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) { log.info("[afterPlaceOrder] notify."); } }

被观察者:自动注入Bean

  • OrderSubjectpublic abstract class OrderSubject {
    /**
    * 利用Spring的特性直接注入观察者*/

    @Autowired
    protected List<OrderObserver> orderObserverList;
    //定义一个线程池,这里参数随便写的
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
    //通知所有观察者
    public void notifyObservers(PlaceOrderMessage placeOrderMessage) {
    for (OrderObserver orderObserver : orderObserverList) {
    //利用多线程异步执行
    threadPoolExecutor.execute(() -> {orderObserver.afterPlaceOrder(placeOrderMessage);});}}}
  • OrderServiceImpl
@Service @Slf4j public class OrderServiceImpl extends OrderSubject implements OrderService { /** * 实现类里也要注入一下 */ @Autowired private List<OrderObserver> orderObserverList; /** * 下单 */ @Override public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) { PlaceOrderResVO resVO = new PlaceOrderResVO(); //通知观察者 this.notifyObservers(new PlaceOrderMessage()); log.info("[placeOrder] end."); return resVO; } }

这样一来,发现被观察者又简洁了很多,但是后来我发现,在SpringBoot项目里,利用Spring事件驱动驱动模型(event)模型来实现,更加地简练。

Spring Event实现发布/订阅模式

Spring Event对发布/订阅模式进行了封装,使用起来更加简单,还是以我们这个场景为例,看看怎么来实现吧。

自定义事件

  • PlaceOrderEvent:继承ApplicationEvent,并重写构造函数。ApplicationEvent是Spring提供的所有应用程序事件扩展类。
public class PlaceOrderEvent extends ApplicationEvent { public PlaceOrderEvent(PlaceOrderEventMessage source) { super(source); } }
  • PlaceOrderEventMessage:事件消息,定义了事件的消息体。
@Data public class PlaceOrderEventMessage implements Serializable { /** * 订单号 */ private String orderId; /** * 订单状态 */ private Integer orderStatus; /** * 下单用户ID */ private String userId; //…… }

事件监听者

事件监听者,有两种实现方式,一种是实现ApplicationListener接口,另一种是使用@EventListener注解。

事件监听者实现

实现ApplicationListener接口

实现ApplicationListener接口,重写onApplicationEvent方法,将类定义为Bean,这样,一个监听者就完成了。

  • OrderLogListener
@Slf4j @Service public class OrderLogListener implements ApplicationListener<PlaceOrderEvent> { @Override public void onApplicationEvent(PlaceOrderEvent event) { log.info("[afterPlaceOrder] log."); } }
  • OrderMetricsListener
@Slf4j @Service public class OrderMetricsListener implements ApplicationListener<PlaceOrderEvent> { @Override public void onApplicationEvent(PlaceOrderEvent event) { log.info("[afterPlaceOrder] metrics"); } }
  • OrderNotifyListener
@Slf4j @Service public class OrderNotifyListener implements ApplicationListener<PlaceOrderEvent> { @Override public void onApplicationEvent(PlaceOrderEvent event) { log.info("[afterPlaceOrder] notify."); } }

使用@EventListener注解

使用@EventListener注解就更简单了,直接在方法上,加上@EventListener注解就行了。

  • OrderLogListener@Slf4j
    @Service
    public class OrderLogListener {
    @EventListener
    public void orderLog(PlaceOrderEvent event) {
    log.info("[afterPlaceOrder] log.");
    }}
  • OrderMetricsListener@Slf4j
    @Service
    public class OrderMetricsListener {
    @EventListener
    public void metrics(PlaceOrderEvent event) {
    log.info("[afterPlaceOrder] metrics");
    }}
  • OrderNotifyListener@Slf4j
    @Service
    public class OrderNotifyListener{
    @EventListener
    public void notify(PlaceOrderEvent event) {
    log.info("[afterPlaceOrder] notify.");
    }}

异步和自定义线程池

异步执行

异步执行也非常简单,使用Spring的异步注解@Async就可以了。例如:

  • OrderLogListener
@Slf4j @Service public class OrderLogListener { @EventListener @Async public void orderLog(PlaceOrderEvent event) { log.info("[afterPlaceOrder] log."); } }

当然,还需要开启异步,SpringBoot项目默认是没有开启异步的,我们需要手动配置开启异步功能,很简单,只需要在配置类上加上@EnableAsync注解就行了,这个注解用于声明启用Spring的异步方法执行功能,需要和@Configuration注解一起使用,也可以直接加在启动类上。

@SpringBootApplication @EnableAsync public class DailyApplication { public static void main(String[] args) { SpringApplication.run(DairlyLearnApplication.class, args); } }

自定义线程池

使用@Async的时候,一般都会自定义线程池,因为@Async的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。

自定义线程池有三种方式:

@Async自定义线程池

  • 实现接口AsyncConfigurer
  • 继承AsyncConfigurerSupport
  • 配置由自定义的TaskExecutor替代内置的任务执行器

我们来看看三种写法:

  • 实现接口AsyncConfigurer
@Configuration @Slf4j public class AsyncConfiguration implements AsyncConfigurer { @Bean("fighter3AsyncExecutor") public ThreadPoolTaskExecutor executor() { //Spring封装的一个线程池 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //随便写的一些配置 executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(30); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix("fighter3AsyncExecutor-"); executor.initialize(); return executor; } @Override public Executor getAsyncExecutor() { return executor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex); } }
  • 继承AsyncConfigurerSupport
@Configuration @Slf4j public class SpringAsyncConfigurer extends AsyncConfigurerSupport { @Bean public ThreadPoolTaskExecutor asyncExecutor() { ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); //随便写的一些配置 threadPool.setCorePoolSize(10); threadPool.setMaxPoolSize(30); threadPool.setWaitForTasksToCompleteOnShutdown(true); threadPool.setAwaitTerminationSeconds(60 * 15); return threadPool; } @Override public Executor getAsyncExecutor() { return asyncExecutor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error(String.format("[async] task{} error:", method), ex); } }
  • 配置自定义的TaskExecutor@Slf4j
    @Service
    public class OrderLogListener {
    @EventListener
    @Async("asyncExecutor")
    public void orderLog(PlaceOrderEvent event) {
    log.info("[afterPlaceOrder] log.");
    }}
    • 配置线程池@Configuration
      public class TaskPoolConfig {
      @Bean(name = "asyncExecutor")
      public Executor taskExecutor() {
      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
      //随便写的一些配置
      executor.setCorePoolSize(10);
      executor.setMaxPoolSize(20);
      executor.setQueueCapacity(200);
      executor.setKeepAliveSeconds(60);
      executor.setThreadNamePrefix("asyncExecutor-");
      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
      return executor;
      }}
    • 使用@Async注解的时候,指定线程池,推荐使用这种方式,因为在项目里,尽量做到线程池隔离,不同的任务使用不同的线程池

异步和自定义线程池这一部分只是一些扩展,稍微占了一些篇幅,大家可不要觉得Spring Event用起来很繁琐。

发布事件

发布事件也非常简单,只需要使用Spring 提供的ApplicationEventPublisher来发布自定义事件。

  • OrderServiceImpl@Service
    @Slf4j
    public class OrderServiceImpl implements OrderService {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    /**
    * 下单*/

    @Override
    public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {
    log.info("[placeOrder] start.");
    PlaceOrderResVO resVO = new PlaceOrderResVO();
    //消息
    PlaceOrderEventMessage eventMessage = new PlaceOrderEventMessage();
    //发布事件
    applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage));
    log.info("[placeOrder] end.");
    return resVO;
    }}

在Idea里查看事件的监听者也比较方便,点击下面图中的图标,就可以查看监听者。

查看监听者

监听者

测试

最后,我们还是测试一下。

@Test void placeOrder() { PlaceOrderReqVO placeOrderReqVO = new PlaceOrderReqVO(); orderService.placeOrder(placeOrderReqVO); }
  • 执行结果
2022-11-08 10:05:14.415 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] start. 2022-11-08 10:05:14.424 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] end. 2022-11-08 10:05:14.434 INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener : [afterPlaceOrder] notify. 2022-11-08 10:05:14.435 INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener : [afterPlaceOrder] metrics 2022-11-08 10:05:14.436 INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener : [afterPlaceOrder] log.

可以看到,异步执行,而且用到了我们自定义的线程池。

小结

这篇文章里,从最开始自己实现的观察者模式,再到利用Spring简化的观察者模式,再到使用Spring Event实现发布/订阅模式,可以看到,Spring Event用起来还是比较简单的。除此之外,还有Guava EventBus这样的事件驱动实现,大家更习惯使用哪种呢?

小结

这篇文章里,从最开始自己实现的观察者模式,再到利用Spring简化的观察者模式,再到使用Spring Event实现发布/订阅模式,可以看到,Spring Event用起来还是比较简单的。除此之外,还有Guava EventBus这样的事件驱动实现,大家更习惯使用哪种呢?

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/2/13 21:49:58

向量的正交分解和标准正交基

本篇算是前面《Gram-Schmidt 正交化过程简介》&#xff0c;《正交投影和正交拒绝》&#xff0c;《标量投影和向量投影》的一个补充或强化学习。 一&#xff64;概述 向量的正交分解 向量的正交分解&#xff08;orthogonal decomposition&#xff09;是指&#xff1a;在内积空…

作者头像 李华
网站建设 2026/2/11 11:26:15

这次终于选对!9个AI论文平台测评:本科生毕业论文写作全攻略

随着AI技术在学术领域的深度应用&#xff0c;越来越多的本科生开始借助AI工具辅助毕业论文写作。然而&#xff0c;面对市场上琳琅满目的AI论文平台&#xff0c;如何选择一款真正适合自己需求的产品成为一大难题。为此&#xff0c;我们基于2026年的实测数据与用户真实反馈&#…

作者头像 李华
网站建设 2026/2/13 19:52:18

计算机毕业设计springboot基于java的农家乐管理系统 基于SpringBoot的乡村民宿与餐饮一体化运营平台 Java Web驱动的生态农庄数字化服务系统

计算机毕业设计springboot基于java的农家乐管理系统ace53ou5&#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。随着乡村振兴战略深入推进&#xff0c;乡村旅游产业迎来爆发式增长&a…

作者头像 李华
网站建设 2026/2/13 11:11:30

参考文献崩了?千笔ai写作,MBA论文一键生成神器

你是否曾为论文的参考文献发愁&#xff1f;面对海量资料无从下手&#xff0c;查重率高得让人心慌&#xff0c;格式调整反复出错……MBA论文写作的每一步都像在闯关。你不是不够努力&#xff0c;而是缺少一个真正懂你的写作助手。千笔AI&#xff0c;专为MBA学生打造的智能写作工…

作者头像 李华