RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认
RabbitMQ消息确认
SpringBoot与RabbitMQ整合后,对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致;
消息发布确认
- 生产者给交换机发送消息后、若是不管了,则会出现消息丢失;
- 解决方案1: 交换机接受到消息、给生产者一个答复ack, 若生产者没有收到ack, 可能出现消息丢失,因此重新发送消息;
- 解决方案1隐藏问题:若是交换机发送了ack, 出现网络延迟,则生产者没有收到ack, 就会出现消息重复发送问题, 进而衍生幂等性问题;
- 隐藏问题解决方案1:在数据库中增加一张去重表,设置唯一索引; 生产者在消息内容中,翻入唯一ID,消费者消费时、先从数据库查询是否存在,存在则不处理该消息;
- 适用于并发低、业务严谨的场景
- 隐藏问题解决方案2:利用Redis的String的setnx,若key存在,则不处理、若key存在,则执行业务;
- 适用于短时间处理大量消息,且 key不会重复;
-这就是大名鼎鼎的幂等性问题,贼讨厌这些专有名词;
- 适用于短时间处理大量消息,且 key不会重复;
- 隐藏问题解决方案1:在数据库中增加一张去重表,设置唯一索引; 生产者在消息内容中,翻入唯一ID,消费者消费时、先从数据库查询是否存在,存在则不处理该消息;
业务开发中的幂等性
- 前端保存数据时、点击多次保存按钮,插入多条数据;
- 解决方案 :前端限制按钮点击、 数据库设置业务唯一索引;
- 消息推送中,可能出现多条内容一样的消息,又不可以重复处理 ,需要幂等性处理;
上家公司中,后台给app客户端推送系统消息时、配置给所有用户推送消息, 其他服务给我的应用消息服务推送 RabbitMQ消息, 正常来说, 每次推送的消息, 设备ID和用户ID合起来唯一的,结果其他服务业务数据存在问题,有些旧数据没有清除, 导致相通的设备ID,用户ID, 一次给设备用户推送了十几条,安卓客户端当当当的响, 直接惊动了产品经理; 经过排查上,是上游数据有问题,代码又很老,其他服务负责人排查了好几天, 把问题数据清楚了, 结果后面又产生了问题数据;- 解决方案:由于会一次性处理几万条推送消息,因此对业务要求速度高,因此利用Redis的String的setNx, 以taskId + mobileDevId + userId + tenantId 组成了唯一key,若是存在,则不处理; key有限时间为60分钟, 就成功处理了该问题;
- 吐槽:上游的业务问题,让下游服务做业务保证,属实离谱;
{
“taskId” :“xxxx”;
“mobileDevId” : “xxxx”;
“userId”:“xxx”;
“tenantId” : “xxx”;
“其他字段”: “…”
}
RabbitMQ发布确认与返回
SpringBoot发布确认与返回
配置:
第二个参数因为过时,所以要配置第三个参数为correlated,表示用来确认消息;
#生产者
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
生产者:
- 通过RabbitTempldate#setConfirmCallback设置确认回调, 即交换器发送给ack给生产者,生产者调用ConfirmCallback回调, 若出现异常cause,则可重新推送;
通过RabbitTempldate#setReturnCallback设置返回回调; - 通过template#waitForConfirms(xxx)表示等待xxx毫秒后确认,超时返回false;
- 若返回false, 则进行业务补救处理;
public class ConfirmProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate template;@Autowiredprivate DirectExchange confirmExchange;AtomicInteger index = new AtomicInteger(0);AtomicInteger count = new AtomicInteger(0);private final String[] keys = {"sms", "mail"};@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() throws IOException {//短信String sms = "{userName: xxx; phone:xxx}";HashMap<String, Object> map = new HashMap<>();map.put("userName", "hanxin");map.put("phone", index.getAndIncrement());template.setMandatory(true);template.setConfirmCallback(this);template.setReturnCallback(this);template.convertAndSend(confirmExchange.getName(), "confirm", map);System.out.println("send sms confirm");}@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send2() throws IOException {template.invoke((operations) -> {//短信String sms = "{userName: xxx; phone:xxx}";HashMap<String, Object> map = new HashMap<>();map.put("userName", "hanxin");map.put("phone", index.getAndIncrement());//必须设置template.setMandatory(true);template.convertAndSend(confirmExchange.getName(), "confirm", map);System.out.println("send sms confirm");return template.waitForConfirms(1000);});}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("receive confirm callback, ack = " + ack);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("receive return call : message:" + message.getBody());System.out.println("receive return call : replyCode:" + replyCode);System.out.println("receive return call : replyText:" + replyText);System.out.println("receive return call : exchange:" + exchange);System.out.println("receive return call : routingKey:" + routingKey);}
}
业务开发中,非严谨,追求性能高业务建议使用send,这个过程是异步确认的;
严谨业务建议使用send2, 同步等待相应,出现问题好确认;
交换机:
@Configuration
public class ConfirmConfig {public final static String CONFIRM_QUEUE_NAME = "confirmQueue";public final static String CONFIRM_EXCHANGE_NAME = "confirmExchange";public final static String CONFIRM_ROUTING_NAME = "confirm";@Beanpublic DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Beanpublic ConfirmProducer confirmProducer() {return new ConfirmProducer();}
}
运行结果:
receive status : true
RabbitMQ发布确认
若是使用原生Rabbit MQ客户端API,则有三种方式:
- 声明channel是需要交换机确认的
channel.confirmSelect();
- 发布单条消息
for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);channel.basicPublish("", queue, null, body.getBytes());channel.waitForConfirmsOrDie(5_000);
}
channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。
官方说明了,其实channel底层是异步工作的,会将channel阻塞住,然后异步等待服务端发送一个确认消息,才解除阻塞。但是我们在使用时,可以把他当作一个同步工具来看待。利用一个异步转同步功能,可利用JUC实现;
然后如果到了超时时间,还没有收到服务端的确认机制,那就会抛出异常。然后通常处理这个异常的方式是记录错误日志或者尝试重发消息,但是尝试重发时一定要注意不要使程序陷入死循环。
- 发送批量消息
条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认
int batchSize = 100;int outstandingMessageCount = 0;long start = System.nanoTime();for (int i = 0; i < MESSAGE_COUNT; i++) {String body = String.valueOf(i);ch.basicPublish("", queue, null, body.getBytes());outstandingMessageCount++;if (outstandingMessageCount == batchSize) {ch.waitForConfirmsOrDie(5_000);outstandingMessageCount = 0;}}if (outstandingMessageCount > 0) {ch.waitForConfirmsOrDie(5_000);}
存在隐藏问题:若是500条消息处理太久,超时了,则响应失败,消息重新入队、出现重新消费问题;
- 异步确认消息
实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。核心代码就是一个:
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2);
这三种确认机制都能够提升Producer发送消息的安全性。通常情况下,第三种异步确认机制的性能是最好的。第一种安全性最高。
消费者确认
当交换机接受消息后,就要转发给消费者;如何保证消息不丢失?重复消费?
SpringBoot消费确认
自动确认
-
若是业务中,一些消息发送给消费者,若是消息出现异常,消费者返回通知交换机消息出现了异常,交换机会将消息重新入队;
-
若是没有确认消息,交换机没有收到消息,会将消息会重新放入队列中,每次消费者启动都会把以前消费的消息重新消费;
-
SpringBoot整合RabbitMQ后, 设置参数max-attempts为最大重试次数、retry.enabled为开启重试机制;
spring.rabbitmq.listener.direct.retry.max-attempts=5
spring.rabbitmq.listener.direct.retry.enabled=true
-
为什么要开启重试?
不开启重试、消费者处理消息发生异常后, RabbitMQ会丢弃该消息, 通常业务开发中,是不允许的。 -
为什么设置重试次数?
不开启重试次数,则消息会一直重新入队,占用内存,若是错误消息过多,RabbitMQ内存爆了;
手动确认
在手动确认的模式下,不管是消费成功还是消费失败,一定要记得确认消息,不然消息会一直处于unack状态,直到消费者进程重启或者停止。
设置参数:
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消费者:
通过使用channel#basicAck, basicNack, basicReject完成;
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public class ConfirmConsumer {// @RabbitHandler : 标记的方法只能有一个参数,类型为String ,若是传Map参数、则需要传入map参数// @RabbitListener:标记的方法可以传入Channel, Message参数@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void listenObjectQueue(Channel channel, Message message, Map<String, Object> msg) throws IOException {System.out.println("接收到object.queue的消息" + msg);System.out.println("消息ID : " + message.getMessageProperties().getDeliveryTag());try {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (IOException exception) {//拒绝确认消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//拒绝消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}@RabbitHandlerpublic void listenObjectQueue2(Map<String, Object> msg) throws IOException {System.out.println("接收到object.queue的消息" + msg);}}
-
确认收到一个或多个消息
- void basicAck(long deliveryTag, boolean multiple) throws IOException;
- deliveryTag :消息传递标识
- multiple:是否批量确认,为true,则确认后,其他消息deliveryTag小于当前消息的deliveryTag的消息全部变为确认;(慎重)
-
拒绝一个或多个消息:
- void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
- deliveryTag:消息的传递标识。
- multiple: 如果为true,则拒绝所有consumer获得的小于deliveryTag的消息。(慎重)
- requeue: 设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列。
-
拒绝一个消息:
- void basicReject(long deliveryTag, boolean requeue) throws IOException;
- deliveryTag:消息的传递标识。
- requeue: 设置为false 表示不再重新入队,如果配置了死信队列则进入死信队列。
-
channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。
个人还是推荐手动确认,可控性更高;
SpringBoot消费者手动确认调用的API与RabbitMQClient原生API一致,都是通过这三个方法完成确认操作;
业务开发中,@RabbitHandler注解用的少,因为注解标记的方法只能传入 消息内容参数, 无法传Channel, Message, 获取到的消息有限, 而@RabbitListener则相反;
相关文章:
RabbitMQ(二) - RabbitMQ与消息发布确认与返回、消费确认
RabbitMQ消息确认 SpringBoot与RabbitMQ整合后,对RabbitClient的“确认”进行了封装、使用方式与RabbitMQ官网不一致; 消息发布确认 生产者给交换机发送消息后、若是不管了,则会出现消息丢失; 解决方案1: 交换机接受…...
操作指南 | 如何使用Chainlink喂价功能获取价格数据
Chainlink的去中心化预言机网络中的智能合约包含由运行商为其他智能合约(DApps)使用或截取所持续更新的实施价格数据。其中有两个主要架构:喂价和基础要求模型。此教程将会展现如何在Moonbeam、Moonriver或是Moonbase Alpha测试网上使用喂价功…...
Pandaer的iPhone手机壳
哇塞,Pandaer的设计太棒了!手机壳的花样多到让我眼花缭乱,好多系列设计都很有意思,让人有集齐的冲动。我最近入手了几个iPhone的手机壳,它有亮色和透明的款式,亮色的壳内部也是亮的,因为手机壳全…...
将自己的网站免费发布到互联网上【无需公网IP】
作者简介: 辭七七,目前大一,正在学习C/C,Java,Python等 作者主页: 七七的个人主页 文章收录专栏: 七七的闲谈 欢迎大家点赞 👍 收藏 ⭐ 加关注哦!💖…...
浅谈 Python中if __name__ == ‘__main__‘:的工作原理
为了理解if __name__ __main__:的工作原理,我们需要先了解Python中的特殊变量__name__。 每个Python模块都有一个内置的变量__name__。这个变量的值取决于如何执行模块: 如果模块是被直接运行的(例如,你使用命令python myscrip…...
【力扣】344. 反转字符串 <首尾指针>
【力扣】344. 反转字符串 编写一个函数,其作用是将输入的字符串反转过来。输入字符串以字符数组 s 的形式给出。不要给另外的数组分配额外的空间,你必须原地修改输入数组、使用 O(1) 的额外空间解决这一问题。 示例 1: 输入:s …...
Kubectl 详解
目录 陈述式资源管理方法:项目的生命周期:创建-->发布-->更新-->回滚-->删除声明式管理方法: 陈述式资源管理方法: kubernetes 集群管理集群资源的唯一入口是通过相应的方法调用 apiserver 的接口kubectl 是官方的CL…...
华为OD面试记录
The experience of applying for software test engineer(Dispatcher) 记录保存 招聘岗位: 测试工程师 Base:西安 华为面试流程如下: 流程名内容机试三题,总分400分,最后一道题200分人力资源面试询问私人问题,不谈薪资一面技术面二面技术面主管问项目…...
电源控制--品质因素Q值全解
什么是品质因素Q值? 在电源控制中,品质因素 Q 值通常用于描述电源滤波器的性能。电源滤波器用于减小电源中的噪声和干扰,以提供干净稳定的电源供应给电子设备。 品质因素 Q 值在电源滤波器中表示滤波器的带宽和中心频率之比,用于…...
实际工作中通过python+go-cqhttp+selenium实现自动检测维护升级并发送QQ通知消息(程序内测)
说明:该篇博客是博主一字一码编写的,实属不易,请尊重原创,谢谢大家! 首先,今年比较忙没有多余时间去实操创作分享文章给大家,那就给大家分享下博主在实际工作中的一点点内容吧,就当交…...
EC200 CAT1 拨号PPP
**硬件支持型号 点击 查看 硬件支持 详情** DTU701 产品详情 DTU702 产品详情 DTU801 产品详情 DTU802 产品详情 DTU902 产品详情 G5501 产品详情 目前 DTU系列 产品,WIFI4G拨号 ,默认开机自启动拨号。 WIFI 只需要 根据现场 修改SSID热点和密码…...
外网通过ipv6访问家里设备
想从公司访问家里的设备,比较轻松方便的,用向日葵也可以远程。但是家里电脑比较old的了,向日葵开起来,占用内存挺大的,想尝试windows自带的“mstsc”,所以硬着头皮搞ipv6. (重点提示࿱…...
docker 如何使用代理
为docker添加代理有三种情况: 为docker pull(dockerd)添加代理为Docker build添加代理为docker容器添加代理 参考文章如下: 如何优雅的给 Docker 配置网络代理Configure the daemon with systemd 其中,如果在使用代…...
Go和Java实现装饰器模式
Go和Java实现装饰器模式 我们通过人穿着打扮自己的实例来演示装饰器模式的用法。 1、装饰器模式 装饰器模式允许向一个现有的对象添加新的功能,同时又不改变其结构。这种类型的设计模式属于结构型模式,它 是作为现有的类的一个包装。 装饰器模式通过…...
Android中级——RemoteView
RemoteView RemoteView的应用NotificationWidgetPendingIntent RemoteViews内部机制模拟RemoteViews RemoteView的应用 Notification 如下开启一个系统的通知栏,点击后跳转到某网页 public class MainActivity extends AppCompatActivity {private static final …...
SpringBoot核心内容梳理
1.SpringBoot是什么? Spring Boot是一个基于Spring框架的快速开发应用程序的工具。它简化了Spring应用程序的创建和开发过程,使开发人员能够更快速地创建独立的、生产就绪的Spring应用程序。它采用了“约定优于配置”的原则,尽可能地减少开发人员需要进…...
Benchmarking Augmentation Methods for Learning Robust Navigation Agents 论文阅读
论文信息 题目:Benchmarking Augmentation Methods for Learning Robust Navigation Agents: the Winning Entry of the 2021 iGibson Challenge 作者:Naoki Yokoyama, Qian Luo 来源:arXiv 时间:2022 Abstract 深度强化学习和…...
面试题:HTTP Code码及应用场景分析
1xx 消息(临时响应) 属于临时相应,代表所发出的请求已经被接受,需要继续进行处理。只包含状态行和某些可选的响应头信息,并以空行结束。由于 HTTP/1.0 协议中没有定义任何 1xx 状态码,所以除非在某些试验条件下,服务器…...
The ‘kotlin-android-extensions‘ Gradle plugin is no longer supported.
Android使用kotlin开发,运行报错 The kotlin-android-extensions Gradle plugin is no longer supported. Please use this migration guide (https://goo.gle/kotlin-android-extensions-deprecation) to start working with View Binding (https://developer.an…...
vi 编辑器入门到高级
vi 编辑器的初级用法vi 编辑器的工作模式1. 命令模式2. 文本输入模式3. 状态行vi 工作模式切换存储缓冲区 vi 编辑器命令1. 启动 vi2. 文本输入3. 退出 vi4. 命令模式下的 光标移动5. 命令模式下的 文本修改6. 从 命令模式 进入 文本输入模式7. 搜索字符串8. vi 在线帮助文档 v…...
css实现圆环展示百分比,根据值动态展示所占比例
代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...
使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能
1. 开发环境准备 安装DevEco Studio 3.1: 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK 项目配置: // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...
