当前位置: 首页 > news >正文

SpringBoot ApplicationListener实现发布订阅模式

文章目录

  • 前言
  • 一、Spring对JDK的扩展
  • 二、快速实现发布订阅模式


前言

发布订阅模式(Publish-Subscribe Pattern)通常又称观察者模式,它被广泛应用于事件驱动架构中。即一个事件的发布,该行为会通过同步或者异步的方式告知给订阅该事件的订阅者。JDK中提供了EventListener作为所有订阅者的接口规范(即所有的订阅者都应该实现该接口),而EventObject则作为所有事件发布者的实现规范(即所有事件发布者都应该继承该类)。对于观察者的原理不是本章讨论的重点,本章只是演示如何在SpringBoot中实现发布订阅模式。


一、Spring对JDK的扩展

Spring中,提供了接口ApplicationListener作为Spring观察者(也叫监听者)的实现规范,ApplicationListener其实是对JDKEventListener中的扩展,增加了onApplicationEvent方法作为触发监听的方法。而事件发布对象ApplicationEvent也是继承了JDK中的EventObject类,仅仅增加了参数timestamp用于记录事件创建的时间。也就是说如果要使用Spring提供的发布订阅模式,您的监听器应该实现ApplicationListener接口,通过onApplicationEvent方法获取监听的内容。事件则必须继承ApplicationEvent

ApplicationListener源码:

@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {/*** Handle an application event.* @param event the event to respond to*/void onApplicationEvent(E event);}

ApplicationEvent源码:

public abstract class ApplicationEvent extends EventObject {/** use serialVersionUID from Spring 1.2 for interoperability. */private static final long serialVersionUID = 7099057708183571937L;/** System time when the event happened. */private final long timestamp;/*** Create a new {@code ApplicationEvent}.* @param source the object on which the event initially occurred or with* which the event is associated (never {@code null})*/public ApplicationEvent(Object source) {super(source);this.timestamp = System.currentTimeMillis();}/*** Return the system time in milliseconds when the event occurred.*/public final long getTimestamp() {return this.timestamp;}}

二、快速实现发布订阅模式

配置线程池是必要的,因为发布订阅模式的一个好处就是可以实现解耦,而解耦最好的方式就是采用异步线程处理。如果我们不配置线程池,则在spring中默认会采用同步的方式进行消息发布和订阅消费。这样一来就没有任何意义了。首先在yaml或者properties中配置线程池信息:

thread:executor:corePoolSize: 8 #核心线程keepAliveSeconds: 30000 # 活跃时间maxPoolSize: 16 #最大线程数queueCapacity: 100000 #最大队列长度

然后通过配置文件读取配置信息,创建线程池并注入IOC容器

package com.hl.by.common.thread;import com.hl.by.common.utils.JsonUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ErrorHandler;import java.util.concurrent.ThreadPoolExecutor;/*** @Author: DI.YIN* @Date: 2024/3/6 9:39* @Version: 1.0.0* @Description: 线程池配置**/
@Configuration
public class ThreadPoolTaskExecutorConfig {//核心线程@Value("${thread.executor.corePoolSize}")private Integer corePoolSize;//存活时间@Value("${thread.executor.keepAliveSeconds}")private Integer keepAliveSeconds;//最大线程数@Value("${thread.executor.maxPoolSize}")private Integer maxPoolSize;//最大队列长度@Value("${thread.executor.queueCapacity}")private Integer queueCapacity;@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);threadPoolTaskExecutor.setCorePoolSize(corePoolSize);threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);threadPoolTaskExecutor.setQueueCapacity(queueCapacity);//设置拒绝策略,直接运行,不采用异步threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());threadPoolTaskExecutor.setThreadNamePrefix("Thread-Pool-Task-");return threadPoolTaskExecutor;}@DependsOn(value = "taskExecutor")@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)public SimpleApplicationEventMulticaster eventMulticaster() {SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());//设置错误处理器simpleApplicationEventMulticaster.setErrorHandler(new ErrorHandler() {@Overridepublic void handleError(Throwable throwable) {System.out.println("抛出异常:" + JsonUtils.writeObjectAsBeautifulJson(throwable));}});return simpleApplicationEventMulticaster;}
}

