RabbitMQ发布确认高级版
1.前言
在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?


2.添加配置信息
在application.properties文件中添加如下配置,交换机开启消息确认模式
#NONE 值是禁用发布确认模式,是默认值
#CORRELATED 值是发布消息成功到交换器后会触发回调方法
#SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
# 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,
# 根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,
# 则接下来无法发送消息到 broker;
spring.rabbitmq.publisher-confirm-type=correlated
- NONE 值是禁用发布确认模式,是默认值
- CORRELATED 值是发布消息成功到交换器后会触发回调方法
- SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate 调用waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false则会关闭 channel,则接下来无法发送消息到 broker;
3. 配置类
package com.hong.springboot.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Description: 发布确认高级版配置类* @Author: hong* @Date: 2024-03-05 20:52* @Version: 1.0**/
@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String CONFIRM_ROUTING_KEY = "key1";//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);}
}
4.生产者
package com.hong.springboot.rabbitmq.controller;import com.hong.springboot.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Description: 发布确认高级版生产者* @Author: hong* @Date: 2024-03-05 20:58* @Version: 1.0**/
@Slf4j
@RequestMapping("/confirm/")
@RestController
public class ConfirmProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;//http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message);}
}
5.消费者
package com.hong.springboot.rabbitmq.consumer;import com.hong.springboot.rabbitmq.config.ConfirmConfig;
import com.hong.springboot.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Description: 发布确认高级版消费者* @Author: hong* @Date: 2024-03-05 21:05* @Version: 1.0**/
@Slf4j
@Component
public class ConfirmConsumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMessage(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg);}
}
正常情况下,发送http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才

6.回调接口
package com.hong.springboot.rabbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @Description: 发布确认高级版消息生产者的回调接口* @Author: hong* @Date: 2024-03-09 21:58* @Version: 1.0**/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}/*** 交换机不管是否收到消息的一个回调方法* 1.收到消息* correlationData 保存回调消息的id及相关信息* b true 交换机收到消息* s null* 2.未收到消息* correlationData 保存回调消息的id及相关信息* b false 交换机未收到消息* s 失败的原因* @param correlationData 消息相关数据* @param b 交换机是否收到消息* @param s 没收到消息的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id = correlationData != null ? correlationData.getId() : "";if (b) {log.info("交换机已经收到id为:{}的消息", id);} else {log.info("交换机还未收到id为:{}消息,原因:{}", id, s);}}
}
修改ConfirmProducerController中sendMsg方法
交换机改个名字模拟交换机收不到消息
@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {CorrelationData correlationData = new CorrelationData("1");log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"123", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);}

将routingKey改个名字模拟队列收不到消息
@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {CorrelationData correlationData1 = new CorrelationData("1");log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY,correlationData1);CorrelationData correlationData2 = new CorrelationData("2");log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY+"abc");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"abc",message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY+"abc",correlationData2);}

7.回退消息
从以上模拟场景可以看出,在仅开启生产者确认机制,交换机接收到消息后,会直接给生产者发送确认消息,但若发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃的。因此我们借用mandatory参数在当消息传递过程中不可达目的地时将消息返回给生产者。
7.1.开启消息回退机制
配置文件中添加如下配置
#开启消息回退机制
spring.rabbitmq.publisher-returns=true
7.2. 添加消息回退回调
@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 当消息传递过程中不可达目的地时将消息返回给生产者* 只有不可达目的地时才回调* @param returnedMessage*/@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(),returnedMessage.getReplyText(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());}

