当前位置: 首页 > article >正文

RabbitMQ惰性队列的工作原理、消息持久化机制、同步刷盘的概念、延迟插件的使用方法

惰性队列工作原理

惰性队列通过尽可能多地将消息存储到磁盘上来减少内存的使用。与传统队列相比,惰性队列不会主动将消息加载到内存中,而是尽量让消息停留在磁盘上,从而降低内存占用。尽管如此,它并不保证所有操作都是同步写入磁盘的。这意味着消息可能会先被缓存到操作系统的缓冲区中,然后由操作系统决定何时将其真正写入磁盘。

  • 优点:适合处理大量消息且对内存压力敏感的场景。
  • 缺点:由于频繁的磁盘I/O操作,性能可能不如传统队列。

同步刷盘的概念

同步刷盘意味着每次写入操作都会等待数据完全写入磁盘后才返回确认信息。虽然这种方式提供了更强的数据持久性保证,但它也显著增加了写入操作的延迟。对于RabbitMQ而言,可以通过设置消息为持久化来增加数据的安全性,但对于极端情况下的数据安全性要求,还需要结合其他策略如调整操作系统参数或使用文件系统级别的同步写入配置。

延迟插件的工作原理

RabbitMQ本身没有内置的延迟队列功能,但可以通过安装rabbitmq_delayed_message_exchange插件实现这一功能。该插件允许创建一个自定义交换机类型,该交换机能够根据消息头中的延迟时间属性来延迟消息的传递。

在Spring Boot中集成RabbitMQ惰性队列和延迟消息

1. 项目初始化

首先,确保你的Spring Boot项目中包含必要的依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
</dependencies>
2. 配置RabbitMQ连接

application.yml中配置RabbitMQ连接信息:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest
3. 定义惰性队列

创建一个配置类来定义惰性队列:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfig {/*** 定义惰性模式的队列* @return 返回惰性队列实例*/@Beanpublic Queue lazyQueue() {Map<String, Object> args = new HashMap<>();// 设置队列为惰性模式args.put("x-queue-mode", "lazy");return new Queue("my_lazy_queue", true, false, false, args); // durable=true for queue durability}
}
4. 发送持久化消息

创建一个服务类用于发送消息,并确保消息是持久化的:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送一条持久化消息到惰性队列* @param message 要发送的消息内容*/public void sendMessage(String message) {rabbitTemplate.convertAndSend("my_lazy_queue", message);System.out.println(" [x] Sent '" + message + "'");}
}

确保消息持久化可以在application.yml中设置如下:

spring:rabbitmq:template:exchange: ''routing-key: 'my_lazy_queue'mandatory: truepublisher-confirms: truepublisher-returns: true
5. 接收消息

创建一个监听器来接收消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageReceiver {/*** 监听并接收来自惰性队列的消息* @param message 接收到的消息内容*/@RabbitListener(queues = "my_lazy_queue")public void receiveMessage(String message) {System.out.println(" [x] Received '" + message + "'");}
}
6. 使用延迟插件发送延迟消息

首先,在RabbitMqConfig中定义延迟交换机:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMqConfig {/*** 定义延迟交换机* @return 返回延迟交换机实例*/@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);}/*** 绑定延迟队列到延迟交换机* @param delayedQueue 延迟队列* @param delayExchange 延迟交换机* @return 返回绑定实例*/@Beanpublic Binding binding(Queue delayedQueue, CustomExchange delayExchange) {return new Binding("delayed_queue", Binding.DestinationType.QUEUE, "delayed_exchange", "routing.key", Collections.emptyMap());}/*** 定义延迟队列* @return 返回延迟队列实例*/@Beanpublic Queue delayedQueue() {return new Queue("delayed_queue");}
}

然后,创建一个服务类来发送延迟消息:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;@Service
public class DelayedMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送带有延迟的消息* @param message 要发送的消息内容* @param delayTime 延迟时间(毫秒)*/public void sendDelayedMessage(String message, int delayTime) {MessagePostProcessor messagePostProcessor = message -> {message.getMessageProperties().setHeader("x-delay", delayTime);return message;};rabbitTemplate.convertAndSend("delayed_exchange", "routing.key", message, messagePostProcessor);System.out.println(" [x] Sent '" + message + "' with delay.");}
}

最后,创建一个监听器来接收延迟消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DelayedMessageReceiver {/*** 监听并接收来自延迟队列的消息* @param message 接收到的消息内容*/@RabbitListener(queues = "delayed_queue")public void receiveDelayedMessage(String message) {System.out.println(" [x] Received delayed message '" + message + "'");}
}

