Spring Boot 集成 Redisson 实现消息队列
包含组件内容
- RedisQueue:消息队列监听标识
- RedisQueueInit:Redis队列监听器
- RedisQueueListener:Redis消息队列监听实现
- RedisQueueService:Redis消息队列服务工具
代码实现
RedisQueue
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Redis消息队列注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {/*** 队列名*/String value();
}
RedisQueueInit
import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** 初始化Redis队列监听器** @author 十八* @createTime 2024-09-09 22:49*/
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {public static final String REDIS_QUEUE_PREFIX = "redis-queue";final AtomicBoolean shutdownRequested = new AtomicBoolean(false);@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService;public static String buildQueueName(String queueName) {return REDIS_QUEUE_PREFIX + ":" + queueName;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class);if (!queueListeners.isEmpty()) {executorService = createThreadPool();for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) {RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);if (redisQueue != null) {String queueName = redisQueue.value();executorService.submit(() -> listenQueue(queueName, entry.getValue()));}}}}private ExecutorService createThreadPool() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,Runtime.getRuntime().availableProcessors() * 4,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),new NamedThreadFactory(REDIS_QUEUE_PREFIX),new ThreadPoolExecutor.CallerRunsPolicy());}private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {queueName = buildQueueName(queueName);RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);log.info("Redis队列监听开启: {}", queueName);while (!shutdownRequested.get() && !redissonClient.isShutdown()) {try {Object message = blockingQueue.take();executorService.submit(() -> redisQueueListener.consume(message));} catch (RedissonShutdownException e) {log.info("Redis连接关闭,停止监听队列: {}", queueName);break;} catch (Exception e) {log.error("监听队列异常: {}", queueName, e);}}}public void shutdown() {if (executorService != null) {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}shutdownRequested.set(true);if (redissonClient != null && !redissonClient.isShuttingDown()) {redissonClient.shutdown();}}private static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String prefix) {this.namePrefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());}}}
RedisQueueListener
/*** Redis消息队列监听实现** @author 十八* @createTime 2024-09-09 22:51*/
public interface RedisQueueListener<T> {/*** 队列消费方法** @param content 消息内容*/void consume(T content);
}
RedisQueueService
import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;/*** Redis 消息队列服务** @author 十八* @createTime 2024-09-09 22:52*/
@Component
public class RedisQueueService {@Resourceprivate RedissonClient redissonClient;/*** 添加队列** @param queueName 队列名称* @param content 消息* @param <T> 泛型*/public <T> void send(String queueName, T content) {RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));blockingQueue.add(content);}/*** 添加延迟队列** @param queueName 队列名称* @param content 消息类型* @param delay 延迟时间* @param timeUnit 单位* @param <T> 泛型*/public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, timeUnit);}/*** 发送延迟队列消息(单位毫秒)** @param queueName 队列名称* @param content 消息类型* @param delay 延迟时间* @param <T> 泛型*/public <T> void sendDelay(String queueName, T content, long delay) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);}
}
测试
创建监听对象
import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 十八* @createTime 2024-09-10 00:09*/
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {@Overridepublic void invoke(String content) {log.info("队列消息接收 >>> {}", content);}
}
测试用例
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 十八* @createTime 2024-09-10 00:11*/
@RestController
@RequestMapping("queue")
public class QueueController {@Resourceprivate RedisQueueService redisQueueService;@PostMapping("send")public void send(String message) {redisQueueService.send("test", message);redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);}}
测试结果

