RabbitMQ深入 —— 持久化和发布确认
前言
前面的文章荔枝梳理了如何去配置RabbitMQ环境并且也介绍了两种比较简单的运行模式,在这篇文章中荔枝将会继续梳理有关RabbitMQ的持久化机制以及发布确认模式的相关知识,希望能够帮助到大家~~~
文章目录
前言
一、持久化
1.1 队列持久化
1.2 消息持久化
1.3 不公平分发
1.4 预取值
二、发布确认机制
2.1 单个发布确认
2.2 批量发布确认
2.3 异步发布确认
2.4 处理异步未确认消息的机制
总结
一、持久化
跟MySQL和Redis一样,持久化的操作就是为了保存数据,避免因为RabbitMQ宕机停止服务之后可以确保消息不丢失。默认情况下RabbitMQ不会开启持久化,他会忽视队列和消息。
1.1 队列持久化
队列的持久化比较简单,只需要在生产者创建信道的时候将queueDeclare()函数的durable参数设置为ture即可。如果之前就已经创建过该队列,修改完durable之后可能会报错,这时候仅需要管理后台中删除队列即可。
boolean durable = ture;
//信道申明
channel.queueDeclare(TASK_QUEUE_ACK, durable ,false,false,null);
1.2 消息持久化
消息持久化的实现也需要在消息生产者中修改demo,只需要在basicPublish()中设置第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN。
channel.basicPublish("",TASK_QUEUE_ACK, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
需要注意的是:只有消息持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言已经足够,如果需要更强地保证持久化,可能要用到发布确认模式。
1.3 不公平分发
默认情况下RabbitMQ对队列中的消息采用的是轮询分发的机制,但这种机制会造成资源浪费,因为处理消息的快的消费者更多的时间是处于空闲状态。不公平分发的机制开启比较简单,只需要在消费者处的代码设置信道时,设置其basicQos的值为1即可。
channel.basicQos(1);
1.4 预取值
由于消息的确认和发送都是异步的,因此消费者处会有一个未确认消息的缓冲区,为了使得该缓冲区不会无限制地增大,可以通过basic.qos来设置消费者未确认消息的缓冲区中允许的未确认的消息的最大数量。一旦达到最大数量就会RabbitMQ就会停止在通道上传递更多消息直至至少一个未被确认的消息被确认处理。这时候为了尽量避免自动分发和手动分发模式下无限制确认消息缓冲区所造成了消费者的RAM消耗,我们需要根据需求场景设置一个比较好的预取值。
//预取值是10
int prefetchCount = 10;
channel.basicQos(prefetchCount);
需要注意的是预取值的设置是在消费者一方!
二、发布确认机制
在前面我们设置了队列持久化和消息持久化,但是单纯开启持久化并不能保证完全持久化,消息有可能还没来得及保存在磁盘中就发生了RabbitMQ宕机,因此我们还需要引入发布确认模式。
在消息发布者处开启发布确认模式
//开启发布确认模式
channel.confirmSelect();
下面我们来看看几种确认发布的模式:
2.1 单个发布确认
单个发布确认是一种简单的确认方式,它是一种同步确认发布的方式。也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirms()这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
缺点:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
//单个确认发布public static void publishMessageIndividually() throws Exception{Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//循环发布消息for(int i=0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功"+i);}}long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("单次发布模式下发送"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}
2.2 批量发布确认
批量消息确认发布是指不再是像单个发布确认的模式那样发一条消息等待确认后再发一条,而是一次性发送一批消息之后再统一确认发布。批量发布确认确实可以极大的提高消息队列的吞吐量,但缺点却是不清楚哪个消息出现问题。
//批量确认发布public static void publishMessageBatch() throws Exception {Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量操作的信息大小int batchSize = 10;for (int i=0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//判断是否达到sizeif (i%batchSize==0){channel.waitForConfirms();}}long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("批量确认发布模式下发送消息"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}
2.3 异步发布确认
相比于前面两种发布确认模式,异步确认发布在实现起来更加复杂,但是性能和效率更高。它是通过回调函数来达到消息可靠性传递和保证消息投递成功的。在异步发布确认模式下一般至少有三个线程,一个线程是用来发送消息的;另外两个线程分别是处理消息确认成功和消息确认失败的回调函数。
//异步确认发布public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//消息确认成功的回调函数ConfirmCallback ackCallback = (deliveryTag,multiple)->{System.out.println("确认的消息:"+deliveryTag);};//消息确认失败的回调函数(消息的标记,是否为批量处理)ConfirmCallback nackCallback = (deliveryTag,multiple)->{System.out.println("未确认的消息:"+deliveryTag);};//设置消息监听器channel.addConfirmListener(ackCallback,nackCallback); //这个监听是异步的//消息发送for(int i = 0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());}//结束时间long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("异步确认发布模式下发送消息"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}
2.4 处理异步未确认消息的机制
处理异步未确认消息的最好解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentSkipListMap这个队列在confirm callbacks与发布线程之间进行消息的传递。 这里为了存储发送的消息体我们使用的是ConcurrentSkipListMap这个类型对象,该类对象是线程安全的有序的哈希表,适用于高并发的场景。
public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtil.getChannel();//随机生成队列声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();/*** ConcurrentSkipListMap线程安全有序的一个哈希表,适用于高并发的场景下* 1.轻松的关联序号和消息* 2.轻松批量删除条目* 3.支持高并发场景*/ConcurrentSkipListMap<Long,String> outstandingConfirm = new ConcurrentSkipListMap<>();//消息确认成功的回调函数ConfirmCallback ackCallback = (deliveryTag,multiple)->{//删除掉已经确认的消息 剩下未确认的消息//判断是不是批量删除if(multiple){ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirm.headMap(deliveryTag);confirmed.clear();}else {outstandingConfirm.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};//消息确认失败的回调函数(消息的标记,是否为批量处理)ConfirmCallback nackCallback = (deliveryTag,multiple)->{//打印未确认的消息String message = outstandingConfirm.get(deliveryTag);System.out.println("未确认消息是"+message+"未确认的消息的标签:"+deliveryTag);};//设置消息监听器channel.addConfirmListener(ackCallback,nackCallback); //这个监听是异步的//消息发送for(int i = 0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//记录下所有的发送消息outstandingConfirm.put(channel.getNextPublishSeqNo(), message);}//结束时间long end = System.currentTimeMillis();long ttl = end - begin;System.out.println("异步确认发布模式下发送消息"+MESSAGE_COUNT+"次消息的耗时:"+ttl+"ms");}
outstandingConfirm.headMap(deliveryTag)是为了在批量操作中做一些过滤,返回小于deliveryTag的映射并调用clear方法清空outstandingConfirm 映射中所有小于 deliveryTag 的键值对,而不影响其他键值对。这里其实就是清除所有已经确认的消息,留下未确认的消息。
总结
这篇文章的知识都比较简单,其实中间件的设计都是为了解决问题的,而我们也可以通过保证消息不丢失的同时兼顾性能这个需求来学习这部分知识。其余的就是基本的操作和类库操作的熟练了哈哈哈哈,继续梳理的同时,荔枝也要继续努力学习咯~~~
今朝已然成为过去,明日依然向往未来!我是荔枝,在技术成长之路上与您相伴~~~
如果博文对您有帮助的话,可以给荔枝一键三连嘿,您的支持和鼓励是荔枝最大的动力!
如果博文内容有误,也欢迎各位大佬在下方评论区批评指正!!!
相关文章:
RabbitMQ深入 —— 持久化和发布确认
前言 前面的文章荔枝梳理了如何去配置RabbitMQ环境并且也介绍了两种比较简单的运行模式,在这篇文章中荔枝将会继续梳理有关RabbitMQ的持久化机制以及发布确认模式的相关知识,希望能够帮助到大家~~~ 文章目录 前言 一、持久化 1.1 队列持久化 1.2 消息…...
人脸识别三部曲
人脸识别三部曲 首先看目录结构图像信息采集 采集图片.py模型训练 训练模型.py人脸识别 人脸识别.py效果 首先看目录结构 引用文121本 opencv │ 采集图片.py │ 训练模型.py │ 人脸识别.py │ └───trainer │ │ trainer.yml │ └───data │ └──…...
【Linux网络编程】Socket-TCP实例
netstat -nltp 无法用read函数读取UDP套接字的数据,因为UDP是面向数据报,而TCP是面向数据流。 客户端不需要 bind,listen,accept,但是客户端需要connect,connect会自动做bind工作。 #include <sys/sock…...
<OpenCV> 边缘填充
OpenCV边缘填充 1、边缘填充类型 enum cv::BorderTypes ORDER_CONSTANT iiiiii|abcdefgh|iiiiiii with some specified i -常量法,常熟值填充; BORDER_REPLICATE aaaaaa|abcdefgh|hhhhhhh -复制法,复制边缘像素; BORDER_R…...
【视觉SLAM入门】7.3.后端优化 基于KF/EKF和基于BA图优化的后端,推导及举例分析
"时间倾诉我的故事" 1. 理论推导2. 主流解法3. 用EKF估计状态3.1. 基于EKF代表解法的感悟 4. 用BA法估计状态4.1 构建最小二乘问题4.2 求解BA推导4.3 H的稀疏结构4.4 根据H稀疏性求解4.5 鲁棒核函数4.6 编程注意 5.总结 引入: 前端里程计能给出一个短时间…...
Docker概念通讲
目录 什么是Docker? Docker的应用场景有哪些? Docker的优点有哪些? Docker与虚拟机的区别是什么? Docker的三大核心是什么? 如何快速安装Docker? 如何修改Docker的存储位置? Docker镜像常…...
PHP请求API接口案例采集电商平台数据获取淘宝/天猫优惠券查询示例
优惠券查询API接口对于用户和商家来说具有重要作用,可以方便地获取优惠券信息,进行优惠券搜索和筛选,参与活动和促销推广,提供数据分析和决策支持,提升用户体验和忠诚度,为商家增加销售额和市场竞争力。 t…...
计算机网络:三次握手与四次挥手
摘取作者:拓跋阿秀 三次握手 三次握手(Three-way Handshake)其实就是指建立一个TCP连接时,需要客户端和服务器总共发送3个包。进行三次握手的主要作用就是为了确认双方的接收能力和发送能力是否正常、指定自己的初始化序列号为后…...
Visual Studio 调试上传文件时自动停止运行的解决方法
进入:选项,项目和解决方案,Web项目, 找到在浏览器窗口关闭时停止调试程序,在调试停止时关闭浏览器 将它不要勾关闭,然后重新启动下Visual Studio,上传文件时就可以调试了...
使用scp命令失败出错
使用scp命令失败出错,无反应。 解决: 1.使用ifconfig查看目标主机公网IP地址 ifconfig需使用公网ip 2.配置免密登录 可参考 远程登录ssh ssh-copy-id root目标主机ip再次尝试scp命令。 SCP(Secure Copy)是一个用于在本地主机和…...
kafka增加磁盘或者分区,topic重分区
场景:kafka配置文件log.dirs增加了几个目录,但是新目录没有分区数据写入,所以打算进行重分区一下。 1.生成迁移计划 进入kafka/bin目录 新建 topic-reassign.json,把要重分区的topic按下面格式写。 { "topics": [{ …...
SpringMVC系列(五)之JSR303和拦截器
目录 一. JSR303 1.1 JSR303是什么 1.2 为什么要使用JSR303 1.3 JSR303常用注解 1.4 JSR303快速入门 1. 导入相关pom依赖 2. 配置校验规则 3. 入门示例 二. SpringMVC的拦截器 2.1 什么是拦截器 2.2 拦截器与过滤器的区别 2.3 拦截器工作原理 2.4 入门示例 1. 创建…...
LCP 01.猜数字
题目来源: leetcode题目,网址:LCP 01. 猜数字 - 力扣(LeetCode) 解题思路: 遍历比较即可。 解题代码: class Solution {public int game(int[] guess, int[] answer) {int res0;for(int …...
智能小车开发
1.材料 店铺:店内搜索页-risym旗舰店-天猫Tmall.com 1.四个小车轮子 2.四个直流减速电机 3.两节18650锂电池(每节3.7V),大概电压在7.4V左右,电压最好不要超过12V不然会损坏电机驱动 4.一个18650锂电池盒 5.一个L…...
RDMA性能测试工具集preftest_README
文章目录 1 概述2 安装3 测试方法说明4 测试说明5 运行测试所有测试的通用选项延迟测试选项带宽测试选项ib_send_lat(发送延迟测试)和 ib_send_bw(发送带宽测试)的选项ib_atomic_lat(原子延迟测试)和 ib_at…...
墨天轮专访星环科技刘熙:“向量热”背后的冷思考,Hippo如何打造“先发”优势?
导读: 深耕技术研发数十载,坚持自主可控发展路。星环科技一路砥砺前行、坚持创新为先,建设了全面的产品矩阵,并于2022年作为首个独立基础软件产品公司成功上市。星环科技在今年的向星力•未来技术大会上发布了分布式向量数据库Tra…...
逆向-beginners之非递归
/* * 非递归 */ void f() { } void main() { f(); } #if 0 /* * intel */ 0000000000001129 <f>: 1129: f3 0f 1e fa endbr64 112d: 55 push %rbp 112e: 48 89 e5 mov %rsp,%…...
Spring for Apache Kafka概述和简单入门
一、概述 Spring for Apache Kafka 的高级概述以及底层概念和可运行的示例代码。 二、准备工作 注意:进行工作开始之前至少要有一个 Apache Kafka 环境 2.1、依赖 使用 Spring Boot<dependency><groupId>org.springframework.kafka</groupId><artifact…...
基于SSM+Vue的医院医患管理系统
末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:采用Vue技术开发 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目&#x…...
再次理解Android账号管理体系
目录 ✅ 0. 需求 📂 1. 前言 🔱 2. 使用 2.1 账户体系前提 2.2 创建账户服务 2.3 操作账户-增删改查 💠 3. 源码流程 ✅ 0. 需求 试想,自己去实现一个账号管理体系,该如何做呢? ——————————…...
python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
R语言速释制剂QBD解决方案之三
本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...
系统掌握PyTorch:图解张量、Autograd、DataLoader、nn.Module与实战模型
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文通过代码驱动的方式,系统讲解PyTorch核心概念和实战技巧,涵盖张量操作、自动微分、数据加载、模型构建和训练全流程&#…...
ubuntu22.04有线网络无法连接,图标也没了
今天突然无法有线网络无法连接任何设备,并且图标都没了 错误案例 往上一顿搜索,试了很多博客都不行,比如 Ubuntu22.04右上角网络图标消失 最后解决的办法 下载网卡驱动,重新安装 操作步骤 查看自己网卡的型号 lspci | gre…...
