当前位置: 首页 > news >正文

Spring Boot 集成 Redisson 实现消息队列

包含组件内容

  • RedisQueue:消息队列监听标识
  • RedisQueueInit:Redis队列监听器
  • RedisQueueListener:Redis消息队列监听实现
  • RedisQueueService:Redis消息队列服务工具

代码实现

RedisQueue

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Redis消息队列注解*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {/*** 队列名*/String value();
}

RedisQueueInit

import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** 初始化Redis队列监听器** @author 十八* @createTime 2024-09-09 22:49*/
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {public static final String REDIS_QUEUE_PREFIX = "redis-queue";final AtomicBoolean shutdownRequested = new AtomicBoolean(false);@Resourceprivate RedissonClient redissonClient;private ExecutorService executorService;public static String buildQueueName(String queueName) {return REDIS_QUEUE_PREFIX + ":" + queueName;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class);if (!queueListeners.isEmpty()) {executorService = createThreadPool();for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) {RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);if (redisQueue != null) {String queueName = redisQueue.value();executorService.submit(() -> listenQueue(queueName, entry.getValue()));}}}}private ExecutorService createThreadPool() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,Runtime.getRuntime().availableProcessors() * 4,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100),new NamedThreadFactory(REDIS_QUEUE_PREFIX),new ThreadPoolExecutor.CallerRunsPolicy());}private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {queueName = buildQueueName(queueName);RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);log.info("Redis队列监听开启: {}", queueName);while (!shutdownRequested.get() && !redissonClient.isShutdown()) {try {Object message = blockingQueue.take();executorService.submit(() -> redisQueueListener.consume(message));} catch (RedissonShutdownException e) {log.info("Redis连接关闭,停止监听队列: {}", queueName);break;} catch (Exception e) {log.error("监听队列异常: {}", queueName, e);}}}public void shutdown() {if (executorService != null) {executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}shutdownRequested.set(true);if (redissonClient != null && !redissonClient.isShuttingDown()) {redissonClient.shutdown();}}private static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String prefix) {this.namePrefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());}}}

RedisQueueListener

/*** Redis消息队列监听实现** @author 十八* @createTime 2024-09-09 22:51*/
public interface RedisQueueListener<T> {/*** 队列消费方法** @param content 消息内容*/void consume(T content);
}

RedisQueueService

import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;/*** Redis 消息队列服务** @author 十八* @createTime 2024-09-09 22:52*/
@Component
public class RedisQueueService {@Resourceprivate RedissonClient redissonClient;/*** 添加队列** @param queueName 队列名称* @param content   消息* @param <T>       泛型*/public <T> void send(String queueName, T content) {RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));blockingQueue.add(content);}/*** 添加延迟队列** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param timeUnit  单位* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, timeUnit);}/*** 发送延迟队列消息(单位毫秒)** @param queueName 队列名称* @param content   消息类型* @param delay     延迟时间* @param <T>       泛型*/public <T> void sendDelay(String queueName, T content, long delay) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);}
}

测试

创建监听对象

import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @author 十八* @createTime 2024-09-10 00:09*/
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {@Overridepublic void invoke(String content) {log.info("队列消息接收 >>> {}", content);}
}

测试用例

import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 十八* @createTime 2024-09-10 00:11*/
@RestController
@RequestMapping("queue")
public class QueueController {@Resourceprivate RedisQueueService redisQueueService;@PostMapping("send")public void send(String message) {redisQueueService.send("test", message);redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);}}

测试结果

相关文章:

Spring Boot 集成 Redisson 实现消息队列

包含组件内容 RedisQueue&#xff1a;消息队列监听标识RedisQueueInit&#xff1a;Redis队列监听器RedisQueueListener&#xff1a;Redis消息队列监听实现RedisQueueService&#xff1a;Redis消息队列服务工具 代码实现 RedisQueue import java.lang.annotation.ElementTyp…...

go语言Map详解