相关文章:
Spring Boot 集成 Redisson 实现消息队列
包含组件内容 RedisQueue:消息队列监听标识RedisQueueInit:Redis队列监听器RedisQueueListener:Redis消息队列监听实现RedisQueueService:Redis消息队列服务工具 代码实现 RedisQueue import java.lang.annotation.ElementTyp…...
go语言Map详解
Map Go语言中提供的映射关系容器为map,其内部使用散列表(hash)实现 map是一种无序的基于key-value的数据结构,Go语言中的map是引用类型,必须初始化才能使用。 它提供了高效的查找、插入和删除操作,非常适…...
C++——已知数组a[6]={1,3,5,7,9};输入一个数值,要求按照现有排序规律将它放入数组当中。
没注释的源代码 #include <iostream> using namespace std; int main() { int a[6]{1,3,5,7,9}; int n,i,j; cout<<"请输入一个数值:"; cin>>n; for(int i0;i<4;i) { if(n<a[i]) { …...
云计算第四阶段---CLOUD Day7---Day8
CLOUD 07 一、Dockerfile详细解析 指令说明FROM指定基础镜像(唯一)RUN在容器内执行命令,可以写多条ADD把文件拷贝到容器内,如果文件是 tar.xx 格式,会自动解压COPY把文件拷贝到容器内,不会自动解压ENV设置…...
深入解析ThingsBoard与ThingsKit物联网平台的差异
VS 在物联网(IoT)领域,平台的选择对于企业来说至关重要。本文将深入探讨ThingsBoard社区版与ThingsKit企业版这两个物联网平台的差异,帮助读者更好地理解它们的特色和适用场景。 系统相同点 首先,ThingsBoard社区版和ThingsKit企业版都基于…...
五、CAN总线
目录 一、基础知识 1、can介绍 2、CAN硬件电路 3、CAN电平标准 4、CAN收发器芯片介绍 5、CAN帧格式 ① CAN帧种类 ② CAN数据帧 ③ CAN遥控帧编辑 ④ 位填充 ⑤ 波形实例 6、接收方数据采样 ① 接收方数据采样遇到的问题 ② 位时序 ③ 硬同步 ④ 再同步 ⑤ 波…...
Linux:终端(terminal)与终端管理器(agetty)
终端的设备文件 打开/dev目录可以发现其中有许多字符设备文件,例如对于我的RedHat操作系统,拥有tty0到tty59,它们是操作系统提供的终端设备。对于tty1-tty12使用ctrlaltF*可以进行快捷切换,下面的命令可以进行通用切换。 sudo ch…...
钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句
钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句 接入系统:钉钉 钉钉是阿里巴巴集团打造的企业级智能移动办公平台,是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能人事、钉工…...
微信小程序点赞动画特效实现
这里提供两种实现点赞动画特效的方法: 方法一:使用 CSS 动画 wxml 文件: <view class"like-container"><image src"{{isLiked ? likedImg : unlikedImg}}" class"like-icon {{isLiked ? liked : }}" bindta…...
Day25笔记-普通文件读写with上下文二进制文件csv文件
一、文件读写【重点掌握】 常见文件的读写分类: 1.普通文件文件,如txt,py,html等 2.二进制文件,如图片,音频,视频,压缩包等 3.csv文件,如csv,需要借助于系统模块csv 4.对…...
MySQL安装教程
MySQL安装教程 如果需要删除原有mysql,然后安装过新的,可以参照如何彻底卸载旧mysql重装测试 1. 准备资源 mysql官网直达:https://dev.mysql.com/downloads/mysql/ CADN:https://download.csdn.net/download/luocong321/89592962 …...
【Windows】快速帮你解决如何找到 Windows 上的 .condarc 文件
🎬 鸽芷咕:个人主页 🔥 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想,就是为了理想的生活! 专栏介绍 在软件开发和日常使用中,BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…...
『正版软件』XYplorer 专业的 Windows 文件管理工具软件
在数字化时代,我们每天都在与各种文件打交道。无论是工作文档、个人照片还是多媒体资料,管理这些文件的效率直接关系到我们的工作效率和生活体验。今天,我要向大家推荐一款功能强大、操作简便的文件管理软件 —— XYplorer。 XYplorer&#x…...
“吉林一号”宽幅02B系列卫星
离轴四反光学成像系统 1.光学系统参数: 焦距:77.5mm; F/#:7.4; 视场:≥56゜; 光谱范围:400nm~1000nm。 2.说明: 光学系统采用离轴全反射式结构,整…...
我的AI工具箱Tauri版-FasterWhisper音频转文本
本教程基于自研的AI工具箱Tauri版进行FasterWhisper音频转文本服务。 FasterWhisper音频转文本服务 是自研AI工具箱Tauri版中的一款模块,专门用于将音频或视频中的语音内容自动转化为文本或字幕。通过简单的配置,该工具能够批量处理大量音频或视频文件&…...
Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略
Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在后端开发中,延迟队列(Delayed Queue)…...
Linux中使用cp命令的 -f 选项,但还是提醒覆盖的问题
问题: linux 在执行cp的命令的时候,就算是执行 cp -f 也还是会提醒是否要进行替换。 问题原因: 查看别名,alias命令,看到cp的别名为cp -i,那就是说cp本身就是自带覆盖提醒,就算我们加上-f 的…...
互联网技术的持续演进:从现在到未来
互联网技术的持续演进:从现在到未来 在过去的十年里,互联网技术发生了飞速变化。无论是大数据、人工智能,还是5G网络和物联网,每一种技术的突破都在改变我们的生活方式和工作模式。作为现代社会的核心驱动力,互联网技…...
vscode安装ESLint与Vetur插件后自动修复代码不生效
vscode安装ESLint与Vetur插件后自动修复代码不生效 1、安装ESLint 和 Vuter 2、运行结果 2.1、代码保存时代码中的分号;能被检测出来,但是不会自动修复 2.2、手动运行ESLint 修复命令(在终端中执行 npx eslint . --fix)可以修复问题 3、解决办法 在.vscode目录下setti…...
2848、与车相交的点
2848、[简单] 与车相交的点 1、题目描述 给你一个下标从 0 开始的二维整数数组 nums 表示汽车停放在数轴上的坐标。对于任意下标 i,nums[i] [starti, endi] ,其中 starti 是第 i 辆车的起点,endi 是第 i 辆车的终点。 返回数轴上被车 任意…...
(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
C++ 设计模式 《小明的奶茶加料风波》
👨🎓 模式名称:装饰器模式(Decorator Pattern) 👦 小明最近上线了校园奶茶配送功能,业务火爆,大家都在加料: 有的同学要加波霸 🟤,有的要加椰果…...
MySQL 部分重点知识篇
一、数据库对象 1. 主键 定义 :主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 :确保数据的完整性,便于数据的查询和管理。 示例 :在学生信息表中,学号可以作为主键ÿ…...
Ubuntu系统多网卡多相机IP设置方法
目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机,交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息,系统版本:Ubuntu22.04.5 LTS;内核版本…...
