当前位置: 首页 > news >正文

Spring Boot 集成 Redisson 实现消息队列

包含组件内容

  • RedisQueue:消息队列监听标识
  • RedisQueueInit:Redis队列监听器
  • RedisQueueListener:Redis消息队列监听实现
  • RedisQueueService:Redis消息队列服务工具

代码实现

RedisQueue

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Redis消息队列注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {/*** 队列名*/String value();
}

RedisQueueInit

import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** 初始化Redis队列监听器** @author 十八* @createTime 2024-09-09 22:49*/
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {public static final String REDIS_QUEUE_PREFIX = "redis-queue";final AtomicBoolean shutdownRequested = new AtomicBoolean(false);@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService;public static String buildQueueName(String queueName) {return REDIS_QUEUE_PREFIX + ":" + queueName;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class);if (!queueListeners.isEmpty()) {executorService = createThreadPool();for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) {RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);if (redisQueue != null) {String queueName = redisQueue.value();executorService.submit(() -> listenQueue(queueName, entry.getValue()));}}}}private ExecutorService createThreadPool() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,Runtime.getRuntime().availableProcessors() * 4,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),new NamedThreadFactory(REDIS_QUEUE_PREFIX),new ThreadPoolExecutor.CallerRunsPolicy());}private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {queueName = buildQueueName(queueName);RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);log.info("Redis队列监听开启: {}", queueName);while (!shutdownRequested.get() && !redissonClient.isShutdown()) {try {Object message = blockingQueue.take();executorService.submit(() -> redisQueueListener.consume(message));} catch (RedissonShutdownException e) {log.info("Redis连接关闭,停止监听队列: {}", queueName);break;} catch (Exception e) {log.error("监听队列异常: {}", queueName, e);}}}public void shutdown() {if (executorService != null) {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}shutdownRequested.set(true);if (redissonClient != null && !redissonClient.isShuttingDown()) {redissonClient.shutdown();}}private static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String prefix) {this.namePrefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());}}}

RedisQueueListener

/*** Redis消息队列监听实现** @author 十八* @createTime 2024-09-09 22:51*/
public interface RedisQueueListener<T> {/*** 队列消费方法** @param content 消息内容*/void consume(T content);
}

RedisQueueService

import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;/*** Redis 消息队列服务** @author 十八* @createTime 2024-09-09 22:52*/
@Component
public class RedisQueueService {@Resourceprivate RedissonClient redissonClient;/*** 添加队列** @param queueName 队列名称* @param content   消息* @param <T>       泛型*/public <T> void send(String queueName, T content) {RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));blockingQueue.add(content);}/*** 添加延迟队列** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param timeUnit  单位* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, timeUnit);}/*** 发送延迟队列消息(单位毫秒)** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);}
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 十八* @createTime 2024-09-10 00:09*/
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {@Overridepublic void invoke(String content) {log.info("队列消息接收 >>> {}", content);}
}

测试用例

import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 十八* @createTime 2024-09-10 00:11*/
@RestController
@RequestMapping("queue")
public class QueueController {@Resourceprivate RedisQueueService redisQueueService;@PostMapping("send")public void send(String message) {redisQueueService.send("test", message);redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);}}

测试结果

相关文章:

Spring Boot 集成 Redisson 实现消息队列

包含组件内容 RedisQueue&#xff1a;消息队列监听标识RedisQueueInit&#xff1a;Redis队列监听器RedisQueueListener&#xff1a;Redis消息队列监听实现RedisQueueService&#xff1a;Redis消息队列服务工具 代码实现 RedisQueue import java.lang.annotation.ElementTyp…...

go语言Map详解

Map Go语言中提供的映射关系容器为map&#xff0c;其内部使用散列表&#xff08;hash&#xff09;实现 map是一种无序的基于key-value的数据结构&#xff0c;Go语言中的map是引用类型&#xff0c;必须初始化才能使用。 它提供了高效的查找、插入和删除操作&#xff0c;非常适…...

C++——已知数组a[6]={1,3,5,7,9};输入一个数值,要求按照现有排序规律将它放入数组当中。

