当前位置: 首页 > news >正文

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

1.2 yml 配置

### 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机
## 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机

2.生产端的消息确认发送代码

/*** (1) RabbitTemplate.ConfirmCallback 这个接口是用来确定消息是否到达交换器的* (2) RabbitTemplate.ReturnsCallback 这个则是用来确定消息是否到达队列的,未到达队列时会被调用*/
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{private RabbitTemplate rabbitTemplate;public void queueConfirm(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));// 故意输入一个不存在的交换机rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));// 故意输入一个不存在的队列rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));log.info("Confirm -- 消息--发送结束");}/*** 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null* //将当前类的实例设置为 RabbitMQ 的确认回调处理器,跟下面的confirm方法联合使用,* // 还需要打开配置:spring: rabbitmq: publisher-confirm-type: correlated*/@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Autowiredpublic RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;
//        rabbitTemplate.setConfirmCallback(this);}/** 此方法用于监听消息是否发送到交换机* 回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("confirm -- 监听消息成功发送到交换机--回调id = {}", correlationData);} else {log.info("confirm -- 消息没有发送到交换机回调id= {},消息发送失败:{}。", correlationData, cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息未到达队列 --- returnedMessage= " + returnedMessage);}
}

2.2 生产端的截图

3.消费端代码

@Component
@Slf4j
public class RabbitConfirmConsumer {// 交换机public static final String confirm_exchange_name = "confirm_exchange";// 队列public static final String confirm_queue_name="confirm_queue";// routingkeypublic static final String confirm_routing_key = "confirm_key1";// 声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(confirm_exchange_name);}// 声明队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(confirm_queue_name).build();}// 绑定队列到交换机@Beanpublic Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);}/*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//获取消息的内容byte[] body = message.getBody();//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}}

3.2消费端截图

4 消费端重试机制

@Service
@Slf4j
public class RabbitRetryConsumer {@Beanpublic Queue retryQueue(){Map<String,Object> params = new HashMap<>();return QueueBuilder.durable("retry_queue").withArguments(params).build();}@Beanpublic TopicExchange retryTopicExchange(){return new TopicExchange("retry_exchange",true,false);}//队列与交换机进行绑定@Beanpublic Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");}int count  = 0;//测试重试,需要在yml配置 retry@RabbitListener(queues = "retry_queue")public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("retryConsumer 重试次数 = {},重试接收数据为:{}",count++, map);int i = 10 /0;channel.basicAck(tag,false);}}

4.2 重试机制截图

5. 限流设置--消费端

spring:rabbitmq:listener:simple:acknowledge-mode: manual # 开启手动确认模式prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量,以此来实现流控制

5.1 生产端--发送19条信息

@GetMapping("/xianliu")public String xianliuTest(){for(int i = 1; i < 20; i++){Map<String, String> map = new HashMap<>();map.put("key","限流测试--" + i);rabbitMqProducer.xianliuTest(map);}return "限流测试发送成功";}/**** 限流消息的发送测试*/public void xianliuTest(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));}

