当前位置: 首页 > news >正文

springboot集成Redisson做分布式消息队列

这里演示Redisson做分布式消息队列。首先引入 Redisson依赖,官方github

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version>
</dependency>

首先创建一个自定义注解RedissonTopic.java,用于指定消息的路由key

package com.zyq.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;/** Redissson消息队列注解* author xiaochi* date 2024/10/23*/
@Inherited
@Documented
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonTopic {/*** topic名称* @return*/String key();/*** 是否队列发送消息* @return*/boolean queue() default false;/*** 队列容量* @return*/int queueSize() default 100;/** queue为true时生效* 延迟发送时间(大于0默认延迟,延迟队列可设置大于0)* @return*/int delayTime() default 0;/** queue为true时生效* 时间单位(默认毫秒)* @return*/TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

继续创建消息监听器 RedissonTopicMessageListener.java,具体内容如下:

package com.zyq.listener;/** Redisson消息监听接口* author xiaochi* date 2024/10/23*/
public interface RedissonTopicMessageListener{/*** 接收的消息处理* @param message*/void message(Object message);/*** 发送失败(队列已满时会调用)* @param message*/void sendFail(Object message);/*** 异常* @param message*/void exception(Object message);
}

接下来就是最重要的Redisson配置,内容如下:

/*** Redisson 配置* @return*/
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext){Config config = new Config();config.useSingleServer().setPassword("123456").setDatabase(0).setConnectionPoolSize(24) // 连接池大小,默认64.setConnectionMinimumIdleSize(3) // 最小空闲连接数,默认32.setRetryAttempts(3) // 命令失败重试次数 3.setRetryInterval(1500) // 命令重试发送时间间隔(毫秒) 默认1500.setTimeout(10000) // 命令等待超时(毫秒) 默认10000.setConnectTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setIdleConnectionTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setSubscriptionConnectionMinimumIdleSize(1) // 发布和订阅连接的最小空闲连接数.setSubscriptionConnectionPoolSize(24) // 发布和订阅连接池大小 默认50.setDnsMonitoringInterval(10000) // DNS监测时间间隔(毫秒),默认5000*//*.setAddress("redis://127.0.0.1:6379");//config.setThreads(Runtime.getRuntime().availableProcessors());// 默认 16RedissonClient redissonClient = Redisson.create(config);StringBuilder msg = new StringBuilder();msg.append("Redisson topic register[");String[] beanNames = applicationContext.getBeanNamesForType(RedissonTopicMessageListener.class);for (String beanName : beanNames) {RedissonTopicMessageListener topicMessageListener = applicationContext.getBean(beanName, RedissonTopicMessageListener.class);if (topicMessageListener.getClass().isAnnotationPresent(RedissonTopic.class)){RedissonTopic redissonTopic = topicMessageListener.getClass().getAnnotation(RedissonTopic.class);if (redissonTopic.queue()){RBoundedBlockingQueue<Object> boundedBlockingQueue = redissonClient.getBoundedBlockingQueue(redissonTopic.key());boundedBlockingQueue.trySetCapacity(redissonTopic.queueSize());RDelayedQueue<Object> delayedQueue = null;if (0 != redissonTopic.delayTime()){delayedQueue = redissonClient.getDelayedQueue(boundedBlockingQueue);}RTopic topic = redissonClient.getTopic(redissonTopic.key());RDelayedQueue<Object> finalDelayedQueue = delayedQueue;topic.addListener(Object.class, (channel, message) -> {if (finalDelayedQueue != null){try {finalDelayedQueue.offer(message,redissonTopic.delayTime(), redissonTopic.timeUnit());}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加延迟队列异常,{}",e);}}else {try {if (!boundedBlockingQueue.offer(message)){topicMessageListener.sendFail(message);}}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加队列异常,{}",e);}}});// 为了不阻塞主线程,放在新线程中运行AsyncUtil.run(() -> {while (!Thread.currentThread().isInterrupted() && !redissonClient.isShutdown()){try {Object take = boundedBlockingQueue.take();if (!"".equals(take)){topicMessageListener.message(take);}} catch (Exception e) {topicMessageListener.exception(e.getMessage());log.info("Redisson延迟队列监测异常,{}",e);}}if (Thread.currentThread().isInterrupted() || redissonClient.isShutdown()){log.info("Redisson service shutdown");}});}else {RTopic topic = redissonClient.getTopic(redissonTopic.key());topic.addListener(Object.class, (channel,message) -> {try {topicMessageListener.message(message);}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson即时消息发送异常,{}",e);}});}msg.append(redissonTopic.key()).append(".");}}msg.append("]").append("finish.");log.info(msg.toString());return redissonClient;
}

到此基本就完成了,接下来就是创建消息监听类进行消费消息了TopicMessageListener.java 去实现消息监听器接口RedissonTopicMessageListener.java

