当前位置: 首页 > news >正文

rabbitmq笔记-rabbitmq进阶-数据可靠性,rabbitmq高级特性

消息何去何从

mandatory和immediate是channel.basicPublish方法的两个参数,都有消息传递过程中不可达目的地时将消息返回给生产者的功能。

mandatory参数

  • true:交换器无法根据自身的类型 和路由键找到符合条件的队列,rabbitmq调用Basic.Return命令将消息返回给生产者
    • 生产者调用channel.addReturnListener添加ReturnListener监听器实现
  • false:消息直接丢弃

immediate参数

告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。

Rabbitmq3.0去掉了对immediate参数支持,建议采用TTL和DLX方法替代

  • true:如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,这条消息将不会存入队列中,当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return返回至生产者。

备份交换器(AE)

生产者发送消息不设置mandatory,消息未被路由会丢失,设置了,需要添加ReturnListener。如果不想编程复杂也不想消息丢失使用备份交换器。

使用:

  • 声明交换器时添加alternate-exchange参数实现
    • channel.exchangeDeclare(“myAe”,“fanout”,true,false,null);
    • channel.queueDeclare(“unroutedQueue”,true,false,false,null);
    • channel.queueBind(“unroutedQueue”,“myAe”,“”);
  • 通过策略(Policy)实现
    • rabbitmqctl set_policy AE “^normalExchange$” ‘{“alternate-exchange”:“myAE”}’

特殊情况:

  • 如果设置了备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失
  • 如果备份交换器没有绑定任何队列,客户端和rabbitmq服务端都不会有异常出现,此时消息会丢失
  • 如果备份交换器没有任何匹配的队列,客户端和rabbitmq服务端都不会有异常出现,此时消息会丢失
  • 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效

过期时间(Time to Live ,TTL)

设置消息TTL

  • 通过队列属性设置,队列中所有消息都有相同的过期时间(一旦过期,就从队列中抹去,消息已经在队列头部,只要定期从队列头部开始扫描即可)

    • channel.queueDeclare方法中加入x-message-ttl参数实现,单位ms

      • Map<String,Object> args = new HashMap<String,Object>();
        args.put("x-message-ttl",6000);
        channel.queueDeclare(queueName,durable,exclusive,autoDelete,args);
        
    • 通过Policy方式设置ttl

      • rabbitmqctl set_policy TTL ".*" '{"message-ttl":6000}' --apply-to queue
        
    • 通过调用http api接口设置

  • 通过对消息本身单独设置,每条消息的ttl可以不同(即使过期,也不会马上抹去,是否过期是在即将投递到消费者之前判定的)

    • 代码设置
      • 设置AMQP.BasicProperties属性
      • set属性:deliveryMode(持久化消息),expiration(ttl时间)
    • 通过 http api接口设置
  • 如果两个方法一起使用,消息的ttl以两者之间较小的数值为准,消息在队列中一旦超过设置的ttl时,就会变成死信,消费者将无法再收到该消息

  • 不设置ttl,表示此消息不会过期,ttl=0,表示除非此时可以直接将消息投递到消费者,否则立即丢弃。

设置队列TTL

channel.queueDeclare方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间(未使用:队列上没有任何消费者,队列也没有被重新声明,并在过期时间段内也未调用过Basic.Get命令)

Map<String, Object> args = new HashMap<String,Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue",false,false,false,args);

死信队列

当消息在一个队列中变成死信后,能被重新被发送到另一个交换器中,这个交换器就是DLX,死信交换器,绑定DLX的队列就是死信队列

消息变成死信情况

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

当队列中存在死信时,rabbitmq会自动将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,死信队列,可以监听这个队列的消息进行相应处理

设置方法

  • 代码设置:
    • channel.queueDeclare方法中设置x-dead-letter-exchange为队列添加DLX
  • 通过Policy方式设置

延迟队列

延迟队列存储的对象是对应的延迟消息(当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费)。

场景:

  • 订单系统,30min内未支付,进行异常处理
  • 手机遥控设备指定时间工作

通过DLX 和TTL模拟延迟队列的功能。

假设一个应用中需要将每条消息都设置为10秒延迟,生产者通过exchange.normal交换器将发送的消息存储在queue.normal队列,消费者订阅的是queue.dlx队列,当消息从queue.normal整个队列中过期之后被存入queue.dlx队列,消费者恰巧消费到了延迟10秒的这条消息。