可以看到除了注入线程池之外,还注入了自定义的SimpleApplicationEventMulticaster 对象并将创建的线程池设置到SimpleApplicationEventMulticaster中。因为SimpleApplicationEventMulticaster是处理发布订阅的核心类,通过multicastEvent方法进行事件发布。可以看到multicastEvent中,循环遍历订阅该事件的所有监听器,并判断是否配置了线程池Executor,如果配置了则将发布操作扔入线程池中异步处理,否则将同步处理发布事件操作。很多情况发现我们的事件发布与监听处理是在一个线程中执行,就是因为我们未设置线程池,导致发布订阅无法异步实现。

	@Overridepublic void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));//获取线程池Executor executor = getTaskExecutor();for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {if (executor != null) {//如果配置了线程池,则放入线程池中异步处理executor.execute(() -> invokeListener(listener, event));}else {//未配置线程池,则同步处理invokeListener(listener, event);}}}

完成以上配置后,就可以定义发布者、订阅者和发布事件了。现在我们定义一个类MessageSource作为发布者发布的事件,结构如下:

import lombok.Data;/*** @Author: DI.YIN* @Date: 2024/3/6 13:41* @Version:* @Description: 消息实体**/
@Data
public class MessageSource {private String id;private String msg;private String title;
}

定义好发布事件后,我们定义一个事件发布者MessageEvent,并指定其发布的事件类型是MessageSourceMessageSource 的子类,结构如下:

import org.springframework.context.ApplicationEvent;/*** @Author: DI.YIN* @Date: 2024/3/6 13:39* @Version: 1.0.0* @Description: 消息事件**/
public class MessageEvent<T extends MessageSource> extends ApplicationEvent {/*** Create a new {@code ApplicationEvent}.** @param source the object on which the event initially occurred or with*               which the event is associated (never {@code null})*/public MessageEvent(MessageSource source) {super(source);}
}

现在已经定义好了发布事件MessageSource,事件发布者MessageEvent,此时我们可以定义一个事件订阅者MessageListener,用于监听事件发布者MessageEvent发布的事件。代码如下:

import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;/*** @Author: DI.YIN* @Date: 2024/3/6 10:19* @Version:* @Description:**/
@Component
public class MessageListener implements ApplicationListener<MessageEvent> {@Overridepublic void onApplicationEvent(MessageEvent event) {MessageSource source = (MessageSource)event.getSource();System.out.println("消息监听器监听到消息:===>"+ JSONObject.toJSONString(source));}
}