package com.zyq.listener;import com.zyq.annotation.RedissonTopic;
import org.springframework.stereotype.Component;/** 消息监听类* author xiaochi* date 2024/10/23*/
@Component
@RedissonTopic(key = "testTopic",queue = true,delayTime = 5000)
public class TopicMessageListener implements RedissonTopicMessageListener {@Overridepublic void message(Object message) {System.out.println("testTopic监听器延迟队列收到消息," + message);}@Overridepublic void sendFail(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息发送失败");}@Overridepublic void exception(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息异常");}
}

现在可以起2个springboot项目进行消息交流了。封装一个消息发送工具 RedissonMessageUtil.java,内容如下:

package com.demo3.util;import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/** Redisson消息发送工具* author xiaochi* date 2024/10/24*/
@Component
public class RedissonMessageUtil {private static RedissonClient redissonClient;@Autowiredpublic void setRedissonClient(RedissonClient redissonClient) {RedissonMessageUtil.redissonClient = redissonClient;}/*** 发送消息* @param key* @param message* @return 返回接收消息的客户端数量*/public static long send(String key,Object message){RTopic topic = redissonClient.getTopic(key);return topic.publish(message);}
}

到此完成。

相关文章:

springboot集成Redisson做分布式消息队列

这里演示Redisson做分布式消息队列。首先引入 Redisson依赖&#xff0c;官方github <dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version> </dependen…...

如何通过Lua语言请求接口拿到数据

文章目录 概要http客户端通过请求下载数据 概要 当某个需求是需要在模块内请求接口拿到数据&#xff0c;需要使用http客户端调用接口 http客户端 LuaSOC请求接口官方文档 调用&#xff1a;http.request(method,url,headers,body,opts,ca_file,client_ca, client_key, clien…...

Android 13 SystemUI 隐藏下拉快捷面板部分模块(wifi,bt,nfc等)入口

frameworks/base/packages/SystemUI/src/com/android/systemui/qs/tileimpl/QSFactoryImpl.java createTileInternal(tileSpec)方法注释想隐藏的模块即可。...

自由学习记录(14)

unity操作问题 位置&#xff1a;子物体的位置是相对于父物体的。如果你移动父物体&#xff0c;子物体会保持相对于父物体的相对位置&#xff0c;跟着一起移动。 旋转&#xff1a;子物体的旋转也是相对于父物体的。旋转父物体会导致子物体围绕父物体的原点旋转。 缩放&#xf…...

疯狂Spring Boot讲义[推荐1]

《疯狂Spring Boot讲义》是2021年电子工业出版社出版的图书&#xff0c;作者是李刚 《疯狂Spring Boot终极讲义》不是一本介绍类似于PathVariable、MatrixVariable、RequestBody、ResponseBody这些基础注解的图书&#xff0c;它是真正讲解Spring Boot的图书。Spring Boot的核心…...

vue中$nextTick的作用是什么,什么时候使用

$nextTick 是 Vue 提供的一个方法&#xff0c;用于在下一次 DOM 更新周期之后执行回调函数。它通常用于在 Vue 完成数据更新后&#xff0c;需要访问更新后的 DOM 状态时&#xff0c;保证操作的是更新后的 DOM。 工作原理&#xff1a; Vue 是异步更新 DOM 的&#xff0c;当数据…...

Redis实现全局ID生成器

全局ID生成器 为什么要用全局ID生成器 1.当我们使用数据库自增来实现id的生成时,规律过于明显,会给用户暴露很多信息 2.当我们订单量过大时无法用数据库的一张表来存放订单,如果两张表的id都是自增的话,id就会出现重复 什么是全局ID生成器 全局ID生成器,是一种在分布式系统…...

Xshell远程连接工具详解

Xshell是一款在Windows平台上运行的远程连接工具&#xff0c;它支持SSH1、SSH2以及Microsoft Windows平台的TELNET协议。Xshell通过互联网实现对远程主机的安全连接&#xff0c;帮助用户在复杂的网络环境中享受他们的工作。本文将详细介绍Xshell的溯源、最新版本以及它的优势。…...

如何在verilog设计的磁盘阵列控制器中实现不同RAID级别(如RAID 0、RAID 1等)的切换?

以下是一种在Verilog设计的磁盘阵列控制器中实现不同RAID级别(以RAID 0和RAID 1为例)切换的方法: 添加控制信号 在磁盘阵列控制器模块中添加一个输入信号,例如raid_mode,用于选择RAID模式。假设raid_mode = 0表示RAID 0模式,raid_mode = 1表示RAID 1模式。module raid_co…...

基于元神操作系统实现NTFS文件操作(十)

1. 背景 本文补充介绍文件遍历操作的部分附加内容&#xff0c;譬如&#xff0c;过滤掉系统元文件、过滤掉重复的文件项、过滤掉隐藏文件等&#xff0c;并提供了基于元神操作系统的部分实现代码。 2. 方法 &#xff08;1&#xff09;过滤掉系统元文件 NTFS文件系统的前16个元…...

Qt的几个函数方法

