Spring Boot 集成 Redisson 实现消息队列
包含组件内容
- RedisQueue:消息队列监听标识
- RedisQueueInit:Redis队列监听器
- RedisQueueListener:Redis消息队列监听实现
- RedisQueueService:Redis消息队列服务工具
代码实现
RedisQueue
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Redis消息队列注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {/*** 队列名*/String value();
}
RedisQueueInit
import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** 初始化Redis队列监听器** @author 十八* @createTime 2024-09-09 22:49*/
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {public static final String REDIS_QUEUE_PREFIX = "redis-queue";final AtomicBoolean shutdownRequested = new AtomicBoolean(false);@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService;public static String buildQueueName(String queueName) {return REDIS_QUEUE_PREFIX + ":" + queueName;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class);if (!queueListeners.isEmpty()) {executorService = createThreadPool();for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) {RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);if (redisQueue != null) {String queueName = redisQueue.value();executorService.submit(() -> listenQueue(queueName, entry.getValue()));}}}}private ExecutorService createThreadPool() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,Runtime.getRuntime().availableProcessors() * 4,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),new NamedThreadFactory(REDIS_QUEUE_PREFIX),new ThreadPoolExecutor.CallerRunsPolicy());}private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {queueName = buildQueueName(queueName);RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);log.info("Redis队列监听开启: {}", queueName);while (!shutdownRequested.get() && !redissonClient.isShutdown()) {try {Object message = blockingQueue.take();executorService.submit(() -> redisQueueListener.consume(message));} catch (RedissonShutdownException e) {log.info("Redis连接关闭,停止监听队列: {}", queueName);break;} catch (Exception e) {log.error("监听队列异常: {}", queueName, e);}}}public void shutdown() {if (executorService != null) {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}shutdownRequested.set(true);if (redissonClient != null && !redissonClient.isShuttingDown()) {redissonClient.shutdown();}}private static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String prefix) {this.namePrefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());}}}
RedisQueueListener
/*** Redis消息队列监听实现** @author 十八* @createTime 2024-09-09 22:51*/
public interface RedisQueueListener<T> {/*** 队列消费方法** @param content 消息内容*/void consume(T content);
}
RedisQueueService
import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;/*** Redis 消息队列服务** @author 十八* @createTime 2024-09-09 22:52*/
@Component
public class RedisQueueService {@Resourceprivate RedissonClient redissonClient;/*** 添加队列** @param queueName 队列名称* @param content 消息* @param <T> 泛型*/public <T> void send(String queueName, T content) {RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));blockingQueue.add(content);}/*** 添加延迟队列** @param queueName 队列名称* @param content 消息类型* @param delay 延迟时间* @param timeUnit 单位* @param <T> 泛型*/public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, timeUnit);}/*** 发送延迟队列消息(单位毫秒)** @param queueName 队列名称* @param content 消息类型* @param delay 延迟时间* @param <T> 泛型*/public <T> void sendDelay(String queueName, T content, long delay) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);}
}
测试
创建监听对象
import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 十八* @createTime 2024-09-10 00:09*/
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {@Overridepublic void invoke(String content) {log.info("队列消息接收 >>> {}", content);}
}
测试用例
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 十八* @createTime 2024-09-10 00:11*/
@RestController
@RequestMapping("queue")
public class QueueController {@Resourceprivate RedisQueueService redisQueueService;@PostMapping("send")public void send(String message) {redisQueueService.send("test", message);redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);}}
测试结果
相关文章:

Spring Boot 集成 Redisson 实现消息队列
包含组件内容 RedisQueue:消息队列监听标识RedisQueueInit:Redis队列监听器RedisQueueListener:Redis消息队列监听实现RedisQueueService:Redis消息队列服务工具 代码实现 RedisQueue import java.lang.annotation.ElementTyp…...

go语言Map详解
Map Go语言中提供的映射关系容器为map,其内部使用散列表(hash)实现 map是一种无序的基于key-value的数据结构,Go语言中的map是引用类型,必须初始化才能使用。 它提供了高效的查找、插入和删除操作,非常适…...

C++——已知数组a[6]={1,3,5,7,9};输入一个数值,要求按照现有排序规律将它放入数组当中。
没注释的源代码 #include <iostream> using namespace std; int main() { int a[6]{1,3,5,7,9}; int n,i,j; cout<<"请输入一个数值:"; cin>>n; for(int i0;i<4;i) { if(n<a[i]) { …...

