RabbitMQ发布确认高级版
1.前言
在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?
2.添加配置信息
在application.properties文件中添加如下配置,交换机开启消息确认模式
#NONE 值是禁用发布确认模式,是默认值
#CORRELATED 值是发布消息成功到交换器后会触发回调方法
#SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
# 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,
# 根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,
# 则接下来无法发送消息到 broker;
spring.rabbitmq.publisher-confirm-type=correlated
- NONE 值是禁用发布确认模式,是默认值
- CORRELATED 值是发布消息成功到交换器后会触发回调方法
- SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate 调用waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false则会关闭 channel,则接下来无法发送消息到 broker;
3. 配置类
package com.hong.springboot.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Description: 发布确认高级版配置类* @Author: hong* @Date: 2024-03-05 20:52* @Version: 1.0**/
@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String CONFIRM_ROUTING_KEY = "key1";//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);}
}
4.生产者
package com.hong.springboot.rabbitmq.controller;import com.hong.springboot.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Description: 发布确认高级版生产者* @Author: hong* @Date: 2024-03-05 20:58* @Version: 1.0**/
@Slf4j
@RequestMapping("/confirm/")
@RestController
public class ConfirmProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;//http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message);}
}
5.消费者
package com.hong.springboot.rabbitmq.consumer;import com.hong.springboot.rabbitmq.config.ConfirmConfig;
import com.hong.springboot.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Description: 发布确认高级版消费者* @Author: hong* @Date: 2024-03-05 21:05* @Version: 1.0**/
@Slf4j
@Component
public class ConfirmConsumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMessage(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg);}
}
正常情况下,发送http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才
6.回调接口
package com.hong.springboot.rabbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @Description: 发布确认高级版消息生产者的回调接口* @Author: hong* @Date: 2024-03-09 21:58* @Version: 1.0**/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}/*** 交换机不管是否收到消息的一个回调方法* 1.收到消息* correlationData 保存回调消息的id及相关信息* b true 交换机收到消息* s null* 2.未收到消息* correlationData 保存回调消息的id及相关信息* b false 交换机未收到消息* s 失败的原因* @param correlationData 消息相关数据* @param b 交换机是否收到消息* @param s 没收到消息的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id = correlationData != null ? correlationData.getId() : "";if (b) {log.info("交换机已经收到id为:{}的消息", id);} else {log.info("交换机还未收到id为:{}消息,原因:{}", id, s);}}
}
修改ConfirmProducerController中sendMsg方法
交换机改个名字模拟交换机收不到消息
@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {CorrelationData correlationData = new CorrelationData("1");log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"123", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);}
将routingKey改个名字模拟队列收不到消息
@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {CorrelationData correlationData1 = new CorrelationData("1");log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY,correlationData1);CorrelationData correlationData2 = new CorrelationData("2");log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY+"abc");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"abc",message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY+"abc",correlationData2);}
7.回退消息
从以上模拟场景可以看出,在仅开启生产者确认机制,交换机接收到消息后,会直接给生产者发送确认消息,但若发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃的。因此我们借用mandatory参数在当消息传递过程中不可达目的地时将消息返回给生产者。
7.1.开启消息回退机制
配置文件中添加如下配置
#开启消息回退机制
spring.rabbitmq.publisher-returns=true
7.2. 添加消息回退回调
@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 当消息传递过程中不可达目的地时将消息返回给生产者* 只有不可达目的地时才回调* @param returnedMessage*/@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(),returnedMessage.getReplyText(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());}
相关文章:

RabbitMQ发布确认高级版
1.前言 在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢&…...

【阿里云系列】-基于云效构建部署Springboot项目到ACK
介绍 为了提高项目迭代的速度加速交付产品给客户,我们通常会选择CICD工具来减少人力投入产生的成本,开源的工具比如有成熟的Jenkins,但是本文讲的是阿里云提高的解决方案云效平台,通过配置流水线的形式实现项目的快速部署到服务器…...

PyTorch搭建LeNet训练集详细实现
一、下载训练集 导包 import torch import torchvision import torch.nn as nn from model import LeNet import torch.optim as optim import torchvision.transforms as transforms import matplotlib.pyplot as plt import numpy as npToTensor()函数: 把图像…...

R语言复现:中国Charls数据库一篇现况调查论文的缺失数据填补方法
编者 在临床研究中,数据缺失是不可避免的,甚至没有缺失,数据的真实性都会受到质疑。 那我们该如何应对缺失的数据?放着不管?还是重新开始?不妨试着对缺失值进行填补,简单又高效。毕竟对于统计师来说&#…...