void receiveInfo1() {// 假设这是从串口接收到的字符串QString receivedString "23.5C,45%,1012hPa";// 使用逗号分隔符分割字符串QStringList parts receivedString.split(,);// 检查分割后的列表是否有足够的部分if (parts.size() > 3) {QString part1 part…...

openpnp - bug - 散料飞达至少定义2个物料

文章目录 openpnp - bug - 散料飞达至少定义2个物料笔记END openpnp - bug - 散料飞达至少定义2个物料 笔记 散料飞达上定义的物料个数用完了&#xff0c;现在只需要一个料就可以。 用顶部相机去找编带上是否还有一个单独的料&#xff0c;找到了。 定义散料飞达的料为1个&…...

HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException

HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException 异常信息&#xff1a; Hive:org.apache.hadoop.hdfs.protocol.NSQuotaExceededException: The NameSpace quota (directories and files) of directory /xxxdir is exceeded: quota10000 file count15001N…...

数据库的构成与手写简单数据库的探索

一、引言 在当今数字化的时代&#xff0c;数据库扮演着至关重要的角色。无论是企业管理系统、电子商务平台还是各种移动应用&#xff0c;都离不开数据库的支持。数据库是存储和管理数据的核心工具&#xff0c;它的高效性、可靠性和安全性对于数据的处理和应用至关重要。本文将…...

基于STM32的智能晾衣架设计

引言 随着智能家居的普及&#xff0c;智能晾衣架成为了提升生活便利性的重要设备。智能晾衣架通过集成多个传感器&#xff0c;能够自动感知天气变化、湿度、光照等环境因素&#xff0c;实现自动升降、风干和报警功能&#xff0c;帮助用户更加高效地晾晒衣物。本项目基于STM32设…...

【MAUI】模糊控件(毛玻璃高斯模糊亚克力模糊)

文章目录 XAML.CSToBytes方法使用效果 常试过AcrylicView.MAUI和Sharpnado.MaterialFrame&#xff0c;对于二者教程很少&#xff0c;使用直接写控件然后调属性&#xff0c;没有报错但也并没有效果所幸就自己写一个 XAML <?xml version"1.0" encoding"utf-…...

深度学习:pandas篇

1. Pandas 基础 Pandas 是一个帮助你处理和分析数据的工具 安装 Pandas pip install pandas 导入 Pandas&#xff0c;我们用 pd 来代替 Pandas 的全称&#xff0c;这样以后写代码的时候更简洁 import pandas as pd 建 Series 和 DataFrame Pandas 最基本的两个数据结构是…...

Redis学习文档(Redis基本数据类型【Hash、Set】)

Hash&#xff08;哈希&#xff09; 介绍 Redis 中的 Hash 是一个 String 类型的 field-value&#xff08;键值对&#xff09; 的映射表&#xff0c;特别适合用于存储对象&#xff0c;后续操作的时候&#xff0c;你可以直接修改这个对象中的某些字段的值。 Hash 类似于 JDK1.…...

15分钟学Go 第9天:函数的定义与调用

第9天&#xff1a;函数的定义与调用 欢迎来到第9天的Go语言学习模块&#xff01;今天我们将深入探讨函数的定义与调用&#xff0c;帮助你掌握如何编写和使用函数。学习函数不仅是Go语言的基础&#xff0c;也是程序设计的核心概念之一。这一节将详细介绍函数的结构、参数传递、…...

Java虚拟机:JVM介绍

1024 程序员节日快乐&#xff01;愿您我的代码永远没有 bug &#xff0c;人生永远没有 bug &#xff01; JVM 概述JVM 架构 概述 JVM&#xff08; Java Virtual Machine &#xff0c;Java 虚拟机&#xff09;&#xff0c;是 Java 语言的运行环境&#xff0c;是运行所有 Java 程…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

ubuntu搭建nfs服务centos挂载访问

在Ubuntu上设置NFS服务器 在Ubuntu上&#xff0c;你可以使用apt包管理器来安装NFS服务器。打开终端并运行&#xff1a; sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享&#xff0c;例如/shared&#xff1a; sudo mkdir /shared sud…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 &#x1f4dd; 在上一篇文章中&#xff0c;我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源&#xff0c;方便后续将资源打包到一个可执行文件中。 2.embed介绍 &#x1f3af; Go 1.16 引入了革命性的 embed 包&#xff0c;彻底改变了静态资源管理的…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解

在 C/C 编程的编译和链接过程中&#xff0c;附加包含目录、附加库目录和附加依赖项是三个至关重要的设置&#xff0c;它们相互配合&#xff0c;确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中&#xff0c;这些概念容易让人混淆&#xff0c;但深入理解它们的作用和联…...

给网站添加live2d看板娘

给网站添加live2d看板娘 参考文献&#xff1a; stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下&#xff0c;文章也主…...

pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)

目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 &#xff08;1&#xff09;输入单引号 &#xff08;2&#xff09;万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...