优先级队列

实现:通过设置队列的x-max-priority参数实现

默认最低优先级为0,越高越优先消费

前提:如果在消费速度大于生成者的速度且broker中没有消息堆积的情况下,对发送的消息设置优先级就没什么意义了。

RPC实现

客户端发送请求消息,服务端回复响应的消息,为了接收响应的消息,需要在请求消息中发送一个回调队列

String callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();
channel.basicPublish("","rpc_queue",props,message.getBytes());
  • replyTo:用来设置一个回调队列
  • correlationId:用来关联请求和其调用RPC之后的回复,每一个请求设置一个唯一的correlationId

可以为每一个客户端创建一个单一的回调队列。

持久化

  • 交换器持久化:通过在声明队列是将durable参数置为true实现的。如果不持久化,rabbitmq服务重启后,相关的交换器元数据会丢失,消息不丢失,只是不能将消息发送到这个交换器中了。
  • 队列持久化:通过在声明队列时将durable置为true,如果不设置持久化,rabbitmq重启后,相关队列元数据会丢失,此时数据也会丢失。
  • 消息持久化:通过将消息的投递模式BasicProperties中的diliveryMode属性设置为2即可实现消息的持久化
  • 设置了队列和消息的持久化,rabbitmq服务重启后,消息依旧存在。

将交换器、队列、消息都设置持久化后不能保证数据百分百丢失。

生产者确认

确定消息到底有没有正确到达服务器。可以通过事务机制和发送方确认机制

事务机制

rabbitmq客户端与事务机制相关方法

  • channel.txSelect:用于当前的信道设置成事务模式
  • channel.txCommit:用于提交事务
  • channel.txRollback:用于事务回滚

开启事务流程

  • 客户端发送Tx.select,将信道置为事务模式
  • Broker回复Tx.Select-Ok,确认已将信道置为事务模式
  • 在发送完消息后,客户端发送Tx.Commit提交事务
  • Broker回复Tx.Commit-Ok,确认提交事务
  • 如果发生异常,在捕获异常后,channel.txRollback()回滚

缺点:会有性能损失

发送方确认机制

  • 生产者将信道设置成confirm模式(channel.confirmSelect),rabbitmq同意:Confirm.Select-Ok;
  • 一旦信道进入confirm模式,所有在该信道上发布的消息都会被指派一个唯一id
  • 一旦消息被投递到匹配的队列后,rabbitmq会发送一个确认给生产者(包含消息唯一id),使得生产者知晓消息已经正确到达了目的地。

事务机制在一条消息发送后会使发送端阻塞,等待rabbitmq回应后才发下一条消息,而发送发确认机制最大好处是异步的。生产者通过回调方法处理该确认消息。如果rabbitmq因自身内部错误导致消息丢失,会发送一条nack命令,生产者应用程序同样可以在回调方法中处理nack命令。

publisher confirm优势

  • 批量confirm方法,每发送一批消息后,调用channel.waitForConfirms方法,等待服务器的确认返回
  • 异步confirm方法:提供一个回调方法,服务端确认了一条或多条消息后客户端会回调这个方法进行处理。

消费端要点介绍

消息分发

rabbitmq队列拥有多个消费者时,队列收到的消息将以轮询的分发方式发送给消费者,每条消息只会发送给订阅列表里的一个消费者。

  • 问题:如果某些空闲,某些忙碌造成整体下降
  • 方法:channel.basicQos方法允许限制信道上的消费者所能保持的最大未确认消息的数量。如果达到上限,就不会向这个消费者再发送任何消息,知道消费者确认了某条消息后,相应计数减1,之后消费者可以继续接受消息。

消息顺序性

指消费者消费到的消息和发送者发布的消息的顺序是一致的。

打破顺序性的情形

  • 如果生产者使用了事务机制,发送消息遇到异常进行了事务回滚,需重新补偿发送,如果是另一个线程实现,则出现乱序。
  • 如果生产者发送的消息设置了不同的超时时间,并设置了死信队列,顺序不一致。
  • 设置了优先级,也不是顺序的。

要保证消息的顺序性,需要业务方使用rabbitmq之后进一步处理,例如在消息体内添加全局有序标识实现。

弃用QueueingConsumer