现在我们就实现了一个订阅发布模式,事件对象MessageSource,事件发布者MessageEvent专门用于发布MessageSource类型的事件,事件监听者MessageListener 则专门监听MessageEvent发布的事件。可以创建一个接口用于测试发布订阅是否成功。


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;/*** @Author: Greyfus* @Create: 2024-03-02 14:08* @Version:* @Description:*/
@RestController
@RequestMapping("/mock")
public class TestController {@Autowiredprivate ApplicationContext applicationContext;@RequestMapping(method = RequestMethod.POST, value = "/publishMessage", consumes = MediaType.APPLICATION_JSON_VALUE)public void publishMessage() throws Exception {//构建信息实体MessageSource messageSource = new MessageSource();messageSource.setId(String.valueOf(1));messageSource.setTitle("日志消息");messageSource.setMsg("调用了接口publishMessage");//构建消息事件MessageEvent<MessageSource> messageEvent = new MessageEvent(messageSource);//发布事件applicationContext.publishEvent(messageEvent);}
}

通过用postman调用接口/mock/feign可以看到MessageListener 成功接受到了MessageEvent发布的MessageSource事件。
在这里插入图片描述

相关文章:

SpringBoot ApplicationListener实现发布订阅模式

文章目录 前言一、Spring对JDK的扩展二、快速实现发布订阅模式 前言 发布订阅模式(Publish-Subscribe Pattern)通常又称观察者模式&#xff0c;它被广泛应用于事件驱动架构中。即一个事件的发布&#xff0c;该行为会通过同步或者异步的方式告知给订阅该事件的订阅者。JDK中提供…...

嵌入式学习40-数据结构

数据结构 1.定义 一组用来保存一种或者多种特定关系的 数据的集合&#xff08;组织和存储数据&#xff09; 程序的设计&#xff1a; …...

k8s集群部署elk

一、前言 本次部署elk所有的服务都部署在k8s集群中&#xff0c;服务包含filebeat、logstash、elasticsearch、kibana&#xff0c;其中elasticsearch使用集群的方式部署&#xff0c;所有服务都是用7.17.10版本 二、部署 部署elasticsearch集群 部署elasticsearch集群需要先优化…...

【Python】清理conda缓存的常用命令

最近发现磁盘空间不足&#xff0c;很大一部分都被anaconda占据了&#xff0c;下面是一些清除conda缓存的命令 清理所有环境的Anaconda包缓存 删除所有未使用的包以及缓存的索引和临时文件 conda clean --all清理某一特定环境的Anaconda包缓存 conda clean --all -n 环境名清…...

代码随想录算法训练营第46天 | 完全背包,139.单词拆分

动态规划章节理论基础&#xff1a; https://programmercarl.com/%E5%8A%A8%E6%80%81%E8%A7%84%E5%88%92%E7%90%86%E8%AE%BA%E5%9F%BA%E7%A1%80.html 完全背包理论基础&#xff1a; https://programmercarl.com/%E8%83%8C%E5%8C%85%E9%97%AE%E9%A2%98%E7%90%86%E8%AE%BA%E5%9…...

rust - 将windows剪贴板的截图保存为png

本文提供了将windows系统的截图另存为png格式图片的方法。 添加依赖 cargo add clipboard-win cargo add image cargo add windows配置修改windows依赖特性 [dependencies] image "0.25.0"[target.cfg(windows).dependencies] windows "0.51.1" clipb…...

pyflink1.18.0 报错 TypeError: cannot pickle ‘_thread.lock‘ object

完整报错 Traceback (most recent call last):File "/Users//1.py", line 851, in <module>ds1 = my_datastream.key_by(lambda x: x[0]).process(MyProcessFunction()) # 返回元组即: f0 f1 f2 三列File "/Users/thomas990p/bigdataSoft/minicondaarm/…...

算法学习系列(四十一):Flood Fill算法

目录 引言一、池塘计数二、城堡问题三、山峰和山谷 引言 关于这个 F l o o d F i l l Flood\ Fill Flood Fill 算法&#xff0c;其实我觉得就是一个 B F S BFS BFS 算法&#xff0c;模板其实都是非常相似的&#xff0c;只不过有些变形而已&#xff0c;然后又叫这个名字。关于…...

Re62:读论文 GPT-2 Language Models are Unsupervised Multitask Learners

诸神缄默不语-个人CSDN博文目录 诸神缄默不语的论文阅读笔记和分类 论文全名&#xff1a;Language Models are Unsupervised Multitask Learners 论文下载地址&#xff1a;https://cdn.openai.com/better-language-models/language_models_are_unsupervised_multitask_learner…...

stm32-编码器测速

一、编码器简介 编码电机 旋转编码器 A,B相分别接通道一和二的引脚&#xff0c;VCC&#xff0c;GND接单片机VCC&#xff0c;GND 二、正交编码器工作原理 以前的代码是通过触发外部中断&#xff0c;然后在中断函数里手动进行计次。使用编码器接口的好处就是节约软件资源。对于频…...

全国各省市县统计年鉴/中国环境统计年鉴/中国工业企业数据库/中国专利数据库/污染排放数据库

统计年鉴是指以统计图表和分析说明为主&#xff0c;通过高度密集的统计数据来全面、系统、连续地记录年度经济、社会等各方面发展情况的大型工具书来获取统计数据资料。 统计年鉴是进行各项经济、社会研究的必要前提。而借助于统计年鉴&#xff0c;则是研究者常用的途径。目前国…...

【LAMMPS学习】二、LAMMPS安装(2)MacOS和Win安装

2. LAMMPS安装 您可以将LAMMPS下载为可执行文件或源代码。 在下载LAMMPS源代码时&#xff0c;还必须构建LAMMPS。但是对于在构建中包含或排除哪些特性&#xff0c;您有更大的灵活性。当您下载并安装预编译的LAMMPS可执行文件时&#xff0c;您只能安装可用的LAMMPS版本以及这些…...

如何解决网络中IP地址发生冲突故障?

0、前言 本专栏为个人备考软考嵌入式系统设计师的复习笔记&#xff0c;未经本人许可&#xff0c;请勿转载&#xff0c;如发现本笔记内容的错误还望各位不吝赐教&#xff08;笔记内容可能有误怕产生错误引导&#xff09;。 1、个人IP地址冲突解决方案 首先winR&#xff0c;调出…...

机器学习常用框架

机器学习是人工智能的一个重要分支&#xff0c;它通过让计算机系统利用数据自我学习来改进任务执行的能力。在机器学习领域&#xff0c;有许多成熟的框架被广泛使用&#xff0c;这些框架提供了构建和训练机器学习模型的工具。以下是一些常用的机器学习框架&#xff1a; Tensor…...

计算机网络——物理层(信道复用技术)

计算机网络——物理层&#xff08;信道复用技术&#xff09; 信道复用技术频分多址与时分多址 频分复用 FDM (Frequency Division Multiplexing)时分复用 TDM (Time Division Multiplexing)统计时分复用 STDM (Statistic TDM)波分复用码分复用 我们今天接着来看信道复用技术&am…...

【Qt问题】使用QSlider创建滑块小部件无法显示

问题描述&#xff1a; 使用QSlider创建滑块小部件用于音量按钮的时候&#xff0c;无法显示&#xff0c;很奇怪&#xff0c;怎么都不显示 一直是这个效果&#xff0c;运行都没问题&#xff0c;但是就是不出现。 一直解决不了&#xff0c;最后我在无意中&#xff0c;在主程序中…...

Linux--Shell脚本安装 httpd 和 修改IP

shell脚本 关闭防火墙、安装httpd、启动httpd [rootnode11 ~]# mkdir shell[rootnode11 ~]# vim abc.sh #!/bin/bash#安装httpd服务#1、挂载 准备yum源 mount /dev/sr0 /mnt &> /dev/nulldf$(df -h | grep /dev/sr0 | awk {print $6})if [ "$df" "/mn…...

mysql 常见问题

1、count(*) 、 count(1) 和 count&#xff08;字段&#xff09;区别 在MySQL中&#xff0c;COUNT(*)、COUNT(1) 和 COUNT(字段) 是用于统计行数的函数&#xff0c;它们的主要区别在于&#xff1a; COUNT(*)&#xff1a;会统计符合条件的所有行的数量&#xff0c;不管这些行中…...

考研机试题

目录 头文件与STL动态规划最大数组子串和最长公共子序列最长连续公共子串最长递增子序列最大上升子序列和0-1背包多重背包多重背包问题 I整数拆分最小邮票最大子矩阵 数学问题朴素法筛素数线性筛素数快速幂 石子合并锯木棍并查集Dijkstra单源最短路Python进制转换(整数无限大)全…...

Java基础知识总结(6)

String类中常用的类方法&#xff1a; 方法名称描述format(String format, Object... args)使用指定的格式字符串和参数返回一个格式化字符串。 format - 格式字符串 args - 格式字符串中由格式说明符引用的参数。如果还有格式说明符以外的参数&#xff0c;则忽略这些额外的参数…...

智慧医疗泡罩药板药片缺失缺陷检测数据集VOC+YOLO格式1300张3类别

注意数据集中图片大约500张是原图剩余为增强图片数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件)图片数量(jpg文件个数)&#xff1a;1300标注数量(xml文件个数)&#xff1a;1300…...

