【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息
【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息Pub/Sub Messaging
- 一、发布消息Publishing (Sending Messages)
- 二、订阅消息Subscribing (Receiving Messages)
- 2.1 消息监听容器Message Listener Containers
- 2.2 消息监听适配器The MessageListenerAdapter
- 三、反应式消息监听器容器Reactive Message Listener Container
- 3.1 通过template API订阅Subscribing via template API
Spring Data为Redis提供了专用的消息集成,在功能和命名方面与Spring Framework中的JMS集成类似。
Redis消息传递大致可以分为两个功能领域:
- 消息的发布(publish)或生产
- 消息的订阅(subscribe)或消费
这是一个通常称为发布/订阅(简称Pub/Sub)的模式示例。RedisTemplate类用于消息生成。对于类似于Java EE的消息驱动bean风格的异步接收,Spring Data提供了一个专用的消息监听器容器,用于创建消息驱动的POJO(MDP),对于同步接收,还提供RedisConnection。
org.springframework.data.redis.connection 和 org.springframework.data.redis.listener包提供Redis消息传递的核心功能。
一、发布消息Publishing (Sending Messages)
要发布消息,你可以像使用其他操作一样,使用低级的RedisConnection或高级的RedisOperations。这两个实体都提供了publish方法,该方法接受消息和目标通道(channel)作为参数。RedisConnection需要原始数据(字节数组),而RedisOperations允许任意对象作为消息传递,如下面的例子所示:
// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
一个相对完整的发布例子:
package com.example.demo;import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;public class RedisPublishMessage {public JedisConnectionFactory jedisConnectionFactory() {RedisStandaloneConfiguration redisStandaloneConfiguration =new RedisStandaloneConfiguration();redisStandaloneConfiguration.setHostName("localhost");redisStandaloneConfiguration.setDatabase(0);redisStandaloneConfiguration.setPassword(RedisPassword.of("123456"));redisStandaloneConfiguration.setPort(6379);return new JedisConnectionFactory(redisStandaloneConfiguration);}public static void main(String[] args) {JedisConnectionFactory connectionFactory = new RedisApplication().jedisConnectionFactory();connectionFactory.afterPropertiesSet();RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);template.setDefaultSerializer(StringRedisSerializer.UTF_8);template.afterPropertiesSet();
// send message through RedisOperationsRedisOperations operations = template;Long numberOfClients = operations.convertAndSend("mychannel", "This is Gabriel");System.out.println(numberOfClients);}
}
二、订阅消息Subscribing (Receiving Messages)
在接收端,可以通过直接命名或使用模式(pattern)匹配来订阅一个或多个通道(channels)。后一种方法非常有用,因为它不仅允许使用一个命令创建多个订阅(subscription),而且还可以监听订阅时尚未创建的通道(只要它们与模式匹配)。
在底层,RedisConnection提供了subscribe和pSubscribe方法,它们分别映射Redis命令的以通道或模式进行订阅。前述2个方法可以使用多个通道或多个模式做为参数。为了更改连接的订阅或检查连接是否在监听,RedisConnection提供了getSubscription和isSubscribed方法。
Spring Data Redis中的Subscription命令是阻塞的。也就是说,在连接上调用subscribe会导致当前线程在开始等待消息时阻塞。只有当订阅被取消时,线程才会被释放,当另一个线程在同一连接上调用unsubscribe或pUnsubscribe时,就会发生这种情况。有关此问题的解决方案,请参阅“2.1 消息监听容器”(本文档稍后部分)。
如前所述,一旦订阅,连接就会开始等待消息。只允许添加新订阅、修改现有订阅和取消现有订阅的命令。调用除subscribe, pSubscribe, unsubscribe, 和 pUnsubscribe之外的任何操作都会引发异常。
为了订阅消息,需要实现MessageListener回调。每次新消息到达时,都会调用回调,并通过onMessage方法运行用户代码。该接口不仅可以访问实际消息,还可以访问接收该消息的通道以及用于匹配订阅的通道的模式(如果有的话)。这些信息使被调用者来区分各种消息,不仅是通过内容,还可以通过检查其他细节。
一个相对完整的订阅例子:
package com.example.demo;import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.nio.charset.StandardCharsets;public class RedisSubscribeMessage {public JedisConnectionFactory jedisConnectionFactory() {RedisStandaloneConfiguration redisStandaloneConfiguration =new RedisStandaloneConfiguration();redisStandaloneConfiguration.setHostName("localhost");redisStandaloneConfiguration.setDatabase(0);redisStandaloneConfiguration.setPassword(RedisPassword.of("123456"));redisStandaloneConfiguration.setPort(6379);return new JedisConnectionFactory(redisStandaloneConfiguration);}public static void main(String[] args) {JedisConnectionFactory connectionFactory = new RedisApplication().jedisConnectionFactory();connectionFactory.afterPropertiesSet();RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);template.setDefaultSerializer(StringRedisSerializer.UTF_8);template.afterPropertiesSet();RedisConnection redisConnection = template.getConnectionFactory().getConnection();redisConnection.subscribe(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] bytes) {// 收到消息的处理逻辑System.out.println("Receive message : " + message);}}, "mychannel".getBytes(StandardCharsets.UTF_8));}
}
2.1 消息监听容器Message Listener Containers
由于其阻塞特性,低级别订阅(RedisConnection的订阅)没有吸引力,因为它需要对每个监听器进行连接和线程管理。为了缓解这个问题,Spring Data提供了RedisMessageListenerContainer,它完成了所有繁重的工作。如果你熟悉EJB和JMS,你应该会发现这些概念很熟悉,因为它的设计尽可能地接近Spring Framework的支持及其消息驱动的POJO(MDP)。
RedisMessageListenerContainer充当消息监听器容器。它用于从Redis通道(channel)接收消息,并驱动注入其中的MessageListener实例。监听器容器负责消息接收的所有线程,并将消息分派到监听器中进行处理。消息监听器容器是MDP和消息传递提供者之间的中介,负责注册接收消息、资源获取和释放、异常转换等。这使你作为应用程序开发人员能够编写与接收消息相关联的(可能复杂的)业务逻辑,并将Redis基础设施的公式化问题委托给框架。
MessageListener还可以实现SubscriptionListener,以便在确认订阅/取消订阅时接收通知。在同步调用时,监听订阅通知可能很有用。
为了最大限度地减少应用程序占用,RedisMessageListenerContainer允许多个监听器共享一个连接和一个线程,即使它们不共享订阅。因此,无论应用程序跟踪多少监听器或通道,运行时成本在其整个生命周期中都保持不变。另外,容器允许更改运行时配置,以便在应用程序运行时添加或删除监听器,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用RedisConnection。如果所有监听器都被取消订阅,则会自动执行清理,并释放线程。
为了保证消息的异步特性,容器需要一个java.util.concurrent.Executor(或Spring的TaskExecutor)来分发消息。根据负载、监听器的数量和运行时环境,你应该调整executor以更好地满足你的需求。
2.2 消息监听适配器The MessageListenerAdapter
MessageListenerAdapter类是Spring异步消息传递支持的最后一个组件。简而言之,它允许你将几乎任何类公开为MDP(尽管有一些约束)。以下面的接口定义举例:
public interface MessageDelegate {void handleMessage(String message);void handleMessage(Map message);void handleMessage(byte[] message);void handleMessage(Serializable message);// pass the channel/pattern as wellvoid handleMessage(Serializable message, String channel);}
请注意,尽管上面的接口没有扩展MessageListener接口,但仍然可以通过使用MessageListenerAdapter类将其用作MDP。还请注意,各种消息处理方法是如何根据它们可以接收和处理的各种消息类型的内容进行强类型化的。此外,消息发送到的通道(channel)或模式(pattern)可以作为第二个String类型的参数传递给方法:
public class DefaultMessageDelegate implements MessageDelegate {// implementation elided for clarity...
}
注意上面的MessageDelegate接口的实现(上面的DefaultMessageDelegate类)是如何完全没有Redis依赖的。它确实是一个POJO,我们使用以下配置将其创建为MDP:
@Configuration
class MyConfig {// …@BeanDefaultMessageDelegate listener() {return new DefaultMessageDelegate();}@BeanMessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {return new MessageListenerAdapter(listener, "handleMessage");}@BeanRedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listener, ChannelTopic.of("chatroom"));return container;}
}
监听器主题可以是一个通道(例如,topic=“chatroom”),也可以是一种模式(例如,topic=“*room”)
前面的示例使用Redis命名空间来声明消息监听器容器,并自动将POJO注册为监听器。成熟beans的定义如下:
<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter"><constructor-arg><bean class="redisexample.DefaultMessageDelegate"/></constructor-arg>
</bean><bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="messageListeners"><map><entry key-ref="messageListener"><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="chatroom"/></bean></entry></map></property>
</bean>
每次接收到消息时,适配器都会自动且透明地在低级(low-level)格式和所需的对象类型之间执行转换(使用配置的RedisSerializer)。由方法调用引起的任何异常都将被捕获并由容器处理(默认情况下,异常将被记录)。
三、反应式消息监听器容器Reactive Message Listener Container
Spring Data提供了ReactiveRedisMessageListenerContainer,它帮助用户完成所有繁重的转换和订阅状态管理工作。
消息监听器容器本身不需要外部线程资源。它使用driver线程来发布消息。
ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));
要等待并确保正确的订阅,可以使用receiveLater方法,该方法返回Mono<Flux>。返回的Mono与内部发布者一起完成,作为完成对给定主题的订阅的结果。通过拦截onNext信号,你可以同步服务器端订阅。
ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server).flatMapMany(Function.identity()).…;
3.1 通过template API订阅Subscribing via template API
如上所述,你可以直接使用ReactiveRedisTemplate订阅通道/模式。这种方法提供了一种直接但有限的解决方案,因为你失去了在初始订阅之后添加订阅的选项。尽管如此,你仍然可以通过返回的Flux来控制消息流,例如使用take(Duration)。当读取完成、出错或取消时,所有绑定的资源将再次释放。
redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {// message processing ...
}).subscribe();
相关文章:
【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息
【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息Pub/Sub Messaging 一、发布消息Publishing (Sending Messages)二、订阅消息Subscribing (Receiving Messages)2.1 消息监听容器Message Listener Containers2.2 消息监听适配器The Message…...