相关文章:
RabbitMQ发布确认高级版
1.前言 在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢&…...
【阿里云系列】-基于云效构建部署Springboot项目到ACK
介绍 为了提高项目迭代的速度加速交付产品给客户,我们通常会选择CICD工具来减少人力投入产生的成本,开源的工具比如有成熟的Jenkins,但是本文讲的是阿里云提高的解决方案云效平台,通过配置流水线的形式实现项目的快速部署到服务器…...
PyTorch搭建LeNet训练集详细实现
一、下载训练集 导包 import torch import torchvision import torch.nn as nn from model import LeNet import torch.optim as optim import torchvision.transforms as transforms import matplotlib.pyplot as plt import numpy as npToTensor()函数: 把图像…...
R语言复现:中国Charls数据库一篇现况调查论文的缺失数据填补方法
编者 在临床研究中,数据缺失是不可避免的,甚至没有缺失,数据的真实性都会受到质疑。 那我们该如何应对缺失的数据?放着不管?还是重新开始?不妨试着对缺失值进行填补,简单又高效。毕竟对于统计师来说&#…...
解决Git:Author identity unknown Please tell me who you are.
报错信息: 意思: 作者身份未知 ***请告诉我你是谁。 解决办法: git config --global user.name "你的名字"git config --global user.email "你的邮箱"...
Flink StreamTask启动和执行源码分析
文章目录 前言StreamTask 部署启动Task 线程启动StreamTask 初始化StreamTask 执行 前言 Flink的StreamTask的启动和执行是一个复杂的过程,涉及多个关键步骤。以下是StreamTask启动和执行的主要流程: 初始化:StreamTask的初始化阶段涉及多个…...
【MySQL 系列】MySQL 语句篇_DCL 语句
DCL( Data Control Language,数据控制语言)用于对数据访问权限进行控制,定义数据库、表、字段、用户的访问权限和安全级别。主要关键字包括 GRANT、 REVOKE 等。 文章目录 1、MySQL 中的 DCL 语句1.1、数据控制语言--DCL1.2、MySQ…...
什么是序列化?为什么需要序列化?
1、典型回答 序列化(Serialization)序列化是将对象转换为可存储或传输的形式的过程(例如: 将对象转换为字节流) 反序列化(Deserialization) 是将序列化后的数据(例如: 二进制文件)转换回原始对象的过程。通过反序列化,可以从存储介质 (如磁盘、数据库) 或通过网络…...
Linux本地搭建FastDFS系统
文章目录 前言1. 本地搭建FastDFS文件系统1.1 环境安装1.2 安装libfastcommon1.3 安装FastDFS1.4 配置Tracker1.5 配置Storage1.6 测试上传下载1.7 与Nginx整合1.8 安装Nginx1.9 配置Nginx 2. 局域网测试访问FastDFS3. 安装cpolar内网穿透4. 配置公网访问地址5. 固定公网地址5.…...
docker和docker-compose安装
一、docker安装 1、移除旧版本 依次执行如下命令移除旧版本docker,如未安装过无需执行 yum -y remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-selinux docker-engine-selinux…...
深入理解Spring的ApplicationContext:案例详解与应用
深入理解Spring的ApplicationContext:案例详解与应用 在Spring框架的丰富生态中,ApplicationContext扮演着至关重要的角色。作为BeanFactory的扩展,ApplicationContext不仅继承了其所有功能,还引入了更多高级特性,使得…...
6.Java并发编程—深入剖析Java Executors:探索创建线程的5种神奇方式
Executors快速创建线程池的方法 Java通过Executors 工厂提供了5种创建线程池的方法,具体方法如下 方法名描述newSingleThreadExecutor()创建一个单线程的线程池,该线程池中只有一个工作线程。所有任务按照提交的顺序依次执行,保证任务的顺序性…...
英语阅读挑战
英语阅读真是令人头痛的东西。可怜的子航想利用寒假时间突破英语难题。当他拿到一篇英语阅读时,他很好奇作者最喜欢用那些字母。 输入 一句30词以内的英语句子 输出 统计每个字母出现的次数 样例输入 复制 However,the British dont have a history of exporting th…...
备战蓝桥之思维
平台重叠真的坑 给你一句样例,如果你觉得自己的代码没问题那就试试吧 2 1 1 3 1 0 4 正确答案 0 0 0 0 P1105 平台 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) import java.awt.Checkbox; import java.awt.PageAttributes.OriginType; import java.io.B…...
09 string的实现
注意 实现仿cplus官网的的string类,对部分主要功能实现 实现 头文件 #pragma once #include <iostream> #include <assert.h> #include <string>namespace mystring {class string{friend std::ostream& operator<<(std::ostream&a…...
Git 进行版本控制时,配置 user.name 和 user.email
在使用 Git 进行版本控制时,配置 user.name 和 user.email 是一个非常重要的初始步骤,但不是绝对必须的。这两个配置项定义了当你进行提交(commit)时用于标识提交者的信息。 为什么建议配置 user.name 和 user.email 标识提交者…...
传统开发读写优化与HBase
目录: 一、传统开发数据读写性能优化 1. Mysql 分表、主从复制与读写分离 2. Redis(缓存型数据库)主从复制与读写分离 二、HBase 一、传统开发数据读写性能优化 1、Mysql 分表、主从复制与读写分离 mysql分库分表方案 一种分表方案:设置表A 表B 表A 自增列从1开始…...
【OpenGL实现 03】纹理贴图原理和实现
目录 一、说明二、纹理贴图原理2.1 纹理融合原理2.2 UV坐标原理 三、生成纹理对象3.1 需要在VAO上绑定纹理坐标3.2 纹理传递3.3 纹理buffer生成 四、代码实现:五、着色器4.1 片段4.2 顶点 五、后记 一、说明 本篇叙述在画出图元的时候,如何贴图纹理图片…...
FDU 2021 | 二叉树关键节点的个数
文章目录 1. 题目描述2. 我的尝试 1. 题目描述 给定一颗二叉树,树的每个节点的值为一个正整数。如果从根节点到节点 N 的路径上不存在比节点 N 的值大的节点,那么节点 N 被认为是树上的关键节点。求树上所有的关键节点的个数。请写出程序,并…...
精读《React Conf 2019 - Day2》
1 引言 这是继 精读《React Conf 2019 - Day1》 之后的第二篇,补充了 React Conf 2019 第二天的内容。 2 概述 & 精读 第二天的内容更为精彩,笔者会重点介绍比较干货的部分。 Fast refresh Fast refresh 是更好的 react-hot-loader 替代方案&am…...
Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...
第25节 Node.js 断言测试
Node.js的assert模块主要用于编写程序的单元测试时使用,通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试,通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)
参考官方文档:https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java(供 Kotlin 使用) 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
