rabbitmq 延时队列
要使用 RabbitMQ Delayed Message Plugin 实现延时队列,首先需要确保插件已安装并启用。以下是实现延时队列的步骤和代码示例。
1. 安装 RabbitMQ Delayed Message Plugin
首先,确保你的 RabbitMQ 安装了 rabbitmq-delayed-message-exchange 插件。你可以通过以下命令安装和启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2. 创建交换机和队列
你需要创建一个 延时交换机(x-delayed-message)和一个普通队列。我们将在发送消息时指定延迟时间。
3. 发送延迟消息的代码示例
假设你已经在 RabbitMQ 中设置了延时交换机。以下是使用 Java 和 Spring AMQP 发送延迟消息的代码示例。
Maven 依赖
确保你的项目中已经添加了 Spring AMQP 相关依赖:
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.4.6</version> <!-- 适配你使用的版本 -->
</dependency>
配置延时交换机和队列
你需要配置一个 延时交换机 和 队列,并设置消息的延迟时间。
@Configuration
public class RabbitConfig {// 创建一个延时交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> arguments = new HashMap<>();// 设定交换机类型为延时交换机arguments.put("x-delayed-type", "direct");return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, arguments);}// 创建队列@Beanpublic Queue delayedQueue() {return new Queue("delayed-queue", true);}// 将队列绑定到延时交换机@Beanpublic Binding binding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed.routing.key").noargs();}
}
发送延迟消息
在消息发送时,你需要通过设置消息的属性来指定延迟时间。可以使用 AMQP.BasicProperties 来设置消息的 x-delay 属性,这个值表示延迟的时间(单位:毫秒)。
@Service
public class MessageProducer {@Autowiredprivate AmqpTemplate amqpTemplate;public void sendDelayedMessage(String message, int delayMilliseconds) {// 创建消息属性,并设置延迟时间MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(delayMilliseconds); // 设置延迟时间(毫秒)Message messageObj = new Message(message.getBytes(), messageProperties);// 发送消息到延时交换机amqpTemplate.send("delayed-exchange", "delayed.routing.key", messageObj);System.out.println("Sent delayed message: " + message + " with delay: " + delayMilliseconds + " ms");}
}
在上面的代码中,setDelay(delayMilliseconds) 方法设置了延迟时间。这个时间会告诉 RabbitMQ 延迟多久后将消息投递到队列中。
监听消息
最后,你需要设置消费者来监听这个延时队列,并处理接收到的消息:
@Service
public class MessageConsumer {@RabbitListener(queues = "delayed-queue")public void consume(String message) {System.out.println("Received delayed message: " + message);}
}
4. 测试发送延迟消息
现在,你可以在业务逻辑中调用 sendDelayedMessage 方法发送延时消息。例如,发送一条延迟 10 秒的消息:
@Autowired
private MessageProducer messageProducer;public void testDelay() {// 发送一条延迟10秒的消息messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}
5. 启动和测试
- 启动你的 Spring Boot 应用。
- 调用
testDelay方法发送延迟消息。 - 你将看到消息在队列中延迟指定的时间(例如,10秒)后被消费。
关键点:
- 通过
x-delayed-message交换机,设置x-delayed-type为direct或topic,根据需求选择交换机类型。 - 使用
setDelay方法设置延迟时间,单位是毫秒。 - RabbitMQ 会在指定的时间到达后,将消息投递到目标队列。
总结
通过 RabbitMQ Delayed Message Plugin,你可以非常方便地实现延时队列。只需要创建一个支持延迟的交换机,并通过设置 x-delay 属性来指定消息的延迟时间。
配置并行消费
要启动多个消费者并并行处理 RabbitMQ 中的消息,通常可以通过 Spring AMQP 和 RabbitListener 实现。这将帮助你加快消费速度,提升系统的吞吐量。下面是如何启动多个消费者进行并行消费的代码修改步骤:
1. 配置多个消费者
Spring AMQP 支持使用 @RabbitListener 注解启动多个消费者实例。通过配置 并行消费者,Spring 会为每个消费者实例分配一个独立的线程来处理消息。
2. 增加消费者并发处理能力
为了实现并发消费,我们可以通过以下几种方式:
- 使用
@RabbitListener启动多个消费者实例:每个@RabbitListener注解的消费者都会独立地消费队列中的消息。 - 配置
SimpleMessageListenerContainer的并发设置:通过配置SimpleMessageListenerContainer,你可以设置多个消费者同时监听队列,从而提高并发消费能力。
3. 代码修改示例
1) 创建并发消费者
首先,创建一个通用的消息监听器,并将 @RabbitListener 注解应用于多个消费者实例上。你可以通过 @RabbitListener 注解中的 concurrency 属性来设置消费者的并发数量。
@Service
public class ConcurrentMessageConsumer {// 使用 @RabbitListener 注解配置多个并发消费者,默认启动2个消费者@RabbitListener(queues = "delayed-queue", concurrency = "3-5") // 设置并发消费者数目 3-5 个消费者public void consume(String message) {System.out.println("Thread: " + Thread.currentThread().getName() + " - Received message: " + message);}
}
在上面的代码中,concurrency = "3-5" 表示 Spring 会启动 3 到 5 个消费者实例来并行处理队列中的消息。消费者数目是动态的,具体数量由 Spring 的消息监听容器控制。
"3-5"表示最低启动 3 个消费者,最多启动 5 个消费者来并行处理消息。- 如果消息量很大,Spring 会动态调整消费者的数量,以适应系统的负载。
2) 配置并发消费者的线程池(可选)
为了更好地控制消费者的线程池和消息消费的并发度,你可以通过配置 SimpleMessageListenerContainer 来定义更具体的并发设置。例如,你可以在 Spring 配置类中手动定义消费者容器。
@Configuration
public class RabbitConfig {@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,MessageListener messageListener) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("delayed-queue");container.setMessageListener(messageListener);// 设置并发消费的最小值和最大值container.setConcurrentConsumers(3); // 最小3个消费者container.setMaxConcurrentConsumers(10); // 最大10个消费者return container;}
}
setConcurrentConsumers(3):设置最小消费者数量。setMaxConcurrentConsumers(10):设置最大消费者数量,Spring 会根据消息的积压情况动态调整消费者的数量。
3) 控制消费者的负载和流量
如果你希望更精细地控制消息消费的负载,可以使用 @RabbitListener 注解中的 acknowledgeMode 设置来调整消息确认模式,确保消息被正确地处理和确认。例如,使用 MANUAL 手动确认消费:
@RabbitListener(queues = "delayed-queue", ackMode = "MANUAL")
public void consumeWithAck(Message message, Channel channel) throws IOException {try {// 消费消息System.out.println("Consumed message: " + new String(message.getBody()));// 手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理异常,手动拒绝消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}
通过手动确认,你可以更好地控制消息的确认和失败重试机制,防止在消费者挂掉的情况下丢失消息。
4. 测试并发消费
你可以通过调用 testDelay 方法或者其他方式,发送延时消息来验证并发消费是否生效。发送的消息会被多个消费者并行处理,输出的日志中会显示哪个线程消费了哪个消息,从而验证消费者的并发能力。
@Autowired
private MessageProducer messageProducer;public void testDelay() {// 发送一条延迟10秒的消息messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}
5. 总结
通过配置多个并发消费者来加速消息消费,有以下几个要点:
- 使用
@RabbitListener(concurrency = "3-5")注解来启动多个并发消费者。 - 配置
SimpleMessageListenerContainer来更灵活地管理消费者线程池。 - 使用手动确认模式(
ackMode = "MANUAL")可以更精细地控制消息确认和失败重试。
通过这些配置,你可以根据消息量的大小和系统负载动态调整消费者数量,以达到加快消费速度的目的。
相关文章:
rabbitmq 延时队列
要使用 RabbitMQ Delayed Message Plugin 实现延时队列,首先需要确保插件已安装并启用。以下是实现延时队列的步骤和代码示例。 1. 安装 RabbitMQ Delayed Message Plugin 首先,确保你的 RabbitMQ 安装了 rabbitmq-delayed-message-exchange 插件。你可…...
idea + Docker + 阿里镜像服务打包部署
一、下载docker desktop软件 官网下载docker desktop,需要结合wsl使用 启动成功的画面(如果不是这个画面例如一直处理start或者是stop需要重新启动,不行就重启电脑) 打包成功的镜像在这里,如果频繁打包会导致磁盘空间被占满,需…...
Vue 3 零基础入门:从计数器应用开始你的工程化之旅 - 深入理解 Vue 3 响应式系统
引言 欢迎来到 Vue 3 + 现代前端工程化 系列技术博客! 本系列博客旨在通过每日构建一个小项目,帮助您深入学习 Vue 3 的各项核心特性,并掌握现代前端工程化的实践技能。 在接下来的系列文章中,我们将从零开始,由浅入深,逐步构建一系列实用的小型应用。 今天,作为本系列…...
批量将手机照片修改为一寸白底证件照的方法
生活中经常需要用到一寸白底证件照,但每次去照相馆拍摄既费时又麻烦。其实,利用手机拍照和批量证件照生成工具,就能轻松批量修改手机照片为一寸白底证件照。 首先,在电脑浏览器中打开【报名电子照助手】,找到“批量证件…...
【Docker基础】理解 Docker:本质、性质、架构与核心组件
文章目录 Docker 本质Docker 的引擎迭代Docker 和虚拟机的区别Docker 为什么比虚拟机资源利用率高,速度快?Docker 和 JVM 虚拟化的区别Docker 版本1. LXC (Linux Containers)2. libcontainer3. Moby4. docker-ce5. docker-ee总结: Docker 架构…...
LeetCodehot 力扣热题100 全排列
这段代码的目的是计算给定整数数组的所有全排列(permutations),并返回一个包含所有排列的二维数组。 思路解析 在这段代码中,采用了 深度优先搜索(DFS) 和 回溯 的方法来生成所有的排列。 关键步骤…...
SQL笔记#数据更新
一、数据的插入(INSERT语句的使用方法) 1、什么是INSERT 首先通过CREATE TABLE语句创建表,但创建的表中没有数据;再通过INSERT语句向表中插入数据。 --创建表ProductIns CREATE TABLE ProductIns (product_id CHAR(4) NOT NULL,product_name VARCHAR(1…...
GCC 和 G++的基本使用
GCC 和 G 命令 GCC 和 G 命令GCC(GNU C 编译器)基本用法常用选项示例 G(GNU C 编译器)基本用法常用选项示例 GCC 与 G 的区别选择使用 GCC 还是 G C编译流程1. 预处理(Preprocessing)2. 编译(Co…...
Maven中一些基础知识点
早些时候只知道创建或者开发springboot项目时候,有一个叫pom.xml的文件可以用来管理项目所需的依赖/第三方工具。 索性稍微深入了解了一下,然后把自己认为重要的记录下来。 首先我们要引入新的依赖自然是在dependencies下写dependency,这个…...
论文阅读笔记:Deep Face Recognition: A Survey
论文阅读笔记:Deep Face Recognition: A Survey 1 介绍2 总览2.1 人脸识别组件2.1.1 人脸处理2.1.2 深度特征提取2.1.3 基于深度特征的人脸对比 3 网络结构和损失函数3.1 判别损失函数的演化3.1.1 基于欧式距离的损失3.1.2 基于角度/余弦边距的损失3.1.3 Softmax损失…...
JVM生产环境问题定位与解决实战(三):揭秘Java飞行记录器(JFR)的强大功能
提到飞行记录器,或许你的脑海中并未立刻浮现出清晰的画面,但一说起“黑匣子”,想必大多数人都能恍然大悟,知晓其重要性及用途。在航空领域,黑匣子作为不可或缺的设备,默默记录着飞行过程中的每一项关键数据…...
爬虫框架与库
爬虫框架与库是用于网络数据抓取的核心工具,帮助开发者高效地从网页中提取结构化数据。 Requests:用于发送HTTP请求。 BeautifulSoup:用于解析HTML和XML。 Scrapy:强大的爬虫框架,适合大规模爬取。 Selenium&#…...
PyTorch常用函数总结(持续更新)
本文主要记录自己在用 PyTorch复现经典模型 过程中遇到的一些函数及用法,以期对 常见PyTorch函数 更加熟练~ 官方Docs:PyTorch documentation — PyTorch 2.6 documentation 目录 数据层面 torch.sign(tensor) torch.tensor(np.eye(3)[y]) torch.on…...
代码异常(js中push)NO.4
1. 环境 Vue3,Element Plsu 2. 示例代码 const { updateBy, updateTime, ...curObj } form.valuecurObj.id props.tableData.length 1var newTableData props.tableData.push(curObj)updateTableData(newTableData)3. 情景描述 newTableData变成了整数&#…...
Anaconda 2025 最新版安装与Python环境配置指南(附官方下载链接)
一、软件定位与核心功能 Anaconda 2025 是Python/R数据科学集成开发平台,预装1500科学计算库,新增AI模型可视化调试、多环境GPU加速等特性。相较于传统Python安装,其优势包括: 环境隔离:通过conda工具实现多版本Pyth…...
Vue 中动态实现进度条
在 Vue 中动态实现进度条,基本上有两种常见的方法:直接通过 Vue 数据绑定控制样式,或者利用外部库来实现更复杂的功能。我们会深入探讨这两种方式,并且详细说明每种方法的实现步骤、优缺点以及使用场景。 1. 使用 Vue 数据绑定来…...
CSS滚动条原理与自定义样式指南,CSS滚动条样式失效,滚动条样式无效,-webkit-scrollbar无效,overflow不显示滚动条
滚动内容形成的必要条件 CSS Overflow属性解析 MDN官方文档-Overflow属性 菜鸟教程-Overflow属性 overflow 属性控制内容溢出元素框时在对应的元素区间内是否添加滚动条。 值描述visible默认值。内容不会被修剪,会呈现在元素框之外。hidden内容会被修剪…...
Three.js 入门(辅助、位移、父子关系、缩放旋转、响应式布局)
本篇主要学习内容 : 三维坐标系与辅助坐标系物体位移与父子元素物体的缩放与物体的旋转设置响应式画布与全屏控制 点赞 关注 收藏 学会了 本文使用 Three.js 的版本:171 基于 Vue3vite开发调试 1.三维坐标系与辅助坐标系 1.1) 导入three和轨道控制器 // 导入…...
python算法-用递归打印数字3的幂--Day017
文章目录 前言采用创新方式,精选趣味、实用性强的例子,从不同难度、不同算法、不同类型和不同数据结构进行总结,全面提升算法能力。例1.用递归打印数字例2.相对排名 总结 前言 采用创新方式,精选趣味、实用性强的例子,…...
Selenium 与 Coze 集成
涵盖两者的基本概念、集成步骤、代码示例以及相关注意事项。 基本概念 Selenium:是一个用于自动化浏览器操作的工具集,支持多种浏览器(如 Chrome、Firefox 等),能够模拟用户在浏览器中的各种操作,如点击、输入文本、选择下拉框等,常用于 Web 应用的自动化测试。Coze:它…...
SpringBoot-17-MyBatis动态SQL标签之常用标签
文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...
让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...
如何在最短时间内提升打ctf(web)的水平?
刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...
佰力博科技与您探讨热释电测量的几种方法
热释电的测量主要涉及热释电系数的测定,这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中,积分电荷法最为常用,其原理是通过测量在电容器上积累的热释电电荷,从而确定热释电系数…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...