list基本使用
list基本使用 构造迭代器容量访问修改 list容器底层是带头双向链表结构,可以在常数范围内在任意位置进行输入和删除,但不支持任意位置的随机访问(如不支持[ ]下标访问),下面介绍list容器的基本使用接口。 template <…...

网络原理TCP/IP(5)
文章目录 IP协议IP协议报头地址管理网段划分特殊的IP地址路由选择以太网认识MAC地址对比理解MAC地址和IP地址DNS(域名服务器) IP协议 IP协议主要完成的工作是两方面: 地址管理,使用一套地址体系,来描述互联网上每个设…...

前端JavaScript篇之JavaScript为什么要进行变量提升,它导致了什么问题?什么是尾调用,使用尾调用有什么好处?
目录 JavaScript为什么要进行变量提升,它导致了什么问题?总结 什么是尾调用,使用尾调用有什么好处?总结 JavaScript为什么要进行变量提升,它导致了什么问题? 变量提升是JavaScript在代码执行之前对变量和函…...
React和Vue实现路由懒加载
React实现路由懒加载: React官方提供了React.lazy()函数来实现路由的懒加载。使用React.lazy()函数需要配合React的Suspense组件来使用。 首先,使用React.lazy()函数动态导入组件,例如: const Home React.lazy(() > import(…...

ReactNative实现的横向滑动条
OK,我们先看下效果图 注意使用到了两个库 1.react-native-linear-gradient 2.react-native-gesture-handler ok,我们看下面的代码 import {Image, TouchableWithoutFeedback, StyleSheet, View} from react-native; import LinearGradient from reac…...

华为自动驾驶干不过特斯拉?
文 | AUTO芯球 作者 | 李诞 什么? 华为的智能驾驶方案干不过蔚小理? 特斯拉的智能驾驶[FSD]要甩中国车企几条街? 这华为问界阿维塔刚刚推送“全国都能开”的城区“无图 NCA” 就有黑子来喷了 这是跪久了站不起来了吧 作为玩车14年&…...
docker容器stop流程
从API route开始看StopContainer接口的调用过程。 // NewRouter initializes a new container router func NewRouter(b Backend, decoder httputils.ContainerDecoder) router.Router {r : &containerRouter{backend: b,decoder: decoder,}r.initRoutes()return r } ... …...
生产环境_Spark接收传入的sql并替换sql中的表名与解析_非常NB
背景 开发时遇到一个较为复杂的周期需求,为了适配读取各种数据库中的数据并将数据库数据转换为DataFrame并进行后续的开发分析工作,做了如下代码。 在爷们开发这段生产中的代码,可适配mysql,hive,hbase,gbase等等…...

【issue-YOLO】自定义数据集训练YOLO-v7 Segmentation
1. 拉取代码创建环境 执行nvidia-smi验证cuda环境是否可用;拉取官方代码; clone官方代码仓库 git clone https://github.com/WongKinYiu/yolov7;从main分支切换到u7分支 cd yolov7 && git checkout 44f30af0daccb1a3baecc5d80eae229…...

【八大排序】选择排序 | 堆排序 + 图文详解!!
📷 江池俊: 个人主页 🔥个人专栏: ✅数据结构冒险记 ✅C语言进阶之路 🌅 有航道的人,再渺小也不会迷途。 文章目录 一、选择排序1.1 基本思想1.2 算法步骤 动图演示1.3 代码实现1.4 选择排序特性总结 二…...

C语言贪吃蛇详解
个人简介:双非大二学生 个人博客:Monodye 今日鸡汤:人生就像一盒巧克力,你永远不知道下一块是什么味的 C语言基础刷题:牛客网在线编程_语法篇_基础语法 (nowcoder.com) 一.贪吃蛇游戏背景 贪吃蛇是久负盛名的游戏&…...
go使用gopprof分析内存泄露
假设我们使用的是比如beego这样的网络框架,我们可以这样加代码来使用gopprof来进行内存泄露分析: beego框架加gopprof分析代码: 1.先在router.go里添加路由信息: beego.Router("/debug/pprof", &controllers.ProfController{}) beego.Router("/debug…...

uniapp中组件库Mask 遮罩层 的使用方法
目录 #平台差异说明 #基本使用 #嵌入内容 #遮罩样式 #API #Props #Events #Slot 创建一个遮罩层,用于强调特定的页面元素,并阻止用户对遮罩下层的内容进行操作,一般用于弹窗场景 #平台差异说明 AppH5微信小程序支付宝小程序百度小程…...

【数据结构与算法】(7)基础数据结构之双端队列的链表实现、环形数组实现示例讲解
目录 2.6 双端队列1) 概述2) 链表实现3) 数组实现习题E01. 二叉树 Z 字层序遍历-Leetcode 103 2.6 双端队列 1) 概述 双端队列、队列、栈对比 定义特点队列一端删除(头)另一端添加(尾)First In First Out栈一端删除和添加&…...
2024 高级前端面试题之 前端工程相关 「精选篇」
该内容主要整理关于 前端工程相关模块的相关面试题,其他内容面试题请移步至 「最新最全的前端面试题集锦」 查看。 前端工程相关模块精选篇 1. webpack的基本配置2. webpack高级配置3. webpack性能优化-构建速度4. webpack性能优化-产出代码(线上运行&am…...

CSS常用属性
CSS常用属性 1. 像素的概念 概念:我们的电脑屏幕是,是由一个一个“小点”组成的,每个“小点”,就是一个像素(px)。规律:像素点越小,呈现的内容就越清晰、越细腻。 注意点ÿ…...

AI新宠Arc浏览器真可以取代Chrome吗?
每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…...

基于Java (spring-boot)的实验室管理系统
一、项目介绍 普通用户: 1.登录,注册 2.查看实验室列表信息 3.实验室预约 4.查看预约进度并取消 5.查看公告 6.订阅课程 7.实验室报修 8.修改个人信息 教师登录: 1.查看并审核预约申请 2.查看已审核预约并导出到excel 3.实验室设备管理,报修 …...

Android用setRectToRect实现Bitmap基于Matrix矩阵scale缩放RectF动画,Kotlin(一)
Android用setRectToRect实现Bitmap基于Matrix矩阵scale缩放RectF动画,Kotlin(一) 基于Matrix,控制Bitmap的setRectToRect的目标RectF的宽高。从很小的宽高开始,不断迭代增加setRectToRect的目标RectF的宽高,…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例
文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
JAVA后端开发——多租户
数据隔离是多租户系统中的核心概念,确保一个租户(在这个系统中可能是一个公司或一个独立的客户)的数据对其他租户是不可见的。在 RuoYi 框架(您当前项目所使用的基础框架)中,这通常是通过在数据表中增加一个…...

springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机
这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机,因为在使用过程中发现 Airsim 对外部监控相机的描述模糊,而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置,最后在源码示例中找到了,所以感…...

Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...