解决Git:Author identity unknown Please tell me who you are.
报错信息: 意思: 作者身份未知 ***请告诉我你是谁。 解决办法: git config --global user.name "你的名字"git config --global user.email "你的邮箱"...
Flink StreamTask启动和执行源码分析
文章目录 前言StreamTask 部署启动Task 线程启动StreamTask 初始化StreamTask 执行 前言 Flink的StreamTask的启动和执行是一个复杂的过程,涉及多个关键步骤。以下是StreamTask启动和执行的主要流程: 初始化:StreamTask的初始化阶段涉及多个…...
【MySQL 系列】MySQL 语句篇_DCL 语句
DCL( Data Control Language,数据控制语言)用于对数据访问权限进行控制,定义数据库、表、字段、用户的访问权限和安全级别。主要关键字包括 GRANT、 REVOKE 等。 文章目录 1、MySQL 中的 DCL 语句1.1、数据控制语言--DCL1.2、MySQ…...

什么是序列化?为什么需要序列化?
1、典型回答 序列化(Serialization)序列化是将对象转换为可存储或传输的形式的过程(例如: 将对象转换为字节流) 反序列化(Deserialization) 是将序列化后的数据(例如: 二进制文件)转换回原始对象的过程。通过反序列化,可以从存储介质 (如磁盘、数据库) 或通过网络…...

Linux本地搭建FastDFS系统
文章目录 前言1. 本地搭建FastDFS文件系统1.1 环境安装1.2 安装libfastcommon1.3 安装FastDFS1.4 配置Tracker1.5 配置Storage1.6 测试上传下载1.7 与Nginx整合1.8 安装Nginx1.9 配置Nginx 2. 局域网测试访问FastDFS3. 安装cpolar内网穿透4. 配置公网访问地址5. 固定公网地址5.…...
docker和docker-compose安装
一、docker安装 1、移除旧版本 依次执行如下命令移除旧版本docker,如未安装过无需执行 yum -y remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-selinux docker-engine-selinux…...
深入理解Spring的ApplicationContext:案例详解与应用
深入理解Spring的ApplicationContext:案例详解与应用 在Spring框架的丰富生态中,ApplicationContext扮演着至关重要的角色。作为BeanFactory的扩展,ApplicationContext不仅继承了其所有功能,还引入了更多高级特性,使得…...

6.Java并发编程—深入剖析Java Executors:探索创建线程的5种神奇方式
Executors快速创建线程池的方法 Java通过Executors 工厂提供了5种创建线程池的方法,具体方法如下 方法名描述newSingleThreadExecutor()创建一个单线程的线程池,该线程池中只有一个工作线程。所有任务按照提交的顺序依次执行,保证任务的顺序性…...
英语阅读挑战
英语阅读真是令人头痛的东西。可怜的子航想利用寒假时间突破英语难题。当他拿到一篇英语阅读时,他很好奇作者最喜欢用那些字母。 输入 一句30词以内的英语句子 输出 统计每个字母出现的次数 样例输入 复制 However,the British dont have a history of exporting th…...
备战蓝桥之思维
平台重叠真的坑 给你一句样例,如果你觉得自己的代码没问题那就试试吧 2 1 1 3 1 0 4 正确答案 0 0 0 0 P1105 平台 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) import java.awt.Checkbox; import java.awt.PageAttributes.OriginType; import java.io.B…...
09 string的实现
注意 实现仿cplus官网的的string类,对部分主要功能实现 实现 头文件 #pragma once #include <iostream> #include <assert.h> #include <string>namespace mystring {class string{friend std::ostream& operator<<(std::ostream&a…...
Git 进行版本控制时,配置 user.name 和 user.email
在使用 Git 进行版本控制时,配置 user.name 和 user.email 是一个非常重要的初始步骤,但不是绝对必须的。这两个配置项定义了当你进行提交(commit)时用于标识提交者的信息。 为什么建议配置 user.name 和 user.email 标识提交者…...
传统开发读写优化与HBase
目录: 一、传统开发数据读写性能优化 1. Mysql 分表、主从复制与读写分离 2. Redis(缓存型数据库)主从复制与读写分离 二、HBase 一、传统开发数据读写性能优化 1、Mysql 分表、主从复制与读写分离 mysql分库分表方案 一种分表方案:设置表A 表B 表A 自增列从1开始…...

【OpenGL实现 03】纹理贴图原理和实现
目录 一、说明二、纹理贴图原理2.1 纹理融合原理2.2 UV坐标原理 三、生成纹理对象3.1 需要在VAO上绑定纹理坐标3.2 纹理传递3.3 纹理buffer生成 四、代码实现:五、着色器4.1 片段4.2 顶点 五、后记 一、说明 本篇叙述在画出图元的时候,如何贴图纹理图片…...

