springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。
发布确认
发布确认方案

架构
 
 
配置文件
在配置文件当中添加 spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.host=43.139.59.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}
}生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);}@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message){//指定消息 id 为 1CorrelationData correlationData1=new CorrelationData("1");String routingKey="key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);CorrelationData correlationData2=new CorrelationData("2");routingKey="key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);log.info("发送消息内容:{}",message);}
}回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}
}消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}结果

回退消息
Mandatory 参数
生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);/*** true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* false:* 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(myCallBack);}@GetMapping("sendMessage")public void sendMessage(String message){//让消息绑定一个 id 值CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message+"key1",correlationData1);log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",message+"key2",correlationData2);log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");}
}回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}//当消息无法路由的时候的回调方法@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, Stringexchange, String routingKey) {log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",newString(message.getBody()),exchange,replyText,routingKey);}
}结果
 接收到被退回的消息
 接收到被退回的消息
备份交换机
架构

配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份 Exchange@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder =ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);return (DirectExchange)exchangeBuilder.build();}// 声明警告队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}
}报警消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}
结果

相关文章:
 
springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。 发布确认 发布确认方案 架构 配置…...
【linux命令讲解大全】010. mapfile命令和tempfile命令的用法及示例
文章目录 mapfile概要主要用途选项参数返回值例子 tempfile补充说明tempfile 命令$$ 变量 从零学 python mapfile 从标准输入读取行并赋值到数组。 概要 mapfile [-d delim] [-n count] [-O origin] [-s count] [-t] [-u fd] [-C callback] [-c quantum] [array] 主要用途 …...
 
在 Python 中构建卷积神经网络; 从 0 到 9 的手绘数字的灰度图像预测数字
一、说明 为了预测从0到9的数字,我选择了一个基于著名的Kaggle的MNIST数据集的数据集。数据集包含从 <0> 到 <9> 的手绘图数字的灰度图像。在本文中,我将根据像素数据(即数值数据)和卷积神经网络预测数字。 二、 卷积…...
 
前端分页处理
页面中实现的分页效果,要么后端提供接口,每次点击下一页就调用接口,若不提供接口,分页得前端自己去截取。 方法一:slice方法 slice(参数1,参数2)方法是返回一个新的数组对象,左开右闭 参数1&…...
 
【C语言】位操作符的一些题目与技巧
初学者在学完位操作符之后,总是不能很好的掌握,因此这篇文章旨在巩固对位操作符的理解与使用。 有的题目可能会比较难以接受,但是看完一定会有收获 目录 位操作符:一些题目:不创建临时变量交换整数整数转换二进制中1的…...
 
爬虫逆向实战(二十二)--某恩数据电影票房
一、数据接口分析 主页地址:某恩数据 1、抓包 通过抓包可以发现数据接口是API/GetData.ashx 2、判断是否有加密参数 请求参数是否加密? 无请求头是否加密? 无响应是否加密? 通过查看“响应”模块可以发现,响应是…...
 
火山引擎发布自研视频编解码芯片
2023年8月22日,火山引擎视频云宣布其自研的视频编解码芯片已成功出片。经验证,该芯片的视频压缩效率相比行业主流硬件编码器可提升30%以上,未来将服务于抖音、西瓜视频等视频业务,并将通过火山引擎视频云开放给企业客户。 火山引…...
投递技术类简历的注意事项
简历修改的背景 作为程序员,随着工作年限的增加,要定期的去修改自己的简历中的工作项目,一方面可以促进自己复盘一下工作成果和个人成长,另外也能给自己换工作提供一个前置的便捷性。 注意事项 修改简历的时候有哪些需要注意的…...
 
每日一题——柱状图中最大的矩形
柱状图中最大的矩形 题目链接 用什么数据结构? 要得到柱状图中最大的矩形,我们就必须要知道对于每一个高度heights[i],他所能勾勒出的矩形最大是多少(即宽度最大是多少)。 而对应到图上我们可以知道,要知…...
 
Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台
Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台:BPI-5202信创工业控制开发平台 BPI-5202 龙芯2K1000LA 信创工业控制开发平台 1.1 工控机的应用场景 物联网的狂潮,既是一场众多的计算机软硬件厂家(也包括通讯方案和产品厂家&…...
 
springCloud整合Zookeeper的时候调用找不到服务
SpringCloud整合Zookeeper的时候调用找不到服务 首先,我们在注册中心注册了这个服务: 然后我们使用RestTemplate 调用的时候发现失败了:找不到这个服务: 找了很多资料发现这个必须要加上负载才行 BeanLoadBalanced //负载publi…...
 
【kubernetes】使用kubepshere部署中间件服务
KubeSphere部署中间件服务 入门使用KubeSphere部署单机版MySQL、Redis、RabbitMQ 记录一下搭建过程 (内容学习于尚硅谷云原生课程) 环境准备 VMware虚拟机k8s集群,一主两从,master也作为工作节点;KubeSphere k8skubesphere devops比较占用磁…...
如何从tabbar页面传数据
无论是百度小程序还是微信小程序,app.json中规定的tabbar页面是不支持传参的,例如: <navigator url../service/service?typeid6 openType"switchTab"> 服务项目 </navigator> 上面的navigater跳转有个属性&#…...
软考高级系统架构设计师系列论文七十四:基于构件的软件开发
软考高级系统架构设计师系列论文七十四:基于构件的软件开发 一、构件相关知识点二、摘要三、正文四、总结一、构件相关知识点 软考高级系统架构设计师系列之:面向构件的软件设计,构件平台与典型架构...
图为科技_边缘计算在智能安防领域的作用
边缘计算在智能安防领域发挥着重要的作用。智能安防系统通常需要处理大量的图像、视频和传感器数据,并对其进行实时分析和处理。边缘计算可以将计算和数据处理功能移动到离数据源更接近的地方,例如摄像头、传感器设备或安防终端。 以下是边缘计算在智能…...
 
Android 13 - Media框架(7)- NuPlayer::Source
Source 在播放器中起着拉流(Streaming)和解复用(demux)的作用,Source 设计的好坏直接影响到播放器的基础功能,我们这一节将会了解 NuPlayer 中的通用 Source(GenericSource)关注本地…...
 
MySql015——使用子查询
一、创建customers表 ######################## # Create customers table ######################## use study;CREATE TABLE customers (cust_id int NOT NULL AUTO_INCREMENT,cust_name char(50) NOT NULL ,cust_address char(50) NULL ,cust_city char…...
leetcode 355 设计推特
用链表存储用户发送的每一个推特,用堆获取最先的10条动态 class Twitter {Map<Integer,Set<Integer>> followMap;//规定最新的放到最后Map<Integer,Tweet> postMap;//优先队列(堆)PriorityQueue<Tweet> priorityQueue;int time…...
 
倒数 2 周|期待 2023 Google开发者大会
9 月 6-7 日,中国上海 前沿科技,新知同享 趣味体验,灵感齐聚 技术生态,多元共进 关注官网最新信息,敬请期待大会开幕 2023 Google 开发者大会官网 相信你一定记得,在今年 5 月的 Google I/O 大会上&am…...
代码随想录day57
516最长回文子序列 class Solution { public:int longestPalindromeSubseq(string s) {vector<vector<int>>dp(s.size(),vector<int>(s.size(),0));for(int i0;i<s.size();i)dp[i][i]1;for(int is.size()-1;i>0;i--){for(int ji1;j<s.size();j){if…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
 
什么是库存周转?如何用进销存系统提高库存周转率?
你可能听说过这样一句话: “利润不是赚出来的,是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业,很多企业看着销售不错,账上却没钱、利润也不见了,一翻库存才发现: 一堆卖不动的旧货…...
 
Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
 
wpf在image控件上快速显示内存图像
wpf在image控件上快速显示内存图像https://www.cnblogs.com/haodafeng/p/10431387.html 如果你在寻找能够快速在image控件刷新大图像(比如分辨率3000*3000的图像)的办法,尤其是想把内存中的裸数据(只有图像的数据,不包…...
 
C++实现分布式网络通信框架RPC(2)——rpc发布端
有了上篇文章的项目的基本知识的了解,现在我们就开始构建项目。 目录 一、构建工程目录 二、本地服务发布成RPC服务 2.1理解RPC发布 2.2实现 三、Mprpc框架的基础类设计 3.1框架的初始化类 MprpcApplication 代码实现 3.2读取配置文件类 MprpcConfig 代码实现…...
 
针对药品仓库的效期管理问题,如何利用WMS系统“破局”
案例: 某医药分销企业,主要经营各类药品的批发与零售。由于药品的特殊性,效期管理至关重要,但该企业一直面临效期问题的困扰。在未使用WMS系统之前,其药品入库、存储、出库等环节的效期管理主要依赖人工记录与检查。库…...
用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法
用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法 大家好,我是Echo_Wish。最近刷短视频、看直播,有没有发现,越来越多的应用都开始“懂你”了——它们能感知你的情绪,推荐更合适的内容,甚至帮客服识别用户情绪,提升服务体验。这背后,神经网络在悄悄发力,撑起…...
 
高效的后台管理系统——可进行二次开发
随着互联网技术的迅猛发展,企业的数字化管理变得愈加重要。后台管理系统作为数据存储与业务管理的核心,成为了现代企业不可或缺的一部分。今天我们要介绍的是一款名为 若依后台管理框架 的系统,它不仅支持跨平台应用,还能提供丰富…...
