当前位置: 首页 > news >正文

rabbitmq 延时队列

要使用 RabbitMQ Delayed Message Plugin 实现延时队列,首先需要确保插件已安装并启用。以下是实现延时队列的步骤和代码示例。

1. 安装 RabbitMQ Delayed Message Plugin

首先,确保你的 RabbitMQ 安装了 rabbitmq-delayed-message-exchange 插件。你可以通过以下命令安装和启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2. 创建交换机和队列

你需要创建一个 延时交换机x-delayed-message)和一个普通队列。我们将在发送消息时指定延迟时间。

3. 发送延迟消息的代码示例

假设你已经在 RabbitMQ 中设置了延时交换机。以下是使用 Java 和 Spring AMQP 发送延迟消息的代码示例。

Maven 依赖

确保你的项目中已经添加了 Spring AMQP 相关依赖:

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.4.6</version>  <!-- 适配你使用的版本 -->
</dependency>

配置延时交换机和队列

你需要配置一个 延时交换机队列,并设置消息的延迟时间。

@Configuration
public class RabbitConfig {// 创建一个延时交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> arguments = new HashMap<>();// 设定交换机类型为延时交换机arguments.put("x-delayed-type", "direct");return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, arguments);}// 创建队列@Beanpublic Queue delayedQueue() {return new Queue("delayed-queue", true);}// 将队列绑定到延时交换机@Beanpublic Binding binding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed.routing.key").noargs();}
}

发送延迟消息

在消息发送时,你需要通过设置消息的属性来指定延迟时间。可以使用 AMQP.BasicProperties 来设置消息的 x-delay 属性,这个值表示延迟的时间(单位:毫秒)。

@Service
public class MessageProducer {@Autowiredprivate AmqpTemplate amqpTemplate;public void sendDelayedMessage(String message, int delayMilliseconds) {// 创建消息属性,并设置延迟时间MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(delayMilliseconds);  // 设置延迟时间(毫秒)Message messageObj = new Message(message.getBytes(), messageProperties);// 发送消息到延时交换机amqpTemplate.send("delayed-exchange", "delayed.routing.key", messageObj);System.out.println("Sent delayed message: " + message + " with delay: " + delayMilliseconds + " ms");}
}

在上面的代码中,setDelay(delayMilliseconds) 方法设置了延迟时间。这个时间会告诉 RabbitMQ 延迟多久后将消息投递到队列中。

监听消息

最后,你需要设置消费者来监听这个延时队列,并处理接收到的消息:

@Service
public class MessageConsumer {@RabbitListener(queues = "delayed-queue")public void consume(String message) {System.out.println("Received delayed message: " + message);}
}

4. 测试发送延迟消息

现在,你可以在业务逻辑中调用 sendDelayedMessage 方法发送延时消息。例如,发送一条延迟 10 秒的消息:

@Autowired
private MessageProducer messageProducer;public void testDelay() {// 发送一条延迟10秒的消息messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}

5. 启动和测试

  1. 启动你的 Spring Boot 应用。
  2. 调用 testDelay 方法发送延迟消息。
  3. 你将看到消息在队列中延迟指定的时间(例如,10秒)后被消费。

关键点:

  • 通过 x-delayed-message 交换机,设置 x-delayed-typedirecttopic,根据需求选择交换机类型。
  • 使用 setDelay 方法设置延迟时间,单位是毫秒。
  • RabbitMQ 会在指定的时间到达后,将消息投递到目标队列。

总结

通过 RabbitMQ Delayed Message Plugin,你可以非常方便地实现延时队列。只需要创建一个支持延迟的交换机,并通过设置 x-delay 属性来指定消息的延迟时间。

配置并行消费

要启动多个消费者并并行处理 RabbitMQ 中的消息,通常可以通过 Spring AMQPRabbitListener 实现。这将帮助你加快消费速度,提升系统的吞吐量。下面是如何启动多个消费者进行并行消费的代码修改步骤:

1. 配置多个消费者

Spring AMQP 支持使用 @RabbitListener 注解启动多个消费者实例。通过配置 并行消费者,Spring 会为每个消费者实例分配一个独立的线程来处理消息。

2. 增加消费者并发处理能力

为了实现并发消费,我们可以通过以下几种方式:

  • 使用 @RabbitListener 启动多个消费者实例:每个 @RabbitListener 注解的消费者都会独立地消费队列中的消息。
  • 配置 SimpleMessageListenerContainer 的并发设置:通过配置 SimpleMessageListenerContainer,你可以设置多个消费者同时监听队列,从而提高并发消费能力。

3. 代码修改示例

1) 创建并发消费者

首先,创建一个通用的消息监听器,并将 @RabbitListener 注解应用于多个消费者实例上。你可以通过 @RabbitListener 注解中的 concurrency 属性来设置消费者的并发数量。

@Service
public class ConcurrentMessageConsumer {// 使用 @RabbitListener 注解配置多个并发消费者,默认启动2个消费者@RabbitListener(queues = "delayed-queue", concurrency = "3-5")  // 设置并发消费者数目 3-5 个消费者public void consume(String message) {System.out.println("Thread: " + Thread.currentThread().getName() + " - Received message: " + message);}
}

在上面的代码中,concurrency = "3-5" 表示 Spring 会启动 3 到 5 个消费者实例来并行处理队列中的消息。消费者数目是动态的,具体数量由 Spring 的消息监听容器控制。

