当前位置: 首页 > news >正文

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

1.2 yml 配置

### 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机
## 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机

2.生产端的消息确认发送代码

/*** (1) RabbitTemplate.ConfirmCallback 这个接口是用来确定消息是否到达交换器的* (2) RabbitTemplate.ReturnsCallback 这个则是用来确定消息是否到达队列的,未到达队列时会被调用*/
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{private RabbitTemplate rabbitTemplate;public void queueConfirm(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));// 故意输入一个不存在的交换机rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));// 故意输入一个不存在的队列rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));log.info("Confirm -- 消息--发送结束");}/*** 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null* //将当前类的实例设置为 RabbitMQ 的确认回调处理器,跟下面的confirm方法联合使用,* // 还需要打开配置:spring: rabbitmq: publisher-confirm-type: correlated*/@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Autowiredpublic RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;
//        rabbitTemplate.setConfirmCallback(this);}/** 此方法用于监听消息是否发送到交换机* 回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("confirm -- 监听消息成功发送到交换机--回调id = {}", correlationData);} else {log.info("confirm -- 消息没有发送到交换机回调id= {},消息发送失败:{}。", correlationData, cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息未到达队列 --- returnedMessage= " + returnedMessage);}
}

2.2 生产端的截图

3.消费端代码

@Component
@Slf4j
public class RabbitConfirmConsumer {// 交换机public static final String confirm_exchange_name = "confirm_exchange";// 队列public static final String confirm_queue_name="confirm_queue";// routingkeypublic static final String confirm_routing_key = "confirm_key1";// 声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(confirm_exchange_name);}// 声明队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(confirm_queue_name).build();}// 绑定队列到交换机@Beanpublic Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);}/*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//获取消息的内容byte[] body = message.getBody();//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}}

3.2消费端截图

4 消费端重试机制

@Service
@Slf4j
public class RabbitRetryConsumer {@Beanpublic Queue retryQueue(){Map<String,Object> params = new HashMap<>();return QueueBuilder.durable("retry_queue").withArguments(params).build();}@Beanpublic TopicExchange retryTopicExchange(){return new TopicExchange("retry_exchange",true,false);}//队列与交换机进行绑定@Beanpublic Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");}int count  = 0;//测试重试,需要在yml配置 retry@RabbitListener(queues = "retry_queue")public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("retryConsumer 重试次数 = {},重试接收数据为:{}",count++, map);int i = 10 /0;channel.basicAck(tag,false);}}

4.2 重试机制截图

5. 限流设置--消费端

spring:rabbitmq:listener:simple:acknowledge-mode: manual # 开启手动确认模式prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量,以此来实现流控制

5.1 生产端--发送19条信息

@GetMapping("/xianliu")public String xianliuTest(){for(int i = 1; i < 20; i++){Map<String, String> map = new HashMap<>();map.put("key","限流测试--" + i);rabbitMqProducer.xianliuTest(map);}return "限流测试发送成功";}/**** 限流消息的发送测试*/public void xianliuTest(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));}

