springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。
发布确认
发布确认方案

架构
配置文件
在配置文件当中添加 spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.host=43.139.59.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}
}
生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);}@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message){//指定消息 id 为 1CorrelationData correlationData1=new CorrelationData("1");String routingKey="key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);CorrelationData correlationData2=new CorrelationData("2");routingKey="key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);log.info("发送消息内容:{}",message);}
}
回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}
}
消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}
结果

回退消息
Mandatory 参数
生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);/*** true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* false:* 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(myCallBack);}@GetMapping("sendMessage")public void sendMessage(String message){//让消息绑定一个 id 值CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message+"key1",correlationData1);log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",message+"key2",correlationData2);log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");}
}
回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}//当消息无法路由的时候的回调方法@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, Stringexchange, String routingKey) {log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",newString(message.getBody()),exchange,replyText,routingKey);}
}
结果
接收到被退回的消息
备份交换机
架构

配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份 Exchange@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder =ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);return (DirectExchange)exchangeBuilder.build();}// 声明警告队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}
}
报警消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}
结果

相关文章:
springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。 发布确认 发布确认方案 架构 配置…...
【linux命令讲解大全】010. mapfile命令和tempfile命令的用法及示例
文章目录 mapfile概要主要用途选项参数返回值例子 tempfile补充说明tempfile 命令$$ 变量 从零学 python mapfile 从标准输入读取行并赋值到数组。 概要 mapfile [-d delim] [-n count] [-O origin] [-s count] [-t] [-u fd] [-C callback] [-c quantum] [array] 主要用途 …...
在 Python 中构建卷积神经网络; 从 0 到 9 的手绘数字的灰度图像预测数字
一、说明 为了预测从0到9的数字,我选择了一个基于著名的Kaggle的MNIST数据集的数据集。数据集包含从 <0> 到 <9> 的手绘图数字的灰度图像。在本文中,我将根据像素数据(即数值数据)和卷积神经网络预测数字。 二、 卷积…...
前端分页处理
页面中实现的分页效果,要么后端提供接口,每次点击下一页就调用接口,若不提供接口,分页得前端自己去截取。 方法一:slice方法 slice(参数1,参数2)方法是返回一个新的数组对象,左开右闭 参数1&…...
【C语言】位操作符的一些题目与技巧
初学者在学完位操作符之后,总是不能很好的掌握,因此这篇文章旨在巩固对位操作符的理解与使用。 有的题目可能会比较难以接受,但是看完一定会有收获 目录 位操作符:一些题目:不创建临时变量交换整数整数转换二进制中1的…...
爬虫逆向实战(二十二)--某恩数据电影票房
一、数据接口分析 主页地址:某恩数据 1、抓包 通过抓包可以发现数据接口是API/GetData.ashx 2、判断是否有加密参数 请求参数是否加密? 无请求头是否加密? 无响应是否加密? 通过查看“响应”模块可以发现,响应是…...
火山引擎发布自研视频编解码芯片
2023年8月22日,火山引擎视频云宣布其自研的视频编解码芯片已成功出片。经验证,该芯片的视频压缩效率相比行业主流硬件编码器可提升30%以上,未来将服务于抖音、西瓜视频等视频业务,并将通过火山引擎视频云开放给企业客户。 火山引…...
投递技术类简历的注意事项
简历修改的背景 作为程序员,随着工作年限的增加,要定期的去修改自己的简历中的工作项目,一方面可以促进自己复盘一下工作成果和个人成长,另外也能给自己换工作提供一个前置的便捷性。 注意事项 修改简历的时候有哪些需要注意的…...
每日一题——柱状图中最大的矩形
柱状图中最大的矩形 题目链接 用什么数据结构? 要得到柱状图中最大的矩形,我们就必须要知道对于每一个高度heights[i],他所能勾勒出的矩形最大是多少(即宽度最大是多少)。 而对应到图上我们可以知道,要知…...
Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台
Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台:BPI-5202信创工业控制开发平台 BPI-5202 龙芯2K1000LA 信创工业控制开发平台 1.1 工控机的应用场景 物联网的狂潮,既是一场众多的计算机软硬件厂家(也包括通讯方案和产品厂家&…...
springCloud整合Zookeeper的时候调用找不到服务
SpringCloud整合Zookeeper的时候调用找不到服务 首先,我们在注册中心注册了这个服务: 然后我们使用RestTemplate 调用的时候发现失败了:找不到这个服务: 找了很多资料发现这个必须要加上负载才行 BeanLoadBalanced //负载publi…...
【kubernetes】使用kubepshere部署中间件服务
KubeSphere部署中间件服务 入门使用KubeSphere部署单机版MySQL、Redis、RabbitMQ 记录一下搭建过程 (内容学习于尚硅谷云原生课程) 环境准备 VMware虚拟机k8s集群,一主两从,master也作为工作节点;KubeSphere k8skubesphere devops比较占用磁…...
如何从tabbar页面传数据
无论是百度小程序还是微信小程序,app.json中规定的tabbar页面是不支持传参的,例如: <navigator url../service/service?typeid6 openType"switchTab"> 服务项目 </navigator> 上面的navigater跳转有个属性&#…...
软考高级系统架构设计师系列论文七十四:基于构件的软件开发
软考高级系统架构设计师系列论文七十四:基于构件的软件开发 一、构件相关知识点二、摘要三、正文四、总结一、构件相关知识点 软考高级系统架构设计师系列之:面向构件的软件设计,构件平台与典型架构...
图为科技_边缘计算在智能安防领域的作用
边缘计算在智能安防领域发挥着重要的作用。智能安防系统通常需要处理大量的图像、视频和传感器数据,并对其进行实时分析和处理。边缘计算可以将计算和数据处理功能移动到离数据源更接近的地方,例如摄像头、传感器设备或安防终端。 以下是边缘计算在智能…...
Android 13 - Media框架(7)- NuPlayer::Source
Source 在播放器中起着拉流(Streaming)和解复用(demux)的作用,Source 设计的好坏直接影响到播放器的基础功能,我们这一节将会了解 NuPlayer 中的通用 Source(GenericSource)关注本地…...
MySql015——使用子查询
一、创建customers表 ######################## # Create customers table ######################## use study;CREATE TABLE customers (cust_id int NOT NULL AUTO_INCREMENT,cust_name char(50) NOT NULL ,cust_address char(50) NULL ,cust_city char…...
leetcode 355 设计推特
用链表存储用户发送的每一个推特,用堆获取最先的10条动态 class Twitter {Map<Integer,Set<Integer>> followMap;//规定最新的放到最后Map<Integer,Tweet> postMap;//优先队列(堆)PriorityQueue<Tweet> priorityQueue;int time…...
倒数 2 周|期待 2023 Google开发者大会
9 月 6-7 日,中国上海 前沿科技,新知同享 趣味体验,灵感齐聚 技术生态,多元共进 关注官网最新信息,敬请期待大会开幕 2023 Google 开发者大会官网 相信你一定记得,在今年 5 月的 Google I/O 大会上&am…...
代码随想录day57
516最长回文子序列 class Solution { public:int longestPalindromeSubseq(string s) {vector<vector<int>>dp(s.size(),vector<int>(s.size(),0));for(int i0;i<s.size();i)dp[i][i]1;for(int is.size()-1;i>0;i--){for(int ji1;j<s.size();j){if…...
【限时开放】Midjourney未来主义风格权威认证路径:完成这5个里程碑任务,获取由Adobe+MJ Labs联合签发的Futurism Prompt Architect证书
更多请点击: https://intelliparadigm.com 第一章:【限时开放】Midjourney未来主义风格权威认证路径:完成这5个里程碑任务,获取由AdobeMJ Labs联合签发的Futurism Prompt Architect证书 什么是未来主义Prompt架构师认证…...
Arm Neoverse CMN-650时钟与电源管理架构解析
1. Arm Neoverse CMN-650时钟与电源管理架构解析在现代SoC设计中,时钟与电源管理子系统如同城市的水电供应网络,其设计优劣直接决定了系统性能与能耗效率的平衡。Arm Neoverse CMN-650作为新一代互连架构,通过创新的时钟域划分和电源域管理机…...
量子通信中的级联环图码技术解析
1. 量子通信与量子中继器概述量子通信的核心挑战在于量子态在传输过程中极易受到环境噪声和信道损耗的影响。与传统经典通信不同,量子信息无法被简单地放大或复制(受限于量子不可克隆定理),这使得长距离量子通信的实现面临巨大困难…...
基于大语言模型的智能终端助手:LetMeDoIt的设计、部署与实战
1. 项目概述:一个能听懂人话的AI终端伴侣如果你和我一样,每天有大量时间泡在终端里,那么“如何让命令行更智能、更高效”一定是个永恒的课题。传统的CLI工具链虽然强大,但学习曲线陡峭,命令参数繁多,上下文…...
基于Claude的AI编程助手:从代码生成到自动化审查的全流程实践
1. 项目概述:当Claude遇上代码,一个全能型AI编程助手的诞生最近在GitHub上闲逛,发现了一个挺有意思的项目,叫“everything-claude-code”。光看名字,你可能会觉得这又是一个普通的AI代码生成工具,但实际深入…...
Fusion 360安装后想改位置?别重装!试试这个Windows符号链接‘乾坤大挪移’
Fusion 360安装路径迁移:无需重装的Windows符号链接实战指南 你是否遇到过这样的困扰——Fusion 360默认安装在C盘,随着项目文件增多,宝贵的SSD空间被快速吞噬?传统认知告诉我们,软件一旦安装就无法更改路径࿰…...
DsHidMini:让PS3手柄在Windows上重获新生的终极指南
DsHidMini:让PS3手柄在Windows上重获新生的终极指南 【免费下载链接】DsHidMini Virtual HID Mini-user-mode-driver for Sony DualShock 3 Controllers 项目地址: https://gitcode.com/gh_mirrors/ds/DsHidMini 还在为闲置的索尼DualShock 3手柄寻找新的用途…...
配电箱国家标准最新解读:GB/T 7251系列关键更新与合规要点
作为低压配电系统的核心设备,配电箱的质量直接关乎电力安全与人民生命财产安全。近年来,GB/T 7251《低压成套开关设备和控制设备》系列标准持续迭代升级,为行业规范化发展提供了重要技术支撑。本文从行业观察视角,系统梳理该系列标…...
Flyway实战:从零到一构建数据库版本管理流水线
1. 为什么你的项目需要Flyway 第一次接触数据库版本管理这个概念时,我正面临一个典型的开发困境:团队里有5个开发人员在同时修改数据库结构,每次发布新版本都像在玩俄罗斯轮盘赌——永远不知道谁会忘记执行哪个SQL脚本。直到生产环境出现数据…...
回溯52-59
52. 全排列 给定一个不含重复数字的数组 nums ,返回其 所有可能的全排列 。你可以 按任意顺序 返回答案。 class Solution(object):def fun(self,nums,path):if len(path)len(nums):self.res.append(path[:])for i in range(len(nums)):if self.visit[i]0:self.vi…...
