当前位置: 首页 > news >正文

RabbitMQ发布确认高级版

1.前言

在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?
在这里插入图片描述在这里插入图片描述

2.添加配置信息

在application.properties文件中添加如下配置,交换机开启消息确认模式

#NONE 值是禁用发布确认模式,是默认值
#CORRELATED 值是发布消息成功到交换器后会触发回调方法
#SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
# 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,
# 根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,
# 则接下来无法发送消息到 broker;
spring.rabbitmq.publisher-confirm-type=correlated
  • NONE 值是禁用发布确认模式,是默认值
  • CORRELATED 值是发布消息成功到交换器后会触发回调方法
  • SIMPLE 值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate 调用waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false则会关闭 channel,则接下来无法发送消息到 broker;

3. 配置类

package com.hong.springboot.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Description: 发布确认高级版配置类* @Author: hong* @Date: 2024-03-05 20:52* @Version: 1.0**/
@Configuration
public class ConfirmConfig {public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";public static final String CONFIRM_QUEUE_NAME = "confirm.queue";public static final String CONFIRM_ROUTING_KEY = "key1";//声明业务 Exchange@Bean("confirmExchange")public DirectExchange confirmExchange() {return new DirectExchange(CONFIRM_EXCHANGE_NAME);}// 声明确认队列@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();}// 声明确认队列绑定关系@Beanpublic Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);}
}

4.生产者

package com.hong.springboot.rabbitmq.controller;import com.hong.springboot.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Description: 发布确认高级版生产者* @Author: hong* @Date: 2024-03-05 20:58* @Version: 1.0**/
@Slf4j
@RequestMapping("/confirm/")
@RestController
public class ConfirmProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;//http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, message);}
}

5.消费者

package com.hong.springboot.rabbitmq.consumer;import com.hong.springboot.rabbitmq.config.ConfirmConfig;
import com.hong.springboot.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @Description: 发布确认高级版消费者* @Author: hong* @Date: 2024-03-05 21:05* @Version: 1.0**/
@Slf4j
@Component
public class ConfirmConsumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveConfirmMessage(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg);}
}

正常情况下,发送http://localhost:8080/confirm/sendMsg/Hi,JAVA小生不才
在这里插入图片描述

6.回调接口

package com.hong.springboot.rabbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** @Description: 发布确认高级版消息生产者的回调接口* @Author: hong* @Date: 2024-03-09 21:58* @Version: 1.0**/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}/*** 交换机不管是否收到消息的一个回调方法* 1.收到消息* correlationData   保存回调消息的id及相关信息* b true   交换机收到消息* s null* 2.未收到消息* correlationData   保存回调消息的id及相关信息* b false   交换机未收到消息* s 失败的原因* @param correlationData  消息相关数据* @param b           交换机是否收到消息* @param s             没收到消息的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id = correlationData != null ? correlationData.getId() : "";if (b) {log.info("交换机已经收到id为:{}的消息", id);} else {log.info("交换机还未收到id为:{}消息,原因:{}", id, s);}}
}

修改ConfirmProducerController中sendMsg方法
交换机改个名字模拟交换机收不到消息

    @GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {CorrelationData correlationData = new CorrelationData("1");log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"123", ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);}

在这里插入图片描述
将routingKey改个名字模拟队列收不到消息

    @GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message) {CorrelationData correlationData1 = new CorrelationData("1");log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY);rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY,correlationData1);CorrelationData correlationData2 = new CorrelationData("2");log.info("当前时间:{},发送信息给队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY+"abc");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"abc",message+"----"+ConfirmConfig.CONFIRM_ROUTING_KEY+"abc",correlationData2);}

在这里插入图片描述

7.回退消息

从以上模拟场景可以看出,在仅开启生产者确认机制,交换机接收到消息后,会直接给生产者发送确认消息,但若发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃的。因此我们借用mandatory参数在当消息传递过程中不可达目的地时将消息返回给生产者。

7.1.开启消息回退机制

配置文件中添加如下配置

#开启消息回退机制
spring.rabbitmq.publisher-returns=true

7.2. 添加消息回退回调

    @PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 当消息传递过程中不可达目的地时将消息返回给生产者* 只有不可达目的地时才回调* @param returnedMessage*/@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(),returnedMessage.getReplyText(), returnedMessage.getRoutingKey(), returnedMessage.getReplyCode());}