没注释的源代码 #include <iostream> using namespace std; int main() { int a[6]{1,3,5,7,9}; int n,i,j; cout<<"请输入一个数值&#xff1a;"; cin>>n; for(int i0;i<4;i) { if(n<a[i]) { …...

云计算第四阶段---CLOUD Day7---Day8

CLOUD 07 一、Dockerfile详细解析 指令说明FROM指定基础镜像&#xff08;唯一&#xff09;RUN在容器内执行命令&#xff0c;可以写多条ADD把文件拷贝到容器内&#xff0c;如果文件是 tar.xx 格式&#xff0c;会自动解压COPY把文件拷贝到容器内&#xff0c;不会自动解压ENV设置…...

深入解析ThingsBoard与ThingsKit物联网平台的差异

VS 在物联网(IoT)领域&#xff0c;平台的选择对于企业来说至关重要。本文将深入探讨ThingsBoard社区版与ThingsKit企业版这两个物联网平台的差异&#xff0c;帮助读者更好地理解它们的特色和适用场景。 系统相同点 首先&#xff0c;ThingsBoard社区版和ThingsKit企业版都基于…...

五、CAN总线

目录 一、基础知识 1、can介绍 2、CAN硬件电路 3、CAN电平标准 4、CAN收发器芯片介绍 5、CAN帧格式 ① CAN帧种类 ② CAN数据帧 ③ CAN遥控帧​编辑 ④ 位填充 ⑤ 波形实例 6、接收方数据采样 ① 接收方数据采样遇到的问题 ② 位时序 ③ 硬同步 ④ 再同步 ⑤ 波…...

Linux:终端(terminal)与终端管理器(agetty)

终端的设备文件 打开/dev目录可以发现其中有许多字符设备文件&#xff0c;例如对于我的RedHat操作系统&#xff0c;拥有tty0到tty59&#xff0c;它们是操作系统提供的终端设备。对于tty1-tty12使用ctrlaltF*可以进行快捷切换&#xff0c;下面的命令可以进行通用切换。 sudo ch…...

钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句

钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句 接入系统&#xff1a;钉钉 钉钉是阿里巴巴集团打造的企业级智能移动办公平台&#xff0c;是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能人事、钉工…...

微信小程序点赞动画特效实现

这里提供两种实现点赞动画特效的方法&#xff1a; 方法一&#xff1a;使用 CSS 动画 wxml 文件: <view class"like-container"><image src"{{isLiked ? likedImg : unlikedImg}}" class"like-icon {{isLiked ? liked : }}" bindta…...

Day25笔记-普通文件读写with上下文二进制文件csv文件

一、文件读写【重点掌握】 常见文件的读写分类&#xff1a; ​ 1.普通文件文件,如txt&#xff0c;py&#xff0c;html等 ​ 2.二进制文件&#xff0c;如图片&#xff0c;音频&#xff0c;视频&#xff0c;压缩包等 ​ 3.csv文件&#xff0c;如csv,需要借助于系统模块csv ​ 4.对…...

MySQL安装教程

MySQL安装教程 如果需要删除原有mysql&#xff0c;然后安装过新的&#xff0c;可以参照如何彻底卸载旧mysql重装测试 1. 准备资源 mysql官网直达&#xff1a;https://dev.mysql.com/downloads/mysql/ CADN&#xff1a;https://download.csdn.net/download/luocong321/89592962 …...

【Windows】快速帮你解决如何找到 Windows 上的 .condarc 文件

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…...

『正版软件』XYplorer 专业的 Windows 文件管理工具软件

在数字化时代&#xff0c;我们每天都在与各种文件打交道。无论是工作文档、个人照片还是多媒体资料&#xff0c;管理这些文件的效率直接关系到我们的工作效率和生活体验。今天&#xff0c;我要向大家推荐一款功能强大、操作简便的文件管理软件 —— XYplorer。 XYplorer&#x…...

“吉林一号”宽幅02B系列卫星

离轴四反光学成像系统 1.光学系统参数&#xff1a; 焦距&#xff1a;77.5mm&#xff1b; F/#&#xff1a;7.4&#xff1b; 视场&#xff1a;≥56゜&#xff1b; 光谱范围&#xff1a;400nm&#xff5e;1000nm。 2.说明&#xff1a; 光学系统采用离轴全反射式结构&#xff0c;整…...