高级特性和最佳实践

  • 发布确认机制:为了提高可靠性,可以开启发布确认机制,以确保消息确实被RabbitMQ服务器接受。

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message acknowledged");} else {System.err.println("Message not acknowledged due to: " + cause);}
});
  • 预取计数(Prefetch Count):通过设置预取计数限制每个消费者同时处理的消息数量,有助于防止消费者被过多未处理的消息压垮。

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMqConnectionConfig {@Beanpublic CachingConnectionFactory cachingConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setChannelCacheSize(25);connectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(200);return connectionFactory;}
}

可以在application.yml中设置:

spring:rabbitmq:listener:simple:prefetch: 10

相关文章:

RabbitMQ惰性队列的工作原理、消息持久化机制、同步刷盘的概念、延迟插件的使用方法

惰性队列工作原理 惰性队列通过尽可能多地将消息存储到磁盘上来减少内存的使用。与传统队列相比&#xff0c;惰性队列不会主动将消息加载到内存中&#xff0c;而是尽量让消息停留在磁盘上&#xff0c;从而降低内存占用。尽管如此&#xff0c;它并不保证所有操作都是同步写入磁…...

MySQL与Oracle深度对比

MySQL与Oracle深度对比&#xff1a;数据类型与SQL差异 一、数据类型差异 1. 数值类型对比 数据类型MySQLOracle整数TINYINT, SMALLINT, MEDIUMINT, INT, BIGINTNUMBER(精度) 或直接INT(内部仍为NUMBER)小数DECIMAL(p,s), FLOAT, DOUBLENUMBER(p,s), FLOAT, BINARY_FLOAT, BI…...

【Leetcode 每日一题】1922. 统计好数字的数目

问题背景 我们称一个数字字符串是 好数字 当它满足&#xff08;下标从 0 0 0 开始&#xff09;偶数 下标处的数字为 偶数 且 奇数 下标处的数字为 质数 ( 2 , 3 , 5 (2, \ 3, \ 5 (2, 3, 5 或 7 ) 7) 7)。 比方说&#xff0c;“2582” 是好数字&#xff0c;因为偶数下标处…...

pyqtgraph.opengl.items.GLSurfacePlotItem.GLSurfacePlotItem 报了一个错

1. 需求是这个样子的 有一个 pyqtgraph.opengl.GLViewWidget &#xff0c;在应用启动时存在QMainWindow中&#xff0c;即父对象是QMainWindow&#xff0c;当业务需要时&#xff0c;修改它的父对象变为一个QDialog&#xff0c;可以让它从QMainWindow中弹出显示在QDialog里&#…...

【C++初学】课后作业汇总复习(六) 函数模板

1、函数模板 思考&#xff1a;如果重载的函数&#xff0c;其解决问题的逻辑是一致的、函数体语句相同&#xff0c;只是处理的数据类型不同&#xff0c;那么写多个相同的函数体&#xff0c;是重复劳动&#xff0c;而且还可能因为代码的冗余造成不一致性。 解决&#xff1a;使用…...

【第16届蓝桥杯C++C组】--- 数位倍数

Hello呀&#xff0c;小伙伴们&#xff0c;第16届蓝桥杯也完美结束了&#xff0c;无论大家考的如何&#xff0c;都要放平心态&#xff0c;今年我刚上大一&#xff0c;也第一次参加蓝桥杯&#xff0c;刷的算法题也只有200来道&#xff0c;但是还是考的不咋滴&#xff0c;但是拿不…...

ASP.NET Core 性能优化:客户端响应缓存

文章目录 前言一、什么是缓存二、客户端缓存核心机制&#xff1a;HTTP缓存头1&#xff09;使用[ResponseCache]属性&#xff08;推荐&#xff09;2&#xff09;预定义缓存配置&#xff08;CacheProfile&#xff09;3&#xff09;手动设置HTTP头4&#xff09;缓存验证机制&#…...

Numpy和OpenCV库匹配查询,安装OpenCV ABI错误

文章目录 地址opencv-python&#xff1a;4.x版本的对应numpyopencv-python&#xff1a;5.x版本的对应numpy方法2 ps&#xff1a;装个opencv遇到ABI错误无语了&#xff0c;翻了官网&#xff0c;github文档啥都没&#xff0c;记录下 地址 opencv-python&#xff1a;4.x版本的对应…...

全球变暖(蓝桥杯 2018 年第九届省赛)

