【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息
【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息Pub/Sub Messaging
- 一、发布消息Publishing (Sending Messages)
- 二、订阅消息Subscribing (Receiving Messages)
- 2.1 消息监听容器Message Listener Containers
- 2.2 消息监听适配器The MessageListenerAdapter
- 三、反应式消息监听器容器Reactive Message Listener Container
- 3.1 通过template API订阅Subscribing via template API
Spring Data为Redis提供了专用的消息集成,在功能和命名方面与Spring Framework中的JMS集成类似。
Redis消息传递大致可以分为两个功能领域:
- 消息的发布(publish)或生产
- 消息的订阅(subscribe)或消费
这是一个通常称为发布/订阅(简称Pub/Sub)的模式示例。RedisTemplate类用于消息生成。对于类似于Java EE的消息驱动bean风格的异步接收,Spring Data提供了一个专用的消息监听器容器,用于创建消息驱动的POJO(MDP),对于同步接收,还提供RedisConnection。
org.springframework.data.redis.connection 和 org.springframework.data.redis.listener包提供Redis消息传递的核心功能。
一、发布消息Publishing (Sending Messages)
要发布消息,你可以像使用其他操作一样,使用低级的RedisConnection或高级的RedisOperations。这两个实体都提供了publish方法,该方法接受消息和目标通道(channel)作为参数。RedisConnection需要原始数据(字节数组),而RedisOperations允许任意对象作为消息传递,如下面的例子所示:
// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
一个相对完整的发布例子:
package com.example.demo;import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;public class RedisPublishMessage {public JedisConnectionFactory jedisConnectionFactory() {RedisStandaloneConfiguration redisStandaloneConfiguration =new RedisStandaloneConfiguration();redisStandaloneConfiguration.setHostName("localhost");redisStandaloneConfiguration.setDatabase(0);redisStandaloneConfiguration.setPassword(RedisPassword.of("123456"));redisStandaloneConfiguration.setPort(6379);return new JedisConnectionFactory(redisStandaloneConfiguration);}public static void main(String[] args) {JedisConnectionFactory connectionFactory = new RedisApplication().jedisConnectionFactory();connectionFactory.afterPropertiesSet();RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);template.setDefaultSerializer(StringRedisSerializer.UTF_8);template.afterPropertiesSet();
// send message through RedisOperationsRedisOperations operations = template;Long numberOfClients = operations.convertAndSend("mychannel", "This is Gabriel");System.out.println(numberOfClients);}
}
二、订阅消息Subscribing (Receiving Messages)
在接收端,可以通过直接命名或使用模式(pattern)匹配来订阅一个或多个通道(channels)。后一种方法非常有用,因为它不仅允许使用一个命令创建多个订阅(subscription),而且还可以监听订阅时尚未创建的通道(只要它们与模式匹配)。
在底层,RedisConnection提供了subscribe和pSubscribe方法,它们分别映射Redis命令的以通道或模式进行订阅。前述2个方法可以使用多个通道或多个模式做为参数。为了更改连接的订阅或检查连接是否在监听,RedisConnection提供了getSubscription和isSubscribed方法。
Spring Data Redis中的Subscription命令是阻塞的。也就是说,在连接上调用subscribe会导致当前线程在开始等待消息时阻塞。只有当订阅被取消时,线程才会被释放,当另一个线程在同一连接上调用unsubscribe或pUnsubscribe时,就会发生这种情况。有关此问题的解决方案,请参阅“2.1 消息监听容器”(本文档稍后部分)。
如前所述,一旦订阅,连接就会开始等待消息。只允许添加新订阅、修改现有订阅和取消现有订阅的命令。调用除subscribe, pSubscribe, unsubscribe, 和 pUnsubscribe之外的任何操作都会引发异常。
为了订阅消息,需要实现MessageListener回调。每次新消息到达时,都会调用回调,并通过onMessage方法运行用户代码。该接口不仅可以访问实际消息,还可以访问接收该消息的通道以及用于匹配订阅的通道的模式(如果有的话)。这些信息使被调用者来区分各种消息,不仅是通过内容,还可以通过检查其他细节。
一个相对完整的订阅例子:
package com.example.demo;import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.jedis.JedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.nio.charset.StandardCharsets;public class RedisSubscribeMessage {public JedisConnectionFactory jedisConnectionFactory() {RedisStandaloneConfiguration redisStandaloneConfiguration =new RedisStandaloneConfiguration();redisStandaloneConfiguration.setHostName("localhost");redisStandaloneConfiguration.setDatabase(0);redisStandaloneConfiguration.setPassword(RedisPassword.of("123456"));redisStandaloneConfiguration.setPort(6379);return new JedisConnectionFactory(redisStandaloneConfiguration);}public static void main(String[] args) {JedisConnectionFactory connectionFactory = new RedisApplication().jedisConnectionFactory();connectionFactory.afterPropertiesSet();RedisTemplate<String, String> template = new RedisTemplate<>();template.setConnectionFactory(connectionFactory);template.setDefaultSerializer(StringRedisSerializer.UTF_8);template.afterPropertiesSet();RedisConnection redisConnection = template.getConnectionFactory().getConnection();redisConnection.subscribe(new MessageListener() {@Overridepublic void onMessage(Message message, byte[] bytes) {// 收到消息的处理逻辑System.out.println("Receive message : " + message);}}, "mychannel".getBytes(StandardCharsets.UTF_8));}
}
2.1 消息监听容器Message Listener Containers
由于其阻塞特性,低级别订阅(RedisConnection的订阅)没有吸引力,因为它需要对每个监听器进行连接和线程管理。为了缓解这个问题,Spring Data提供了RedisMessageListenerContainer,它完成了所有繁重的工作。如果你熟悉EJB和JMS,你应该会发现这些概念很熟悉,因为它的设计尽可能地接近Spring Framework的支持及其消息驱动的POJO(MDP)。
RedisMessageListenerContainer充当消息监听器容器。它用于从Redis通道(channel)接收消息,并驱动注入其中的MessageListener实例。监听器容器负责消息接收的所有线程,并将消息分派到监听器中进行处理。消息监听器容器是MDP和消息传递提供者之间的中介,负责注册接收消息、资源获取和释放、异常转换等。这使你作为应用程序开发人员能够编写与接收消息相关联的(可能复杂的)业务逻辑,并将Redis基础设施的公式化问题委托给框架。
MessageListener还可以实现SubscriptionListener,以便在确认订阅/取消订阅时接收通知。在同步调用时,监听订阅通知可能很有用。
为了最大限度地减少应用程序占用,RedisMessageListenerContainer允许多个监听器共享一个连接和一个线程,即使它们不共享订阅。因此,无论应用程序跟踪多少监听器或通道,运行时成本在其整个生命周期中都保持不变。另外,容器允许更改运行时配置,以便在应用程序运行时添加或删除监听器,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用RedisConnection。如果所有监听器都被取消订阅,则会自动执行清理,并释放线程。
为了保证消息的异步特性,容器需要一个java.util.concurrent.Executor(或Spring的TaskExecutor)来分发消息。根据负载、监听器的数量和运行时环境,你应该调整executor以更好地满足你的需求。
2.2 消息监听适配器The MessageListenerAdapter
MessageListenerAdapter类是Spring异步消息传递支持的最后一个组件。简而言之,它允许你将几乎任何类公开为MDP(尽管有一些约束)。以下面的接口定义举例:
public interface MessageDelegate {void handleMessage(String message);void handleMessage(Map message);void handleMessage(byte[] message);void handleMessage(Serializable message);// pass the channel/pattern as wellvoid handleMessage(Serializable message, String channel);}
请注意,尽管上面的接口没有扩展MessageListener接口,但仍然可以通过使用MessageListenerAdapter类将其用作MDP。还请注意,各种消息处理方法是如何根据它们可以接收和处理的各种消息类型的内容进行强类型化的。此外,消息发送到的通道(channel)或模式(pattern)可以作为第二个String类型的参数传递给方法:
public class DefaultMessageDelegate implements MessageDelegate {// implementation elided for clarity...
}
注意上面的MessageDelegate接口的实现(上面的DefaultMessageDelegate类)是如何完全没有Redis依赖的。它确实是一个POJO,我们使用以下配置将其创建为MDP:
@Configuration
class MyConfig {// …@BeanDefaultMessageDelegate listener() {return new DefaultMessageDelegate();}@BeanMessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {return new MessageListenerAdapter(listener, "handleMessage");}@BeanRedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listener, ChannelTopic.of("chatroom"));return container;}
}
监听器主题可以是一个通道(例如,topic=“chatroom”),也可以是一种模式(例如,topic=“*room”)
前面的示例使用Redis命名空间来声明消息监听器容器,并自动将POJO注册为监听器。成熟beans的定义如下:
<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter"><constructor-arg><bean class="redisexample.DefaultMessageDelegate"/></constructor-arg>
</bean><bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="messageListeners"><map><entry key-ref="messageListener"><bean class="org.springframework.data.redis.listener.ChannelTopic"><constructor-arg value="chatroom"/></bean></entry></map></property>
</bean>
每次接收到消息时,适配器都会自动且透明地在低级(low-level)格式和所需的对象类型之间执行转换(使用配置的RedisSerializer)。由方法调用引起的任何异常都将被捕获并由容器处理(默认情况下,异常将被记录)。
三、反应式消息监听器容器Reactive Message Listener Container
Spring Data提供了ReactiveRedisMessageListenerContainer,它帮助用户完成所有繁重的转换和订阅状态管理工作。
消息监听器容器本身不需要外部线程资源。它使用driver线程来发布消息。
ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));
要等待并确保正确的订阅,可以使用receiveLater方法,该方法返回Mono<Flux>。返回的Mono与内部发布者一起完成,作为完成对给定主题的订阅的结果。通过拦截onNext信号,你可以同步服务器端订阅。
ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server).flatMapMany(Function.identity()).…;
3.1 通过template API订阅Subscribing via template API
如上所述,你可以直接使用ReactiveRedisTemplate订阅通道/模式。这种方法提供了一种直接但有限的解决方案,因为你失去了在初始订阅之后添加订阅的选项。尽管如此,你仍然可以通过返回的Flux来控制消息流,例如使用take(Duration)。当读取完成、出错或取消时,所有绑定的资源将再次释放。
redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {// message processing ...
}).subscribe();
相关文章:
【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息
【Spring连载】使用Spring Data访问Redis(八)----发布/订阅消息Pub/Sub Messaging 一、发布消息Publishing (Sending Messages)二、订阅消息Subscribing (Receiving Messages)2.1 消息监听容器Message Listener Containers2.2 消息监听适配器The Message…...

list基本使用
list基本使用 构造迭代器容量访问修改 list容器底层是带头双向链表结构,可以在常数范围内在任意位置进行输入和删除,但不支持任意位置的随机访问(如不支持[ ]下标访问),下面介绍list容器的基本使用接口。 template <…...

网络原理TCP/IP(5)
文章目录 IP协议IP协议报头地址管理网段划分特殊的IP地址路由选择以太网认识MAC地址对比理解MAC地址和IP地址DNS(域名服务器) IP协议 IP协议主要完成的工作是两方面: 地址管理,使用一套地址体系,来描述互联网上每个设…...

前端JavaScript篇之JavaScript为什么要进行变量提升,它导致了什么问题?什么是尾调用,使用尾调用有什么好处?
目录 JavaScript为什么要进行变量提升,它导致了什么问题?总结 什么是尾调用,使用尾调用有什么好处?总结 JavaScript为什么要进行变量提升,它导致了什么问题? 变量提升是JavaScript在代码执行之前对变量和函…...
React和Vue实现路由懒加载
React实现路由懒加载: React官方提供了React.lazy()函数来实现路由的懒加载。使用React.lazy()函数需要配合React的Suspense组件来使用。 首先,使用React.lazy()函数动态导入组件,例如: const Home React.lazy(() > import(…...

ReactNative实现的横向滑动条
OK,我们先看下效果图 注意使用到了两个库 1.react-native-linear-gradient 2.react-native-gesture-handler ok,我们看下面的代码 import {Image, TouchableWithoutFeedback, StyleSheet, View} from react-native; import LinearGradient from reac…...

华为自动驾驶干不过特斯拉?
文 | AUTO芯球 作者 | 李诞 什么? 华为的智能驾驶方案干不过蔚小理? 特斯拉的智能驾驶[FSD]要甩中国车企几条街? 这华为问界阿维塔刚刚推送“全国都能开”的城区“无图 NCA” 就有黑子来喷了 这是跪久了站不起来了吧 作为玩车14年&…...
docker容器stop流程
从API route开始看StopContainer接口的调用过程。 // NewRouter initializes a new container router func NewRouter(b Backend, decoder httputils.ContainerDecoder) router.Router {r : &containerRouter{backend: b,decoder: decoder,}r.initRoutes()return r } ... …...
生产环境_Spark接收传入的sql并替换sql中的表名与解析_非常NB
背景 开发时遇到一个较为复杂的周期需求,为了适配读取各种数据库中的数据并将数据库数据转换为DataFrame并进行后续的开发分析工作,做了如下代码。 在爷们开发这段生产中的代码,可适配mysql,hive,hbase,gbase等等…...

【issue-YOLO】自定义数据集训练YOLO-v7 Segmentation
1. 拉取代码创建环境 执行nvidia-smi验证cuda环境是否可用;拉取官方代码; clone官方代码仓库 git clone https://github.com/WongKinYiu/yolov7;从main分支切换到u7分支 cd yolov7 && git checkout 44f30af0daccb1a3baecc5d80eae229…...

【八大排序】选择排序 | 堆排序 + 图文详解!!
📷 江池俊: 个人主页 🔥个人专栏: ✅数据结构冒险记 ✅C语言进阶之路 🌅 有航道的人,再渺小也不会迷途。 文章目录 一、选择排序1.1 基本思想1.2 算法步骤 动图演示1.3 代码实现1.4 选择排序特性总结 二…...

C语言贪吃蛇详解
个人简介:双非大二学生 个人博客:Monodye 今日鸡汤:人生就像一盒巧克力,你永远不知道下一块是什么味的 C语言基础刷题:牛客网在线编程_语法篇_基础语法 (nowcoder.com) 一.贪吃蛇游戏背景 贪吃蛇是久负盛名的游戏&…...
go使用gopprof分析内存泄露
假设我们使用的是比如beego这样的网络框架,我们可以这样加代码来使用gopprof来进行内存泄露分析: beego框架加gopprof分析代码: 1.先在router.go里添加路由信息: beego.Router("/debug/pprof", &controllers.ProfController{}) beego.Router("/debug…...

uniapp中组件库Mask 遮罩层 的使用方法
目录 #平台差异说明 #基本使用 #嵌入内容 #遮罩样式 #API #Props #Events #Slot 创建一个遮罩层,用于强调特定的页面元素,并阻止用户对遮罩下层的内容进行操作,一般用于弹窗场景 #平台差异说明 AppH5微信小程序支付宝小程序百度小程…...

【数据结构与算法】(7)基础数据结构之双端队列的链表实现、环形数组实现示例讲解
目录 2.6 双端队列1) 概述2) 链表实现3) 数组实现习题E01. 二叉树 Z 字层序遍历-Leetcode 103 2.6 双端队列 1) 概述 双端队列、队列、栈对比 定义特点队列一端删除(头)另一端添加(尾)First In First Out栈一端删除和添加&…...
2024 高级前端面试题之 前端工程相关 「精选篇」
该内容主要整理关于 前端工程相关模块的相关面试题,其他内容面试题请移步至 「最新最全的前端面试题集锦」 查看。 前端工程相关模块精选篇 1. webpack的基本配置2. webpack高级配置3. webpack性能优化-构建速度4. webpack性能优化-产出代码(线上运行&am…...

CSS常用属性
CSS常用属性 1. 像素的概念 概念:我们的电脑屏幕是,是由一个一个“小点”组成的,每个“小点”,就是一个像素(px)。规律:像素点越小,呈现的内容就越清晰、越细腻。 注意点ÿ…...

AI新宠Arc浏览器真可以取代Chrome吗?
每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…...

基于Java (spring-boot)的实验室管理系统
一、项目介绍 普通用户: 1.登录,注册 2.查看实验室列表信息 3.实验室预约 4.查看预约进度并取消 5.查看公告 6.订阅课程 7.实验室报修 8.修改个人信息 教师登录: 1.查看并审核预约申请 2.查看已审核预约并导出到excel 3.实验室设备管理,报修 …...

Android用setRectToRect实现Bitmap基于Matrix矩阵scale缩放RectF动画,Kotlin(一)
Android用setRectToRect实现Bitmap基于Matrix矩阵scale缩放RectF动画,Kotlin(一) 基于Matrix,控制Bitmap的setRectToRect的目标RectF的宽高。从很小的宽高开始,不断迭代增加setRectToRect的目标RectF的宽高,…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?
现有的 Redis 分布式锁库(如 Redisson)相比于开发者自己基于 Redis 命令(如 SETNX, EXPIRE, DEL)手动实现分布式锁,提供了巨大的便利性和健壮性。主要体现在以下几个方面: 原子性保证 (Atomicity)ÿ…...

LLMs 系列实操科普(1)
写在前面: 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容,原视频时长 ~130 分钟,以实操演示主流的一些 LLMs 的使用,由于涉及到实操,实际上并不适合以文字整理,但还是决定尽量整理一份笔…...
JavaScript 数据类型详解
JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型(Primitive) 和 对象类型(Object) 两大类,共 8 种(ES11): 一、原始类型(7种) 1. undefined 定…...
Caliper 负载(Workload)详细解析
Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...
Vite中定义@软链接
在webpack中可以直接通过符号表示src路径,但是vite中默认不可以。 如何实现: vite中提供了resolve.alias:通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...