当前位置: 首页 > article >正文

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

1.2 yml 配置

### 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机
## 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机

2.生产端的消息确认发送代码

/*** (1) RabbitTemplate.ConfirmCallback 这个接口是用来确定消息是否到达交换器的* (2) RabbitTemplate.ReturnsCallback 这个则是用来确定消息是否到达队列的,未到达队列时会被调用*/
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{private RabbitTemplate rabbitTemplate;public void queueConfirm(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));// 故意输入一个不存在的交换机rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));// 故意输入一个不存在的队列rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));log.info("Confirm -- 消息--发送结束");}/*** 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null* //将当前类的实例设置为 RabbitMQ 的确认回调处理器,跟下面的confirm方法联合使用,* // 还需要打开配置:spring: rabbitmq: publisher-confirm-type: correlated*/@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Autowiredpublic RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;
//        rabbitTemplate.setConfirmCallback(this);}/** 此方法用于监听消息是否发送到交换机* 回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("confirm -- 监听消息成功发送到交换机--回调id = {}", correlationData);} else {log.info("confirm -- 消息没有发送到交换机回调id= {},消息发送失败:{}。", correlationData, cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息未到达队列 --- returnedMessage= " + returnedMessage);}
}

2.2 生产端的截图

3.消费端代码

@Component
@Slf4j
public class RabbitConfirmConsumer {// 交换机public static final String confirm_exchange_name = "confirm_exchange";// 队列public static final String confirm_queue_name="confirm_queue";// routingkeypublic static final String confirm_routing_key = "confirm_key1";// 声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(confirm_exchange_name);}// 声明队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(confirm_queue_name).build();}// 绑定队列到交换机@Beanpublic Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);}/*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//获取消息的内容byte[] body = message.getBody();//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}}

3.2消费端截图

4 消费端重试机制

@Service
@Slf4j
public class RabbitRetryConsumer {@Beanpublic Queue retryQueue(){Map<String,Object> params = new HashMap<>();return QueueBuilder.durable("retry_queue").withArguments(params).build();}@Beanpublic TopicExchange retryTopicExchange(){return new TopicExchange("retry_exchange",true,false);}//队列与交换机进行绑定@Beanpublic Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");}int count  = 0;//测试重试,需要在yml配置 retry@RabbitListener(queues = "retry_queue")public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("retryConsumer 重试次数 = {},重试接收数据为:{}",count++, map);int i = 10 /0;channel.basicAck(tag,false);}}

4.2 重试机制截图

5. 限流设置--消费端

spring:rabbitmq:listener:simple:acknowledge-mode: manual # 开启手动确认模式prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量,以此来实现流控制

5.1 生产端--发送19条信息

@GetMapping("/xianliu")public String xianliuTest(){for(int i = 1; i < 20; i++){Map<String, String> map = new HashMap<>();map.put("key","限流测试--" + i);rabbitMqProducer.xianliuTest(map);}return "限流测试发送成功";}/**** 限流消息的发送测试*/public void xianliuTest(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));}