题目描述 你有一张某海域 NN 像素的照片&#xff0c;. 表示海洋、 # 表示陆地&#xff0c;如下所示&#xff1a; ....... .##.... .##.... ....##. ..####. ...###. .......其中 "上下左右" 四个方向上连在一起的一片陆地组成一座岛屿。例如上图就有 2 座岛屿。 由…...

ubuntu18.04安装miniforge3

1.下载安装文件 略&#xff08;注&#xff1a;从同事哪里拖来的安装包&#xff09; 2.修改安装文件权限 chmod x Miniforge3-Linux-x86_64.sh 3.将它安装到指定位置 micromamba activate /home/xxx/fxp/fromDukto/miniforge3 4.激活 /home/xxx/fxp/fromDukto/miniforge3…...

高并发短信系统设计:基于SharingJDBC的分库分表、大数据同步与实时计算方案

高并发短信系统设计&#xff1a;基于SharingJDBC的分库分表、大数据同步与实时计算方案 一、概述 在当今互联网应用中&#xff0c;短信服务是极为重要的一环。面对每天发送2000万条短信的需求&#xff0c;我们需要一个能够处理海量数据&#xff08;一年下来达到数千万亿级别&…...

OceanBase企业版集群部署:oatcli命令行方式

OceanBase企业版集群部署&#xff1a;oatcli命令行方式 安装包准备服务器准备最低资源配置是否部署ODP组件&#xff1f;仲裁服务器 服务器配置操作系统内核参数BIOS设置磁盘挂载网卡设置 安装OAT部署工具初始化OBServer服务器使用oatcli部署三副本集群安装OceanBase软件初始化O…...

SQL 查询中涉及的表及其作用说明

SQL 查询中涉及的表及其作用说明&#xff1a; 涉及的数据库表 表名别名/用途关联关系dbo.s_orderSO&#xff08;主表&#xff09;存储订单主信息&#xff08;订单号、日期、客户等&#xff09;dbo.s_orderdetailSoD&#xff08;订单明细&#xff09;通过 billid SO.billid 关…...

智能手机功耗测试

随着智能手机发展,用户体验对手机的续航功耗要求越来越高。需要对手机进行功耗测试及分解优化,将手机的性能与功耗平衡。低功耗技术推动了手机的用户体验。手机功耗测试可以采用powermonitor或者NI仪表在功耗版上进行测试与优化。作为一个多功能的智能终端,手机的功耗组成极…...

UNIX域套接字(Unix Domain Sockets, UDS) 的两种接口

目录 1. 流式套接字&#xff08;SOCK_STREAM&#xff09;特点类比典型使用场景代码示例&#xff08;伪代码&#xff09; 2. 数据报套接字&#xff08;SOCK_DGRAM&#xff09;特点类比典型使用场景代码示例&#xff08;伪代码&#xff09; 3. 两者的核心区别对比4. 为什么 UNIX …...

使用U盘安装 ubuntu 系统

1. 准备U 盘制作镜像 1.1 下载 ubuntu iso https://ubuntu.com/download/ 这里有多个版本以供下载&#xff0c;本文选择桌面版。 1.2 下载rufus https://rufus.ie/downloads/ 1.3 以管理员身份运行 rufus 设备选择你用来制作启动项的U盘&#xff0c;不能选错了&#xff1b;点…...

安全厂商安全理念分析

奇安信&#xff08;toB企业安全&#xff09; 安全理念&#xff1a;率先提出 “内生安全” 理念。即把安全能力内置到信息化环境中&#xff0c;通过信息化系统和安全系统的聚合、业务数据和安全数据的聚合、IT 人才和安全人才的聚合&#xff0c;让安全系统像人的免疫系统一样&a…...

Redis如何判断哨兵模式下节点之间数据是否一致

在哨兵模式下判断两个Redis节点的数据一致性&#xff0c;可以通过以下几种方法实现&#xff1a; 一、检查主从复制偏移量 使用INFO replication命令 分别在主节点和从节点执行该命令&#xff0c;比较两者的master_repl_offset&#xff08;主节点&#xff09;和slave_repl_offs…...

HarmonyOS-ArkUI V2装饰器: @Provider和@Consumer装饰器:跨组件层级双向同步

作用 我们在之前学习的那些控件中,各有特点,也各有缺陷,至今没有痛痛快快的出现过真正能跨组件的双向绑定的装饰器。 比如 @Local装饰器,不能跨组件@Param装饰器呢,能跨组件传递,但是仅仅就是下一层组件接收参数。另外,它是单向传递,不可被重新赋值。如果您非要改值则…...

oracle 并行度(Parallel Degree)

