当前位置: 首页 > news >正文

RabbitMQ深入 —— 持久化和发布确认

前言

        前面的文章荔枝梳理了如何去配置RabbitMQ环境并且也介绍了两种比较简单的运行模式,在这篇文章中荔枝将会继续梳理有关RabbitMQ的持久化机制以及发布确认模式的相关知识,希望能够帮助到大家~~~


文章目录

前言

一、持久化

1.1 队列持久化

1.2 消息持久化

1.3 不公平分发

1.4 预取值

二、发布确认机制

2.1 单个发布确认

2.2 批量发布确认

2.3 异步发布确认

2.4 处理异步未确认消息的机制 

总结


一、持久化

跟MySQL和Redis一样,持久化的操作就是为了保存数据,避免因为RabbitMQ宕机停止服务之后可以确保消息不丢失。默认情况下RabbitMQ不会开启持久化,他会忽视队列和消息。 

1.1 队列持久化

        队列的持久化比较简单,只需要在生产者创建信道的时候将queueDeclare()函数的durable参数设置为ture即可。如果之前就已经创建过该队列,修改完durable之后可能会报错,这时候仅需要管理后台中删除队列即可。 

boolean durable = ture;
//信道申明
channel.queueDeclare(TASK_QUEUE_ACK, durable ,false,false,null);

1.2 消息持久化

消息持久化的实现也需要在消息生产者中修改demo,只需要在basicPublish()中设置第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN。