FDU 2021 | 二叉树关键节点的个数
文章目录 1. 题目描述2. 我的尝试 1. 题目描述 给定一颗二叉树,树的每个节点的值为一个正整数。如果从根节点到节点 N 的路径上不存在比节点 N 的值大的节点,那么节点 N 被认为是树上的关键节点。求树上所有的关键节点的个数。请写出程序,并…...

精读《React Conf 2019 - Day2》
1 引言 这是继 精读《React Conf 2019 - Day1》 之后的第二篇,补充了 React Conf 2019 第二天的内容。 2 概述 & 精读 第二天的内容更为精彩,笔者会重点介绍比较干货的部分。 Fast refresh Fast refresh 是更好的 react-hot-loader 替代方案&am…...

dvwa5——File Upload
LOW 在dvwa里建一个testd2.php文件,写入一句话木马,密码password antsword连接 直接上传testd2.php文件,上传成功 MEDIUM 查看源码,发现这一关只能提交jpg和png格式的文件 把testd2.php的后缀改成jpg,上传时用bp抓包…...

Redis:Hash数据类型
🌈 个人主页:Zfox_ 🔥 系列专栏:Redis 🔥 Hash哈希 🐳 ⼏乎所有的主流编程语⾔都提供了哈希(hash)类型,它们的叫法可能是哈希、字典、关联数组、映射。在Redis中&#…...
DeepSeek 赋能智能养老:情感陪伴机器人的温暖革新
目录 一、引言二、智能养老情感陪伴机器人的市场现状与需求2.1 市场现状2.2 老年人情感陪伴需求分析 三、DeepSeek 技术详解3.1 DeepSeek 的技术特点3.2 与其他类似技术的对比优势 四、DeepSeek 在智能养老情感陪伴机器人中的具体应用4.1 自然语言处理与对话交互4.2 情感识别与…...

浏览器工作原理05 [#] 渲染流程(上):HTML、CSS和JavaScript是如何变成页面的
引用 浏览器工作原理与实践 一、提出问题 在上一篇文章中我们介绍了导航相关的流程,那导航被提交后又会怎么样呢?就进入了渲染阶段。这个阶段很重要,了解其相关流程能让你“看透”页面是如何工作的,有了这些知识,你可…...

高频通信与航天电子的材料革命:猎板PCB高端压合基材技术解析
—聚酰亚胺/陶瓷基板在5G与航天场景的产业化应用 一、极端环境材料体系:突破温域与频率极限 聚酰亚胺基板(PI)的航天级稳定性 猎板在卫星通信PCB中采用真空层压工艺处理聚酰亚胺基材(Dk≈10.2)&a…...
赋能大型语言模型与外部世界交互——函数调用的崛起
大型语言模型 (LLM) 近年来在自然语言处理领域取得了革命性的进展,展现出强大的文本理解、生成和对话能力。然而,这些模型在与外部实时数据源和动态系统交互方面存在固有的局限性 1。它们主要依赖于训练阶段学习到的静态知识,难以直接访问和利…...
【Kotlin】注解反射扩展
文章目录 注解用法反射类引用 扩展扩展函数的作用域成员方法优先级总高于扩展函数 被滥用的扩展函数扩展属性静态扩展 标准库中的扩展函数 使用 T.also 函数交换两个变量sNullOrEmpty | isNullOrBlankwith函数repeat函数 调度方式对扩展函数的影响静态与动态调度扩展函数始终静…...

数据通信与计算机网络——数据与信号
主要内容 模拟与数字 周期模拟信号 数字信号 传输减损 数据速率限制 性能 注:数据必须被转换成电磁信号才能进行传输。 一、模拟与数字 数据以及表示数据的信号可以使用模拟或者数字的形式。数据可以是模拟的也可以是数字的,模拟数据是连续的采用…...

AWS App Mesh实战:构建可观测、安全的微服务通信解决方案
摘要:本文详解如何利用AWS App Mesh统一管理微服务间通信,实现精细化流量控制、端到端可观测性与安全通信,提升云原生应用稳定性。 一、什么是AWS App Mesh? AWS App Mesh 是一种服务网格(Service Mesh)解…...

Imprompter: Tricking LLM Agents into Improper Tool Use
原文:Imprompter: Tricking LLM Agents into Improper Tool Use 代码:Reapor-Yurnero/imprompter: Codebase of https://arxiv.org/abs/2410.14923 实机演示:Imprompter 摘要: 新兴发展的Agent可以将LLM与外部资源工具相结合&a…...