5.2 消费端

 /*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。//channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{//否定确认//channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}

5.3 注释掉channel.basicAck--堵塞了

5.4 注释掉了 prefetch -- 19条全部被消费,即使没有ack

相关文章:

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <arti…...

【HarmonyOS Next】自定义Tabs

背景 项目中Tabs的使用可以说是特别的频繁&#xff0c;但是官方提供的Tabs使用起来&#xff0c;存在tab选项卡切换动画滞后的问题。 原始动画无法满足产品的UI需求&#xff0c;因此&#xff0c;这篇文章将实现下面页面滑动&#xff0c;tab选项卡实时滑动的动画效果。 实现逻…...

Sass 模块化革命:深入解析 @use 语法,打造高效 CSS 架构

文章目录 前言use 用法1. 模块化与命名空间2. use 中 as 语法的使用3. as * 语法的使用4. 私有成员的访问5. use 中with默认值6. use 导入问题总结下一篇预告&#xff1a; 前言 在上一篇中&#xff0c;我们深入探讨了 Sass 中 import 语法的局限性&#xff0c;正是因为这些问题…...

【渗透测试】反弹 Shell 技术详解(一)

反弹 Shell 技术详解 一、前置知识 反弹 shell&#xff08;Reverse Shell&#xff09;是一种技术&#xff0c;攻击者利用它可以在远程主机上获得一个交互式的命令行接口。通常情况下&#xff0c;反弹 shell 会将标准输入&#xff08;stdin&#xff09;、标准输出&#xff08;…...

python:pymunk + pygame 模拟六边形中小球弹跳运动

向 chat.deepseek.com 提问&#xff1a;编写 python 程序&#xff0c;用 pymunk, 有一个正六边形&#xff0c;围绕中心点缓慢旋转&#xff0c;六边形内有一个小球&#xff0c;六边形的6条边作为墙壁&#xff0c;小球受重力和摩擦力、弹力影响&#xff0c;模拟小球弹跳运动&…...

Windows 图形显示驱动开发-WDDM 3.2-本机 GPU 围栏对象(二)

GPU 和 CPU 之间的同步 CPU 必须执行 MonitoredValue 的更新&#xff0c;并读取 CurrentValue&#xff0c;以确保不会丢失正在进行的信号中断通知。 当向系统中添加新的 CPU 等待程序时&#xff0c;或者如果现有的 CPU 等待程序失效时&#xff0c;OS 必须修改受监视的值。OS …...

23种设计模式之《模板方法模式(Template Method)》在c#中的应用及理解

程序设计中的主要设计模式通常分为三大类&#xff0c;共23种&#xff1a; 1. 创建型模式&#xff08;Creational Patterns&#xff09; 单例模式&#xff08;Singleton&#xff09;&#xff1a;确保一个类只有一个实例&#xff0c;并提供全局访问点。 工厂方法模式&#xff0…...

DEV-C++ 为什么不能调试?(正确解决方案)

为了备战pat考试&#xff0c;专门下载了DEV C&#xff0c;然后懵圈的发现&#xff0c;怎么无法调试(╯□&#xff09;╯︵ ┻━┻ 然后整了半天&#xff0c;终于在网上找到相应的解决方案&#xff01;&#xff01;&#xff01;-> Dev C 5.11 调试初始设置 <- 一共四步…...

【C++设计模式】第五篇:原型模式(Prototype)

注意&#xff1a;复现代码时&#xff0c;确保 VS2022 使用 C17/20 标准以支持现代特性。 克隆对象的效率革命 1. 模式定义与用途​ ​ 核心思想​ ​原型模式&#xff1a;通过复制现有对象​&#xff08;原型&#xff09;来创建新对象&#xff0c;而非通过new构造。​关键用…...

深入 Vue.js 组件开发:从基础到实践

深入 Vue.js 组件开发&#xff1a;从基础到实践 Vue.js 作为一款卓越的前端框架&#xff0c;其组件化开发模式为构建高效、可维护的用户界面提供了强大支持。在这篇博客中&#xff0c;我们将深入探讨 Vue.js 组件开发的各个方面&#xff0c;从基础概念到高级技巧&#xff0c;助…...

maven导入spring框架

在eclipse导入maven项目&#xff0c; 在pom.xml文件中加入以下内容 junit junit 3.8.1 test org.springframework spring-core ${org.springframework.version} org.springframework spring-beans ${org.springframework.version} org.springframework spring-context ${org.s…...

数据守护者:备份文件的重要性与自动化实践策略

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业运营和个人生活中不可或缺的核心资源。无论是企业的财务报表、客户资料&#xff0c;还是个人的家庭照片、工作文档&#xff0c;这些数据都承载着巨大的价值。然而&#xff0c;数据丢失的风险无处不在&#xff0c;硬件故障…...

MyBatis @Param 注解详解:指定的参数找不到?

MyBatis Param 注解详解 1. Param 注解的作用 Param 注解用于显式指定方法参数的名称&#xff0c;让 MyBatis 在 SQL 映射文件&#xff08;XML&#xff09;或注解中通过该名称访问参数。 核心场景&#xff1a; 方法有多个参数时&#xff0c;避免参数名丢失或混淆。参数为简单…...

【项目日记(八)】内存回收与联调

前言 我们前面实现了三层缓存申请的过程&#xff0c;并完成了三层缓存申请过程的联调&#xff01;本期我们来介绍三层的缓存的回收机制以及三层整体联调释放的过程。 目录 前言 一、thread cache 回收内存 二、central cache 回收内存 • 如何确定一个对象对应的span • …...

性能测试监控工具jmeter+grafana

1、什么是性能测试监控体系&#xff1f; 为什么要有监控体系&#xff1f; 原因&#xff1a; 1、项目-日益复杂&#xff08;内部除了代码外&#xff0c;还有中间件&#xff0c;数据库&#xff09; 2、一个系统&#xff0c;背后可能有多个软/硬件组合支撑&#xff0c;影响性能的因…...

016.3月夏令营:数理类

016.3月夏令营&#xff1a;数理类&#xff1a; 中国人民大学统计学院&#xff1a; http://www.eeban.com/forum.php?modviewthread&tid386109 北京大学化学学院第一轮&#xff1a; http://www.eeban.com/forum.php?m ... 6026&extrapage%3D1 香港大学化学系夏令营&a…...

CS144 Lab Checkpoint 0: networking warm up

Set up GNU/Linux on your computer 我用的是Ubuntu&#xff0c;按照指导书上写的输入如下命令安装所需的软件包&#xff1a; sudo apt update && sudo apt install git cmake gdb build-essential clang \ clang-tidy clang-format gcc-doc pkg-config glibc-doc tc…...

靶场之路-VulnHub-DC-6 nmap提权、kali爆破、shell反连

靶场之路-VulnHub-DC-6 一、信息收集 1、扫描靶机ip 2、指纹扫描 这里扫的我有点懵&#xff0c;这里只有两个端口&#xff0c;感觉是要扫扫目录了 nmap -sS -sV 192.168.122.128 PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 7.4p1 Debian 10deb9u6 (protoc…...

给没有登录认证的web应用添加登录认证(openresty lua实现)

这阵子不是deepseek火么&#xff1f;我也折腾了下本地部署&#xff0c;ollama、vllm、llama.cpp都弄了下&#xff0c;webui也用了几个&#xff0c;发现nextjs-ollama-llm-ui小巧方便&#xff0c;挺适合个人使用的。如果放在网上供多人使用的话&#xff0c;得接入登录认证才好&a…...

3月5日作业

代码作业&#xff1a; #!/bin/bash# 清空目录函数 safe_clear_dir() {local dir"$1"local name"$2"if [ -d "$dir" ]; thenwhile true; doread -p "检测到 $name 目录已存在&#xff0c;请选择操作&#xff1a; 1) 清空目录内容 2) 保留目…...

JavaSec-RCE

简介 RCE(Remote Code Execution)&#xff0c;可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景&#xff1a;Groovy代码注入 Groovy是一种基于JVM的动态语言&#xff0c;语法简洁&#xff0c;支持闭包、动态类型和Java互操作性&#xff0c…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务

通过akshare库&#xff0c;获取股票数据&#xff0c;并生成TabPFN这个模型 可以识别、处理的格式&#xff0c;写一个完整的预处理示例&#xff0c;并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务&#xff0c;进行预测并输…...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...

Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换

目录 关键点 技术实现1 技术实现2 摘要&#xff1a; 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式&#xff08;自动驾驶、人工驾驶、远程驾驶、主动安全&#xff09;&#xff0c;并通过实时消息推送更新车…...

Razor编程中@Html的方法使用大全

文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...

深入理解Optional:处理空指针异常

1. 使用Optional处理可能为空的集合 在Java开发中&#xff0c;集合判空是一个常见但容易出错的场景。传统方式虽然可行&#xff0c;但存在一些潜在问题&#xff1a; // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...

算法打卡第18天

从中序与后序遍历序列构造二叉树 (力扣106题) 给定两个整数数组 inorder 和 postorder &#xff0c;其中 inorder 是二叉树的中序遍历&#xff0c; postorder 是同一棵树的后序遍历&#xff0c;请你构造并返回这颗 二叉树 。 示例 1: 输入&#xff1a;inorder [9,3,15,20,7…...