云计算第四阶段---CLOUD Day7---Day8
CLOUD 07 一、Dockerfile详细解析 指令说明FROM指定基础镜像(唯一)RUN在容器内执行命令,可以写多条ADD把文件拷贝到容器内,如果文件是 tar.xx 格式,会自动解压COPY把文件拷贝到容器内,不会自动解压ENV设置…...

深入解析ThingsBoard与ThingsKit物联网平台的差异
VS 在物联网(IoT)领域,平台的选择对于企业来说至关重要。本文将深入探讨ThingsBoard社区版与ThingsKit企业版这两个物联网平台的差异,帮助读者更好地理解它们的特色和适用场景。 系统相同点 首先,ThingsBoard社区版和ThingsKit企业版都基于…...

五、CAN总线
目录 一、基础知识 1、can介绍 2、CAN硬件电路 3、CAN电平标准 4、CAN收发器芯片介绍 5、CAN帧格式 ① CAN帧种类 ② CAN数据帧 ③ CAN遥控帧编辑 ④ 位填充 ⑤ 波形实例 6、接收方数据采样 ① 接收方数据采样遇到的问题 ② 位时序 ③ 硬同步 ④ 再同步 ⑤ 波…...

Linux:终端(terminal)与终端管理器(agetty)
终端的设备文件 打开/dev目录可以发现其中有许多字符设备文件,例如对于我的RedHat操作系统,拥有tty0到tty59,它们是操作系统提供的终端设备。对于tty1-tty12使用ctrlaltF*可以进行快捷切换,下面的命令可以进行通用切换。 sudo ch…...

钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句
钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句 接入系统:钉钉 钉钉是阿里巴巴集团打造的企业级智能移动办公平台,是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能人事、钉工…...

微信小程序点赞动画特效实现
这里提供两种实现点赞动画特效的方法: 方法一:使用 CSS 动画 wxml 文件: <view class"like-container"><image src"{{isLiked ? likedImg : unlikedImg}}" class"like-icon {{isLiked ? liked : }}" bindta…...

Day25笔记-普通文件读写with上下文二进制文件csv文件
一、文件读写【重点掌握】 常见文件的读写分类: 1.普通文件文件,如txt,py,html等 2.二进制文件,如图片,音频,视频,压缩包等 3.csv文件,如csv,需要借助于系统模块csv 4.对…...

MySQL安装教程
MySQL安装教程 如果需要删除原有mysql,然后安装过新的,可以参照如何彻底卸载旧mysql重装测试 1. 准备资源 mysql官网直达:https://dev.mysql.com/downloads/mysql/ CADN:https://download.csdn.net/download/luocong321/89592962 …...

【Windows】快速帮你解决如何找到 Windows 上的 .condarc 文件
🎬 鸽芷咕:个人主页 🔥 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想,就是为了理想的生活! 专栏介绍 在软件开发和日常使用中,BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…...

『正版软件』XYplorer 专业的 Windows 文件管理工具软件
在数字化时代,我们每天都在与各种文件打交道。无论是工作文档、个人照片还是多媒体资料,管理这些文件的效率直接关系到我们的工作效率和生活体验。今天,我要向大家推荐一款功能强大、操作简便的文件管理软件 —— XYplorer。 XYplorer&#x…...

“吉林一号”宽幅02B系列卫星
离轴四反光学成像系统 1.光学系统参数: 焦距:77.5mm; F/#:7.4; 视场:≥56゜; 光谱范围:400nm~1000nm。 2.说明: 光学系统采用离轴全反射式结构,整…...

我的AI工具箱Tauri版-FasterWhisper音频转文本
本教程基于自研的AI工具箱Tauri版进行FasterWhisper音频转文本服务。 FasterWhisper音频转文本服务 是自研AI工具箱Tauri版中的一款模块,专门用于将音频或视频中的语音内容自动转化为文本或字幕。通过简单的配置,该工具能够批量处理大量音频或视频文件&…...

Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略
Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在后端开发中,延迟队列(Delayed Queue)…...

Linux中使用cp命令的 -f 选项,但还是提醒覆盖的问题
问题: linux 在执行cp的命令的时候,就算是执行 cp -f 也还是会提醒是否要进行替换。 问题原因: 查看别名,alias命令,看到cp的别名为cp -i,那就是说cp本身就是自带覆盖提醒,就算我们加上-f 的…...