在这里插入图片描述

相关文章:

RabbitMQ发布确认高级版

1.前言 在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢&…...

【阿里云系列】-基于云效构建部署Springboot项目到ACK

介绍 为了提高项目迭代的速度加速交付产品给客户,我们通常会选择CICD工具来减少人力投入产生的成本,开源的工具比如有成熟的Jenkins,但是本文讲的是阿里云提高的解决方案云效平台,通过配置流水线的形式实现项目的快速部署到服务器…...

PyTorch搭建LeNet训练集详细实现

一、下载训练集 导包 import torch import torchvision import torch.nn as nn from model import LeNet import torch.optim as optim import torchvision.transforms as transforms import matplotlib.pyplot as plt import numpy as npToTensor()函数: 把图像…...

R语言复现:中国Charls数据库一篇现况调查论文的缺失数据填补方法

编者 在临床研究中,数据缺失是不可避免的,甚至没有缺失,数据的真实性都会受到质疑。 那我们该如何应对缺失的数据?放着不管?还是重新开始?不妨试着对缺失值进行填补,简单又高效。毕竟对于统计师来说&#…...

解决Git:Author identity unknown Please tell me who you are.

报错信息: 意思: 作者身份未知 ***请告诉我你是谁。 解决办法: git config --global user.name "你的名字"git config --global user.email "你的邮箱"...

Flink StreamTask启动和执行源码分析

文章目录 前言StreamTask 部署启动Task 线程启动StreamTask 初始化StreamTask 执行 前言 Flink的StreamTask的启动和执行是一个复杂的过程,涉及多个关键步骤。以下是StreamTask启动和执行的主要流程: 初始化:StreamTask的初始化阶段涉及多个…...

【MySQL 系列】MySQL 语句篇_DCL 语句

DCL( Data Control Language,数据控制语言)用于对数据访问权限进行控制,定义数据库、表、字段、用户的访问权限和安全级别。主要关键字包括 GRANT、 REVOKE 等。 文章目录 1、MySQL 中的 DCL 语句1.1、数据控制语言--DCL1.2、MySQ…...

什么是序列化?为什么需要序列化?

1、典型回答 序列化(Serialization)序列化是将对象转换为可存储或传输的形式的过程(例如: 将对象转换为字节流) 反序列化(Deserialization) 是将序列化后的数据(例如: 二进制文件)转换回原始对象的过程。通过反序列化,可以从存储介质 (如磁盘、数据库) 或通过网络…...

Linux本地搭建FastDFS系统

文章目录 前言1. 本地搭建FastDFS文件系统1.1 环境安装1.2 安装libfastcommon1.3 安装FastDFS1.4 配置Tracker1.5 配置Storage1.6 测试上传下载1.7 与Nginx整合1.8 安装Nginx1.9 配置Nginx 2. 局域网测试访问FastDFS3. 安装cpolar内网穿透4. 配置公网访问地址5. 固定公网地址5.…...

docker和docker-compose安装

一、docker安装 1、移除旧版本 依次执行如下命令移除旧版本docker,如未安装过无需执行 yum -y remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-selinux docker-engine-selinux…...

深入理解Spring的ApplicationContext:案例详解与应用

深入理解Spring的ApplicationContext:案例详解与应用 在Spring框架的丰富生态中,ApplicationContext扮演着至关重要的角色。作为BeanFactory的扩展,ApplicationContext不仅继承了其所有功能,还引入了更多高级特性,使得…...

6.Java并发编程—深入剖析Java Executors:探索创建线程的5种神奇方式

Executors快速创建线程池的方法 Java通过Executors 工厂提供了5种创建线程池的方法,具体方法如下 方法名描述newSingleThreadExecutor()创建一个单线程的线程池,该线程池中只有一个工作线程。所有任务按照提交的顺序依次执行,保证任务的顺序性…...

英语阅读挑战

英语阅读真是令人头痛的东西。可怜的子航想利用寒假时间突破英语难题。当他拿到一篇英语阅读时,他很好奇作者最喜欢用那些字母。 输入 一句30词以内的英语句子 输出 统计每个字母出现的次数 样例输入 复制 However,the British dont have a history of exporting th…...

