rabbitmq 延时队列
要使用 RabbitMQ Delayed Message Plugin 实现延时队列,首先需要确保插件已安装并启用。以下是实现延时队列的步骤和代码示例。
1. 安装 RabbitMQ Delayed Message Plugin
首先,确保你的 RabbitMQ 安装了 rabbitmq-delayed-message-exchange 插件。你可以通过以下命令安装和启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2. 创建交换机和队列
你需要创建一个 延时交换机(x-delayed-message)和一个普通队列。我们将在发送消息时指定延迟时间。
3. 发送延迟消息的代码示例
假设你已经在 RabbitMQ 中设置了延时交换机。以下是使用 Java 和 Spring AMQP 发送延迟消息的代码示例。
Maven 依赖
确保你的项目中已经添加了 Spring AMQP 相关依赖:
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.4.6</version> <!-- 适配你使用的版本 -->
</dependency>
配置延时交换机和队列
你需要配置一个 延时交换机 和 队列,并设置消息的延迟时间。
@Configuration
public class RabbitConfig {// 创建一个延时交换机@Beanpublic CustomExchange delayedExchange() {Map<String, Object> arguments = new HashMap<>();// 设定交换机类型为延时交换机arguments.put("x-delayed-type", "direct");return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, arguments);}// 创建队列@Beanpublic Queue delayedQueue() {return new Queue("delayed-queue", true);}// 将队列绑定到延时交换机@Beanpublic Binding binding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed.routing.key").noargs();}
}
发送延迟消息
在消息发送时,你需要通过设置消息的属性来指定延迟时间。可以使用 AMQP.BasicProperties 来设置消息的 x-delay 属性,这个值表示延迟的时间(单位:毫秒)。
@Service
public class MessageProducer {@Autowiredprivate AmqpTemplate amqpTemplate;public void sendDelayedMessage(String message, int delayMilliseconds) {// 创建消息属性,并设置延迟时间MessageProperties messageProperties = new MessageProperties();messageProperties.setDelay(delayMilliseconds); // 设置延迟时间(毫秒)Message messageObj = new Message(message.getBytes(), messageProperties);// 发送消息到延时交换机amqpTemplate.send("delayed-exchange", "delayed.routing.key", messageObj);System.out.println("Sent delayed message: " + message + " with delay: " + delayMilliseconds + " ms");}
}
在上面的代码中,setDelay(delayMilliseconds) 方法设置了延迟时间。这个时间会告诉 RabbitMQ 延迟多久后将消息投递到队列中。
监听消息
最后,你需要设置消费者来监听这个延时队列,并处理接收到的消息:
@Service
public class MessageConsumer {@RabbitListener(queues = "delayed-queue")public void consume(String message) {System.out.println("Received delayed message: " + message);}
}
4. 测试发送延迟消息
现在,你可以在业务逻辑中调用 sendDelayedMessage 方法发送延时消息。例如,发送一条延迟 10 秒的消息:
@Autowired
private MessageProducer messageProducer;public void testDelay() {// 发送一条延迟10秒的消息messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}
5. 启动和测试
- 启动你的 Spring Boot 应用。
- 调用
testDelay方法发送延迟消息。 - 你将看到消息在队列中延迟指定的时间(例如,10秒)后被消费。
关键点:
- 通过
x-delayed-message交换机,设置x-delayed-type为direct或topic,根据需求选择交换机类型。 - 使用
setDelay方法设置延迟时间,单位是毫秒。 - RabbitMQ 会在指定的时间到达后,将消息投递到目标队列。
总结
通过 RabbitMQ Delayed Message Plugin,你可以非常方便地实现延时队列。只需要创建一个支持延迟的交换机,并通过设置 x-delay 属性来指定消息的延迟时间。
配置并行消费
要启动多个消费者并并行处理 RabbitMQ 中的消息,通常可以通过 Spring AMQP 和 RabbitListener 实现。这将帮助你加快消费速度,提升系统的吞吐量。下面是如何启动多个消费者进行并行消费的代码修改步骤:
1. 配置多个消费者
Spring AMQP 支持使用 @RabbitListener 注解启动多个消费者实例。通过配置 并行消费者,Spring 会为每个消费者实例分配一个独立的线程来处理消息。
2. 增加消费者并发处理能力
为了实现并发消费,我们可以通过以下几种方式:
- 使用
@RabbitListener启动多个消费者实例:每个@RabbitListener注解的消费者都会独立地消费队列中的消息。 - 配置
SimpleMessageListenerContainer的并发设置:通过配置SimpleMessageListenerContainer,你可以设置多个消费者同时监听队列,从而提高并发消费能力。
3. 代码修改示例
1) 创建并发消费者
首先,创建一个通用的消息监听器,并将 @RabbitListener 注解应用于多个消费者实例上。你可以通过 @RabbitListener 注解中的 concurrency 属性来设置消费者的并发数量。
@Service
public class ConcurrentMessageConsumer {// 使用 @RabbitListener 注解配置多个并发消费者,默认启动2个消费者@RabbitListener(queues = "delayed-queue", concurrency = "3-5") // 设置并发消费者数目 3-5 个消费者public void consume(String message) {System.out.println("Thread: " + Thread.currentThread().getName() + " - Received message: " + message);}
}
在上面的代码中,concurrency = "3-5" 表示 Spring 会启动 3 到 5 个消费者实例来并行处理队列中的消息。消费者数目是动态的,具体数量由 Spring 的消息监听容器控制。
"3-5"表示最低启动 3 个消费者,最多启动 5 个消费者来并行处理消息。- 如果消息量很大,Spring 会动态调整消费者的数量,以适应系统的负载。
2) 配置并发消费者的线程池(可选)
为了更好地控制消费者的线程池和消息消费的并发度,你可以通过配置 SimpleMessageListenerContainer 来定义更具体的并发设置。例如,你可以在 Spring 配置类中手动定义消费者容器。
@Configuration
public class RabbitConfig {@Beanpublic SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,MessageListener messageListener) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("delayed-queue");container.setMessageListener(messageListener);// 设置并发消费的最小值和最大值container.setConcurrentConsumers(3); // 最小3个消费者container.setMaxConcurrentConsumers(10); // 最大10个消费者return container;}
}
setConcurrentConsumers(3):设置最小消费者数量。setMaxConcurrentConsumers(10):设置最大消费者数量,Spring 会根据消息的积压情况动态调整消费者的数量。
3) 控制消费者的负载和流量
如果你希望更精细地控制消息消费的负载,可以使用 @RabbitListener 注解中的 acknowledgeMode 设置来调整消息确认模式,确保消息被正确地处理和确认。例如,使用 MANUAL 手动确认消费:
@RabbitListener(queues = "delayed-queue", ackMode = "MANUAL")
public void consumeWithAck(Message message, Channel channel) throws IOException {try {// 消费消息System.out.println("Consumed message: " + new String(message.getBody()));// 手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理异常,手动拒绝消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}
通过手动确认,你可以更好地控制消息的确认和失败重试机制,防止在消费者挂掉的情况下丢失消息。
4. 测试并发消费
你可以通过调用 testDelay 方法或者其他方式,发送延时消息来验证并发消费是否生效。发送的消息会被多个消费者并行处理,输出的日志中会显示哪个线程消费了哪个消息,从而验证消费者的并发能力。
@Autowired
private MessageProducer messageProducer;public void testDelay() {// 发送一条延迟10秒的消息messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}
5. 总结
通过配置多个并发消费者来加速消息消费,有以下几个要点:
- 使用
@RabbitListener(concurrency = "3-5")注解来启动多个并发消费者。 - 配置
SimpleMessageListenerContainer来更灵活地管理消费者线程池。 - 使用手动确认模式(
ackMode = "MANUAL")可以更精细地控制消息确认和失败重试。
通过这些配置,你可以根据消息量的大小和系统负载动态调整消费者数量,以达到加快消费速度的目的。
相关文章:
rabbitmq 延时队列
要使用 RabbitMQ Delayed Message Plugin 实现延时队列,首先需要确保插件已安装并启用。以下是实现延时队列的步骤和代码示例。 1. 安装 RabbitMQ Delayed Message Plugin 首先,确保你的 RabbitMQ 安装了 rabbitmq-delayed-message-exchange 插件。你可…...
Deepseek 实战全攻略,领航科技应用的深度探索之旅
想玩转 Deepseek?这攻略别错过!先带你了解它的基本原理,教你搭建运行环境。接着给出自然语言处理、智能客服等应用场景的实操方法与代码。还分享模型微调、优化技巧,结合案例加深理解,让你全面掌握,探索科技…...
Go语言中的信号量:原理与实践指南
Go语言中的信号量:原理与实践指南 引言 在并发编程中,控制对共享资源的访问是一个经典问题。Go语言提供了丰富的并发原语(如sync.Mutex),但当我们需要灵活限制并发数量时,信号量(Semaphore&am…...
计算机网络与通讯知识总结
计算机网络与通讯知识总结 基础知识总结 1)FTP:文件传输 SSH:远程登录 HTTP:网址访问 2)交换机 定义:一种基于MAC地址实现局域网(LAN)内数据高速转发的网络设备,可为接入设备提供独享通信通道。 - 核心功能: 1.数据链路层(OSI第二层)工作,通过MAC地址…...
ReentrantLock 用法与源码剖析笔记
📒 ReentrantLock 用法与源码剖析笔记 🚀 一、ReentrantLock 核心特性 🔄 可重入性:同一线程可重复获取锁(最大递归次数为 Integer.MAX_VALUE)🔧 公平性:支持公平锁(按等…...
Vscode无法加载文件,因为在此系统上禁止运行脚本
1.在 vscode 终端执行 get-ExecutionPolicy 如果返回是Restricted,说明是禁止状态。 2.在 vscode 终端执行set-ExecutionPolicy RemoteSigned 爆红说明没有设置成功 3.在 vscode 终端执行Set-ExecutionPolicy -Scope CurrentUser RemoteSigned 然后成功后你再在终…...
java进阶专栏的学习指南
学习指南 java类和对象java内部类和常用类javaIO流 java类和对象 类和对象 java内部类和常用类 java内部类精讲Object类包装类的认识String类、BigDecimal类初探Date类、Calendar类、SimpleDateFormat类的认识java Random类、File类、System类初识 javaIO流 java IO流【…...
架构思维:架构的演进之路
文章目录 引言为什么架构思维如此重要架构师的特点软件架构的知识体系如何提升架构思维大型互联网系统架构的演进之路一、大型互联网系统的特点二、系统处理能力提升的两种途径三、大型互联网系统架构演化过程四、总结 引言 在软件开发行业中,有很多技术人可能会问…...
VC++零基础入门之系列教程 【附录E MFC快速参考指南】
附录E MFC快速参考指南 E.1 创建窗口 使用M F C CWnd wnd; W n d . C r e a t e E x ( E xSt y l e , C l a s s N a m e , Wi n d o w N a m e , S t y l e , x , y, Wi d t h , H e i g h t , P a r e n t , M e n u , P a r a m ) ; 使用A P I HWND hwnd=::CreateWi n d …...
vue3:vue3项目安装并引入Element-plus
一、安装Element-plus 1、安装语句位置 安装 | Element Plushttps://element-plus.org/zh-CN/guide/installation.html根据所需进行安装,这里使用npm包 2、找到项目位置 找到项目位置,在路径上输入cmd回车打开“运行”窗口 输入安装语句回车完成安装 …...
一文掌握python中正则表达式的各种使用
文章目录 1. 正则表达式基础1.1 常用元字符1.2 基本用法 2. 正则表达式高级功能2.1 分组捕获2.2 命名分组2.3 非贪婪匹配2.4 零宽断言2.5 编译正则表达式2.6 转义字符 3. 常见应用场景3.1 验证邮箱格式3.2 提取 URL3.3 提取日期3.4 提取HTML中的链接3.5 提取HTML中的图片链接3.…...
java.2.25
1. 注释 注释是对代码的解释和说明文字。 Java中的注释分为三种: 单行注释: // 这是单行注释文字多行注释: /* 这是多行注释文字 这是多行注释文字 这是多行注释文字 */ 注意:多行注释不能嵌套使用。文档注释:…...
45.matlab产生正弦叠加信号
,...
VScode 开发
目录 安装 VS Code 创建一个 Python 代码文件 安装 VS Code VSCode(全称:Visual Studio Code)是一款由微软开发且跨平台的免费源代码编辑器,VSCode 开发环境非常简单易用。 VSCode 安装也很简单,打开官网 Visual S…...
在llm和agent的背景下,有什么比较好的研究方向或者能解决现在的实际的社会问题
在llm和agent的背景下,有什么比较好的研究方向或者能解决现在的实际的社会问题 在LLM(大语言模型)与Agent(智能体)的融合背景下,研究方向和社会应用正呈现出多元化趋势。 一、技术研究方向 多模态智能体(Multi-modal Agents) 方向:将LLM与视觉、语音、触觉等多模态数…...
A Large Recurrent Action Model: xLSTM Enables Fast Inference for Robotics Tasks
奥地利林茨约翰开普勒大学机器学习研究所 ELLIS 小组,LIT 人工智能实验室奥地利林茨 NXAI 有限公司谷歌 DeepMind米拉 - 魁北克人工智能研究所 摘要 近年来,强化学习(Reinforcement Learning, RL)领域出现了一种趋势,…...
CSS按钮点击效果实战:scale(0.95) 与10个交互动画优化指南
[TOC](CSS按钮点击效果实战:scale(0.95) 与10个交互动画优化指南) 导语 在现代 Web 开发中,细腻的交互效果是提升用户体验的关键。通过简单的 CSS 动画(如 transform: scale(0.95)),无需 JavaScript 即可实现高效、流…...
计算机毕业设计SpringBoot+Vue.js学科竞赛管理系统(源码+文档+PPT+讲解)
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...
在Spring Boot+Vue前后端分离的项目中使用JWT实现基本的权限校验
说明 在 Spring Boot + Vue 前后端分离的项目中,如果不使用第三方服务(如 Spring Security、Shiro 等),可以通过自定义实现基本的权限校验。 使用JWT实现步骤 以下是实现步骤: 1. 设计权限模型 通常权限模型包括: 用户(User):系统的使用者。角色(Role):用户的权…...
Deep Seek-编码器
1. DeepSeek Coder 简介 DeepSeek Coder 由一系列代码语言模型组成,每个模型都在 2T 令牌上从头开始训练,其中 87% 的代码和 13% 的自然语言在中英文中组成。我们提供各种大小的代码模型,从 1B 到 33B 版本。每个模型都通过采用 16K 的窗口大小和额外的填空任务在项目级代码…...
在 MySQL 的 InnoDB 存储引擎中,部分数据库优化策略
在 MySQL 的 InnoDB 存储引擎中,以下操作是 同步的,并且会直接影响数据库执行 SQL 的效率: 1. Redo Log 的同步刷盘(事务提交时) 触发条件: 当 innodb_flush_log_at_trx_commit1 时,事务提交时强…...
一文掌握DrissionPage的详细使用
文章目录 1. 什么是DrissionPage?2. 安装 DrissionPage3. 基本使用3.1 初始化浏览器3.2 打开网页3.3 查找元素3.4 操作元素3.5 获取元素属性3.6 执行 JavaScript3.7 页面对象(Page Object)3.8 元素定位方式3.9 常用操作方法3.10 断言 4. 高级…...
Android平台轻量级RTSP服务模块技术对接说明
一、技术背景 随着内网无纸化办公、电子教室等应用场景对超低延迟音视频传输需求的日益增长,为避免用户或开发者单独部署 RTSP 或 RTMP 服务,大牛直播 SDK 推出了轻量级 RTSP 服务 SDK。该 SDK 能够将本地音视频数据(如摄像头、麦克风等&…...
代码随想录第二十天|二叉树part08--669.修建二叉搜索树、108.将有序数组转换为二叉搜索树、538.把二叉搜索树转换为累加树
刷题小记: 上期学习了二叉搜索树的插入和删除操作,这次学习如何按区间修剪二叉搜索树。还有两题,关于借助二叉搜索树的有序特性进行转换。 669.修剪二叉搜索树(669.修剪二叉搜索树) 题目分析: 给定一个…...
RoCEv2 高性能传输协议与 Lossless 无损网络
目录 文章目录 目录RoCERoCEv2 v.s. IBRoCEv2 协议栈RoCEv2 需要 Lossless NetworkLossless Network 拥塞控制技术网络拥塞的原因PFC 基于优先级的流量控制PFC Unfairness (带宽分配不公平)的问题PFC HOL(队头拥塞)的问题PFC Dead…...
C语言多人聊天室 ---chat(客户端聊天)
head.h #ifndef __HEAD_H #define __HEAD_H// 常用头文件 #include <stdio.h> #include <stdlib.h> #include <string.h>// 网络编程涉及的头文件 #include <sys/socket.h> #include <netinet/in.h> #include <netinet/ip.h>#include <…...
联想 SR590 服务器 530-8i RAID 控制器更换损坏的硬盘
坏了的硬盘会自动亮黄灯。用一个空的新盘来替换,新盘最好不要有东西。但是有东西可能也没啥,因为我看 RAID 控制器里有格式化的选项 1. 从 IPMI 把服务器关机,电源键进入绿色闪烁状态 2. 断电,推开塑料滑块拉出支架,…...
城电科技|会追日的智能花,光伏太阳花开启绿色能源新篇章
当艺术与科技相遇,会碰撞出怎样的火花?城电科技推出的光伏太阳花,以其独特的设计与智能化的功能,给出了答案。这款产品不仅具备太阳能发电的实用功能,更是一件充满科技属性的艺术性光伏产品,吸引了广泛关注…...
基于YOLO11深度学习的苹果叶片病害检测识别系统【python源码+Pyqt5界面+数据集+训练代码】
《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…...
FFmpeg 命令行全解析:高效音视频处理从入门到精通
FFmpeg FFmpeg 是一款开源的多媒体处理工具集,支持音视频编解码、格式转换、流媒体处理等全链路操作。核心功能与工具: 多媒体全链路支持 支持 1000+ 音视频编解码格式(如 H.264、HEVC、AV1)和协议(RTMP、RTSP、HLS),覆盖录制、转码、流化等全流程。提供三大核心工具: …...