互联网技术的持续演进:从现在到未来
互联网技术的持续演进:从现在到未来 在过去的十年里,互联网技术发生了飞速变化。无论是大数据、人工智能,还是5G网络和物联网,每一种技术的突破都在改变我们的生活方式和工作模式。作为现代社会的核心驱动力,互联网技…...

vscode安装ESLint与Vetur插件后自动修复代码不生效
vscode安装ESLint与Vetur插件后自动修复代码不生效 1、安装ESLint 和 Vuter 2、运行结果 2.1、代码保存时代码中的分号;能被检测出来,但是不会自动修复 2.2、手动运行ESLint 修复命令(在终端中执行 npx eslint . --fix)可以修复问题 3、解决办法 在.vscode目录下setti…...

2848、与车相交的点
2848、[简单] 与车相交的点 1、题目描述 给你一个下标从 0 开始的二维整数数组 nums 表示汽车停放在数轴上的坐标。对于任意下标 i,nums[i] [starti, endi] ,其中 starti 是第 i 辆车的起点,endi 是第 i 辆车的终点。 返回数轴上被车 任意…...

基于k8s手动部署rabbitmq集群(Manually Deploying RabbitMQ Cluster Based on k8s)
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…...

mybatis 配置文件完成增删改查(四) :多条件 动态sql查询
文章目录 就是你在接收数据时,有的查询条件不写,也能从查到相应的stauts也可能为空恒等式标签 代替where关键字 就是你在接收数据时,有的查询条件不写,也能从查到相应的 注意是写字段名 还是 属性名 companyName不写也能查出满足…...

先楫HPM6750 Windows下VSCode开发环境配置
用的是EVKmini,ft2232作为调试器jtag接口调试 启动start_gui.exe 以hello_world为例,更改一下build path,可以generate并使用gcc compile 最后会得到这些 点击start_gui里面的命令行,用命令行启动vscode 新建.vscode文件夹&…...

【JavaScript】LeetCode:41-45
文章目录 41 排序链表42 合并k个升序链表43 LRU缓存44 二叉树的中序遍历45 二叉树的最大深度 41 排序链表 递归 归并排序找到链表中心点,从中心点将链表一分为二。奇数个节点找中心点,偶数个节点找中心左边的点作为中心点。快慢指针找中心点,…...

数据结构(Day18)
一、周学习内容 1、9.18 数据结构(Day15)-CSDN博客 2、9.19 数据结构(Day16)-CSDN博客 3、9.20 链表 目的 插入删除不需要移动任何节点(元素)。 不需要预估存储空间大小,长度动态增长或减小。…...

error: ‘InsertAtTop‘ was not declared in this scope
Qt编译错误记录: 报错:error: ‘InsertAtTop’ was not declared in this scope ui->comboBoxJob->setInsertPolicy(InsertAtTop);这行代码在Qt中编译就会报这个错误,原因是输入参数需要加类名限定,改为: ui-…...

MySQL缓冲池详解
Buffer Pool 本文参考开源项目:小林coding在线文档; 01-缓冲池概述 在MySQL查询数据的时候,是通过存储引擎去磁盘做IO来获取数据库中的数据,这样每次查询一条数据都要去做一次或者多次磁盘的IO,无疑是非常慢的。…...

【我的 PWN 学习手札】tcache stash with fastbin double free —— tcache key 绕过
参考看雪课程:PWN 探索篇 前言 tcache key 的引入使得 tcache dup 利用出现了困难。除了简单利用 UAF 覆写 key 或者House Of Karui 之外,还可以利用 ptmalloc 中的其他机制进行绕过。 一、Tcache Stash with Fastbin Double Free 之前是 double free …...

How can I stream a response from LangChain‘s OpenAI using Flask API?
题意:怎样在 Flask API 中使用 LangChain 的 OpenAI 模型流式传输响应 问题背景: I am using Python Flask app for chat over data. In the console I am getting streamable response directly from the OpenAI since I can enable streming with a f…...

什么是慢充优惠话费充值api?如何选择平台
一、话费充值api的定义 话费充值api是一种能够让开发者将话费充值功能集成到自己的平台的接口。通过接入话费充值api接口,就能够实现话费充值平台的搭建,从而为用户提供话费充值服务,这一接口主要适用于对话费充值有长期稳定需求的企业或者商…...