备战蓝桥之思维

平台重叠真的坑 给你一句样例,如果你觉得自己的代码没问题那就试试吧 2 1 1 3 1 0 4 正确答案 0 0 0 0 P1105 平台 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) import java.awt.Checkbox; import java.awt.PageAttributes.OriginType; import java.io.B…...

09 string的实现

注意 实现仿cplus官网的的string类&#xff0c;对部分主要功能实现 实现 头文件 #pragma once #include <iostream> #include <assert.h> #include <string>namespace mystring {class string{friend std::ostream& operator<<(std::ostream&a…...

Git 进行版本控制时,配置 user.name 和 user.email

在使用 Git 进行版本控制时&#xff0c;配置 user.name 和 user.email 是一个非常重要的初始步骤&#xff0c;但不是绝对必须的。这两个配置项定义了当你进行提交&#xff08;commit&#xff09;时用于标识提交者的信息。 为什么建议配置 user.name 和 user.email 标识提交者…...

传统开发读写优化与HBase

目录: 一、传统开发数据读写性能优化 1. Mysql 分表、主从复制与读写分离 2. Redis(缓存型数据库)主从复制与读写分离 二、HBase 一、传统开发数据读写性能优化 1、Mysql 分表、主从复制与读写分离 mysql分库分表方案 一种分表方案&#xff1a;设置表A 表B 表A 自增列从1开始…...

【OpenGL实现 03】纹理贴图原理和实现

目录 一、说明二、纹理贴图原理2.1 纹理融合原理2.2 UV坐标原理 三、生成纹理对象3.1 需要在VAO上绑定纹理坐标3.2 纹理传递3.3 纹理buffer生成 四、代码实现&#xff1a;五、着色器4.1 片段4.2 顶点 五、后记 一、说明 本篇叙述在画出图元的时候&#xff0c;如何贴图纹理图片…...

FDU 2021 | 二叉树关键节点的个数

文章目录 1. 题目描述2. 我的尝试 1. 题目描述 给定一颗二叉树&#xff0c;树的每个节点的值为一个正整数。如果从根节点到节点 N 的路径上不存在比节点 N 的值大的节点&#xff0c;那么节点 N 被认为是树上的关键节点。求树上所有的关键节点的个数。请写出程序&#xff0c;并…...

精读《React Conf 2019 - Day2》

1 引言 这是继 精读《React Conf 2019 - Day1》 之后的第二篇&#xff0c;补充了 React Conf 2019 第二天的内容。 2 概述 & 精读 第二天的内容更为精彩&#xff0c;笔者会重点介绍比较干货的部分。 Fast refresh Fast refresh 是更好的 react-hot-loader 替代方案&am…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段&#xff1a; 构建阶段&#xff08;Build Stage&#xff09;&#xff1a…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

Oracle查询表空间大小

1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

C++.OpenGL (14/64)多光源(Multiple Lights)

多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

云原生安全实战:API网关Kong的鉴权与限流详解

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关&#xff08;API Gateway&#xff09; API网关是微服务架构中的核心组件&#xff0c;负责统一管理所有API的流量入口。它像一座…...

莫兰迪高级灰总结计划简约商务通用PPT模版

莫兰迪高级灰总结计划简约商务通用PPT模版&#xff0c;莫兰迪调色板清新简约工作汇报PPT模版&#xff0c;莫兰迪时尚风极简设计PPT模版&#xff0c;大学生毕业论文答辩PPT模版&#xff0c;莫兰迪配色总结计划简约商务通用PPT模版&#xff0c;莫兰迪商务汇报PPT模版&#xff0c;…...

【SpringBoot自动化部署】

SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一&#xff0c;能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时&#xff0c;需要添加Git仓库地址和凭证&#xff0c;设置构建触发器&#xff08;如GitHub…...

go 里面的指针

指针 在 Go 中&#xff0c;指针&#xff08;pointer&#xff09;是一个变量的内存地址&#xff0c;就像 C 语言那样&#xff1a; a : 10 p : &a // p 是一个指向 a 的指针 fmt.Println(*p) // 输出 10&#xff0c;通过指针解引用• &a 表示获取变量 a 的地址 p 表示…...