在Oracle数据库中&#xff0c;并行度&#xff08;Parallel Degree&#xff09; 是用于控制并行处理任务的关键配置&#xff0c;旨在通过多进程协作加速大规模数据处 一、并行度的核心概念 并行度&#xff08;DOP, Degree of Parallelism&#xff09; 表示一个操作同时使用的并…...

Redis-场景缓存+秒杀+管道+消息队列

缓存一致性 1.两次更新 先更新数据库&#xff0c;再更新缓存&#xff1b;先更新缓存&#xff0c;再更新数据库&#xff1b; 出现不一致问题场景&#xff1a; 先更新数据库&#xff0c;再更新缓存&#xff1b; 先更新缓存&#xff0c;再更新数据库&#xff1b; 两次更新的适…...

系统的安全及应用

仓库做了哪些优化 仓库源换成国内源不使用root用户登录将不必要的开机启动项关闭内核的调优 系统做了哪些安全加固 禁止使用root禁止使用弱命令将常见的 远程连接端口换掉 系统安全及应用 Cpu负载高 java程序 运行异常中病毒&#xff1f; ps aux - - sort %cpu %mem Cpu …...

PostgreSQL内幕探索—基础知识

PostgreSQL内幕探索—基础知识 PostgreSQL&#xff08;以下简称PG&#xff09; 起源于 1986 年加州大学伯克利分校的 ‌POSTGRES 项目‌&#xff0c;最初以对象关系模型为核心&#xff0c;支持高级数据类型和复杂查询功能‌。 1996 年更名为 PostgreSQL 并开源&#xff0c;逐…...

基于redis 实现我的收藏功能优化详细设计方案

基于redis 实现我的收藏功能优化详细设计方案 一、架构设计 +---------------------+ +---------------------+ | 客户端请求 | | 数据存储层 | | (收藏列表查询) | | (Redis Cluster) | +-------------------…...

WPS复制粘贴错误 ,文件未找到 mathpage.wll

文章目录 1.错误提示图片2.解决方案1.找到MathType.wll文件和MathType Commands 2016.dotm文件并复制2.找到wps安装地址并拷贝上述两个文件到指定目录 3.重启WPS 1.错误提示图片 2.解决方案 1.找到MathType.wll文件和MathType Commands 2016.dotm文件并复制 MathType.wll地址如…...

驱动开发硬核特训 · Day 6 : 深入解析设备模型的数据流与匹配机制 —— 以 i.MX8M 与树莓派为例的实战对比

&#x1f50d; B站相应的视屏教程&#xff1a; &#x1f4cc; 内核&#xff1a;博文视频 - 从静态绑定驱动模型到现代设备模型 主题&#xff1a;深入解析设备模型的数据流与匹配机制 —— 以 i.MX8M 与树莓派为例的实战对比 在上一节中&#xff0c;我们从驱动框架的历史演进出…...

【UE5 C++课程系列笔记】35——HTTP基础——HTTP客户端异步请求API接口并解析响应的JSON

目录 前言 步骤 一、 搭建异步蓝图节点框架 二、异步蓝图节点嵌入到引擎的执行流程 三、获取本地时间并异步返回 四、获取网络时间并异步返回 五、源码 前言 本文以请求网络/本地时间API为例&#xff0c;介绍如何实现HTTP异步请求。 步骤 一、 搭建异步蓝图节点框架 …...

手机静态ip地址怎么获取?方法与解析‌

而在某些特定情境下&#xff0c;我们可能需要为手机设置一个静态IP地址。本文将详细介绍手机静态IP地址详解及获取方法 一、什么是静态IP地址&#xff1f; 静态IP&#xff1a;由用户手动设置的固定IP地址&#xff0c;不会因网络重启或设备重连而改变。 动态IP&#xff1a;由路…...

个人博客系统后端 - 注册登录功能实现指南

一、功能概述 个人博客系统的注册登录功能包括&#xff1a; 用户注册&#xff1a;新用户可以通过提供用户名、密码、邮箱等信息创建账号用户登录&#xff1a;已注册用户可以通过用户名和密码进行身份验证&#xff0c;获取JWT令牌身份验证&#xff1a;使用JWT令牌访问需要认证…...

Python 基础语法汇总

Python 语法 │ ├── 基本结构 │ ├── 语句&#xff08;Statements&#xff09; │ │ ├── 表达式语句&#xff08;如赋值、算术运算&#xff09; │ │ ├── 控制流语句&#xff08;if, for, while&#xff09; │ │ ├── 定义语句&#xff08;def…...