Map Go语言中提供的映射关系容器为map&#xff0c;其内部使用散列表&#xff08;hash&#xff09;实现 map是一种无序的基于key-value的数据结构&#xff0c;Go语言中的map是引用类型&#xff0c;必须初始化才能使用。 它提供了高效的查找、插入和删除操作&#xff0c;非常适…...

C++——已知数组a[6]={1,3,5,7,9};输入一个数值,要求按照现有排序规律将它放入数组当中。

没注释的源代码 #include <iostream> using namespace std; int main() { int a[6]{1,3,5,7,9}; int n,i,j; cout<<"请输入一个数值&#xff1a;"; cin>>n; for(int i0;i<4;i) { if(n<a[i]) { …...

云计算第四阶段---CLOUD Day7---Day8

CLOUD 07 一、Dockerfile详细解析 指令说明FROM指定基础镜像&#xff08;唯一&#xff09;RUN在容器内执行命令&#xff0c;可以写多条ADD把文件拷贝到容器内&#xff0c;如果文件是 tar.xx 格式&#xff0c;会自动解压COPY把文件拷贝到容器内&#xff0c;不会自动解压ENV设置…...

深入解析ThingsBoard与ThingsKit物联网平台的差异

VS 在物联网(IoT)领域&#xff0c;平台的选择对于企业来说至关重要。本文将深入探讨ThingsBoard社区版与ThingsKit企业版这两个物联网平台的差异&#xff0c;帮助读者更好地理解它们的特色和适用场景。 系统相同点 首先&#xff0c;ThingsBoard社区版和ThingsKit企业版都基于…...

五、CAN总线

目录 一、基础知识 1、can介绍 2、CAN硬件电路 3、CAN电平标准 4、CAN收发器芯片介绍 5、CAN帧格式 ① CAN帧种类 ② CAN数据帧 ③ CAN遥控帧​编辑 ④ 位填充 ⑤ 波形实例 6、接收方数据采样 ① 接收方数据采样遇到的问题 ② 位时序 ③ 硬同步 ④ 再同步 ⑤ 波…...

Linux:终端(terminal)与终端管理器(agetty)

终端的设备文件 打开/dev目录可以发现其中有许多字符设备文件&#xff0c;例如对于我的RedHat操作系统&#xff0c;拥有tty0到tty59&#xff0c;它们是操作系统提供的终端设备。对于tty1-tty12使用ctrlaltF*可以进行快捷切换&#xff0c;下面的命令可以进行通用切换。 sudo ch…...

钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句

钉钉与MySQL对接集成获取部门列表2.0打通EXECUTE语句 接入系统&#xff1a;钉钉 钉钉是阿里巴巴集团打造的企业级智能移动办公平台&#xff0c;是数字经济时代的企业组织协同办公和应用开发平台。钉钉将IM即时沟通、钉钉文档、钉闪会、钉盘、Teambition、OA审批、智能人事、钉工…...

微信小程序点赞动画特效实现

这里提供两种实现点赞动画特效的方法&#xff1a; 方法一&#xff1a;使用 CSS 动画 wxml 文件: <view class"like-container"><image src"{{isLiked ? likedImg : unlikedImg}}" class"like-icon {{isLiked ? liked : }}" bindta…...

Day25笔记-普通文件读写with上下文二进制文件csv文件

一、文件读写【重点掌握】 常见文件的读写分类&#xff1a; ​ 1.普通文件文件,如txt&#xff0c;py&#xff0c;html等 ​ 2.二进制文件&#xff0c;如图片&#xff0c;音频&#xff0c;视频&#xff0c;压缩包等 ​ 3.csv文件&#xff0c;如csv,需要借助于系统模块csv ​ 4.对…...

MySQL安装教程

MySQL安装教程 如果需要删除原有mysql&#xff0c;然后安装过新的&#xff0c;可以参照如何彻底卸载旧mysql重装测试 1. 准备资源 mysql官网直达&#xff1a;https://dev.mysql.com/downloads/mysql/ CADN&#xff1a;https://download.csdn.net/download/luocong321/89592962 …...

【Windows】快速帮你解决如何找到 Windows 上的 .condarc 文件

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…...