  • "3-5" 表示最低启动 3 个消费者,最多启动 5 个消费者来并行处理消息。
  • 如果消息量很大,Spring 会动态调整消费者的数量,以适应系统的负载。

2) 配置并发消费者的线程池(可选)

为了更好地控制消费者的线程池和消息消费的并发度,你可以通过配置 SimpleMessageListenerContainer 来定义更具体的并发设置。例如,你可以在 Spring 配置类中手动定义消费者容器。

@Configuration
public class RabbitConfig {@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,MessageListener messageListener) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("delayed-queue");container.setMessageListener(messageListener);// 设置并发消费的最小值和最大值container.setConcurrentConsumers(3);  // 最小3个消费者container.setMaxConcurrentConsumers(10);  // 最大10个消费者return container;}
}
  • setConcurrentConsumers(3):设置最小消费者数量。
  • setMaxConcurrentConsumers(10):设置最大消费者数量,Spring 会根据消息的积压情况动态调整消费者的数量。

3) 控制消费者的负载和流量

如果你希望更精细地控制消息消费的负载,可以使用 @RabbitListener 注解中的 acknowledgeMode 设置来调整消息确认模式,确保消息被正确地处理和确认。例如,使用 MANUAL 手动确认消费:

@RabbitListener(queues = "delayed-queue", ackMode = "MANUAL")
public void consumeWithAck(Message message, Channel channel) throws IOException {try {// 消费消息System.out.println("Consumed message: " + new String(message.getBody()));// 手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理异常,手动拒绝消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}

通过手动确认,你可以更好地控制消息的确认和失败重试机制,防止在消费者挂掉的情况下丢失消息。

4. 测试并发消费

你可以通过调用 testDelay 方法或者其他方式,发送延时消息来验证并发消费是否生效。发送的消息会被多个消费者并行处理,输出的日志中会显示哪个线程消费了哪个消息,从而验证消费者的并发能力。

@Autowired
private MessageProducer messageProducer;public void testDelay() {// 发送一条延迟10秒的消息messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}

5. 总结

通过配置多个并发消费者来加速消息消费,有以下几个要点:

  • 使用 @RabbitListener(concurrency = "3-5") 注解来启动多个并发消费者。
  • 配置 SimpleMessageListenerContainer 来更灵活地管理消费者线程池。
  • 使用手动确认模式(ackMode = "MANUAL")可以更精细地控制消息确认和失败重试。

通过这些配置,你可以根据消息量的大小和系统负载动态调整消费者数量,以达到加快消费速度的目的。

相关文章:

rabbitmq 延时队列

要使用 RabbitMQ Delayed Message Plugin 实现延时队列&#xff0c;首先需要确保插件已安装并启用。以下是实现延时队列的步骤和代码示例。 1. 安装 RabbitMQ Delayed Message Plugin 首先&#xff0c;确保你的 RabbitMQ 安装了 rabbitmq-delayed-message-exchange 插件。你可…...

idea + Docker + 阿里镜像服务打包部署

一、下载docker desktop软件 官网下载docker desktop&#xff0c;需要结合wsl使用 启动成功的画面(如果不是这个画面例如一直处理start或者是stop需要重新启动&#xff0c;不行就重启电脑) 打包成功的镜像在这里&#xff0c;如果频繁打包会导致磁盘空间被占满&#xff0c;需…...

Vue 3 零基础入门:从计数器应用开始你的工程化之旅 - 深入理解 Vue 3 响应式系统

引言 欢迎来到 Vue 3 + 现代前端工程化 系列技术博客! 本系列博客旨在通过每日构建一个小项目,帮助您深入学习 Vue 3 的各项核心特性,并掌握现代前端工程化的实践技能。 在接下来的系列文章中,我们将从零开始,由浅入深,逐步构建一系列实用的小型应用。 今天,作为本系列…...

批量将手机照片修改为一寸白底证件照的方法

生活中经常需要用到一寸白底证件照&#xff0c;但每次去照相馆拍摄既费时又麻烦。其实&#xff0c;利用手机拍照和批量证件照生成工具&#xff0c;就能轻松批量修改手机照片为一寸白底证件照。 首先&#xff0c;在电脑浏览器中打开【报名电子照助手】&#xff0c;找到“批量证件…...

【Docker基础】理解 Docker:本质、性质、架构与核心组件

文章目录 Docker 本质Docker 的引擎迭代Docker 和虚拟机的区别Docker 为什么比虚拟机资源利用率高&#xff0c;速度快&#xff1f;Docker 和 JVM 虚拟化的区别Docker 版本1. LXC (Linux Containers)2. libcontainer3. Moby4. docker-ce5. docker-ee总结&#xff1a; Docker 架构…...

LeetCodehot 力扣热题100 全排列

这段代码的目的是计算给定整数数组的所有全排列&#xff08;permutations&#xff09;&#xff0c;并返回一个包含所有排列的二维数组。 思路解析 在这段代码中&#xff0c;采用了 深度优先搜索&#xff08;DFS&#xff09; 和 回溯 的方法来生成所有的排列。 关键步骤&#xf…...

SQL笔记#数据更新

一、数据的插入(INSERT语句的使用方法) 1、什么是INSERT 首先通过CREATE TABLE语句创建表,但创建的表中没有数据;再通过INSERT语句向表中插入数据。 --创建表ProductIns CREATE TABLE ProductIns (product_id CHAR(4) NOT NULL,product_name VARCHAR(1…...

GCC 和 G++的基本使用

GCC 和 G 命令 GCC 和 G 命令GCC&#xff08;GNU C 编译器&#xff09;基本用法常用选项示例 G&#xff08;GNU C 编译器&#xff09;基本用法常用选项示例 GCC 与 G 的区别选择使用 GCC 还是 G C编译流程1. 预处理&#xff08;Preprocessing&#xff09;2. 编译&#xff08;Co…...

Maven中一些基础知识点

早些时候只知道创建或者开发springboot项目时候&#xff0c;有一个叫pom.xml的文件可以用来管理项目所需的依赖/第三方工具。 索性稍微深入了解了一下&#xff0c;然后把自己认为重要的记录下来。 首先我们要引入新的依赖自然是在dependencies下写dependency&#xff0c;这个…...

论文阅读笔记:Deep Face Recognition: A Survey

论文阅读笔记&#xff1a;Deep Face Recognition: A Survey 1 介绍2 总览2.1 人脸识别组件2.1.1 人脸处理2.1.2 深度特征提取2.1.3 基于深度特征的人脸对比 3 网络结构和损失函数3.1 判别损失函数的演化3.1.1 基于欧式距离的损失3.1.2 基于角度/余弦边距的损失3.1.3 Softmax损失…...

JVM生产环境问题定位与解决实战(三):揭秘Java飞行记录器(JFR)的强大功能

提到飞行记录器&#xff0c;或许你的脑海中并未立刻浮现出清晰的画面&#xff0c;但一说起“黑匣子”&#xff0c;想必大多数人都能恍然大悟&#xff0c;知晓其重要性及用途。在航空领域&#xff0c;黑匣子作为不可或缺的设备&#xff0c;默默记录着飞行过程中的每一项关键数据…...

爬虫框架与库

爬虫框架与库是用于网络数据抓取的核心工具&#xff0c;帮助开发者高效地从网页中提取结构化数据。 Requests&#xff1a;用于发送HTTP请求。 BeautifulSoup&#xff1a;用于解析HTML和XML。 Scrapy&#xff1a;强大的爬虫框架&#xff0c;适合大规模爬取。 Selenium&#…...

PyTorch常用函数总结(持续更新)

本文主要记录自己在用 PyTorch复现经典模型 过程中遇到的一些函数及用法&#xff0c;以期对 常见PyTorch函数 更加熟练~ 官方Docs&#xff1a;PyTorch documentation — PyTorch 2.6 documentation 目录 数据层面 torch.sign(tensor) torch.tensor(np.eye(3)[y]) torch.on…...

代码异常(js中push)NO.4

1. 环境 Vue3&#xff0c;Element Plsu 2. 示例代码 const { updateBy, updateTime, ...curObj } form.valuecurObj.id props.tableData.length 1var newTableData props.tableData.push(curObj)updateTableData(newTableData)3. 情景描述 newTableData变成了整数&#…...

Anaconda 2025 最新版安装与Python环境配置指南(附官方下载链接)

一、软件定位与核心功能 Anaconda 2025 是Python/R数据科学集成开发平台&#xff0c;预装1500科学计算库&#xff0c;新增AI模型可视化调试、多环境GPU加速等特性。相较于传统Python安装&#xff0c;其优势包括&#xff1a; 环境隔离&#xff1a;通过conda工具实现多版本Pyth…...

Vue 中动态实现进度条

在 Vue 中动态实现进度条&#xff0c;基本上有两种常见的方法&#xff1a;直接通过 Vue 数据绑定控制样式&#xff0c;或者利用外部库来实现更复杂的功能。我们会深入探讨这两种方式&#xff0c;并且详细说明每种方法的实现步骤、优缺点以及使用场景。 1. 使用 Vue 数据绑定来…...

CSS滚动条原理与自定义样式指南,CSS滚动条样式失效,滚动条样式无效,-webkit-scrollbar无效,overflow不显示滚动条

滚动内容形成的必要条件 CSS Overflow属性解析 MDN官方文档-Overflow属性 菜鸟教程-Overflow属性 overflow 属性控制内容溢出元素框时在对应的元素区间内是否添加滚动条。 值描述visible默认值。内容不会被修剪&#xff0c;会呈现在元素框之外。hidden内容会被修剪&#xf…...

Three.js 入门(辅助、位移、父子关系、缩放旋转、响应式布局)

本篇主要学习内容 : 三维坐标系与辅助坐标系物体位移与父子元素物体的缩放与物体的旋转设置响应式画布与全屏控制 点赞 关注 收藏 学会了 本文使用 Three.js 的版本&#xff1a;171 基于 Vue3vite开发调试 1.三维坐标系与辅助坐标系 1.1) 导入three和轨道控制器 // 导入…...

python算法-用递归打印数字3的幂--Day017

文章目录 前言采用创新方式&#xff0c;精选趣味、实用性强的例子&#xff0c;从不同难度、不同算法、不同类型和不同数据结构进行总结&#xff0c;全面提升算法能力。例1.用递归打印数字例2.相对排名 总结 前言 采用创新方式&#xff0c;精选趣味、实用性强的例子&#xff0c…...

Selenium 与 Coze 集成

涵盖两者的基本概念、集成步骤、代码示例以及相关注意事项。 基本概念 Selenium:是一个用于自动化浏览器操作的工具集,支持多种浏览器(如 Chrome、Firefox 等),能够模拟用户在浏览器中的各种操作,如点击、输入文本、选择下拉框等,常用于 Web 应用的自动化测试。Coze:它…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module

1、为什么要修改 CONNECT 报文&#xff1f; 多租户隔离&#xff1a;自动为接入设备追加租户前缀&#xff0c;后端按 ClientID 拆分队列。零代码鉴权&#xff1a;将入站用户名替换为 OAuth Access-Token&#xff0c;后端 Broker 统一校验。灰度发布&#xff1a;根据 IP/地理位写…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

如何应对敏捷转型中的团队阻力

应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中&#xff0c;明确沟通敏捷转型目的尤为关键&#xff0c;团队成员只有清晰理解转型背后的原因和利益&#xff0c;才能降低对变化的…...

提升移动端网页调试效率:WebDebugX 与常见工具组合实践

在日常移动端开发中&#xff0c;网页调试始终是一个高频但又极具挑战的环节。尤其在面对 iOS 与 Android 的混合技术栈、各种设备差异化行为时&#xff0c;开发者迫切需要一套高效、可靠且跨平台的调试方案。过去&#xff0c;我们或多或少使用过 Chrome DevTools、Remote Debug…...

macOS 终端智能代理检测

&#x1f9e0; 终端智能代理检测&#xff1a;自动判断是否需要设置代理访问 GitHub 在开发中&#xff0c;使用 GitHub 是非常常见的需求。但有时候我们会发现某些命令失败、插件无法更新&#xff0c;例如&#xff1a; fatal: unable to access https://github.com/ohmyzsh/oh…...