5.2 消费端

 /*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。//channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{//否定确认//channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}

5.3 注释掉channel.basicAck--堵塞了

5.4 注释掉了 prefetch -- 19条全部被消费,即使没有ack

相关文章:

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <arti…...

【HarmonyOS Next】自定义Tabs

背景 项目中Tabs的使用可以说是特别的频繁&#xff0c;但是官方提供的Tabs使用起来&#xff0c;存在tab选项卡切换动画滞后的问题。 原始动画无法满足产品的UI需求&#xff0c;因此&#xff0c;这篇文章将实现下面页面滑动&#xff0c;tab选项卡实时滑动的动画效果。 实现逻…...

Sass 模块化革命:深入解析 @use 语法,打造高效 CSS 架构

文章目录 前言use 用法1. 模块化与命名空间2. use 中 as 语法的使用3. as * 语法的使用4. 私有成员的访问5. use 中with默认值6. use 导入问题总结下一篇预告&#xff1a; 前言 在上一篇中&#xff0c;我们深入探讨了 Sass 中 import 语法的局限性&#xff0c;正是因为这些问题…...

【渗透测试】反弹 Shell 技术详解(一)

反弹 Shell 技术详解 一、前置知识 反弹 shell&#xff08;Reverse Shell&#xff09;是一种技术&#xff0c;攻击者利用它可以在远程主机上获得一个交互式的命令行接口。通常情况下&#xff0c;反弹 shell 会将标准输入&#xff08;stdin&#xff09;、标准输出&#xff08;…...

python:pymunk + pygame 模拟六边形中小球弹跳运动

向 chat.deepseek.com 提问&#xff1a;编写 python 程序&#xff0c;用 pymunk, 有一个正六边形&#xff0c;围绕中心点缓慢旋转&#xff0c;六边形内有一个小球&#xff0c;六边形的6条边作为墙壁&#xff0c;小球受重力和摩擦力、弹力影响&#xff0c;模拟小球弹跳运动&…...

Windows 图形显示驱动开发-WDDM 3.2-本机 GPU 围栏对象(二)

GPU 和 CPU 之间的同步 CPU 必须执行 MonitoredValue 的更新&#xff0c;并读取 CurrentValue&#xff0c;以确保不会丢失正在进行的信号中断通知。 当向系统中添加新的 CPU 等待程序时&#xff0c;或者如果现有的 CPU 等待程序失效时&#xff0c;OS 必须修改受监视的值。OS …...

23种设计模式之《模板方法模式(Template Method)》在c#中的应用及理解

程序设计中的主要设计模式通常分为三大类&#xff0c;共23种&#xff1a; 1. 创建型模式&#xff08;Creational Patterns&#xff09; 单例模式&#xff08;Singleton&#xff09;&#xff1a;确保一个类只有一个实例&#xff0c;并提供全局访问点。 工厂方法模式&#xff0…...

DEV-C++ 为什么不能调试?(正确解决方案)

为了备战pat考试&#xff0c;专门下载了DEV C&#xff0c;然后懵圈的发现&#xff0c;怎么无法调试(╯□&#xff09;╯︵ ┻━┻ 然后整了半天&#xff0c;终于在网上找到相应的解决方案&#xff01;&#xff01;&#xff01;-> Dev C 5.11 调试初始设置 <- 一共四步…...

【C++设计模式】第五篇:原型模式(Prototype)

注意&#xff1a;复现代码时&#xff0c;确保 VS2022 使用 C17/20 标准以支持现代特性。 克隆对象的效率革命 1. 模式定义与用途​ ​ 核心思想​ ​原型模式&#xff1a;通过复制现有对象​&#xff08;原型&#xff09;来创建新对象&#xff0c;而非通过new构造。​关键用…...

深入 Vue.js 组件开发:从基础到实践

深入 Vue.js 组件开发&#xff1a;从基础到实践 Vue.js 作为一款卓越的前端框架&#xff0c;其组件化开发模式为构建高效、可维护的用户界面提供了强大支持。在这篇博客中&#xff0c;我们将深入探讨 Vue.js 组件开发的各个方面&#xff0c;从基础概念到高级技巧&#xff0c;助…...

maven导入spring框架

在eclipse导入maven项目&#xff0c; 在pom.xml文件中加入以下内容 junit junit 3.8.1 test org.springframework spring-core ${org.springframework.version} org.springframework spring-beans ${org.springframework.version} org.springframework spring-context ${org.s…...

数据守护者:备份文件的重要性与自动化实践策略

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业运营和个人生活中不可或缺的核心资源。无论是企业的财务报表、客户资料&#xff0c;还是个人的家庭照片、工作文档&#xff0c;这些数据都承载着巨大的价值。然而&#xff0c;数据丢失的风险无处不在&#xff0c;硬件故障…...

MyBatis @Param 注解详解:指定的参数找不到?

MyBatis Param 注解详解 1. Param 注解的作用 Param 注解用于显式指定方法参数的名称&#xff0c;让 MyBatis 在 SQL 映射文件&#xff08;XML&#xff09;或注解中通过该名称访问参数。 核心场景&#xff1a; 方法有多个参数时&#xff0c;避免参数名丢失或混淆。参数为简单…...

【项目日记(八)】内存回收与联调

前言 我们前面实现了三层缓存申请的过程&#xff0c;并完成了三层缓存申请过程的联调&#xff01;本期我们来介绍三层的缓存的回收机制以及三层整体联调释放的过程。 目录 前言 一、thread cache 回收内存 二、central cache 回收内存 • 如何确定一个对象对应的span • …...

性能测试监控工具jmeter+grafana

1、什么是性能测试监控体系&#xff1f; 为什么要有监控体系&#xff1f; 原因&#xff1a; 1、项目-日益复杂&#xff08;内部除了代码外&#xff0c;还有中间件&#xff0c;数据库&#xff09; 2、一个系统&#xff0c;背后可能有多个软/硬件组合支撑&#xff0c;影响性能的因…...

016.3月夏令营:数理类

016.3月夏令营&#xff1a;数理类&#xff1a; 中国人民大学统计学院&#xff1a; http://www.eeban.com/forum.php?modviewthread&tid386109 北京大学化学学院第一轮&#xff1a; http://www.eeban.com/forum.php?m ... 6026&extrapage%3D1 香港大学化学系夏令营&a…...

CS144 Lab Checkpoint 0: networking warm up

Set up GNU/Linux on your computer 我用的是Ubuntu&#xff0c;按照指导书上写的输入如下命令安装所需的软件包&#xff1a; sudo apt update && sudo apt install git cmake gdb build-essential clang \ clang-tidy clang-format gcc-doc pkg-config glibc-doc tc…...

靶场之路-VulnHub-DC-6 nmap提权、kali爆破、shell反连

靶场之路-VulnHub-DC-6 一、信息收集 1、扫描靶机ip 2、指纹扫描 这里扫的我有点懵&#xff0c;这里只有两个端口&#xff0c;感觉是要扫扫目录了 nmap -sS -sV 192.168.122.128 PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 7.4p1 Debian 10deb9u6 (protoc…...

给没有登录认证的web应用添加登录认证(openresty lua实现)

这阵子不是deepseek火么&#xff1f;我也折腾了下本地部署&#xff0c;ollama、vllm、llama.cpp都弄了下&#xff0c;webui也用了几个&#xff0c;发现nextjs-ollama-llm-ui小巧方便&#xff0c;挺适合个人使用的。如果放在网上供多人使用的话&#xff0c;得接入登录认证才好&a…...

3月5日作业

代码作业&#xff1a; #!/bin/bash# 清空目录函数 safe_clear_dir() {local dir"$1"local name"$2"if [ -d "$dir" ]; thenwhile true; doread -p "检测到 $name 目录已存在&#xff0c;请选择操作&#xff1a; 1) 清空目录内容 2) 保留目…...

利用ngx_stream_return_module构建简易 TCP/UDP 响应网关

一、模块概述 ngx_stream_return_module 提供了一个极简的指令&#xff1a; return <value>;在收到客户端连接后&#xff0c;立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量&#xff08;如 $time_iso8601、$remote_addr 等&#xff09;&a…...

java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别

UnsatisfiedLinkError 在对接硬件设备中&#xff0c;我们会遇到使用 java 调用 dll文件 的情况&#xff0c;此时大概率出现UnsatisfiedLinkError链接错误&#xff0c;原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用&#xff0c;结果 dll 未实现 JNI 协…...

Qt Http Server模块功能及架构

Qt Http Server 是 Qt 6.0 中引入的一个新模块&#xff0c;它提供了一个轻量级的 HTTP 服务器实现&#xff0c;主要用于构建基于 HTTP 的应用程序和服务。 功能介绍&#xff1a; 主要功能 HTTP服务器功能&#xff1a; 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...

2023赣州旅游投资集团

单选题 1.“不登高山&#xff0c;不知天之高也&#xff1b;不临深溪&#xff0c;不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

laravel8+vue3.0+element-plus搭建方法

创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信

文章目录 Linux C语言网络编程详细入门教程&#xff1a;如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket&#xff08;服务端和客户端都要&#xff09;2. 绑定本地地址和端口&#x…...

CSS设置元素的宽度根据其内容自动调整

width: fit-content 是 CSS 中的一个属性值&#xff0c;用于设置元素的宽度根据其内容自动调整&#xff0c;确保宽度刚好容纳内容而不会超出。 效果对比 默认情况&#xff08;width: auto&#xff09;&#xff1a; 块级元素&#xff08;如 <div>&#xff09;会占满父容器…...

【生成模型】视频生成论文调研

工作清单 上游应用方向&#xff1a;控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

Golang——9、反射和文件操作

反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一&#xff1a;使用Read()读取文件2.3、方式二&#xff1a;bufio读取文件2.4、方式三&#xff1a;os.ReadFile读取2.5、写…...