DeepSeek-OCR-2实战教程:OCR结果JSON Schema解析与结构化数据入库指南

DeepSeek-OCR-2实战教程&#xff1a;OCR结果JSON Schema解析与结构化数据入库指南 1. 项目简介 DeepSeek-OCR-2是基于深度学习的智能文档解析工具&#xff0c;专门针对结构化文档内容提取而设计。与传统的OCR工具只能提取纯文本不同&#xff0c;这个工具能够精准识别文档的排…...

LyricsX:突破平台限制,重构macOS歌词体验的开源解决方案

LyricsX&#xff1a;突破平台限制&#xff0c;重构macOS歌词体验的开源解决方案 【免费下载链接】LyricsX &#x1f3b6; Ultimate lyrics app for macOS. 项目地址: https://gitcode.com/gh_mirrors/ly/LyricsX 在流媒体音乐蓬勃发展的今天&#xff0c;音乐爱好者们却常…...

Mac能够连接校园网,但是无法上网

Mac电脑能够正常连接校园网&#xff0c;但是无法上网解决步骤&#xff1a;打开系统设置&#xff0c;网络&#xff0c;WI-FI&#xff0c;DNS把现有的删掉重置它。原因分析&#xff1a;应该是在使用代理时、访问什么网站被自动篡改了 DNS 设置&#xff0c;导致连接的 DNS 无法解析…...

