rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流
1. 安装和配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
1.2 yml 配置
### 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机
## 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机
2.生产端的消息确认发送代码
/*** (1) RabbitTemplate.ConfirmCallback 这个接口是用来确定消息是否到达交换器的* (2) RabbitTemplate.ReturnsCallback 这个则是用来确定消息是否到达队列的,未到达队列时会被调用*/
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{private RabbitTemplate rabbitTemplate;public void queueConfirm(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));// 故意输入一个不存在的交换机rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));// 故意输入一个不存在的队列rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));log.info("Confirm -- 消息--发送结束");}/*** 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null* //将当前类的实例设置为 RabbitMQ 的确认回调处理器,跟下面的confirm方法联合使用,* // 还需要打开配置:spring: rabbitmq: publisher-confirm-type: correlated*/@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Autowiredpublic RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;
// rabbitTemplate.setConfirmCallback(this);}/** 此方法用于监听消息是否发送到交换机* 回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("confirm -- 监听消息成功发送到交换机--回调id = {}", correlationData);} else {log.info("confirm -- 消息没有发送到交换机回调id= {},消息发送失败:{}。", correlationData, cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息未到达队列 --- returnedMessage= " + returnedMessage);}
}
2.2 生产端的截图
3.消费端代码
@Component
@Slf4j
public class RabbitConfirmConsumer {// 交换机public static final String confirm_exchange_name = "confirm_exchange";// 队列public static final String confirm_queue_name="confirm_queue";// routingkeypublic static final String confirm_routing_key = "confirm_key1";// 声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(confirm_exchange_name);}// 声明队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(confirm_queue_name).build();}// 绑定队列到交换机@Beanpublic Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);}/*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//获取消息的内容byte[] body = message.getBody();//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}}
3.2消费端截图
4 消费端重试机制
@Service
@Slf4j
public class RabbitRetryConsumer {@Beanpublic Queue retryQueue(){Map<String,Object> params = new HashMap<>();return QueueBuilder.durable("retry_queue").withArguments(params).build();}@Beanpublic TopicExchange retryTopicExchange(){return new TopicExchange("retry_exchange",true,false);}//队列与交换机进行绑定@Beanpublic Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");}int count = 0;//测试重试,需要在yml配置 retry@RabbitListener(queues = "retry_queue")public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("retryConsumer 重试次数 = {},重试接收数据为:{}",count++, map);int i = 10 /0;channel.basicAck(tag,false);}}
4.2 重试机制截图
5. 限流设置--消费端
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 开启手动确认模式prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量,以此来实现流控制
5.1 生产端--发送19条信息
@GetMapping("/xianliu")public String xianliuTest(){for(int i = 1; i < 20; i++){Map<String, String> map = new HashMap<>();map.put("key","限流测试--" + i);rabbitMqProducer.xianliuTest(map);}return "限流测试发送成功";}/**** 限流消息的发送测试*/public void xianliuTest(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));}
5.2 消费端
/*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。//channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{//否定确认//channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}
5.3 注释掉channel.basicAck--堵塞了
5.4 注释掉了 prefetch -- 19条全部被消费,即使没有ack
相关文章:

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流
1. 安装和配置 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <arti…...

【HarmonyOS Next】自定义Tabs
背景 项目中Tabs的使用可以说是特别的频繁,但是官方提供的Tabs使用起来,存在tab选项卡切换动画滞后的问题。 原始动画无法满足产品的UI需求,因此,这篇文章将实现下面页面滑动,tab选项卡实时滑动的动画效果。 实现逻…...

Sass 模块化革命:深入解析 @use 语法,打造高效 CSS 架构
文章目录 前言use 用法1. 模块化与命名空间2. use 中 as 语法的使用3. as * 语法的使用4. 私有成员的访问5. use 中with默认值6. use 导入问题总结下一篇预告: 前言 在上一篇中,我们深入探讨了 Sass 中 import 语法的局限性,正是因为这些问题…...
【渗透测试】反弹 Shell 技术详解(一)
反弹 Shell 技术详解 一、前置知识 反弹 shell(Reverse Shell)是一种技术,攻击者利用它可以在远程主机上获得一个交互式的命令行接口。通常情况下,反弹 shell 会将标准输入(stdin)、标准输出(…...

python:pymunk + pygame 模拟六边形中小球弹跳运动
向 chat.deepseek.com 提问:编写 python 程序,用 pymunk, 有一个正六边形,围绕中心点缓慢旋转,六边形内有一个小球,六边形的6条边作为墙壁,小球受重力和摩擦力、弹力影响,模拟小球弹跳运动&…...
Windows 图形显示驱动开发-WDDM 3.2-本机 GPU 围栏对象(二)
GPU 和 CPU 之间的同步 CPU 必须执行 MonitoredValue 的更新,并读取 CurrentValue,以确保不会丢失正在进行的信号中断通知。 当向系统中添加新的 CPU 等待程序时,或者如果现有的 CPU 等待程序失效时,OS 必须修改受监视的值。OS …...
23种设计模式之《模板方法模式(Template Method)》在c#中的应用及理解
程序设计中的主要设计模式通常分为三大类,共23种: 1. 创建型模式(Creational Patterns) 单例模式(Singleton):确保一个类只有一个实例,并提供全局访问点。 工厂方法模式࿰…...

DEV-C++ 为什么不能调试?(正确解决方案)
为了备战pat考试,专门下载了DEV C,然后懵圈的发现,怎么无法调试(╯□)╯︵ ┻━┻ 然后整了半天,终于在网上找到相应的解决方案!!!-> Dev C 5.11 调试初始设置 <- 一共四步…...
【C++设计模式】第五篇:原型模式(Prototype)
注意:复现代码时,确保 VS2022 使用 C17/20 标准以支持现代特性。 克隆对象的效率革命 1. 模式定义与用途 核心思想 原型模式:通过复制现有对象(原型)来创建新对象,而非通过new构造。关键用…...
深入 Vue.js 组件开发:从基础到实践
深入 Vue.js 组件开发:从基础到实践 Vue.js 作为一款卓越的前端框架,其组件化开发模式为构建高效、可维护的用户界面提供了强大支持。在这篇博客中,我们将深入探讨 Vue.js 组件开发的各个方面,从基础概念到高级技巧,助…...
maven导入spring框架
在eclipse导入maven项目, 在pom.xml文件中加入以下内容 junit junit 3.8.1 test org.springframework spring-core ${org.springframework.version} org.springframework spring-beans ${org.springframework.version} org.springframework spring-context ${org.s…...

数据守护者:备份文件的重要性与自动化实践策略
在数字化浪潮席卷全球的今天,数据已成为企业运营和个人生活中不可或缺的核心资源。无论是企业的财务报表、客户资料,还是个人的家庭照片、工作文档,这些数据都承载着巨大的价值。然而,数据丢失的风险无处不在,硬件故障…...
MyBatis @Param 注解详解:指定的参数找不到?
MyBatis Param 注解详解 1. Param 注解的作用 Param 注解用于显式指定方法参数的名称,让 MyBatis 在 SQL 映射文件(XML)或注解中通过该名称访问参数。 核心场景: 方法有多个参数时,避免参数名丢失或混淆。参数为简单…...

【项目日记(八)】内存回收与联调
前言 我们前面实现了三层缓存申请的过程,并完成了三层缓存申请过程的联调!本期我们来介绍三层的缓存的回收机制以及三层整体联调释放的过程。 目录 前言 一、thread cache 回收内存 二、central cache 回收内存 • 如何确定一个对象对应的span • …...

性能测试监控工具jmeter+grafana
1、什么是性能测试监控体系? 为什么要有监控体系? 原因: 1、项目-日益复杂(内部除了代码外,还有中间件,数据库) 2、一个系统,背后可能有多个软/硬件组合支撑,影响性能的因…...

016.3月夏令营:数理类
016.3月夏令营:数理类: 中国人民大学统计学院: http://www.eeban.com/forum.php?modviewthread&tid386109 北京大学化学学院第一轮: http://www.eeban.com/forum.php?m ... 6026&extrapage%3D1 香港大学化学系夏令营&a…...

CS144 Lab Checkpoint 0: networking warm up
Set up GNU/Linux on your computer 我用的是Ubuntu,按照指导书上写的输入如下命令安装所需的软件包: sudo apt update && sudo apt install git cmake gdb build-essential clang \ clang-tidy clang-format gcc-doc pkg-config glibc-doc tc…...

靶场之路-VulnHub-DC-6 nmap提权、kali爆破、shell反连
靶场之路-VulnHub-DC-6 一、信息收集 1、扫描靶机ip 2、指纹扫描 这里扫的我有点懵,这里只有两个端口,感觉是要扫扫目录了 nmap -sS -sV 192.168.122.128 PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 7.4p1 Debian 10deb9u6 (protoc…...
给没有登录认证的web应用添加登录认证(openresty lua实现)
这阵子不是deepseek火么?我也折腾了下本地部署,ollama、vllm、llama.cpp都弄了下,webui也用了几个,发现nextjs-ollama-llm-ui小巧方便,挺适合个人使用的。如果放在网上供多人使用的话,得接入登录认证才好&a…...

3月5日作业
代码作业: #!/bin/bash# 清空目录函数 safe_clear_dir() {local dir"$1"local name"$2"if [ -d "$dir" ]; thenwhile true; doread -p "检测到 $name 目录已存在,请选择操作: 1) 清空目录内容 2) 保留目…...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...
java 实现excel文件转pdf | 无水印 | 无限制
文章目录 目录 文章目录 前言 1.项目远程仓库配置 2.pom文件引入相关依赖 3.代码破解 二、Excel转PDF 1.代码实现 2.Aspose.License.xml 授权文件 总结 前言 java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

Proxmox Mail Gateway安装指南:从零开始配置高效邮件过滤系统
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storms…...