当前位置: 首页 > news >正文

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

1.2 yml 配置

### 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机
## 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机

2.生产端的消息确认发送代码

/*** (1) RabbitTemplate.ConfirmCallback 这个接口是用来确定消息是否到达交换器的* (2) RabbitTemplate.ReturnsCallback 这个则是用来确定消息是否到达队列的,未到达队列时会被调用*/
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{private RabbitTemplate rabbitTemplate;public void queueConfirm(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));// 故意输入一个不存在的交换机rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));// 故意输入一个不存在的队列rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));log.info("Confirm -- 消息--发送结束");}/*** 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null* //将当前类的实例设置为 RabbitMQ 的确认回调处理器,跟下面的confirm方法联合使用,* // 还需要打开配置:spring: rabbitmq: publisher-confirm-type: correlated*/@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Autowiredpublic RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;
//        rabbitTemplate.setConfirmCallback(this);}/** 此方法用于监听消息是否发送到交换机* 回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("confirm -- 监听消息成功发送到交换机--回调id = {}", correlationData);} else {log.info("confirm -- 消息没有发送到交换机回调id= {},消息发送失败:{}。", correlationData, cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息未到达队列 --- returnedMessage= " + returnedMessage);}
}

2.2 生产端的截图

3.消费端代码

@Component
@Slf4j
public class RabbitConfirmConsumer {// 交换机public static final String confirm_exchange_name = "confirm_exchange";// 队列public static final String confirm_queue_name="confirm_queue";// routingkeypublic static final String confirm_routing_key = "confirm_key1";// 声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(confirm_exchange_name);}// 声明队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(confirm_queue_name).build();}// 绑定队列到交换机@Beanpublic Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);}/*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//获取消息的内容byte[] body = message.getBody();//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}}

3.2消费端截图

4 消费端重试机制

@Service
@Slf4j
public class RabbitRetryConsumer {@Beanpublic Queue retryQueue(){Map<String,Object> params = new HashMap<>();return QueueBuilder.durable("retry_queue").withArguments(params).build();}@Beanpublic TopicExchange retryTopicExchange(){return new TopicExchange("retry_exchange",true,false);}//队列与交换机进行绑定@Beanpublic Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");}int count  = 0;//测试重试,需要在yml配置 retry@RabbitListener(queues = "retry_queue")public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("retryConsumer 重试次数 = {},重试接收数据为:{}",count++, map);int i = 10 /0;channel.basicAck(tag,false);}}

4.2 重试机制截图

5. 限流设置--消费端

spring:rabbitmq:listener:simple:acknowledge-mode: manual # 开启手动确认模式prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量,以此来实现流控制

5.1 生产端--发送19条信息

@GetMapping("/xianliu")public String xianliuTest(){for(int i = 1; i < 20; i++){Map<String, String> map = new HashMap<>();map.put("key","限流测试--" + i);rabbitMqProducer.xianliuTest(map);}return "限流测试发送成功";}/**** 限流消息的发送测试*/public void xianliuTest(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));}

