springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。
发布确认
发布确认方案

架构
配置文件
在配置文件当中添加 spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.host=43.139.59.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}
}
生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);}@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message){//指定消息 id 为 1CorrelationData correlationData1=new CorrelationData("1");String routingKey="key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);CorrelationData correlationData2=new CorrelationData("2");routingKey="key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);log.info("发送消息内容:{}",message);}
}
回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}
}
消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}
结果

回退消息
Mandatory 参数
生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);/*** true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* false:* 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(myCallBack);}@GetMapping("sendMessage")public void sendMessage(String message){//让消息绑定一个 id 值CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message+"key1",correlationData1);log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",message+"key2",correlationData2);log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");}
}
回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}//当消息无法路由的时候的回调方法@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, Stringexchange, String routingKey) {log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",newString(message.getBody()),exchange,replyText,routingKey);}
}
结果
接收到被退回的消息
备份交换机
架构

配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份 Exchange@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder =ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);return (DirectExchange)exchangeBuilder.build();}// 声明警告队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}
}
报警消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}
结果

相关文章:
springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。 发布确认 发布确认方案 架构 配置…...
【linux命令讲解大全】010. mapfile命令和tempfile命令的用法及示例
文章目录 mapfile概要主要用途选项参数返回值例子 tempfile补充说明tempfile 命令$$ 变量 从零学 python mapfile 从标准输入读取行并赋值到数组。 概要 mapfile [-d delim] [-n count] [-O origin] [-s count] [-t] [-u fd] [-C callback] [-c quantum] [array] 主要用途 …...
在 Python 中构建卷积神经网络; 从 0 到 9 的手绘数字的灰度图像预测数字
一、说明 为了预测从0到9的数字,我选择了一个基于著名的Kaggle的MNIST数据集的数据集。数据集包含从 <0> 到 <9> 的手绘图数字的灰度图像。在本文中,我将根据像素数据(即数值数据)和卷积神经网络预测数字。 二、 卷积…...
前端分页处理
页面中实现的分页效果,要么后端提供接口,每次点击下一页就调用接口,若不提供接口,分页得前端自己去截取。 方法一:slice方法 slice(参数1,参数2)方法是返回一个新的数组对象,左开右闭 参数1&…...
【C语言】位操作符的一些题目与技巧
初学者在学完位操作符之后,总是不能很好的掌握,因此这篇文章旨在巩固对位操作符的理解与使用。 有的题目可能会比较难以接受,但是看完一定会有收获 目录 位操作符:一些题目:不创建临时变量交换整数整数转换二进制中1的…...
爬虫逆向实战(二十二)--某恩数据电影票房
一、数据接口分析 主页地址:某恩数据 1、抓包 通过抓包可以发现数据接口是API/GetData.ashx 2、判断是否有加密参数 请求参数是否加密? 无请求头是否加密? 无响应是否加密? 通过查看“响应”模块可以发现,响应是…...
火山引擎发布自研视频编解码芯片
2023年8月22日,火山引擎视频云宣布其自研的视频编解码芯片已成功出片。经验证,该芯片的视频压缩效率相比行业主流硬件编码器可提升30%以上,未来将服务于抖音、西瓜视频等视频业务,并将通过火山引擎视频云开放给企业客户。 火山引…...
投递技术类简历的注意事项
简历修改的背景 作为程序员,随着工作年限的增加,要定期的去修改自己的简历中的工作项目,一方面可以促进自己复盘一下工作成果和个人成长,另外也能给自己换工作提供一个前置的便捷性。 注意事项 修改简历的时候有哪些需要注意的…...
每日一题——柱状图中最大的矩形
柱状图中最大的矩形 题目链接 用什么数据结构? 要得到柱状图中最大的矩形,我们就必须要知道对于每一个高度heights[i],他所能勾勒出的矩形最大是多少(即宽度最大是多少)。 而对应到图上我们可以知道,要知…...
Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台
Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台:BPI-5202信创工业控制开发平台 BPI-5202 龙芯2K1000LA 信创工业控制开发平台 1.1 工控机的应用场景 物联网的狂潮,既是一场众多的计算机软硬件厂家(也包括通讯方案和产品厂家&…...
springCloud整合Zookeeper的时候调用找不到服务
SpringCloud整合Zookeeper的时候调用找不到服务 首先,我们在注册中心注册了这个服务: 然后我们使用RestTemplate 调用的时候发现失败了:找不到这个服务: 找了很多资料发现这个必须要加上负载才行 BeanLoadBalanced //负载publi…...
【kubernetes】使用kubepshere部署中间件服务
KubeSphere部署中间件服务 入门使用KubeSphere部署单机版MySQL、Redis、RabbitMQ 记录一下搭建过程 (内容学习于尚硅谷云原生课程) 环境准备 VMware虚拟机k8s集群,一主两从,master也作为工作节点;KubeSphere k8skubesphere devops比较占用磁…...
如何从tabbar页面传数据
无论是百度小程序还是微信小程序,app.json中规定的tabbar页面是不支持传参的,例如: <navigator url../service/service?typeid6 openType"switchTab"> 服务项目 </navigator> 上面的navigater跳转有个属性&#…...
软考高级系统架构设计师系列论文七十四:基于构件的软件开发
软考高级系统架构设计师系列论文七十四:基于构件的软件开发 一、构件相关知识点二、摘要三、正文四、总结一、构件相关知识点 软考高级系统架构设计师系列之:面向构件的软件设计,构件平台与典型架构...
图为科技_边缘计算在智能安防领域的作用
边缘计算在智能安防领域发挥着重要的作用。智能安防系统通常需要处理大量的图像、视频和传感器数据,并对其进行实时分析和处理。边缘计算可以将计算和数据处理功能移动到离数据源更接近的地方,例如摄像头、传感器设备或安防终端。 以下是边缘计算在智能…...
Android 13 - Media框架(7)- NuPlayer::Source
Source 在播放器中起着拉流(Streaming)和解复用(demux)的作用,Source 设计的好坏直接影响到播放器的基础功能,我们这一节将会了解 NuPlayer 中的通用 Source(GenericSource)关注本地…...
MySql015——使用子查询
一、创建customers表 ######################## # Create customers table ######################## use study;CREATE TABLE customers (cust_id int NOT NULL AUTO_INCREMENT,cust_name char(50) NOT NULL ,cust_address char(50) NULL ,cust_city char…...
leetcode 355 设计推特
用链表存储用户发送的每一个推特,用堆获取最先的10条动态 class Twitter {Map<Integer,Set<Integer>> followMap;//规定最新的放到最后Map<Integer,Tweet> postMap;//优先队列(堆)PriorityQueue<Tweet> priorityQueue;int time…...
倒数 2 周|期待 2023 Google开发者大会
9 月 6-7 日,中国上海 前沿科技,新知同享 趣味体验,灵感齐聚 技术生态,多元共进 关注官网最新信息,敬请期待大会开幕 2023 Google 开发者大会官网 相信你一定记得,在今年 5 月的 Google I/O 大会上&am…...
代码随想录day57
516最长回文子序列 class Solution { public:int longestPalindromeSubseq(string s) {vector<vector<int>>dp(s.size(),vector<int>(s.size(),0));for(int i0;i<s.size();i)dp[i][i]1;for(int is.size()-1;i>0;i--){for(int ji1;j<s.size();j){if…...
XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》
引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
Java线上CPU飙高问题排查全指南
一、引言 在Java应用的线上运行环境中,CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时,通常会导致应用响应缓慢,甚至服务不可用,严重影响用户体验和业务运行。因此,掌握一套科学有效的CPU飙高问题排查方法&…...
多模态图像修复系统:基于深度学习的图片修复实现
多模态图像修复系统:基于深度学习的图片修复实现 1. 系统概述 本系统使用多模态大模型(Stable Diffusion Inpainting)实现图像修复功能,结合文本描述和图片输入,对指定区域进行内容修复。系统包含完整的数据处理、模型训练、推理部署流程。 import torch import numpy …...
【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅
目录 前言 操作系统与驱动程序 是什么,为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中,我们在使用电子设备时,我们所输入执行的每一条指令最终大多都会作用到硬件上,比如下载一款软件最终会下载到硬盘上&am…...
Vue 模板语句的数据来源
🧩 Vue 模板语句的数据来源:全方位解析 Vue 模板(<template> 部分)中的表达式、指令绑定(如 v-bind, v-on)和插值({{ }})都在一个特定的作用域内求值。这个作用域由当前 组件…...
uniapp 集成腾讯云 IM 富媒体消息(地理位置/文件)
UniApp 集成腾讯云 IM 富媒体消息全攻略(地理位置/文件) 一、功能实现原理 腾讯云 IM 通过 消息扩展机制 支持富媒体类型,核心实现方式: 标准消息类型:直接使用 SDK 内置类型(文件、图片等)自…...
Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...