缺陷

  • 内存溢出问题:队列堆积较多的消息,导致消费者客户端内存溢出假死,不断堆积
    • 使用Basic.Qos限制某个消费者所保持未确认消息的数量。
  • 会拖累同一个connection下的所有信道,性能降低
  • 同步递归调用QueueingConsumer会产生死锁
  • rabbitmq的自动连接恢复机制不支持Queueing Consumer这种形式
  • QueueingConsumer不是事件驱动的

消息传输保障

一般消息中间件消息传输保障分为三个层级

  • 最多一次
  • 最少一次
  • 恰好一次

rabbitmq支持其中的最多一次和最少一次,其中最少一次投递实现需要考虑

  • 消息生产者需要开启事务机制或publisher confirm机制,以确保消息可以可靠地传输到rabbitmq中
  • 消息生产者需要配合使用mandatory参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃
  • 消息和队列都需要进行持久化处理,以确保rabbitmq服务器在遇到异常情况时不会造成消息丢失
  • 消费者在消费消息的同时需要将autoAck设置为false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。

参考:《RabbitMQ实战指南》

相关文章:

rabbitmq笔记-rabbitmq进阶-数据可靠性,rabbitmq高级特性

消息何去何从 mandatory和immediate是channel.basicPublish方法的两个参数&#xff0c;都有消息传递过程中不可达目的地时将消息返回给生产者的功能。 mandatory参数 true&#xff1a;交换器无法根据自身的类型 和路由键找到符合条件的队列&#xff0c;rabbitmq调用Basic.Re…...

【笔记】判断两个String字符串是否相同(考虑字符串为null的情况)

判断两个字符串是否相同&#xff0c;可用于判断一个字段在逻辑处理前后&#xff0c;值是否有变化。 其中重点是要考虑两个字符串是否有为null的情况&#xff0c;如果其中一个&#xff0c;或者两个都为空&#xff0c;用str1.equals(str2)直接判断&#xff0c;就会报NullPointer…...

【校招VIP】java语言考点之多线程NIO

考点介绍 多线程&NIO考点是校招面试中的常制点之一。 Java NIO是new IO的简称&#xff0c;是一种可以替代Java 10的一套新的IO机制。它提供了一套不同于Java标准1O的操作机制&#xff0c;严格来说&#xff0c;NIO与并发并无直接关系&#xff0c;但是使用NIO技术可以大大提高…...

JVM知识点(一)

1、JVM基础概念 &#xff08;1&#xff09;JVM、JRE、JDK JRE&#xff1a;JVM基本类库组成的运行环境就是JRE。JVM自己是无法完成一次编译&#xff0c;处处运行的&#xff0c;需要有一个基本类库告诉JVM如何操作运行&#xff0c;如如何操作文件&#xff0c;连接网络等&#x…...

网页接口导入postman进行接口请求

postman版本&#xff1a;v10.17.4 一、拷贝接口信息 网页打开开发者工具-networkk&#xff0c;在网页上请求一次接口&#xff0c;鼠标指在接口上&#xff0c;点击鼠标右键-copy-copy as cURL(bash) 二、导入postman 打开postman&#xff0c;点击import-Raw text&#xff0c;…...

【Leetcode】124.二叉树中的最大路径和(Hard)

一、题目 1、题目描述 二叉树中的 路径 被定义为一条节点序列,序列中每对相邻节点之间都存在一条边。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点,且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root ,返回其…...

django自动创建model数据

目前使用的环境&#xff1a;django4.2.3&#xff0c;python3.10 django通过一些第三方库&#xff0c;可以轻易的自动生成一系列的后台数据。 首先先创建一个数据库&#xff1a; 然后&#xff0c;在setting.py中就可以指定我们新创建的数据库了。 DATABASES {default: {ENGI…...

vscode 远程连接

这里记录的是修改ssh配置文件的方式远程连接服务器中的docker服务器 假如已经创建好了docker&#xff0c;并已经启动 1. config ssh in dev container 在/etc/ssh/sshd_config中修改 PermitRootLogin in 把 PermitRootLogin prohibit-password 修改为 PermitRootLogin yes …...

Error running ‘Tomcat 8.5.29‘ Address localhost:1099 is already in use

