springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。
发布确认
发布确认方案
架构
配置文件
在配置文件当中添加 spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.host=43.139.59.23
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-confirm-type=correlated
配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}
}
生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);}@GetMapping("sendMessage/{message}")public void sendMessage(@PathVariable String message){//指定消息 id 为 1CorrelationData correlationData1=new CorrelationData("1");String routingKey="key1";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);CorrelationData correlationData2=new CorrelationData("2");routingKey="key2";rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);log.info("发送消息内容:{}",message);}
}
回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}
}
消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}
结果
回退消息
Mandatory 参数
生产者
import com.example.demo.component.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.UUID;@RestController
@RequestMapping("/confirm")
@Slf4j
public class Producer {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate MyCallBack myCallBack;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(myCallBack);/*** true:* 交换机无法将消息进行路由时,会将该消息返回给生产者* false:* 如果发现消息无法进行路由,则直接丢弃*/rabbitTemplate.setMandatory(true);//设置回退消息交给谁处理rabbitTemplate.setReturnCallback(myCallBack);}@GetMapping("sendMessage")public void sendMessage(String message){//让消息绑定一个 id 值CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",message+"key1",correlationData1);log.info("发送消息 id 为:{}内容为{}",correlationData1.getId(),message+"key1");CorrelationData correlationData2 = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",message+"key2",correlationData2);log.info("发送消息 id 为:{}内容为{}",correlationData2.getId(),message+"key2");}
}
回调接口
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {/*** 交换机不管是否收到消息的一个回调方法* CorrelationData* 消息相关数据* ack* 交换机是否收到消息*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id=correlationData!=null?correlationData.getId():"";if(ack){log.info("交换机已经收到 id 为:{}的消息",id);}else{log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);}}//当消息无法路由的时候的回调方法@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, Stringexchange, String routingKey) {log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",newString(message.getBody()),exchange,replyText,routingKey);}
}
结果
接收到被退回的消息
备份交换机
架构
配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";public static final String BACKUP_QUEUE_NAME = "backup.queue";public static final String WARNING_QUEUE_NAME = "warning.queue";// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}//声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with("key1");}//声明备份 Exchange@Bean("backupExchange")public FanoutExchange backupExchange(){return new FanoutExchange(BACKUP_EXCHANGE_NAME);}//声明确认 Exchange 交换机的备份交换机@Bean("confirmExchange")public DirectExchange confirmExchange(){ExchangeBuilder exchangeBuilder =ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)//设置该交换机的备份交换机.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);return (DirectExchange)exchangeBuilder.build();}// 声明警告队列@Bean("warningQueue")public Queue warningQueue(){return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}// 声明报警队列绑定关系@Beanpublic Binding warningBinding(@Qualifier("warningQueue") Queue queue,@Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}// 声明备份队列@Bean("backQueue")public Queue backQueue(){return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();}// 声明备份队列绑定关系@Beanpublic Binding backupBinding(@Qualifier("backQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange backupExchange){return BindingBuilder.bind(queue).to(backupExchange);}
}
报警消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmConsumer {public static final String CONFIRM_QUEUE_NAME = "confirm.queue";@RabbitListener(queues =CONFIRM_QUEUE_NAME)public void receiveMsg(Message message){String msg=new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}",msg);}
}
结果
相关文章:

springboot整合rabbitmq发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们如何才能进行 RabbitMQ 的消息可靠投递。 发布确认 发布确认方案 架构 配置…...
【linux命令讲解大全】010. mapfile命令和tempfile命令的用法及示例
文章目录 mapfile概要主要用途选项参数返回值例子 tempfile补充说明tempfile 命令$$ 变量 从零学 python mapfile 从标准输入读取行并赋值到数组。 概要 mapfile [-d delim] [-n count] [-O origin] [-s count] [-t] [-u fd] [-C callback] [-c quantum] [array] 主要用途 …...

在 Python 中构建卷积神经网络; 从 0 到 9 的手绘数字的灰度图像预测数字
一、说明 为了预测从0到9的数字,我选择了一个基于著名的Kaggle的MNIST数据集的数据集。数据集包含从 <0> 到 <9> 的手绘图数字的灰度图像。在本文中,我将根据像素数据(即数值数据)和卷积神经网络预测数字。 二、 卷积…...

前端分页处理
页面中实现的分页效果,要么后端提供接口,每次点击下一页就调用接口,若不提供接口,分页得前端自己去截取。 方法一:slice方法 slice(参数1,参数2)方法是返回一个新的数组对象,左开右闭 参数1&…...

【C语言】位操作符的一些题目与技巧
初学者在学完位操作符之后,总是不能很好的掌握,因此这篇文章旨在巩固对位操作符的理解与使用。 有的题目可能会比较难以接受,但是看完一定会有收获 目录 位操作符:一些题目:不创建临时变量交换整数整数转换二进制中1的…...

爬虫逆向实战(二十二)--某恩数据电影票房
一、数据接口分析 主页地址:某恩数据 1、抓包 通过抓包可以发现数据接口是API/GetData.ashx 2、判断是否有加密参数 请求参数是否加密? 无请求头是否加密? 无响应是否加密? 通过查看“响应”模块可以发现,响应是…...

火山引擎发布自研视频编解码芯片
2023年8月22日,火山引擎视频云宣布其自研的视频编解码芯片已成功出片。经验证,该芯片的视频压缩效率相比行业主流硬件编码器可提升30%以上,未来将服务于抖音、西瓜视频等视频业务,并将通过火山引擎视频云开放给企业客户。 火山引…...
投递技术类简历的注意事项
简历修改的背景 作为程序员,随着工作年限的增加,要定期的去修改自己的简历中的工作项目,一方面可以促进自己复盘一下工作成果和个人成长,另外也能给自己换工作提供一个前置的便捷性。 注意事项 修改简历的时候有哪些需要注意的…...

每日一题——柱状图中最大的矩形
柱状图中最大的矩形 题目链接 用什么数据结构? 要得到柱状图中最大的矩形,我们就必须要知道对于每一个高度heights[i],他所能勾勒出的矩形最大是多少(即宽度最大是多少)。 而对应到图上我们可以知道,要知…...

Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台
Banana Pi推出基于龙芯2K1000LA处理器的信创工业控制开发平台:BPI-5202信创工业控制开发平台 BPI-5202 龙芯2K1000LA 信创工业控制开发平台 1.1 工控机的应用场景 物联网的狂潮,既是一场众多的计算机软硬件厂家(也包括通讯方案和产品厂家&…...

springCloud整合Zookeeper的时候调用找不到服务
SpringCloud整合Zookeeper的时候调用找不到服务 首先,我们在注册中心注册了这个服务: 然后我们使用RestTemplate 调用的时候发现失败了:找不到这个服务: 找了很多资料发现这个必须要加上负载才行 BeanLoadBalanced //负载publi…...

【kubernetes】使用kubepshere部署中间件服务
KubeSphere部署中间件服务 入门使用KubeSphere部署单机版MySQL、Redis、RabbitMQ 记录一下搭建过程 (内容学习于尚硅谷云原生课程) 环境准备 VMware虚拟机k8s集群,一主两从,master也作为工作节点;KubeSphere k8skubesphere devops比较占用磁…...
如何从tabbar页面传数据
无论是百度小程序还是微信小程序,app.json中规定的tabbar页面是不支持传参的,例如: <navigator url../service/service?typeid6 openType"switchTab"> 服务项目 </navigator> 上面的navigater跳转有个属性&#…...
软考高级系统架构设计师系列论文七十四:基于构件的软件开发
软考高级系统架构设计师系列论文七十四:基于构件的软件开发 一、构件相关知识点二、摘要三、正文四、总结一、构件相关知识点 软考高级系统架构设计师系列之:面向构件的软件设计,构件平台与典型架构...
图为科技_边缘计算在智能安防领域的作用
边缘计算在智能安防领域发挥着重要的作用。智能安防系统通常需要处理大量的图像、视频和传感器数据,并对其进行实时分析和处理。边缘计算可以将计算和数据处理功能移动到离数据源更接近的地方,例如摄像头、传感器设备或安防终端。 以下是边缘计算在智能…...

Android 13 - Media框架(7)- NuPlayer::Source
Source 在播放器中起着拉流(Streaming)和解复用(demux)的作用,Source 设计的好坏直接影响到播放器的基础功能,我们这一节将会了解 NuPlayer 中的通用 Source(GenericSource)关注本地…...

MySql015——使用子查询
一、创建customers表 ######################## # Create customers table ######################## use study;CREATE TABLE customers (cust_id int NOT NULL AUTO_INCREMENT,cust_name char(50) NOT NULL ,cust_address char(50) NULL ,cust_city char…...
leetcode 355 设计推特
用链表存储用户发送的每一个推特,用堆获取最先的10条动态 class Twitter {Map<Integer,Set<Integer>> followMap;//规定最新的放到最后Map<Integer,Tweet> postMap;//优先队列(堆)PriorityQueue<Tweet> priorityQueue;int time…...

倒数 2 周|期待 2023 Google开发者大会
9 月 6-7 日,中国上海 前沿科技,新知同享 趣味体验,灵感齐聚 技术生态,多元共进 关注官网最新信息,敬请期待大会开幕 2023 Google 开发者大会官网 相信你一定记得,在今年 5 月的 Google I/O 大会上&am…...
代码随想录day57
516最长回文子序列 class Solution { public:int longestPalindromeSubseq(string s) {vector<vector<int>>dp(s.size(),vector<int>(s.size(),0));for(int i0;i<s.size();i)dp[i][i]1;for(int is.size()-1;i>0;i--){for(int ji1;j<s.size();j){if…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...

如何理解 IP 数据报中的 TTL?
目录 前言理解 前言 面试灵魂一问:说说对 IP 数据报中 TTL 的理解?我们都知道,IP 数据报由首部和数据两部分组成,首部又分为两部分:固定部分和可变部分,共占 20 字节,而即将讨论的 TTL 就位于首…...

iview框架主题色的应用
1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题,无需引入,直接可…...
区块链技术概述
区块链技术是一种去中心化、分布式账本技术,通过密码学、共识机制和智能合约等核心组件,实现数据不可篡改、透明可追溯的系统。 一、核心技术 1. 去中心化 特点:数据存储在网络中的多个节点(计算机),而非…...
绕过 Xcode?使用 Appuploader和主流工具实现 iOS 上架自动化
iOS 应用的发布流程一直是开发链路中最“苹果味”的环节:强依赖 Xcode、必须使用 macOS、各种证书和描述文件配置……对很多跨平台开发者来说,这一套流程并不友好。 特别是当你的项目主要在 Windows 或 Linux 下开发(例如 Flutter、React Na…...
简单介绍C++中 string与wstring
在C中,string和wstring是两种用于处理不同字符编码的字符串类型,分别基于char和wchar_t字符类型。以下是它们的详细说明和对比: 1. 基础定义 string 类型:std::string 字符类型:char(通常为8位)…...
用鸿蒙HarmonyOS5实现国际象棋小游戏的过程
下面是一个基于鸿蒙OS (HarmonyOS) 的国际象棋小游戏的完整实现代码,使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├── …...

python可视化:俄乌战争时间线关键节点与深层原因
俄乌战争时间线可视化分析:关键节点与深层原因 俄乌战争是21世纪欧洲最具影响力的地缘政治冲突之一,自2022年2月爆发以来已持续超过3年。 本文将通过Python可视化工具,系统分析这场战争的时间线、关键节点及其背后的深层原因,全面…...