5.2 消费端

 /*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。//channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{//否定确认//channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}

5.3 注释掉channel.basicAck--堵塞了

5.4 注释掉了 prefetch -- 19条全部被消费,即使没有ack

相关文章:

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <arti…...

Mac服务器上创建Docker并安装宝塔环境

1. 远程ssh登录服务器&#xff1a;ssh -o ProxyCommand"nc -X 5 -x 127.0.0.1:7890 %h %p" -i fenfaqianming.pem ec2-user54.254.XXX.XXX 2. mac服务器上需要安装Colima 3. brew install colima 4. colima start 5. colima autostart 创建mac服务器安装docker &…...

golang 从零单排 (一) 安装环境

1.下载安装 打开网址The Go Programming Language 直接点击下载go1.24.1.windows-amd64.msi 下载完成 直接双击下一步 下一步 安装完成 环境变量自动设置不必配置 2.验证 win r 输入cmd 打开命令行 输入go version...

康谋分享 | 3DGS:革新自动驾驶仿真场景重建的关键技术

随着自动驾驶技术的迅猛发展&#xff0c;构建高保真、动态的仿真场景成为了行业的迫切需求。传统的三维重建方法在处理复杂场景时常常面临效率和精度的挑战。在此背景下&#xff0c;3D高斯点阵渲染&#xff08;3DGS&#xff09;技术应运而生&#xff0c;成为自动驾驶仿真场景重…...

【够用就好008】开新坑自学esb32烧录进军物联网和嵌入式

见字如面&#xff0c;这里是AKA AIGC创意人竹相左边。 学习使用了三年的AI工具&#xff0c;现在最大的自信就是业余时间可以学习任何自己感兴趣的事&#xff0c;感觉手搓火箭也不是梦。 今天开个新坑&#xff0c;也是逐步探索想要进入的新世界。物联网&#xff08;IoT&#…...

大白话JavaScript实现一个函数,将字符串中的每个单词首字母大写。

大白话JavaScript实现一个函数&#xff0c;将字符串中的每个单词首字母大写。 答题思路 理解需求&#xff1a;要写一个函数&#xff0c;它能接收一个字符串&#xff0c;然后把这个字符串里每个单词的第一个字母变成大写。分解步骤 拆分单词&#xff1a;一般单词之间是用空格隔…...

Go红队开发—格式导出

文章目录 输出功能CSV输出CSV 转 结构体结构体 转 CSV端口扫描结果使用CSV格式导出 HTML输出Sqlite输出nmap扫描 JSONmap转json结构体转jsonjson写入文件json编解码json转结构体json转mapjson转string练习&#xff1a;nmap扫描结果导出json格式 输出功能 在我们使用安全工具的…...

从零构建高可用MySQL自动化配置系统:核心技术、工具开发与企业级最佳实践

在现代企业级数据库管理中,手动配置 MySQL 已无法满足高效、稳定和可扩展的需求。本文从 MySQL 配置管理的核心原理 出发,深入剖析 自动化配置工具的架构设计、关键技术实现,并结合 企业级落地方案,帮助读者构建一套 高可用、智能化的 MySQL 自动化配置系统。无论是 DevOps…...

element-plus中table组件的使用

1、table组件的基本使用 注意&#xff1a; ①对象集合&#xff0c;要从后端查询。 ②prop是集合中的对象的属性名&#xff1b;label是表格表头的名称。 2、将性别一列的71转为男&#xff0c;72转为女 问题描述&#xff1a; 解决步骤&#xff1a; ①将el-table-column变成双标签…...

K8s 1.27.1 实战系列(三)安装网络插件

Kubernetes 的网络插件常见的有 Flannel 和 Calico ,这是两种主流的 CNI(容器网络接口)解决方案,它们在设计理念、实现方式、性能特征及适用场景上有显著差异。以下是两者的综合对比分析: 一、Flannel 和 Calico 1. 技术基础与网络实现 Flannel 核心机制:基于 Overlay …...

Java基础回顾 Day4

多线程相关 runnable接口实现&#xff0c;解决单继承的问题&#xff0c;因为继承Thread类就不能继承其他类了 Callable接口的特点是满足线程需要返回值和抛出异常的情况 在创建线程后的任何时候都可以重新设置&#xff0c;线程已经创建&#xff0c;可以使用 Thread.setPrior…...

Go加spy++隐藏窗口

最近发现有些软件的窗口就像狗皮膏药一样&#xff0c;关也关不掉&#xff0c;一点就要登录&#xff0c;属实是有点不爽了。 窗口的进程不能杀死&#xff0c;但是窗口我不想要。思路很简单&#xff0c;用 spy 找到要隐藏的窗口的句柄&#xff0c;然后调用 Windows 的 ShowWindo…...

Windows CMD 命令大全(综合开发整理版)

CMD Windows CMD 命令大全(综合整理版)基础操作与文件管理类系统维护与配置类网络与连接类开发者常用命令CMD 黑窗口使用技巧1. **效率操作**2. **高级功能**3. **开发者高效技巧**注意事项**微软官方文档****其他实用资源****如何高效使用官方文档**Windows CMD 命令大全(综…...

网络安全通信架构图

&#x1f345; 点击文末小卡片 &#xff0c;免费获取网络安全全套资料&#xff0c;资料在手&#xff0c;涨薪更快 在安全通信里面我经常听到的2个东西就是SSL和TLS&#xff0c;这2个有什么区别呢&#xff1f;以及HTTPS是怎么通信的&#xff1f;包括对称加密、非对称加密、摘要、…...

当中国“智算心跳”与全球共振:九章云极DataCanvas首秀MWC 2025

3月3日&#xff0c;西班牙巴塞罗那&#xff0c;全球通信与科技领域的盛会“2025世界移动通信大会&#xff08;MWC 2025&#xff09;”正式拉开帷幕。中国人工智能基础设施领军企业九章云极DataCanvas公司以全球化战略视野与硬核技术实力&#xff0c;全方位、多维度地展示了在智…...

Clion快捷键、修改字体

文章目录 一、Clion快捷键1.撤销&#xff1a;crtl Z2.重做&#xff1a;crtl shift Z3.删除该行&#xff1a;crtl Y4.多行后退&#xff1a;选中多行 Tab5.多行缩进&#xff1a;选中多行 shift Tab 二、修改注释的斜体 一、Clion快捷键 1.撤销&#xff1a;crtl Z 2.重做…...

基于PySide6的CATIA零件自动化着色工具开发实践

引言 在汽车及航空制造领域&#xff0c;CATIA作为核心的CAD设计软件&#xff0c;其二次开发能力对提升设计效率具有重要意义。本文介绍一种基于Python的CATIA零件着色工具开发方案&#xff0c;通过PySide6实现GUI交互&#xff0c;结合COM接口操作实现零件着色自动化。该方案成…...

在Uniapp中实现特殊字符弹出框并插入输入框

在开发Uniapp项目时&#xff0c;我们经常会遇到需要用户输入特殊字符的场景。为了提升用户体验&#xff0c;我们可以封装一个特殊字符弹出框&#xff0c;用户点击键盘图标后弹出该字符集&#xff0c;选择字符后自动插入到输入框中。本文将详细介绍如何实现这一功能。 1. 功能概…...

golang dlv调试工具

golang dlv调试工具 在goland2022.2版本 中调试go程序报错 WARNING: undefined behavior - version of Delve is too old for Go version 1.20.7 (maximum supported version 1.19) 即使你go install了新的dlv也无济于事 分析得出Goland实际使用的是 Goland安装目录下dlv 例…...

深入解析 BitBake 日志机制:任务调度、日志记录与调试方法

1. 引言&#xff1a;为什么 BitBake 的日志机制至关重要&#xff1f; BitBake 是 Yocto 项目的核心构建工具&#xff0c;用于解析配方、管理任务依赖&#xff0c;并执行编译和打包任务。在 BitBake 构建过程中&#xff0c;日志记录机制不仅用于跟踪任务执行情况&#xff0c;还…...

数据结构链表的C++实现

在C中实现链表是一种常见的练习&#xff0c;有助于理解指针和动态内存分配的概念。下面是一个简单的单向链表&#xff08;Singly Linked List&#xff09;的实现示例&#xff0c;包括基本的操作如插入、删除和遍历。 单向链表 (Singly Linked List) 实现 1. 定义节点结构 首…...

《原型链的故事:JavaScript 对象模型的秘密》

原型链&#xff08;Prototype Chain&#xff09; 是 JavaScript 中实现继承的核心机制。每个对象都有一个内部属性 [[Prototype]]&#xff08;可以通过 __proto__ 访问&#xff09;&#xff0c;指向其原型对象。每个对象都有一个原型&#xff0c; 原型本身也是一个对象&#xf…...

Linux 配置静态 IP

一、简介 在 Linux CentOS 系统中默认动态分配 IP 地址&#xff0c;每次启动虚拟机服务都是不一样的 IP&#xff0c;因此要配置静态 IP 地址避免每次都发生变化&#xff0c;下面将介绍配置静态 IP 的详细步骤。 首先先理解一下动态 IP 和静态 IP 的概念&#xff1a; 动态 IP…...

【Python 数据结构 10.二叉树】

目录 一、二叉树的基本概念 1.二叉树的定义 2.二叉树的特点 3.特殊的二叉树 Ⅰ、斜树 Ⅱ、满二叉树 Ⅲ、完全二叉树 Ⅳ、完全二叉树和满二叉树的区别 4.二叉树的性质 5.二叉树的顺序存储 Ⅰ、完全二叉树 Ⅱ、非完全二叉树 Ⅲ、稀疏二叉树 6.二叉树的链式存储 7.二叉树的遍历概念…...

SwanLab简明教程:从萌新到高手

目录 1. 什么是SwanLab&#xff1f; 1.1 核心特性 2. 安装SwanLab 3. 登录SwanLab账号&#xff08;云端版&#xff09; 4. 5分钟快速上手 更多案例 5. SwanLab功能组件 5.1 图表视图 5.2 表格视图 5.3 硬件监控 5.4 环境记录 5.5 组织协同 6. 训练框架集成 6.1 基…...

SQLiteStudio:一款免费跨平台的SQLite管理工具

SQLiteStudio 是一款专门用于管理和操作 SQLite 数据库的免费工具。它提供直观的图形化界面&#xff0c;简化了数据库的创建、编辑、查询和维护&#xff0c;适合数据库开发者和数据分析师使用。 功能特性 SQLiteStudio 提供的主要功能包括&#xff1a; 免费开源&#xff0c;可…...

【智能体Agent】ReAct智能体的实现思路和关键技术

基于ReAct&#xff08;Reasoning Acting&#xff09;框架的自主智能体 import re from typing import List, Tuplefrom langchain_community.chat_message_histories.in_memory import ChatMessageHistory from langchain_core.language_models.chat_models import BaseChatM…...

python爬虫系列课程8:js浏览器window对象属性

python爬虫系列课程8:js浏览器window对象属性 一、JavaScript的组成二、document常见属性对象三、navigator对象一、JavaScript的组成 JavaScript可以分为三个部分:ECMAScript标准、DOM、BOM。 ECMAScript标准:即JS的基本语法,JavaScript的核心,描述了语言的基本语法和数…...

Java基础系列:深入理解八大基本数据类型及避坑指南

目录 一、基本数据类型概述 八大类型速查表 二、各类型详解与常见陷阱 1. 整型家族&#xff08;byte/short/int/long&#xff09; 2. 浮点型&#xff08;float/double&#xff09; 3. 字符型&#xff08;char&#xff09; 4. 布尔型&#xff08;boolean&#xff09; 三…...

贝塞尔曲线学习

1、一阶贝塞尔曲线 一阶贝塞尔曲线其实是一条直线——给定点 P0、P1&#xff0c;线性贝塞尔曲线就是一条两点之间的直线&#xff0c;公式如下&#xff1a; 一阶曲线很好理解, 就是根据t来线性插值。 void MainWindow::mousePressEvent(QMouseEvent *e) {list.append(e->pos…...