当前位置: 首页 > news >正文

Spring Boot 集成 Redisson 实现消息队列

包含组件内容

  • RedisQueue:消息队列监听标识
  • RedisQueueInit:Redis队列监听器
  • RedisQueueListener:Redis消息队列监听实现
  • RedisQueueService:Redis消息队列服务工具

代码实现

RedisQueue

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Redis消息队列注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {/*** 队列名*/String value();
}

RedisQueueInit

import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** 初始化Redis队列监听器** @author 十八* @createTime 2024-09-09 22:49*/
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {public static final String REDIS_QUEUE_PREFIX = "redis-queue";final AtomicBoolean shutdownRequested = new AtomicBoolean(false);@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService;public static String buildQueueName(String queueName) {return REDIS_QUEUE_PREFIX + ":" + queueName;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class);if (!queueListeners.isEmpty()) {executorService = createThreadPool();for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) {RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);if (redisQueue != null) {String queueName = redisQueue.value();executorService.submit(() -> listenQueue(queueName, entry.getValue()));}}}}private ExecutorService createThreadPool() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,Runtime.getRuntime().availableProcessors() * 4,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),new NamedThreadFactory(REDIS_QUEUE_PREFIX),new ThreadPoolExecutor.CallerRunsPolicy());}private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {queueName = buildQueueName(queueName);RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);log.info("Redis队列监听开启: {}", queueName);while (!shutdownRequested.get() && !redissonClient.isShutdown()) {try {Object message = blockingQueue.take();executorService.submit(() -> redisQueueListener.consume(message));} catch (RedissonShutdownException e) {log.info("Redis连接关闭,停止监听队列: {}", queueName);break;} catch (Exception e) {log.error("监听队列异常: {}", queueName, e);}}}public void shutdown() {if (executorService != null) {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}shutdownRequested.set(true);if (redissonClient != null && !redissonClient.isShuttingDown()) {redissonClient.shutdown();}}private static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String prefix) {this.namePrefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());}}}

RedisQueueListener

/*** Redis消息队列监听实现** @author 十八* @createTime 2024-09-09 22:51*/
public interface RedisQueueListener<T> {/*** 队列消费方法** @param content 消息内容*/void consume(T content);
}

RedisQueueService

import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;/*** Redis 消息队列服务** @author 十八* @createTime 2024-09-09 22:52*/
@Component
public class RedisQueueService {@Resourceprivate RedissonClient redissonClient;/*** 添加队列** @param queueName 队列名称* @param content   消息* @param <T>       泛型*/public <T> void send(String queueName, T content) {RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));blockingQueue.add(content);}/*** 添加延迟队列** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param timeUnit  单位* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, timeUnit);}/*** 发送延迟队列消息(单位毫秒)** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);}
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 十八* @createTime 2024-09-10 00:09*/
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {@Overridepublic void invoke(String content) {log.info("队列消息接收 >>> {}", content);}
}

测试用例

import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 十八* @createTime 2024-09-10 00:11*/
@RestController
@RequestMapping("queue")
public class QueueController {@Resourceprivate RedisQueueService redisQueueService;@PostMapping("send")public void send(String message) {redisQueueService.send("test", message);redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);}}

测试结果

相关文章:

Spring Boot 集成 Redisson 实现消息队列

包含组件内容 RedisQueue&#xff1a;消息队列监听标识RedisQueueInit&#xff1a;Redis队列监听器RedisQueueListener&#xff1a;Redis消息队列监听实现RedisQueueService&#xff1a;Redis消息队列服务工具 代码实现 RedisQueue import java.lang.annotation.ElementTyp…...

go语言Map详解

Map Go语言中提供的映射关系容器为map&#xff0c;其内部使用散列表&#xff08;hash&#xff09;实现 map是一种无序的基于key-value的数据结构&#xff0c;Go语言中的map是引用类型&#xff0c;必须初始化才能使用。 它提供了高效的查找、插入和删除操作&#xff0c;非常适…...

C++——已知数组a[6]={1,3,5,7,9};输入一个数值,要求按照现有排序规律将它放入数组当中。

没注释的源代码 #include <iostream> using namespace std; int main() { int a[6]{1,3,5,7,9}; int n,i,j; cout<<"请输入一个数值&#xff1a;"; cin>>n; for(int i0;i<4;i) { if(n<a[i]) { …...

云计算第四阶段---CLOUD Day7---Day8

CLOUD 07 一、Dockerfile详细解析 指令说明FROM指定基础镜像&#xff08;唯一&#xff09;RUN在容器内执行命令&#xff0c;可以写多条ADD把文件拷贝到容器内&#xff0c;如果文件是 tar.xx 格式&#xff0c;会自动解压COPY把文件拷贝到容器内&#xff0c;不会自动解压ENV设置…...

深入解析ThingsBoard与ThingsKit物联网平台的差异

VS 在物联网(IoT)领域&#xff0c;平台的选择对于企业来说至关重要。本文将深入探讨ThingsBoard社区版与ThingsKit企业版这两个物联网平台的差异&#xff0c;帮助读者更好地理解它们的特色和适用场景。 系统相同点 首先&#xff0c;ThingsBoard社区版和ThingsKit企业版都基于…...