一、Error running ‘Tomcat 8.5.29’ Address localhost:1099 is already in use 原因&#xff1a;端口1099被占用了。 二、解决 2.1 解决方法一-结束该端口1099占用 //1-查看端口占用&#xff0c;根据端口号1099&#xff0c;获取PID(进程ID) netstat -ano | findstr "…...

后端面试话术集锦第 七 篇:nginx面试话术

🚗后端面试集锦目录 💖后端面试话术集锦第 1 篇:spring面试话术💖 💖后端面试话术集锦第 2 篇:spring boot面试话术💖 💖后端面试话术集锦第 3 篇:spring cloud面试话术💖 💖后端面试话术集锦第 4 篇:ElasticSearch面试话术💖 💖后端面试话术集锦第 5 …...

leetcode算法题--使子序列的和等于目标的最少操作次数

原题链接&#xff1a;https://leetcode.cn/problems/minimum-operations-to-form-subsequence-with-target-sum/description/ 视频讲解&#xff1a;https://www.bilibili.com/video/BV1Em4y1T7Bq?t1456.1 这题是真的难。。 func minOperations(nums []int, target int) int…...

服务器部署前后端项目-SQL Father为例

hello~大家好哇&#xff0c;好久没更新博客了。现在来更新一波hhh 现在更新一下部署上的一些东西&#xff0c;因为其实有很多小伙伴跟我之前一样&#xff0c;很多时候只是开发了&#xff0c;本地前后端都能调通&#xff0c;也能用&#xff0c;但是没有部署到服务器试过&#x…...

LiveNVR监控流媒体Onvif/RTSP功能-支持语音对讲支持非国标摄像头SDK语音对讲GB28181级联国标平台非国标转国标语音对讲

LiveNVR支持语音对讲支持非国标摄像头SDK语音对讲GB28181级联国标平台非国标转国标语音对讲 1、确认摄像头是否支持对讲2、摄像头视频类型复合流3、通道配置SDK接入4、视频广场点击播放5、相关问题5.1、如何配置通道获取直播流&#xff1f;5.2、如何GB28181级联国标平台&#x…...

爬虫selenium获取元素定位方法总结(动态获取元素)

目录 元素 查看元素信息 元素定位 通过元素id定位 通过元素name定位 通过xpath表达式定位 绝对路径 相对路径 通过完整超链接定位 通过部分链接定位 通过标签定位 通过类名进行定位 通过css选择器进行定位 id选择器 class选择器 标签选择器 属性选择器 定位带…...

JVM下篇知识

第01章&#xff1a;概述篇 第02章&#xff1a;JVM监控及诊断工具-命令行篇 第03章&#xff1a;JVM监控及诊断工具-GUI篇 第04章&#xff1a;JVM运行时参数 第05章&#xff1a;分析GC日志...

HBase客户端的批量写缓存BufferedMutator

HBase数据刷写 之前提到过这个方法&#xff0c;那么BufferedMutator是什么&#xff1f;又应该如何实现呢&#xff1f; 写缓存 HBase的每一个put操作实际上是一个RPC操作&#xff0c;将客户端的数据传输到服务器再返回结果&#xff0c;这只适用于小数据量的操作&#xff0c;如…...

从多个角度详解map转为list

从多个角度详解map转为list 更新&#xff1a;2023-05-20 19:24 在Java编程中&#xff0c;我们经常使用map存储键值对数据&#xff0c;而有时我们需要把map转为list&#xff0c;本文将从多个方面对map转list做详细的阐述。 一、map转为list的基础方法 Java提供了多种方法将m…...

PHP用CURL发送Content-type为application/json的POST请求方法

HELLO 各位伙伴&#xff0c;最近一直在做项目&#xff0c;没有及时更新。望请见谅。 今天&#xff0c;给大家讲一下php请求第三方接口的时候遇到的问题&#xff0c;大家都知道&#xff0c;在请求第三方接口的时候&#xff0c;会要求我们用post还是get来传参 一般我们传参的时候…...

【程序猿书籍大放送:第二期】《强化学习:原理与Python实战》

&#x1f339;欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 爱书不爱输的程序猿&#xff1a;送书第二期 一、搞懂大模型的智能基因&#xff0c;RLHF系统设计关键问答1.RLHF是什么&#xff1f;2.RLHF适用于哪些任务&#xff1f;3…...

SV-6002Y 网络对讲求助模块,带3W功放输出和一路30W功放输出