我的AI工具箱Tauri版-FasterWhisper音频转文本

本教程基于自研的AI工具箱Tauri版进行FasterWhisper音频转文本服务。 FasterWhisper音频转文本服务 是自研AI工具箱Tauri版中的一款模块&#xff0c;专门用于将音频或视频中的语音内容自动转化为文本或字幕。通过简单的配置&#xff0c;该工具能够批量处理大量音频或视频文件&…...

Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略

Java后端中的延迟队列实现&#xff1a;使用Redis与RabbitMQ的不同策略 大家好&#xff0c;我是微赚淘客返利系统3.0的小编&#xff0c;是个冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在后端开发中&#xff0c;延迟队列&#xff08;Delayed Queue&#xff09…...

Linux中使用cp命令的 -f 选项,但还是提醒覆盖的问题

问题&#xff1a; linux 在执行cp的命令的时候&#xff0c;就算是执行 cp -f 也还是会提醒是否要进行替换。 问题原因&#xff1a; 查看别名&#xff0c;alias命令&#xff0c;看到cp的别名为cp -i&#xff0c;那就是说cp本身就是自带覆盖提醒&#xff0c;就算我们加上-f 的…...

互联网技术的持续演进:从现在到未来

互联网技术的持续演进&#xff1a;从现在到未来 在过去的十年里&#xff0c;互联网技术发生了飞速变化。无论是大数据、人工智能&#xff0c;还是5G网络和物联网&#xff0c;每一种技术的突破都在改变我们的生活方式和工作模式。作为现代社会的核心驱动力&#xff0c;互联网技…...

vscode安装ESLint与Vetur插件后自动修复代码不生效

vscode安装ESLint与Vetur插件后自动修复代码不生效 1、安装ESLint 和 Vuter 2、运行结果 2.1、代码保存时代码中的分号;能被检测出来,但是不会自动修复 2.2、手动运行ESLint 修复命令(在终端中执行 npx eslint . --fix)可以修复问题 3、解决办法 在.vscode目录下setti…...

2848、与车相交的点

2848、[简单] 与车相交的点 1、题目描述 给你一个下标从 0 开始的二维整数数组 nums 表示汽车停放在数轴上的坐标。对于任意下标 i&#xff0c;nums[i] [starti, endi] &#xff0c;其中 starti 是第 i 辆车的起点&#xff0c;endi 是第 i 辆车的终点。 返回数轴上被车 任意…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力

引言&#xff1a; 在人工智能快速发展的浪潮中&#xff0c;快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型&#xff08;LLM&#xff09;。该模型代表着该领域的重大突破&#xff0c;通过独特方式融合思考与非思考…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

【学习笔记】深入理解Java虚拟机学习笔记——第4章 虚拟机性能监控,故障处理工具

第2章 虚拟机性能监控&#xff0c;故障处理工具 4.1 概述 略 4.2 基础故障处理工具 4.2.1 jps:虚拟机进程状况工具 命令&#xff1a;jps [options] [hostid] 功能&#xff1a;本地虚拟机进程显示进程ID&#xff08;与ps相同&#xff09;&#xff0c;可同时显示主类&#x…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)

考察一般的三次多项式&#xff0c;以r为参数&#xff1a; p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]&#xff1b; 此多项式的根为&#xff1a; 尽管看起来这个多项式是特殊的&#xff0c;其实一般的三次多项式都是可以通过线性变换化为这个形式…...

MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)

macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 &#x1f37a; 最新版brew安装慢到怀疑人生&#xff1f;别怕&#xff0c;教你轻松起飞&#xff01; 最近Homebrew更新至最新版&#xff0c;每次执行 brew 命令时都会自动从官方地址 https://formulae.…...

【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error

在前端开发中&#xff0c;JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作&#xff08;如 Promise、async/await 等&#xff09;&#xff0c;开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝&#xff08;r…...

Vue ③-生命周期 || 脚手架

生命周期 思考&#xff1a;什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09; 什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a; 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...