当前位置: 首页 > news >正文

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency><dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

1.2 yml 配置

### 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机
## 生产端的配置
spring:rabbitmq:host: localhostport: 5672virtual-host: / # 虚拟主机username: guestpassword: guestpublisher-returns: true  #确认消息已经发送到队列,生产上无需开启# simple:同步等待confirm结果,直到超时#开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallbackpublisher-confirm-type: correlated #确认消息已发送到交换机

2.生产端的消息确认发送代码

/*** (1) RabbitTemplate.ConfirmCallback 这个接口是用来确定消息是否到达交换器的* (2) RabbitTemplate.ReturnsCallback 这个则是用来确定消息是否到达队列的,未到达队列时会被调用*/
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{private RabbitTemplate rabbitTemplate;public void queueConfirm(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));// 故意输入一个不存在的交换机rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));// 故意输入一个不存在的队列rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));log.info("Confirm -- 消息--发送结束");}/*** 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null* //将当前类的实例设置为 RabbitMQ 的确认回调处理器,跟下面的confirm方法联合使用,* // 还需要打开配置:spring: rabbitmq: publisher-confirm-type: correlated*/@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Autowiredpublic RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;
//        rabbitTemplate.setConfirmCallback(this);}/** 此方法用于监听消息是否发送到交换机* 回调*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("confirm -- 监听消息成功发送到交换机--回调id = {}", correlationData);} else {log.info("confirm -- 消息没有发送到交换机回调id= {},消息发送失败:{}。", correlationData, cause);}}@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("消息未到达队列 --- returnedMessage= " + returnedMessage);}
}

2.2 生产端的截图

3.消费端代码