五、CAN总线

目录 一、基础知识 1、can介绍 2、CAN硬件电路 3、CAN电平标准 4、CAN收发器芯片介绍 5、CAN帧格式 ① CAN帧种类 ② CAN数据帧 ③ CAN遥控帧​编辑 ④ 位填充 ⑤ 波形实例 6、接收方数据采样 ① 接收方数据采样遇到的问题 ② 位时序 ③ 硬同步 ④ 再同步 ⑤ 波…...

Linux:终端(terminal)与终端管理器(agetty)

终端的设备文件 打开/dev目录可以发现其中有许多字符设备文件&#xff0c;例如对于我的RedHat操作系统&#xff0c;拥有tty0到tty59&#xff0c;它们是操作系统提供的终端设备。对于tty1-tty12使用ctrlaltF*可以进行快捷切换&#xff0c;下面的命令可以进行通用切换。 sudo ch…...

钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句

钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句 接入系统&#xff1a;钉钉 钉钉是阿里巴巴集团打造的企业级智能移动办公平台&#xff0c;是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能人事、钉工…...

微信小程序点赞动画特效实现

这里提供两种实现点赞动画特效的方法&#xff1a; 方法一&#xff1a;使用 CSS 动画 wxml 文件: <view class"like-container"><image src"{{isLiked ? likedImg : unlikedImg}}" class"like-icon {{isLiked ? liked : }}" bindta…...

Day25笔记-普通文件读写with上下文二进制文件csv文件

一、文件读写【重点掌握】 常见文件的读写分类&#xff1a; ​ 1.普通文件文件,如txt&#xff0c;py&#xff0c;html等 ​ 2.二进制文件&#xff0c;如图片&#xff0c;音频&#xff0c;视频&#xff0c;压缩包等 ​ 3.csv文件&#xff0c;如csv,需要借助于系统模块csv ​ 4.对…...

MySQL安装教程

MySQL安装教程 如果需要删除原有mysql&#xff0c;然后安装过新的&#xff0c;可以参照如何彻底卸载旧mysql重装测试 1. 准备资源 mysql官网直达&#xff1a;https://dev.mysql.com/downloads/mysql/ CADN&#xff1a;https://download.csdn.net/download/luocong321/89592962 …...

【Windows】快速帮你解决如何找到 Windows 上的 .condarc 文件

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…...

『正版软件』XYplorer 专业的 Windows 文件管理工具软件

在数字化时代&#xff0c;我们每天都在与各种文件打交道。无论是工作文档、个人照片还是多媒体资料&#xff0c;管理这些文件的效率直接关系到我们的工作效率和生活体验。今天&#xff0c;我要向大家推荐一款功能强大、操作简便的文件管理软件 —— XYplorer。 XYplorer&#x…...

“吉林一号”宽幅02B系列卫星

离轴四反光学成像系统 1.光学系统参数&#xff1a; 焦距&#xff1a;77.5mm&#xff1b; F/#&#xff1a;7.4&#xff1b; 视场&#xff1a;≥56゜&#xff1b; 光谱范围&#xff1a;400nm&#xff5e;1000nm。 2.说明&#xff1a; 光学系统采用离轴全反射式结构&#xff0c;整…...

我的AI工具箱Tauri版-FasterWhisper音频转文本

本教程基于自研的AI工具箱Tauri版进行FasterWhisper音频转文本服务。 FasterWhisper音频转文本服务 是自研AI工具箱Tauri版中的一款模块&#xff0c;专门用于将音频或视频中的语音内容自动转化为文本或字幕。通过简单的配置&#xff0c;该工具能够批量处理大量音频或视频文件&…...

Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略

Java后端中的延迟队列实现&#xff1a;使用Redis与RabbitMQ的不同策略 大家好&#xff0c;我是微赚淘客返利系统3.0的小编&#xff0c;是个冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在后端开发中&#xff0c;延迟队列&#xff08;Delayed Queue&#xff09…...

Linux中使用cp命令的 -f 选项,但还是提醒覆盖的问题

问题&#xff1a; linux 在执行cp的命令的时候&#xff0c;就算是执行 cp -f 也还是会提醒是否要进行替换。 问题原因&#xff1a; 查看别名&#xff0c;alias命令&#xff0c;看到cp的别名为cp -i&#xff0c;那就是说cp本身就是自带覆盖提醒&#xff0c;就算我们加上-f 的…...

互联网技术的持续演进:从现在到未来

互联网技术的持续演进&#xff1a;从现在到未来 在过去的十年里&#xff0c;互联网技术发生了飞速变化。无论是大数据、人工智能&#xff0c;还是5G网络和物联网&#xff0c;每一种技术的突破都在改变我们的生活方式和工作模式。作为现代社会的核心驱动力&#xff0c;互联网技…...

vscode安装ESLint与Vetur插件后自动修复代码不生效