channel.basicPublish("",TASK_QUEUE_ACK, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

        需要注意的是:只有消息持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言已经足够,如果需要更强地保证持久化,可能要用到发布确认模式。

1.3 不公平分发

默认情况下RabbitMQ对队列中的消息采用的是轮询分发的机制,但这种机制会造成资源浪费,因为处理消息的快的消费者更多的时间是处于空闲状态。不公平分发的机制开启比较简单,只需要在消费者处的代码设置信道时,设置其basicQos的值为1即可。

channel.basicQos(1);

1.4 预取值

        由于消息的确认和发送都是异步的,因此消费者处会有一个未确认消息的缓冲区,为了使得该缓冲区不会无限制地增大,可以通过basic.qos来设置消费者未确认消息的缓冲区中允许的未确认的消息的最大数量。一旦达到最大数量就会RabbitMQ就会停止在通道上传递更多消息直至至少一个未被确认的消息被确认处理。这时候为了尽量避免自动分发和手动分发模式下无限制确认消息缓冲区所造成了消费者的RAM消耗,我们需要根据需求场景设置一个比较好的预取值。

//预取值是10
int prefetchCount = 10;
channel.basicQos(prefetchCount);

 需要注意的是预取值的设置是在消费者一方!


二、发布确认机制

在前面我们设置了队列持久化和消息持久化,但是单纯开启持久化并不能保证完全持久化,消息有可能还没来得及保存在磁盘中就发生了RabbitMQ宕机,因此我们还需要引入发布确认模式。

在消息发布者处开启发布确认模式

//开启发布确认模式
channel.confirmSelect();

下面我们来看看几种确认发布的模式:

2.1 单个发布确认

        单个发布确认是一种简单的确认方式,它是一种同步确认发布的方式。也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirms()这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

缺点:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。

    //单个确认发布public static void publishMessageIndividually() throws Exception{Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//循环发布消息for(int i=0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功"+i);}}long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("单次发布模式下发送"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}

2.2 批量发布确认

        批量消息确认发布是指不再是像单个发布确认的模式那样发一条消息等待确认后再发一条,而是一次性发送一批消息之后再统一确认发布。批量发布确认确实可以极大的提高消息队列的吞吐量,但缺点却是不清楚哪个消息出现问题。

    //批量确认发布public static void publishMessageBatch() throws Exception {Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量操作的信息大小int batchSize = 10;for (int i=0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//判断是否达到sizeif (i%batchSize==0){channel.waitForConfirms();}}long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("批量确认发布模式下发送消息"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}

2.3 异步发布确认

        相比于前面两种发布确认模式,异步确认发布在实现起来更加复杂,但是性能和效率更高。它是通过回调函数来达到消息可靠性传递和保证消息投递成功的。在异步发布确认模式下一般至少有三个线程,一个线程是用来发送消息的;另外两个线程分别是处理消息确认成功和消息确认失败的回调函数。

    //异步确认发布public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//消息确认成功的回调函数ConfirmCallback ackCallback = (deliveryTag,multiple)->{System.out.println("确认的消息:"+deliveryTag);};//消息确认失败的回调函数(消息的标记,是否为批量处理)ConfirmCallback nackCallback = (deliveryTag,multiple)->{System.out.println("未确认的消息:"+deliveryTag);};//设置消息监听器channel.addConfirmListener(ackCallback,nackCallback); //这个监听是异步的//消息发送for(int i = 0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());}//结束时间long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("异步确认发布模式下发送消息"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}

2.4 处理异步未确认消息的机制 

        处理异步未确认消息的最好解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentSkipListMap这个队列在confirm callbacks与发布线程之间进行消息的传递。 这里为了存储发送的消息体我们使用的是ConcurrentSkipListMap这个类型对象,该类对象是线程安全的有序的哈希表,适用于高并发的场景。

public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();/*** ConcurrentSkipListMap线程安全有序的一个哈希表,适用于高并发的场景下* 1.轻松的关联序号和消息* 2.轻松批量删除条目* 3.支持高并发场景*/ConcurrentSkipListMap<Long,String> outstandingConfirm = new ConcurrentSkipListMap<>();//消息确认成功的回调函数ConfirmCallback ackCallback = (deliveryTag,multiple)->{//删除掉已经确认的消息 剩下未确认的消息//判断是不是批量删除if(multiple){ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirm.headMap(deliveryTag);confirmed.clear();}else {outstandingConfirm.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};//消息确认失败的回调函数(消息的标记,是否为批量处理)ConfirmCallback nackCallback = (deliveryTag,multiple)->{//打印未确认的消息String message = outstandingConfirm.get(deliveryTag);System.out.println("未确认消息是"+message+"未确认的消息的标签:"+deliveryTag);};//设置消息监听器channel.addConfirmListener(ackCallback,nackCallback); //这个监听是异步的//消息发送for(int i = 0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//记录下所有的发送消息outstandingConfirm.put(channel.getNextPublishSeqNo(), message);}//结束时间long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("异步确认发布模式下发送消息"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}

        outstandingConfirm.headMap(deliveryTag)是为了在批量操作中做一些过滤,返回小于deliveryTag的映射并调用clear方法清空outstandingConfirm 映射中所有小于 deliveryTag 的键值对,而不影响其他键值对。这里其实就是清除所有已经确认的消息,留下未确认的消息。 


总结

        这篇文章的知识都比较简单,其实中间件的设计都是为了解决问题的,而我们也可以通过保证消息不丢失的同时兼顾性能这个需求来学习这部分知识。其余的就是基本的操作和类库操作的熟练了哈哈哈哈,继续梳理的同时,荔枝也要继续努力学习咯~~~

今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~

如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!

如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!

相关文章:

RabbitMQ深入 —— 持久化和发布确认

前言 前面的文章荔枝梳理了如何去配置RabbitMQ环境并且也介绍了两种比较简单的运行模式&#xff0c;在这篇文章中荔枝将会继续梳理有关RabbitMQ的持久化机制以及发布确认模式的相关知识&#xff0c;希望能够帮助到大家~~~ 文章目录 前言 一、持久化 1.1 队列持久化 1.2 消息…...

人脸识别三部曲

人脸识别三部曲 首先看目录结构图像信息采集 采集图片.py模型训练 训练模型.py人脸识别 人脸识别.py效果 首先看目录结构 引用文121本 opencv │ 采集图片.py │ 训练模型.py │ 人脸识别.py │ └───trainer │ │ trainer.yml │ └───data │ └──…...

【Linux网络编程】Socket-TCP实例

netstat -nltp 无法用read函数读取UDP套接字的数据&#xff0c;因为UDP是面向数据报&#xff0c;而TCP是面向数据流。 客户端不需要 bind&#xff0c;listen&#xff0c;accept&#xff0c;但是客户端需要connect&#xff0c;connect会自动做bind工作。 #include <sys/sock…...

<OpenCV> 边缘填充

OpenCV边缘填充 1、边缘填充类型 enum cv::BorderTypes ORDER_CONSTANT iiiiii|abcdefgh|iiiiiii with some specified i -常量法&#xff0c;常熟值填充&#xff1b; BORDER_REPLICATE aaaaaa|abcdefgh|hhhhhhh -复制法&#xff0c;复制边缘像素&#xff1b; BORDER_R…...

【视觉SLAM入门】7.3.后端优化 基于KF/EKF和基于BA图优化的后端,推导及举例分析

"时间倾诉我的故事" 1. 理论推导2. 主流解法3. 用EKF估计状态3.1. 基于EKF代表解法的感悟 4. 用BA法估计状态4.1 构建最小二乘问题4.2 求解BA推导4.3 H的稀疏结构4.4 根据H稀疏性求解4.5 鲁棒核函数4.6 编程注意 5.总结 引入&#xff1a; 前端里程计能给出一个短时间…...

Docker概念通讲

目录 什么是Docker&#xff1f; Docker的应用场景有哪些&#xff1f; Docker的优点有哪些&#xff1f; Docker与虚拟机的区别是什么&#xff1f; Docker的三大核心是什么&#xff1f; 如何快速安装Docker&#xff1f; 如何修改Docker的存储位置&#xff1f; Docker镜像常…...

PHP请求API接口案例采集电商平台数据获取淘宝/天猫优惠券查询示例

优惠券查询API接口对于用户和商家来说具有重要作用&#xff0c;可以方便地获取优惠券信息&#xff0c;进行优惠券搜索和筛选&#xff0c;参与活动和促销推广&#xff0c;提供数据分析和决策支持&#xff0c;提升用户体验和忠诚度&#xff0c;为商家增加销售额和市场竞争力。 t…...

计算机网络:三次握手与四次挥手

摘取作者&#xff1a;拓跋阿秀 三次握手 三次握手&#xff08;Three-way Handshake&#xff09;其实就是指建立一个TCP连接时&#xff0c;需要客户端和服务器总共发送3个包。进行三次握手的主要作用就是为了确认双方的接收能力和发送能力是否正常、指定自己的初始化序列号为后…...

Visual Studio 调试上传文件时自动停止运行的解决方法

进入&#xff1a;选项&#xff0c;项目和解决方案&#xff0c;Web项目&#xff0c; 找到在浏览器窗口关闭时停止调试程序&#xff0c;在调试停止时关闭浏览器 将它不要勾关闭&#xff0c;然后重新启动下Visual Studio&#xff0c;上传文件时就可以调试了...

使用scp命令失败出错

使用scp命令失败出错&#xff0c;无反应。 解决&#xff1a; 1.使用ifconfig查看目标主机公网IP地址 ifconfig需使用公网ip 2.配置免密登录 可参考 远程登录ssh ssh-copy-id root目标主机ip再次尝试scp命令。 SCP&#xff08;Secure Copy&#xff09;是一个用于在本地主机和…...

kafka增加磁盘或者分区,topic重分区

场景&#xff1a;kafka配置文件log.dirs增加了几个目录&#xff0c;但是新目录没有分区数据写入&#xff0c;所以打算进行重分区一下。 1.生成迁移计划 进入kafka/bin目录 新建 topic-reassign.json,把要重分区的topic按下面格式写。 { "topics": [{ …...

SpringMVC系列(五)之JSR303和拦截器

目录 一. JSR303 1.1 JSR303是什么 1.2 为什么要使用JSR303 1.3 JSR303常用注解 1.4 JSR303快速入门 1. 导入相关pom依赖 2. 配置校验规则 3. 入门示例 二. SpringMVC的拦截器 2.1 什么是拦截器 2.2 拦截器与过滤器的区别 2.3 拦截器工作原理 2.4 入门示例 1. 创建…...

LCP 01.猜数字

​​题目来源&#xff1a; leetcode题目&#xff0c;网址&#xff1a;LCP 01. 猜数字 - 力扣&#xff08;LeetCode&#xff09; 解题思路&#xff1a; 遍历比较即可。 解题代码&#xff1a; class Solution {public int game(int[] guess, int[] answer) {int res0;for(int …...

智能小车开发

1.材料 店铺&#xff1a;店内搜索页-risym旗舰店-天猫Tmall.com 1.四个小车轮子 2.四个直流减速电机 3.两节18650锂电池&#xff08;每节3.7V&#xff09;&#xff0c;大概电压在7.4V左右&#xff0c;电压最好不要超过12V不然会损坏电机驱动 4.一个18650锂电池盒 5.一个L…...

RDMA性能测试工具集preftest_README

文章目录 1 概述2 安装3 测试方法说明4 测试说明5 运行测试所有测试的通用选项延迟测试选项带宽测试选项ib_send_lat&#xff08;发送延迟测试&#xff09;和 ib_send_bw&#xff08;发送带宽测试&#xff09;的选项ib_atomic_lat&#xff08;原子延迟测试&#xff09;和 ib_at…...

墨天轮专访星环科技刘熙:“向量热”背后的冷思考,Hippo如何打造“先发”优势?

导读&#xff1a; 深耕技术研发数十载&#xff0c;坚持自主可控发展路。星环科技一路砥砺前行、坚持创新为先&#xff0c;建设了全面的产品矩阵&#xff0c;并于2022年作为首个独立基础软件产品公司成功上市。星环科技在今年的向星力•未来技术大会上发布了分布式向量数据库Tra…...

逆向-beginners之非递归

/* * 非递归 */ void f() { } void main() { f(); } #if 0 /* * intel */ 0000000000001129 <f>: 1129: f3 0f 1e fa endbr64 112d: 55 push %rbp 112e: 48 89 e5 mov %rsp,%…...

Spring for Apache Kafka概述和简单入门

一、概述 Spring for Apache Kafka 的高级概述以及底层概念和可运行的示例代码。 二、准备工作 注意:进行工作开始之前至少要有一个 Apache Kafka 环境 2.1、依赖 使用 Spring Boot<dependency><groupId>org.springframework.kafka</groupId><artifact…...

基于SSM+Vue的医院医患管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用Vue技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…...

再次理解Android账号管理体系

目录 ✅ 0. 需求 &#x1f4c2; 1. 前言 &#x1f531; 2. 使用 2.1 账户体系前提 2.2 创建账户服务 2.3 操作账户-增删改查 &#x1f4a0; 3. 源码流程 ✅ 0. 需求 试想&#xff0c;自己去实现一个账号管理体系&#xff0c;该如何做呢&#xff1f; ——————————…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

生成xcframework

打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式&#xff0c;可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

【项目实战】通过多模态+LangGraph实现PPT生成助手

PPT自动生成系统 基于LangGraph的PPT自动生成系统&#xff0c;可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析&#xff1a;自动解析Markdown文档结构PPT模板分析&#xff1a;分析PPT模板的布局和风格智能布局决策&#xff1a;匹配内容与合适的PPT布局自动…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解&#xff08;7大核心步骤实战技巧&#xff09; 一、爬虫完整工作流程 以下是爬虫开发的完整流程&#xff0c;我将结合具体技术点和实战经验展开说明&#xff1a; 1. 目标分析与前期准备 网站技术分析&#xff1a; 使用浏览器开发者工具&#xff08;F12&…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表

##鸿蒙核心技术##运动开发##Sensor Service Kit&#xff08;传感器服务&#xff09;# 前言 在运动类应用中&#xff0c;运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据&#xff0c;如配速、距离、卡路里消耗等&#xff0c;用户可以更清晰…...

基于 TAPD 进行项目管理

起因 自己写了个小工具&#xff0c;仓库用的Github。之前在用markdown进行需求管理&#xff0c;现在随着功能的增加&#xff0c;感觉有点难以管理了&#xff0c;所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD&#xff0c;需要提供一个企业名新建一个项目&#…...

Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成

一个面向 Java 开发者的 Sring-Ai 示例工程项目&#xff0c;该项目是一个 Spring AI 快速入门的样例工程项目&#xff0c;旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计&#xff0c;每个模块都专注于特定的功能领域&#xff0c;便于学习和…...