@Component
@Slf4j
public class RabbitConfirmConsumer {// 交换机public static final String confirm_exchange_name = "confirm_exchange";// 队列public static final String confirm_queue_name="confirm_queue";// routingkeypublic static final String confirm_routing_key = "confirm_key1";// 声明交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(confirm_exchange_name);}// 声明队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(confirm_queue_name).build();}// 绑定队列到交换机@Beanpublic Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);}/*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//获取消息的内容byte[] body = message.getBody();//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}}

3.2消费端截图

4 消费端重试机制

@Service
@Slf4j
public class RabbitRetryConsumer {@Beanpublic Queue retryQueue(){Map<String,Object> params = new HashMap<>();return QueueBuilder.durable("retry_queue").withArguments(params).build();}@Beanpublic TopicExchange retryTopicExchange(){return new TopicExchange("retry_exchange",true,false);}//队列与交换机进行绑定@Beanpublic Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");}int count  = 0;//测试重试,需要在yml配置 retry@RabbitListener(queues = "retry_queue")public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {log.info("retryConsumer 重试次数 = {},重试接收数据为:{}",count++, map);int i = 10 /0;channel.basicAck(tag,false);}}

4.2 重试机制截图

5. 限流设置--消费端

spring:rabbitmq:listener:simple:acknowledge-mode: manual # 开启手动确认模式prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量,以此来实现流控制

5.1 生产端--发送19条信息

@GetMapping("/xianliu")public String xianliuTest(){for(int i = 1; i < 20; i++){Map<String, String> map = new HashMap<>();map.put("key","限流测试--" + i);rabbitMqProducer.xianliuTest(map);}return "限流测试发送成功";}/**** 限流消息的发送测试*/public void xianliuTest(Map<String, String> map) {// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));}

5.2 消费端

 /*** ack:成功处理消息,RabbitMQ从队列中删除该消息* nack:消息处理失败,RabbitMQ需要再次投递消息* reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息*/@RabbitListener(queues = "confirm_queue")public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//获取消息的唯一标记long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);if(message.getBody() != null){//basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。//channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功log.info("接收的消息为:{}", map);}else{//否定确认//channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);log.info("未消费数据");}}

5.3 注释掉channel.basicAck--堵塞了

5.4 注释掉了 prefetch -- 19条全部被消费,即使没有ack

相关文章:

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency><dependency> <groupId>com.fasterxml.jackson.core</groupId> <arti…...

【HarmonyOS Next】自定义Tabs

背景 项目中Tabs的使用可以说是特别的频繁&#xff0c;但是官方提供的Tabs使用起来&#xff0c;存在tab选项卡切换动画滞后的问题。 原始动画无法满足产品的UI需求&#xff0c;因此&#xff0c;这篇文章将实现下面页面滑动&#xff0c;tab选项卡实时滑动的动画效果。 实现逻…...

Sass 模块化革命:深入解析 @use 语法,打造高效 CSS 架构

文章目录 前言use 用法1. 模块化与命名空间2. use 中 as 语法的使用3. as * 语法的使用4. 私有成员的访问5. use 中with默认值6. use 导入问题总结下一篇预告&#xff1a; 前言 在上一篇中&#xff0c;我们深入探讨了 Sass 中 import 语法的局限性&#xff0c;正是因为这些问题…...

【渗透测试】反弹 Shell 技术详解(一)

反弹 Shell 技术详解 一、前置知识 反弹 shell&#xff08;Reverse Shell&#xff09;是一种技术&#xff0c;攻击者利用它可以在远程主机上获得一个交互式的命令行接口。通常情况下&#xff0c;反弹 shell 会将标准输入&#xff08;stdin&#xff09;、标准输出&#xff08;…...

python:pymunk + pygame 模拟六边形中小球弹跳运动

向 chat.deepseek.com 提问&#xff1a;编写 python 程序&#xff0c;用 pymunk, 有一个正六边形&#xff0c;围绕中心点缓慢旋转&#xff0c;六边形内有一个小球&#xff0c;六边形的6条边作为墙壁&#xff0c;小球受重力和摩擦力、弹力影响&#xff0c;模拟小球弹跳运动&…...

Windows 图形显示驱动开发-WDDM 3.2-本机 GPU 围栏对象(二)

GPU 和 CPU 之间的同步 CPU 必须执行 MonitoredValue 的更新&#xff0c;并读取 CurrentValue&#xff0c;以确保不会丢失正在进行的信号中断通知。 当向系统中添加新的 CPU 等待程序时&#xff0c;或者如果现有的 CPU 等待程序失效时&#xff0c;OS 必须修改受监视的值。OS …...

23种设计模式之《模板方法模式(Template Method)》在c#中的应用及理解

程序设计中的主要设计模式通常分为三大类&#xff0c;共23种&#xff1a; 1. 创建型模式&#xff08;Creational Patterns&#xff09; 单例模式&#xff08;Singleton&#xff09;&#xff1a;确保一个类只有一个实例&#xff0c;并提供全局访问点。 工厂方法模式&#xff0…...

DEV-C++ 为什么不能调试?(正确解决方案)

为了备战pat考试&#xff0c;专门下载了DEV C&#xff0c;然后懵圈的发现&#xff0c;怎么无法调试(╯□&#xff09;╯︵ ┻━┻ 然后整了半天&#xff0c;终于在网上找到相应的解决方案&#xff01;&#xff01;&#xff01;-> Dev C 5.11 调试初始设置 <- 一共四步…...

【C++设计模式】第五篇:原型模式(Prototype)

注意&#xff1a;复现代码时&#xff0c;确保 VS2022 使用 C17/20 标准以支持现代特性。 克隆对象的效率革命 1. 模式定义与用途​ ​ 核心思想​ ​原型模式&#xff1a;通过复制现有对象​&#xff08;原型&#xff09;来创建新对象&#xff0c;而非通过new构造。​关键用…...

深入 Vue.js 组件开发:从基础到实践

深入 Vue.js 组件开发&#xff1a;从基础到实践 Vue.js 作为一款卓越的前端框架&#xff0c;其组件化开发模式为构建高效、可维护的用户界面提供了强大支持。在这篇博客中&#xff0c;我们将深入探讨 Vue.js 组件开发的各个方面&#xff0c;从基础概念到高级技巧&#xff0c;助…...

maven导入spring框架

在eclipse导入maven项目&#xff0c; 在pom.xml文件中加入以下内容 junit junit 3.8.1 test org.springframework spring-core ${org.springframework.version} org.springframework spring-beans ${org.springframework.version} org.springframework spring-context ${org.s…...

数据守护者:备份文件的重要性与自动化实践策略

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业运营和个人生活中不可或缺的核心资源。无论是企业的财务报表、客户资料&#xff0c;还是个人的家庭照片、工作文档&#xff0c;这些数据都承载着巨大的价值。然而&#xff0c;数据丢失的风险无处不在&#xff0c;硬件故障…...

MyBatis @Param 注解详解:指定的参数找不到?

MyBatis Param 注解详解 1. Param 注解的作用 Param 注解用于显式指定方法参数的名称&#xff0c;让 MyBatis 在 SQL 映射文件&#xff08;XML&#xff09;或注解中通过该名称访问参数。 核心场景&#xff1a; 方法有多个参数时&#xff0c;避免参数名丢失或混淆。参数为简单…...

【项目日记(八)】内存回收与联调

前言 我们前面实现了三层缓存申请的过程&#xff0c;并完成了三层缓存申请过程的联调&#xff01;本期我们来介绍三层的缓存的回收机制以及三层整体联调释放的过程。 目录 前言 一、thread cache 回收内存 二、central cache 回收内存 • 如何确定一个对象对应的span • …...

性能测试监控工具jmeter+grafana

1、什么是性能测试监控体系&#xff1f; 为什么要有监控体系&#xff1f; 原因&#xff1a; 1、项目-日益复杂&#xff08;内部除了代码外&#xff0c;还有中间件&#xff0c;数据库&#xff09; 2、一个系统&#xff0c;背后可能有多个软/硬件组合支撑&#xff0c;影响性能的因…...

016.3月夏令营:数理类

016.3月夏令营&#xff1a;数理类&#xff1a; 中国人民大学统计学院&#xff1a; http://www.eeban.com/forum.php?modviewthread&tid386109 北京大学化学学院第一轮&#xff1a; http://www.eeban.com/forum.php?m ... 6026&extrapage%3D1 香港大学化学系夏令营&a…...

CS144 Lab Checkpoint 0: networking warm up

Set up GNU/Linux on your computer 我用的是Ubuntu&#xff0c;按照指导书上写的输入如下命令安装所需的软件包&#xff1a; sudo apt update && sudo apt install git cmake gdb build-essential clang \ clang-tidy clang-format gcc-doc pkg-config glibc-doc tc…...

靶场之路-VulnHub-DC-6 nmap提权、kali爆破、shell反连

靶场之路-VulnHub-DC-6 一、信息收集 1、扫描靶机ip 2、指纹扫描 这里扫的我有点懵&#xff0c;这里只有两个端口&#xff0c;感觉是要扫扫目录了 nmap -sS -sV 192.168.122.128 PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 7.4p1 Debian 10deb9u6 (protoc…...

给没有登录认证的web应用添加登录认证(openresty lua实现)

这阵子不是deepseek火么&#xff1f;我也折腾了下本地部署&#xff0c;ollama、vllm、llama.cpp都弄了下&#xff0c;webui也用了几个&#xff0c;发现nextjs-ollama-llm-ui小巧方便&#xff0c;挺适合个人使用的。如果放在网上供多人使用的话&#xff0c;得接入登录认证才好&a…...

3月5日作业

代码作业&#xff1a; #!/bin/bash# 清空目录函数 safe_clear_dir() {local dir"$1"local name"$2"if [ -d "$dir" ]; thenwhile true; doread -p "检测到 $name 目录已存在&#xff0c;请选择操作&#xff1a; 1) 清空目录内容 2) 保留目…...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

HTML 列表、表格、表单

1 列表标签 作用&#xff1a;布局内容排列整齐的区域 列表分类&#xff1a;无序列表、有序列表、定义列表。 例如&#xff1a; 1.1 无序列表 标签&#xff1a;ul 嵌套 li&#xff0c;ul是无序列表&#xff0c;li是列表条目。 注意事项&#xff1a; ul 标签里面只能包裹 li…...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》

在注意力分散、内容高度同质化的时代&#xff0c;情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现&#xff0c;消费者对内容的“有感”程度&#xff0c;正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中&#xff0…...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互

引擎版本&#xff1a; 3.8.1 语言&#xff1a; JavaScript/TypeScript、C、Java 环境&#xff1a;Window 参考&#xff1a;Java原生反射机制 您好&#xff0c;我是鹤九日&#xff01; 回顾 在上篇文章中&#xff1a;CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...