5.2 消费端

 /*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。//channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{//否定确认//channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}

5.3 注释掉channel.basicAck--堵塞了

5.4 注释掉了 prefetch -- 19条全部被消费,即使没有ack

相关文章:

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <arti…...

【HarmonyOS Next】自定义Tabs

背景 项目中Tabs的使用可以说是特别的频繁&#xff0c;但是官方提供的Tabs使用起来&#xff0c;存在tab选项卡切换动画滞后的问题。 原始动画无法满足产品的UI需求&#xff0c;因此&#xff0c;这篇文章将实现下面页面滑动&#xff0c;tab选项卡实时滑动的动画效果。 实现逻…...

Sass 模块化革命:深入解析 @use 语法,打造高效 CSS 架构

文章目录 前言use 用法1. 模块化与命名空间2. use 中 as 语法的使用3. as * 语法的使用4. 私有成员的访问5. use 中with默认值6. use 导入问题总结下一篇预告&#xff1a; 前言 在上一篇中&#xff0c;我们深入探讨了 Sass 中 import 语法的局限性&#xff0c;正是因为这些问题…...

【渗透测试】反弹 Shell 技术详解(一)

反弹 Shell 技术详解 一、前置知识 反弹 shell&#xff08;Reverse Shell&#xff09;是一种技术&#xff0c;攻击者利用它可以在远程主机上获得一个交互式的命令行接口。通常情况下&#xff0c;反弹 shell 会将标准输入&#xff08;stdin&#xff09;、标准输出&#xff08;…...

python:pymunk + pygame 模拟六边形中小球弹跳运动

向 chat.deepseek.com 提问&#xff1a;编写 python 程序&#xff0c;用 pymunk, 有一个正六边形&#xff0c;围绕中心点缓慢旋转&#xff0c;六边形内有一个小球&#xff0c;六边形的6条边作为墙壁&#xff0c;小球受重力和摩擦力、弹力影响&#xff0c;模拟小球弹跳运动&…...

Windows 图形显示驱动开发-WDDM 3.2-本机 GPU 围栏对象(二)

GPU 和 CPU 之间的同步 CPU 必须执行 MonitoredValue 的更新&#xff0c;并读取 CurrentValue&#xff0c;以确保不会丢失正在进行的信号中断通知。 当向系统中添加新的 CPU 等待程序时&#xff0c;或者如果现有的 CPU 等待程序失效时&#xff0c;OS 必须修改受监视的值。OS …...

23种设计模式之《模板方法模式(Template Method)》在c#中的应用及理解

程序设计中的主要设计模式通常分为三大类&#xff0c;共23种&#xff1a; 1. 创建型模式&#xff08;Creational Patterns&#xff09; 单例模式&#xff08;Singleton&#xff09;&#xff1a;确保一个类只有一个实例&#xff0c;并提供全局访问点。 工厂方法模式&#xff0…...

DEV-C++ 为什么不能调试?(正确解决方案)

为了备战pat考试&#xff0c;专门下载了DEV C&#xff0c;然后懵圈的发现&#xff0c;怎么无法调试(╯□&#xff09;╯︵ ┻━┻ 然后整了半天&#xff0c;终于在网上找到相应的解决方案&#xff01;&#xff01;&#xff01;-> Dev C 5.11 调试初始设置 <- 一共四步…...

【C++设计模式】第五篇:原型模式(Prototype)

注意&#xff1a;复现代码时&#xff0c;确保 VS2022 使用 C17/20 标准以支持现代特性。 克隆对象的效率革命 1. 模式定义与用途​ ​ 核心思想​ ​原型模式&#xff1a;通过复制现有对象​&#xff08;原型&#xff09;来创建新对象&#xff0c;而非通过new构造。​关键用…...

深入 Vue.js 组件开发:从基础到实践

深入 Vue.js 组件开发&#xff1a;从基础到实践 Vue.js 作为一款卓越的前端框架&#xff0c;其组件化开发模式为构建高效、可维护的用户界面提供了强大支持。在这篇博客中&#xff0c;我们将深入探讨 Vue.js 组件开发的各个方面&#xff0c;从基础概念到高级技巧&#xff0c;助…...

maven导入spring框架

在eclipse导入maven项目&#xff0c; 在pom.xml文件中加入以下内容 junit junit 3.8.1 test org.springframework spring-core ${org.springframework.version} org.springframework spring-beans ${org.springframework.version} org.springframework spring-context ${org.s…...

数据守护者:备份文件的重要性与自动化实践策略

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业运营和个人生活中不可或缺的核心资源。无论是企业的财务报表、客户资料&#xff0c;还是个人的家庭照片、工作文档&#xff0c;这些数据都承载着巨大的价值。然而&#xff0c;数据丢失的风险无处不在&#xff0c;硬件故障…...

MyBatis @Param 注解详解:指定的参数找不到?

MyBatis Param 注解详解 1. Param 注解的作用 Param 注解用于显式指定方法参数的名称&#xff0c;让 MyBatis 在 SQL 映射文件&#xff08;XML&#xff09;或注解中通过该名称访问参数。 核心场景&#xff1a; 方法有多个参数时&#xff0c;避免参数名丢失或混淆。参数为简单…...

【项目日记(八)】内存回收与联调

前言 我们前面实现了三层缓存申请的过程&#xff0c;并完成了三层缓存申请过程的联调&#xff01;本期我们来介绍三层的缓存的回收机制以及三层整体联调释放的过程。 目录 前言 一、thread cache 回收内存 二、central cache 回收内存 • 如何确定一个对象对应的span • …...

性能测试监控工具jmeter+grafana

1、什么是性能测试监控体系&#xff1f; 为什么要有监控体系&#xff1f; 原因&#xff1a; 1、项目-日益复杂&#xff08;内部除了代码外&#xff0c;还有中间件&#xff0c;数据库&#xff09; 2、一个系统&#xff0c;背后可能有多个软/硬件组合支撑&#xff0c;影响性能的因…...

016.3月夏令营:数理类

016.3月夏令营&#xff1a;数理类&#xff1a; 中国人民大学统计学院&#xff1a; http://www.eeban.com/forum.php?modviewthread&tid386109 北京大学化学学院第一轮&#xff1a; http://www.eeban.com/forum.php?m ... 6026&extrapage%3D1 香港大学化学系夏令营&a…...

CS144 Lab Checkpoint 0: networking warm up

Set up GNU/Linux on your computer 我用的是Ubuntu&#xff0c;按照指导书上写的输入如下命令安装所需的软件包&#xff1a; sudo apt update && sudo apt install git cmake gdb build-essential clang \ clang-tidy clang-format gcc-doc pkg-config glibc-doc tc…...

靶场之路-VulnHub-DC-6 nmap提权、kali爆破、shell反连

靶场之路-VulnHub-DC-6 一、信息收集 1、扫描靶机ip 2、指纹扫描 这里扫的我有点懵&#xff0c;这里只有两个端口&#xff0c;感觉是要扫扫目录了 nmap -sS -sV 192.168.122.128 PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 7.4p1 Debian 10deb9u6 (protoc…...

给没有登录认证的web应用添加登录认证(openresty lua实现)

这阵子不是deepseek火么&#xff1f;我也折腾了下本地部署&#xff0c;ollama、vllm、llama.cpp都弄了下&#xff0c;webui也用了几个&#xff0c;发现nextjs-ollama-llm-ui小巧方便&#xff0c;挺适合个人使用的。如果放在网上供多人使用的话&#xff0c;得接入登录认证才好&a…...

3月5日作业

代码作业&#xff1a; #!/bin/bash# 清空目录函数 safe_clear_dir() {local dir"$1"local name"$2"if [ -d "$dir" ]; thenwhile true; doread -p "检测到 $name 目录已存在&#xff0c;请选择操作&#xff1a; 1) 清空目录内容 2) 保留目…...

css的定位(position)详解:相对定位 绝对定位 固定定位

在 CSS 中&#xff0c;元素的定位通过 position 属性控制&#xff0c;共有 5 种定位模式&#xff1a;static&#xff08;静态定位&#xff09;、relative&#xff08;相对定位&#xff09;、absolute&#xff08;绝对定位&#xff09;、fixed&#xff08;固定定位&#xff09;和…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...

自然语言处理——文本分类

文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益&#xff08;IG&#xff09; 分类器设计贝叶斯理论&#xff1a;线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别&#xff0c; 有单标签多类别文本分类和多…...

大数据治理的常见方式

大数据治理的常见方式 大数据治理是确保数据质量、安全性和可用性的系统性方法&#xff0c;以下是几种常见的治理方式&#xff1a; 1. 数据质量管理 核心方法&#xff1a; 数据校验&#xff1a;建立数据校验规则&#xff08;格式、范围、一致性等&#xff09;数据清洗&…...

OPENCV图形计算面积、弧长API讲解(1)

一.OPENCV图形面积、弧长计算的API介绍 之前我们已经把图形轮廓的检测、画框等功能讲解了一遍。那今天我们主要结合轮廓检测的API去计算图形的面积&#xff0c;这些面积可以是矩形、圆形等等。图形面积计算和弧长计算常用于车辆识别、桥梁识别等重要功能&#xff0c;常用的API…...

PostgreSQL 与 SQL 基础:为 Fast API 打下数据基础

在构建任何动态、数据驱动的Web API时&#xff0c;一个稳定高效的数据存储方案是不可或缺的。对于使用Python FastAPI的开发者来说&#xff0c;深入理解关系型数据库的工作原理、掌握SQL这门与数据库“对话”的语言&#xff0c;以及学会如何在Python中操作数据库&#xff0c;是…...