SV-6002Y 网络对讲求助模块&#xff0c;带3W功放输出和一路30W功放输出 SV-6002Y是我司一款求助对讲模块&#xff0c;具有10/100M以太网接口&#xff0c;其接收网络的音频数据&#xff0c;实时解码播放&#xff0c;还可配置麦克风输入和扬声器输出。SV-6002Y可实现对讲、广播、…...

Nginx详解 二:配置文件部分

文章目录 1. Nginx 配置文件1.1 主配置文件1.2 子配置文件1.3 全局配置1.3.1 修改启动的进程数1.3.2 cpu和work进程绑定&#xff08;nginx调优&#xff09;1.3.3 修改PID路径1.3.4 nginx进程的优先级&#xff08;work进程的优先级&#xff09;1.3.5 调试work进程打开的文件的个…...

SMC_TRAFO_GantryCutter2 (FB) 带刀片旋向龙门

裁布机&#xff1a;刀片按XY走向&#xff0c;偏转刀片角度。 pi&#xff1a;目标位置矢量&#xff08;x&#xff0c;y&#xff09;&#xff0c;插值器的输出 v&#xff1a;当前路径切线的矢量&#xff0c;插值器的输出 dOffsetX&#xff1a; x轴的附加偏移 dOffsetY&#xf…...

『PyQt5-Qt Designer篇』| 07 Qt Designer中栅格布局和表单布局的使用

07 Qt Designer中栅格布局和表格布局的使用 1 栅格布局1.1 按钮布局1.2 栅格布局中拖入控件1.3 保存并调用2 表单布局2.1 标签+输入控件2.2 保存并调用3 组合水平和垂直布局1 栅格布局 1.1 按钮布局 拖入几个按钮,如图: 选中所有按钮,右键点击布局-栅格布局: 之后可以看到…...

无涯教程-分类算法 - 多项式逻辑回归模型函数

Logistic逻辑回归的另一种有用形式是多项式Lo​​gistic回归&#xff0c;其中目标或因变量可以具有3种或更多可能的unordered类型&#xff0c;即没有定量意义的类型。 用Python实现 现在&#xff0c;无涯教程将在Python中实现上述多项式逻辑回归的概念。为此&#xff0c;使用…...

【C++】开源:Box2D动力学库配置与使用

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍Box2D动力学库配置与使用。 无专精则不能成&#xff0c;无涉猎则不能通。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c…...

Druid连接池和Apache的DBUtils

背景 jdbc连接数据库存在着大批量用户进行短时间的SQL连接操作的 需求&#xff0c;而普通用户连接后直接断开与数据库的连接&#xff0c;下次连接需要重新建立桥梁&#xff0c;再频繁访问时。这是很消耗性能的一个操作&#xff0c;因此诞生了数据库连接池技术。提前创建 一些连…...

怎样快速选择正确的可视化图表?

数据可视化的图表类型十分丰富&#xff0c;好的图表可以有效、清晰地呈现数据的信息。对于用户而言&#xff0c;选择正确的图表是十分关键的&#xff0c;不仅可以达到“一图胜千言”的效果&#xff0c;而且会直接影响分析的结果。 用户选择正确的数据可视化图表前&#xff0c;…...

6路液体水位检测芯片VK36W6D SOP16 抗电源干扰及手机干扰特性好

产品品牌&#xff1a;永嘉微电/VINKA 产品型号&#xff1a;VK36W6D 封装形式&#xff1a;SOP16/QFN16L 详细资料&#xff1a;13.5/5.474/4.703 概述 VK36W6D具有6个触摸检测通道&#xff0c;可用来检测6个点的水位。该芯片具有较高的集成度&#xff0c;仅需极少的外部组件便…...

【设备树笔记整理6】中断系统中的设备树

1 中断概念的引入与处理流程 1.1 中断处理框图 1.2 中断程序的使用 主函数() while(1) {do_routine_task(); }中断处理函数() {handle_interrupt_task(); }如何调用中断处理函数&#xff1f; 1.3 ARM对异常(中断)的处理过程 &#xff08;1&#xff09;初始化 ① 设置中断…...

微信小程序下载后端返回的文件流

downtest() {let temp {"title": ["排名", "车号", "车队", "车手", "领航", "赛段成绩", "距首车成绩", "距前车差距", "发车时间", "冲刺时间", "赛段…...