Wan2.2-T2V-A5B提示词怎么写?新手快速出效果的实用指南

Wan2.2-T2V-A5B提示词怎么写&#xff1f;新手快速出效果的实用指南 1. 认识Wan2.2-T2V-A5B视频生成模型 Wan2.2-T2V-A5B是一款由通义万相开源的轻量级文本到视频生成模型&#xff0c;拥有50亿参数规模。虽然它生成的视频分辨率是480P&#xff0c;但在时序连贯性和运动推理能力…...

ISIS实验1

ISIS实验1网络拓扑配置一、AR1二、AR2三、测试1. 查看 IS-IS 邻居状态2. 查看 IS-IS 接口信息3. 查看 IS-IS 路由表4. 查看 IP 路由表中的 IS-IS 路由5. 查看链路状态数据库&#xff08;LSDB&#xff09;6. 检查&#xff1a;Level-1 区域一致性四、AR3五、AR4六、检测1. 通过链…...

MAX17332 Arduino库详解:单节锂电池燃料计量与独立充电控制

1. 项目概述 MAX17332 是 Maxim Integrated&#xff08;现为 Analog Devices&#xff09;推出的一款高度集成的单节锂离子/锂聚合物电池管理芯片&#xff0c;专为紧凑型便携设备设计。它并非传统意义上的“纯BMS”&#xff08;Battery Management System&#xff09;&#xff0…...

使用现代 Java 技术栈构建企业级 AI 应用

使用现代 Java 技术栈构建企业级 AI 应用 引言 随着人工智能技术的快速发展&#xff0c;企业级 AI 应用的需求也迅速增长。Java 作为一门成熟的企业级编程语言&#xff0c;其生态系统在 AI 应用开发中扮演着重要角色。本文将探讨如何利用 Java 技术栈构建生产级 AI 应用&#x…...

玩转AI!用FastAPI+RAG轻松构建智能文档问答系统,代码、文档全公开!

在企业数字化转型的浪潮中&#xff0c;我们常遇到这样一个痛点&#xff1a;海量的业务文档、研究报告、技术手册堆积如山&#xff0c;当需要从中寻找某个特定答案时&#xff0c;员工往往要花费数小时甚至数天进行翻阅。这不仅是效率的浪费&#xff0c;更是知识资产沉睡的体现**…...

AceMenu:嵌入式轻量级菜单框架设计与实践

1. AceMenu 库概述&#xff1a;面向嵌入式人机交互的轻量级菜单框架AceMenu 是一个专为资源受限嵌入式系统设计的轻量级、可移植菜单管理库。其核心设计哲学是“以最少的硬件资源开销&#xff0c;实现最直观的用户导航体验”。不同于通用 GUI 框架&#xff08;如 LVGL 或 Touch…...