vscode安装ESLint与Vetur插件后自动修复代码不生效 1、安装ESLint 和 Vuter 2、运行结果 2.1、代码保存时代码中的分号;能被检测出来,但是不会自动修复 2.2、手动运行ESLint 修复命令(在终端中执行 npx eslint . --fix)可以修复问题 3、解决办法 在.vscode目录下setti…...

2848、与车相交的点

2848、[简单] 与车相交的点 1、题目描述 给你一个下标从 0 开始的二维整数数组 nums 表示汽车停放在数轴上的坐标。对于任意下标 i&#xff0c;nums[i] [starti, endi] &#xff0c;其中 starti 是第 i 辆车的起点&#xff0c;endi 是第 i 辆车的终点。 返回数轴上被车 任意…...

如何彻底解决Android Studio中文界面兼容性问题:专业级终极配置指南

如何彻底解决Android Studio中文界面兼容性问题&#xff1a;专业级终极配置指南 【免费下载链接】AndroidStudioChineseLanguagePack AndroidStudio中文插件(官方修改版本&#xff09; 项目地址: https://gitcode.com/gh_mirrors/an/AndroidStudioChineseLanguagePack 还…...

保姆级教程:用交大镜像源5分钟安装PyTorch 2.3.0(支持CUDA 12.6)

5分钟极速部署PyTorch 2.3.0开发环境&#xff08;CUDA 12.6兼容方案&#xff09; 深度学习开发环境配置一直是让开发者头疼的问题&#xff0c;尤其是当硬件驱动与框架版本不匹配时。最近在技术社区中&#xff0c;"Torch CUDA is not available"成为高频搜索词&#x…...

CTFmisc文件头尾解析与隐写实战指南

1. CTFmisc文件头尾基础解析 第一次参加CTF比赛时&#xff0c;我盯着misc题目里那个损坏的图片文件发呆了半小时。直到队友提醒我检查文件头&#xff0c;才发现原来是个伪装成jpg的zip压缩包。这种"挂羊头卖狗肉"的把戏在CTF比赛中实在太常见了&#xff0c;今天就带大…...

高质量建站引领数字化转型 ——2026 上海网站建设行业现状与标杆服务商盘点

2026年上海网站建设行业发展现状与高质量建站核心诉求据中国信通院《2026年中国GEO优化行业发展白皮书》、上海市商务委员会2026年一季度数据联合统计&#xff0c;上海企业数字化转型渗透率已达78%&#xff0c;国内GEO市场规模突破286亿元&#xff0c;年增长率125%&#xff1b;…...

让任意窗口保持置顶:AlwaysOnTop提升Windows多任务效率全指南

让任意窗口保持置顶&#xff1a;AlwaysOnTop提升Windows多任务效率全指南 【免费下载链接】AlwaysOnTop Make a Windows application always run on top 项目地址: https://gitcode.com/gh_mirrors/al/AlwaysOnTop 在数字化工作环境中&#xff0c;我们经常需要同时处理多…...

炉石传说自动化系统构建指南:从重复劳动到智能游戏体验

炉石传说自动化系统构建指南&#xff1a;从重复劳动到智能游戏体验 【免费下载链接】Hearthstone-Script Hearthstone script&#xff08;炉石传说脚本&#xff09; 项目地址: https://gitcode.com/gh_mirrors/he/Hearthstone-Script 发现游戏自动化的价值 在策略卡牌游…...

Qwen2.5-VL-7B-Instruct效果对比:vs InternVL2、LLaVA-1.6在中文场景表现

Qwen2.5-VL-7B-Instruct效果对比&#xff1a;vs InternVL2、LLaVA-1.6在中文场景表现 1. 多模态视觉-语言模型概述 Qwen2.5-VL-7B-Instruct是阿里云推出的新一代多模态视觉-语言模型&#xff0c;专为中文场景优化设计。该模型能够同时理解图像和文本输入&#xff0c;并生成符…...

04 月 04 日 AI 每日参考:多厂模型动态频出,产业转向拼用量

今日概览今日 AI 圈迎来多厂模型集中发布&#xff0c;谷歌、微软、阿里等巨头接连推出新模型产品&#xff0c;同时国内 AI 产业规模突破 1.2 万亿元&#xff0c;行业正式从 "拼参数" 转向 "拼用量" 的新阶段。监管层面也同步发力&#xff0c;地方推进 AI 产…...

3个核心功能解决Windows与Office批量激活难题:开源工具KMS_VL_ALL_AIO深度解析

3个核心功能解决Windows与Office批量激活难题&#xff1a;开源工具KMS_VL_ALL_AIO深度解析 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 在企业IT管理和个人系统维护中&#xff0c;Windows与O…...

2025年大中华区21个主要城市甲级写字楼市场数据

、大中华区主要城市甲级写字楼市场数据速览(2025年)美通社消息&#xff1a;全球领先的房地产服务公司戴德梁行发布《大中华区写字楼供应/需求前沿趋势》年度报告&#xff0c;针对2025年大中华区21个主要城市甲级写字楼市场的整体表现展开研究&#xff0c;聚焦市场供需关系深入分…...