『正版软件』XYplorer 专业的 Windows 文件管理工具软件

在数字化时代&#xff0c;我们每天都在与各种文件打交道。无论是工作文档、个人照片还是多媒体资料&#xff0c;管理这些文件的效率直接关系到我们的工作效率和生活体验。今天&#xff0c;我要向大家推荐一款功能强大、操作简便的文件管理软件 —— XYplorer。 XYplorer&#x…...

“吉林一号”宽幅02B系列卫星

离轴四反光学成像系统 1.光学系统参数&#xff1a; 焦距&#xff1a;77.5mm&#xff1b; F/#&#xff1a;7.4&#xff1b; 视场&#xff1a;≥56゜&#xff1b; 光谱范围&#xff1a;400nm&#xff5e;1000nm。 2.说明&#xff1a; 光学系统采用离轴全反射式结构&#xff0c;整…...

我的AI工具箱Tauri版-FasterWhisper音频转文本

本教程基于自研的AI工具箱Tauri版进行FasterWhisper音频转文本服务。 FasterWhisper音频转文本服务 是自研AI工具箱Tauri版中的一款模块&#xff0c;专门用于将音频或视频中的语音内容自动转化为文本或字幕。通过简单的配置&#xff0c;该工具能够批量处理大量音频或视频文件&…...

Java后端中的延迟队列实现:使用Redis与RabbitMQ的不同策略

Java后端中的延迟队列实现&#xff1a;使用Redis与RabbitMQ的不同策略 大家好&#xff0c;我是微赚淘客返利系统3.0的小编&#xff0c;是个冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 在后端开发中&#xff0c;延迟队列&#xff08;Delayed Queue&#xff09…...

Linux中使用cp命令的 -f 选项,但还是提醒覆盖的问题

问题&#xff1a; linux 在执行cp的命令的时候&#xff0c;就算是执行 cp -f 也还是会提醒是否要进行替换。 问题原因&#xff1a; 查看别名&#xff0c;alias命令&#xff0c;看到cp的别名为cp -i&#xff0c;那就是说cp本身就是自带覆盖提醒&#xff0c;就算我们加上-f 的…...

互联网技术的持续演进:从现在到未来

互联网技术的持续演进&#xff1a;从现在到未来 在过去的十年里&#xff0c;互联网技术发生了飞速变化。无论是大数据、人工智能&#xff0c;还是5G网络和物联网&#xff0c;每一种技术的突破都在改变我们的生活方式和工作模式。作为现代社会的核心驱动力&#xff0c;互联网技…...

vscode安装ESLint与Vetur插件后自动修复代码不生效

vscode安装ESLint与Vetur插件后自动修复代码不生效 1、安装ESLint 和 Vuter 2、运行结果 2.1、代码保存时代码中的分号;能被检测出来,但是不会自动修复 2.2、手动运行ESLint 修复命令(在终端中执行 npx eslint . --fix)可以修复问题 3、解决办法 在.vscode目录下setti…...

2848、与车相交的点

2848、[简单] 与车相交的点 1、题目描述 给你一个下标从 0 开始的二维整数数组 nums 表示汽车停放在数轴上的坐标。对于任意下标 i&#xff0c;nums[i] [starti, endi] &#xff0c;其中 starti 是第 i 辆车的起点&#xff0c;endi 是第 i 辆车的终点。 返回数轴上被车 任意…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务

通过akshare库&#xff0c;获取股票数据&#xff0c;并生成TabPFN这个模型 可以识别、处理的格式&#xff0c;写一个完整的预处理示例&#xff0c;并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务&#xff0c;进行预测并输…...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

20个超级好用的 CSS 动画库

分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码&#xff0c;而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库&#xff0c;可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画&#xff0c;可以包含在你的网页或应用项目中。 3.An…...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...

给网站添加live2d看板娘

给网站添加live2d看板娘 参考文献&#xff1a; stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下&#xff0c;文章也主…...

【C++】纯虚函数类外可以写实现吗?

1. 答案 先说答案&#xff0c;可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 &#xff0c;不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源&#xff08;最常用&#xff09; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...