springboot集成Redisson做分布式消息队列
这里演示Redisson做分布式消息队列。首先引入 Redisson依赖,官方github
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version>
</dependency>
首先创建一个自定义注解RedissonTopic.java,用于指定消息的路由key
package com.zyq.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;/** Redissson消息队列注解* author xiaochi* date 2024/10/23*/
@Inherited
@Documented
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonTopic {/*** topic名称* @return*/String key();/*** 是否队列发送消息* @return*/boolean queue() default false;/*** 队列容量* @return*/int queueSize() default 100;/** queue为true时生效* 延迟发送时间(大于0默认延迟,延迟队列可设置大于0)* @return*/int delayTime() default 0;/** queue为true时生效* 时间单位(默认毫秒)* @return*/TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
继续创建消息监听器 RedissonTopicMessageListener.java,具体内容如下:
package com.zyq.listener;/** Redisson消息监听接口* author xiaochi* date 2024/10/23*/
public interface RedissonTopicMessageListener{/*** 接收的消息处理* @param message*/void message(Object message);/*** 发送失败(队列已满时会调用)* @param message*/void sendFail(Object message);/*** 异常* @param message*/void exception(Object message);
}
接下来就是最重要的Redisson配置,内容如下:
/*** Redisson 配置* @return*/
@Bean(destroyMethod="shutdown")
public RedissonClient redissonClient(ConfigurableApplicationContext applicationContext){Config config = new Config();config.useSingleServer().setPassword("123456").setDatabase(0).setConnectionPoolSize(24) // 连接池大小,默认64.setConnectionMinimumIdleSize(3) // 最小空闲连接数,默认32.setRetryAttempts(3) // 命令失败重试次数 3.setRetryInterval(1500) // 命令重试发送时间间隔(毫秒) 默认1500.setTimeout(10000) // 命令等待超时(毫秒) 默认10000.setConnectTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setIdleConnectionTimeout(10000) // 连接空闲超时(毫秒) 默认10000.setSubscriptionConnectionMinimumIdleSize(1) // 发布和订阅连接的最小空闲连接数.setSubscriptionConnectionPoolSize(24) // 发布和订阅连接池大小 默认50.setDnsMonitoringInterval(10000) // DNS监测时间间隔(毫秒),默认5000*//*.setAddress("redis://127.0.0.1:6379");//config.setThreads(Runtime.getRuntime().availableProcessors());// 默认 16RedissonClient redissonClient = Redisson.create(config);StringBuilder msg = new StringBuilder();msg.append("Redisson topic register[");String[] beanNames = applicationContext.getBeanNamesForType(RedissonTopicMessageListener.class);for (String beanName : beanNames) {RedissonTopicMessageListener topicMessageListener = applicationContext.getBean(beanName, RedissonTopicMessageListener.class);if (topicMessageListener.getClass().isAnnotationPresent(RedissonTopic.class)){RedissonTopic redissonTopic = topicMessageListener.getClass().getAnnotation(RedissonTopic.class);if (redissonTopic.queue()){RBoundedBlockingQueue<Object> boundedBlockingQueue = redissonClient.getBoundedBlockingQueue(redissonTopic.key());boundedBlockingQueue.trySetCapacity(redissonTopic.queueSize());RDelayedQueue<Object> delayedQueue = null;if (0 != redissonTopic.delayTime()){delayedQueue = redissonClient.getDelayedQueue(boundedBlockingQueue);}RTopic topic = redissonClient.getTopic(redissonTopic.key());RDelayedQueue<Object> finalDelayedQueue = delayedQueue;topic.addListener(Object.class, (channel, message) -> {if (finalDelayedQueue != null){try {finalDelayedQueue.offer(message,redissonTopic.delayTime(), redissonTopic.timeUnit());}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加延迟队列异常,{}",e);}}else {try {if (!boundedBlockingQueue.offer(message)){topicMessageListener.sendFail(message);}}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson添加队列异常,{}",e);}}});// 为了不阻塞主线程,放在新线程中运行AsyncUtil.run(() -> {while (!Thread.currentThread().isInterrupted() && !redissonClient.isShutdown()){try {Object take = boundedBlockingQueue.take();if (!"".equals(take)){topicMessageListener.message(take);}} catch (Exception e) {topicMessageListener.exception(e.getMessage());log.info("Redisson延迟队列监测异常,{}",e);}}if (Thread.currentThread().isInterrupted() || redissonClient.isShutdown()){log.info("Redisson service shutdown");}});}else {RTopic topic = redissonClient.getTopic(redissonTopic.key());topic.addListener(Object.class, (channel,message) -> {try {topicMessageListener.message(message);}catch (Exception e){topicMessageListener.exception(e.getMessage());log.info("Redisson即时消息发送异常,{}",e);}});}msg.append(redissonTopic.key()).append(".");}}msg.append("]").append("finish.");log.info(msg.toString());return redissonClient;
}
到此基本就完成了,接下来就是创建消息监听类进行消费消息了TopicMessageListener.java 去实现消息监听器接口RedissonTopicMessageListener.java
package com.zyq.listener;import com.zyq.annotation.RedissonTopic;
import org.springframework.stereotype.Component;/** 消息监听类* author xiaochi* date 2024/10/23*/
@Component
@RedissonTopic(key = "testTopic",queue = true,delayTime = 5000)
public class TopicMessageListener implements RedissonTopicMessageListener {@Overridepublic void message(Object message) {System.out.println("testTopic监听器延迟队列收到消息," + message);}@Overridepublic void sendFail(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息发送失败");}@Overridepublic void exception(Object message) {System.out.println("延迟队列 TopicMessageListener testTopic消息异常");}
}
现在可以起2个springboot项目进行消息交流了。封装一个消息发送工具 RedissonMessageUtil.java,内容如下:
package com.demo3.util;import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/** Redisson消息发送工具* author xiaochi* date 2024/10/24*/
@Component
public class RedissonMessageUtil {private static RedissonClient redissonClient;@Autowiredpublic void setRedissonClient(RedissonClient redissonClient) {RedissonMessageUtil.redissonClient = redissonClient;}/*** 发送消息* @param key* @param message* @return 返回接收消息的客户端数量*/public static long send(String key,Object message){RTopic topic = redissonClient.getTopic(key);return topic.publish(message);}
}
到此完成。
相关文章:
springboot集成Redisson做分布式消息队列
这里演示Redisson做分布式消息队列。首先引入 Redisson依赖,官方github <dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.6</version> </dependen…...
 
如何通过Lua语言请求接口拿到数据
文章目录 概要http客户端通过请求下载数据 概要 当某个需求是需要在模块内请求接口拿到数据,需要使用http客户端调用接口 http客户端 LuaSOC请求接口官方文档 调用:http.request(method,url,headers,body,opts,ca_file,client_ca, client_key, clien…...
 
Android 13 SystemUI 隐藏下拉快捷面板部分模块(wifi,bt,nfc等)入口
frameworks/base/packages/SystemUI/src/com/android/systemui/qs/tileimpl/QSFactoryImpl.java createTileInternal(tileSpec)方法注释想隐藏的模块即可。...
 
自由学习记录(14)
unity操作问题 位置:子物体的位置是相对于父物体的。如果你移动父物体,子物体会保持相对于父物体的相对位置,跟着一起移动。 旋转:子物体的旋转也是相对于父物体的。旋转父物体会导致子物体围绕父物体的原点旋转。 缩放…...
 
疯狂Spring Boot讲义[推荐1]
《疯狂Spring Boot讲义》是2021年电子工业出版社出版的图书,作者是李刚 《疯狂Spring Boot终极讲义》不是一本介绍类似于PathVariable、MatrixVariable、RequestBody、ResponseBody这些基础注解的图书,它是真正讲解Spring Boot的图书。Spring Boot的核心…...
vue中$nextTick的作用是什么,什么时候使用
$nextTick 是 Vue 提供的一个方法,用于在下一次 DOM 更新周期之后执行回调函数。它通常用于在 Vue 完成数据更新后,需要访问更新后的 DOM 状态时,保证操作的是更新后的 DOM。 工作原理: Vue 是异步更新 DOM 的,当数据…...
 
Redis实现全局ID生成器
全局ID生成器 为什么要用全局ID生成器 1.当我们使用数据库自增来实现id的生成时,规律过于明显,会给用户暴露很多信息 2.当我们订单量过大时无法用数据库的一张表来存放订单,如果两张表的id都是自增的话,id就会出现重复 什么是全局ID生成器 全局ID生成器,是一种在分布式系统…...
Xshell远程连接工具详解
Xshell是一款在Windows平台上运行的远程连接工具,它支持SSH1、SSH2以及Microsoft Windows平台的TELNET协议。Xshell通过互联网实现对远程主机的安全连接,帮助用户在复杂的网络环境中享受他们的工作。本文将详细介绍Xshell的溯源、最新版本以及它的优势。…...
如何在verilog设计的磁盘阵列控制器中实现不同RAID级别(如RAID 0、RAID 1等)的切换?
以下是一种在Verilog设计的磁盘阵列控制器中实现不同RAID级别(以RAID 0和RAID 1为例)切换的方法: 添加控制信号 在磁盘阵列控制器模块中添加一个输入信号,例如raid_mode,用于选择RAID模式。假设raid_mode = 0表示RAID 0模式,raid_mode = 1表示RAID 1模式。module raid_co…...
 
基于元神操作系统实现NTFS文件操作(十)
1. 背景 本文补充介绍文件遍历操作的部分附加内容,譬如,过滤掉系统元文件、过滤掉重复的文件项、过滤掉隐藏文件等,并提供了基于元神操作系统的部分实现代码。 2. 方法 (1)过滤掉系统元文件 NTFS文件系统的前16个元…...
Qt的几个函数方法
void receiveInfo1() {// 假设这是从串口接收到的字符串QString receivedString "23.5C,45%,1012hPa";// 使用逗号分隔符分割字符串QStringList parts receivedString.split(,);// 检查分割后的列表是否有足够的部分if (parts.size() > 3) {QString part1 part…...
 
openpnp - bug - 散料飞达至少定义2个物料
文章目录 openpnp - bug - 散料飞达至少定义2个物料笔记END openpnp - bug - 散料飞达至少定义2个物料 笔记 散料飞达上定义的物料个数用完了,现在只需要一个料就可以。 用顶部相机去找编带上是否还有一个单独的料,找到了。 定义散料飞达的料为1个&…...
 
HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException
HDFS异常org.apache.hadoop.hdfs.protocol.NSQuotaExceededException 异常信息: Hive:org.apache.hadoop.hdfs.protocol.NSQuotaExceededException: The NameSpace quota (directories and files) of directory /xxxdir is exceeded: quota10000 file count15001N…...
数据库的构成与手写简单数据库的探索
一、引言 在当今数字化的时代,数据库扮演着至关重要的角色。无论是企业管理系统、电子商务平台还是各种移动应用,都离不开数据库的支持。数据库是存储和管理数据的核心工具,它的高效性、可靠性和安全性对于数据的处理和应用至关重要。本文将…...
 
基于STM32的智能晾衣架设计
引言 随着智能家居的普及,智能晾衣架成为了提升生活便利性的重要设备。智能晾衣架通过集成多个传感器,能够自动感知天气变化、湿度、光照等环境因素,实现自动升降、风干和报警功能,帮助用户更加高效地晾晒衣物。本项目基于STM32设…...
 
【MAUI】模糊控件(毛玻璃高斯模糊亚克力模糊)
文章目录 XAML.CSToBytes方法使用效果 常试过AcrylicView.MAUI和Sharpnado.MaterialFrame,对于二者教程很少,使用直接写控件然后调属性,没有报错但也并没有效果所幸就自己写一个 XAML <?xml version"1.0" encoding"utf-…...
深度学习:pandas篇
1. Pandas 基础 Pandas 是一个帮助你处理和分析数据的工具 安装 Pandas pip install pandas 导入 Pandas,我们用 pd 来代替 Pandas 的全称,这样以后写代码的时候更简洁 import pandas as pd 建 Series 和 DataFrame Pandas 最基本的两个数据结构是…...
 
Redis学习文档(Redis基本数据类型【Hash、Set】)
Hash(哈希) 介绍 Redis 中的 Hash 是一个 String 类型的 field-value(键值对) 的映射表,特别适合用于存储对象,后续操作的时候,你可以直接修改这个对象中的某些字段的值。 Hash 类似于 JDK1.…...
 
15分钟学Go 第9天:函数的定义与调用
第9天:函数的定义与调用 欢迎来到第9天的Go语言学习模块!今天我们将深入探讨函数的定义与调用,帮助你掌握如何编写和使用函数。学习函数不仅是Go语言的基础,也是程序设计的核心概念之一。这一节将详细介绍函数的结构、参数传递、…...
 
Java虚拟机:JVM介绍
1024 程序员节日快乐!愿您我的代码永远没有 bug ,人生永远没有 bug ! JVM 概述JVM 架构 概述 JVM( Java Virtual Machine ,Java 虚拟机),是 Java 语言的运行环境,是运行所有 Java 程…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...
 
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
 
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
 
ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...
 
P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...
 
安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲
文章目录 前言第一部分:体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分:体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...
 
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
webpack面试题
面试题:webpack介绍和简单使用 一、webpack(模块化打包工具)1. webpack是把项目当作一个整体,通过给定的一个主文件,webpack将从这个主文件开始找到你项目当中的所有依赖文件,使用loaders来处理它们&#x…...
