当前位置: 首页 > news >正文

尚硅谷rabbitmq 2024 消息可靠性答疑二 第22节

returnedMessage()只有失败才调用,confirm()成功失败了都会调用,为什么?

在RabbitMQ中,消息的确认和返回机制是为了确保消息的可靠传递和处理。`confirm`和`returnedMessage`方法的调用时机和目的不同,因此它们的行为也有所区别。

### `ConfirmCallback`和`confirm`方法

**目的**:`ConfirmCallback`的主要目的是确认消息是否成功发布到交换机(Exchange)。

- **成功(ack=true)**:当消息成功到达交换机时,会调用`confirm`方法,并且`ack`参数为`true`。
- **失败(ack=false)**:当消息未能到达交换机时,会调用`confirm`方法,并且`ack`参数为`false`。失败的原因可能是交换机不存在或其他路由问题。

```java
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (ack) {
        log.info("Message confirmed successfully, correlationData: {}", correlationData);
    } else {
        log.warn("Message confirmation failed, correlationData: {}, cause: {}", correlationData, cause);
        // 这里可以添加重试逻辑或其他处理
    }
}
```

### `ReturnsCallback`和`returnedMessage`方法

**目的**:`ReturnsCallback`的主要目的是处理那些成功到达交换机但无法路由到队列的消息。

- **调用时机**:只有当消息无法路由到任何队列时,才会调用`returnedMessage`方法。这种情况通常发生在消息的路由键(Routing Key)与任何绑定的队列不匹配时。

```java
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
    log.warn("Message returned: exchange: {}, routingKey: {}, replyCode: {}, replyText: {}, message: {}",
            returnedMessage.getExchange(), returnedMessage.getRoutingKey(),
            returnedMessage.getReplyCode(), returnedMessage.getReplyText(),
            returnedMessage.getMessage());
    // 这里可以添加处理逻辑,例如重发消息或记录日志
}
```

### 为什么`confirm`总是被调用,而`returnedMessage`只有在失败时调用?

1. **消息发布确认(confirm)**:
   - **成功与失败**:`confirm`方法用于确认消息是否成功到达交换机。无论成功还是失败,都会调用这个方法,以便你能够记录日志、重试或进行其他处理。这提供了一个全局的确认机制,确保你知道消息的发布状态。
   - **可靠性**:这是消息发布的第一步确认,确保消息至少到达交换机。

2. **消息返回(returnedMessage)**:
   - **仅失败时调用**:`returnedMessage`方法仅在消息无法路由到任何队列时调用。这是因为只有在这种情况下,消息才会被返回给发送者。成功路由的消息不需要调用这个方法,因为它们已经被正确处理。
   - **路由失败处理**:这是消息发布的第二步确认,确保消息不仅到达交换机,还能正确路由到队列。如果路由失败,你可以在这个方法中处理返回的消息,例如记录日志、重试发送或通知其他系统。

### 总结

- **`confirm`方法**:总是被调用,用于确认消息是否成功到达交换机。无论成功还是失败,都会触发这个回调,以便你能够处理消息发布的结果。
- **`returnedMessage`方法**:只有在消息无法路由到任何队列时才会调用,用于处理那些成功到达交换机但无法路由的消息。

这种设计确保了消息发布的可靠性和可追溯性,提供了两个层次的确认机制,分别处理消息发布和消息路由的不同阶段。
==========
rabbitTemplate.convertAndSend(EXCHANGE DIRECT, ROUTING KEy, obiect: "Message Test confirm~~~").
----convertAndSend  ,convert?是转换了什么呢


在Spring AMQP中,`RabbitTemplate`的`convertAndSend`方法用于将消息转换为适合传输的格式,并发送到指定的交换机和路由键。`convert`部分指的是消息的转换过程,`send`部分指的是消息的发送过程。

### `convertAndSend` 方法的作用

1. **消息转换(Convert)**:
   - **对象到消息的转换**:`convertAndSend`方法会将你传入的对象(例如字符串、Java对象等)转换为`Message`对象。这个转换过程通常涉及将对象序列化为字节数组,并设置消息的头信息(如内容类型、编码等)。
   - **消息转换器(Message Converter)**:Spring AMQP提供了多种消息转换器(如`SimpleMessageConverter`、`Jackson2JsonMessageConverter`、`Jaxb2Marshaller`等),你可以根据需要选择或自定义消息转换器。

2. **消息发送(Send)**:
   - **发送到交换机**:转换后的消息会被发送到指定的交换机(Exchange)。
   - **路由到队列**:交换机会根据指定的路由键(Routing Key)将消息路由到合适的队列。

### 示例代码

```java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        String exchange = "my.exchange";
        String routingKey = "my.routingKey";
        String message = "Message Test confirm~~~";

        // 使用convertAndSend方法发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}
```

### 消息转换器(Message Converter)

Spring AMQP提供了多种内置的消息转换器,你也可以自定义消息转换器。以下是一些常用的消息转换器:

1. **SimpleMessageConverter**:
   - 默认的消息转换器,可以处理字符串、字节数组、序列化对象等。
   - 如果传入的是字符串,会将其转换为字节数组。

2. **Jackson2JsonMessageConverter**:
   - 将Java对象转换为JSON格式的字节数组,适用于JSON序列化和反序列化。
   - 需要引入Jackson库。

3. **Jaxb2Marshaller**:
   - 使用JAXB将Java对象转换为XML格式的字节数组,适用于XML序列化和反序列化。

### 自定义消息转换器

如果内置的消息转换器不能满足你的需求,你可以自定义消息转换器。以下是一个自定义消息转换器的示例:

```java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;

@Component
public class MyCustomMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        // 自定义转换逻辑,例如将对象转换为字节数组
        byte[] bytes = serializeObject(object);
        return new Message(bytes, messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        // 自定义转换逻辑,例如将字节数组转换为对象
        return deserializeObject(message.getBody());
    }

    private byte[] serializeObject(Object object) {
        // 实现序列化逻辑
        return object.toString().getBytes();
    }

    private Object deserializeObject(byte[] bytes) {
        // 实现反序列化逻辑
        return new String(bytes);
    }
}
```

然后在配置类中注册自定义消息转换器:

```java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(RabbitTemplate rabbitTemplate, MyCustomMessageConverter myCustomMessageConverter) {
        rabbitTemplate.setMessageConverter(myCustomMessageConverter);
        return rabbitTemplate;
    }
}
```

### 总结

- **`convertAndSend`方法**:将传入的对象转换为`Message`对象,并发送到指定的交换机和路由键。
- **消息转换器**:负责将对象序列化为字节数组,Spring AMQP提供了多种内置的消息转换器,你也可以自定义消息转换器。
- **自定义消息转换器**:可以根据需要实现`MessageConverter`接口,提供自定义的序列化和反序列化逻辑。

通过理解消息转换和发送的过程,你可以更好地控制消息


================
1. **消息发布确认(confirm)**:
   - **成功与失败**:`confirm`方法用于确认消息是否成功到达交换机。无论成功还是失败,都会调用这个方法,以便你能够记录日志、重试或进行其他处理。这提供了一个全局的确认机制,确保你知道消息的发布状态。
   - **可靠性**:这是消息发布的第一步确认,确保消息至少到达交换机。

2. **消息返回(returnedMessage)**:
   - **仅失败时调用**:`returnedMessage`方法仅在消息无法路由到任何队列时调用。这是因为只有在这种情况下,消息才会被返回给发送者。成功路由的消息不需要调用这个方法,因为它们已经被正确处理。
   - **路由失败处理**:这是消息发布的第二步确认,确保消息不仅到达交换机,还能正确路由到队列。如果路由失败,你可以在这个方法中处理返回的消息,例如记录日志、重试发送或通知其他系统。
-----------这是rabbitmq里的,那kafka里面怎么做的呢?


在Kafka中,消息的确认和处理机制与RabbitMQ有所不同。以下是Kafka中对应的机制:

1. **消息发布确认(Acks)**:
   - **成功与失败**:Kafka的生产者配置中有一个`acks`参数用于控制消息发布确认。常见的值有:
     - `acks=0`:生产者不会等待任何确认。消息发送后立即认为成功。
     - `acks=1`:领导者分区接收到消息后即确认,不等待副本确认。
     - `acks=all`(或`acks=-1`):所有副本都确认接收到消息后才认为成功。这是最可靠的设置,确保消息不会丢失。
   - **可靠性**:`acks=all`提供了最高的可靠性,确保消息被所有副本接收。

2. **消息返回(Callback)**:
   - **成功与失败**:Kafka提供了一个回调机制来处理消息发送的结果。你可以在发送消息时指定一个`Callback`,它会在消息发送成功或失败时被调用。
   - **处理回调**:通过实现`Callback`接口的`onCompletion`方法,你可以处理成功和失败的情况。例如,记录日志、重试发送或通知其他系统。

以下是一个简单的示例代码,展示了如何在Kafka中使用`acks`和`Callback`:

```java
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");  // 设置消息确认机制

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", "key", "value");

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            // 消息发送成功
            System.out.println("Message sent successfully to topic " + metadata.topic() + " partition " + metadata.partition() + " offset " + metadata.offset());
        } else {
            // 消息发送失败
            exception.printStackTrace();
            // 处理失败情况,例如重试发送、记录日志等
        }
    }
});

producer.close();
```

通过这种方式,Kafka提供了灵活的消息确认和处理机制,确保消息的可靠传递和处理。

相关文章:

尚硅谷rabbitmq 2024 消息可靠性答疑二 第22节

returnedMessage()只有失败才调用&#xff0c;confirm()成功失败了都会调用&#xff0c;为什么&#xff1f; 在RabbitMQ中&#xff0c;消息的确认和返回机制是为了确保消息的可靠传递和处理。confirm和returnedMessage方法的调用时机和目的不同&#xff0c;因此它们的行为也有…...

在 Ubuntu 上安装 Whisper 支撑环境(ffmpeg、PyTorch)的教程(2024亲测可用)

在 Ubuntu 上安装 Whisper 的教程 以下是如何在 Ubuntu 系统上安装 Whisper 以进行视频转录的详细步骤。 步骤 1&#xff1a;更新系统 首先更新你的 Ubuntu 系统&#xff0c;确保安装最新的软件包&#xff1a; sudo apt update && sudo apt upgrade -y步骤 2&#…...

vue+echarts实现雷达图及刻度标注

文章目录 前言代码实现实现效果总结 前言 最近项目有做数据可视化 大屏 不免再次使用些echarts应用 记录下其中echarts雷达图的实现 代码实现 先上代码 <template><div class"container"><div ref"chart" style"width: 500px; heig…...

【进阶OpenCV】 (9)--摄像头操作--->答题卡识别改分项目

文章目录 项目&#xff1a;答题卡识别改分1. 图片预处理2. 描绘轮廓3. 轮廓近似4. 透视变换5. 阈值处理6. 找每一个圆圈轮廓7. 将每一个圆圈轮廓排序8. 找寻所填答案&#xff0c;比对正确答案8.1 思路8.2 图解8.3 代码体现 9. 计算正确率 总结 项目&#xff1a;答题卡识别改分 …...

实时从TDengine数据库采集数据到Kafka Topic

实时从TDengine数据库采集数据到Kafka Topic 一、认识TDengine二、TDengine Kafka Connector三、什么是 Kafka Connect&#xff1f;四、前置条件五、安装 TDengine Connector 插件六、启动 Kafka七、验证 kafka Connect 是否启动成功八、TDengine Source Connector 的使用九、添…...

Linux -- 初识动静态库

目录 为什么要有库&#xff1f; 静态库 什么是静态库&#xff1f; 特点 优点 缺点 动态库 什么是动态库&#xff1f; 优点 缺点 编译器会选择哪个库&#xff1f; 为什么要有库&#xff1f; 库的存在是为了提高软件开发的效率、促进代码复用以及简化维护工作。通过使用…...

vite 打包前请求接口和打包后的不一致

在使用 Vite 进行项目打包时&#xff0c;如果发现打包前请求接口和打包后的行为不一致&#xff0c;这可能是由于多种原因导致的。以下是一些可能的原因和相应的解决方案&#xff1a; 1. 代理配置问题 开发环境&#xff1a;在开发环境中&#xff0c;Vite 通常使用 vite.config…...

fairseq 安装包python

背景&#xff1a; Collecting fairseq Using cached https://pypi.tuna.tsinghua.edu.cn/packages/d7/0f/b7043b451a97eb9b4cfb1b1e23e567b947d9d7bca542403228bd53b435fe/fairseq-0.12.1.tar.gz (9.6 MB) Installing build dependencies ... done Getting requirements…...

使用Mockaroo生成测试数据

使用Mockaroo生成测试数据 最近在学习【Spring Boot & React】Spring Boot和React教程视频的P51.Generating 1000 students一课中&#xff0c;看到了https://www.mockaroo.com/网站可以用来模拟生成测试数据&#xff0c;觉得还不错&#xff0c;特此记录一下。感觉每次看老…...

使用频率最高的 opencv 基础绘图操作 - python 实现

以下是 opencv-python 基本操作绘制示例&#xff0c;绘制&#xff1a; 1&#xff09;圆&#xff0c;2&#xff09;矩形&#xff0c;3&#xff09;线段&#xff0c;4&#xff09;文本。 安装 opencv-python pip install opencv-python 在图上绘制圆的操作&#xff0c;示例如…...

Python 在Excel中添加数据条

在Excel中添加数据条是一种数据可视化技巧&#xff0c;它通过条形图的形式在单元格内直观展示数值的大小&#xff0c;尤其适合比较同一列或行中各个单元格的数值。这种表示方式可以让大量的数字信息一目了然。本文将介绍如何使用Python在Excel中的指定单元格区域添加数据条。 …...

Unity中搜索不到XR Interaction Toolkit包解决方法

问题&#xff1a; 针对Unity版本2020.3在中PackageManager可能搜素不到XR Interaction Toolkit包 在Package Manager中未显示XR Interaction Toolkit包 解决方法&#xff1a; Package manager左上角&#xff0c;点加号&#xff0c;选择 Add package from git URL..&#xff0c;…...

【前端】JQ验证每个单选按钮是否已经选择

验证每个单选题是否都已经选择&#xff0c;其中每个input中不带name值&#xff0c;直接遍历input[type"radio"]验证 <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <meta name"viewpor…...

【无人机设计与控制】滑模控制、反步控制、传统PID四旋翼无人机轨迹跟踪控制仿真

摘要 本文基于滑模控制、反步控制和传统PID控制&#xff0c;设计了针对四旋翼无人机的轨迹跟踪控制系统。通过对比这三种控制策略在四旋翼无人机轨迹跟踪中的表现&#xff0c;分析了各自的优缺点和适用场景。仿真结果表明&#xff0c;滑模控制具有更强的鲁棒性&#xff0c;反步…...

MongoDB 介绍

一、MongoDB 介绍 MongoDB 是一个开源的、面向文档的数据库管理系统。它采用了灵活的数据模型&#xff0c;以类似 JSON 的文档形式存储数据&#xff0c;具有高可扩展性、高性能和丰富的功能。 主要特点包括&#xff1a; 灵活的数据模型&#xff1a;文档型数据库允许存储不同…...

计算机网络:物理层 —— 物理层概述

文章目录 物理层功能物理层接口特性常见特性 相关概念 物理层&#xff08;Physical Layer&#xff09;是OSI&#xff08;Open Systems Interconnection&#xff09;模型的第一层&#xff0c;负责提供原始比特流传输的服务。它定义了硬件接口的电气、机械、功能和过程特性&#…...

HTTP的工作原理

HTTP&#xff08;Hypertext Transfer Protocol&#xff09;是一种用于在计算机网络上传输超文本数据的应用层协议。它是构成万维网的基础之一&#xff0c;被广泛用于万维网上的数据通信。&#xff08;超文本(Hypertext)是用超链接的方法&#xff0c;将各种不同空间的文字信息组…...

缓存数据减轻服务器压力

问题:不是所有的数据都需要请求后端的 不是所有的数据都需要请求后端的,有些数据是重复的、可以复用的解决方案:缓存 实现思路:每一个分类为一个key,一个可以下面可以有很多菜品 前端是按照分类查询的,所以我们需要通过分类来缓存缓存代码 /*** 根据分类id查询菜品** @pa…...

【自动驾驶】控制算法(十二)横纵向综合控制 | 从理论到实战全面解析

写在前面&#xff1a; &#x1f31f; 欢迎光临 清流君 的博客小天地&#xff0c;这里是我分享技术与心得的温馨角落。&#x1f4dd; 个人主页&#xff1a;清流君_CSDN博客&#xff0c;期待与您一同探索 移动机器人 领域的无限可能。 &#x1f50d; 本文系 清流君 原创之作&…...

Python基础之List列表用法

1、创建列表 names ["张三","李四","王五","Mary"] 2、列表分片 names[1]&#xff1a;获取数组的第2个元素。 names[1:3]&#xff1a;获取数组的第2、第3个元素。包含左侧&#xff0c;不包含右侧。 names[:3]等同于names[0:3]&…...

如何在最短时间内提升打ctf(web)的水平?

刚刚刷完2遍 bugku 的 web 题&#xff0c;前来答题。 每个人对刷题理解是不同&#xff0c;有的人是看了writeup就等于刷了&#xff0c;有的人是收藏了writeup就等于刷了&#xff0c;有的人是跟着writeup做了一遍就等于刷了&#xff0c;还有的人是独立思考做了一遍就等于刷了。…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

逻辑回归暴力训练预测金融欺诈

简述 「使用逻辑回归暴力预测金融欺诈&#xff0c;并不断增加特征维度持续测试」的做法&#xff0c;体现了一种逐步建模与迭代验证的实验思路&#xff0c;在金融欺诈检测中非常有价值&#xff0c;本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...

MinIO Docker 部署:仅开放一个端口

MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...

go 里面的指针

指针 在 Go 中&#xff0c;指针&#xff08;pointer&#xff09;是一个变量的内存地址&#xff0c;就像 C 语言那样&#xff1a; a : 10 p : &a // p 是一个指向 a 的指针 fmt.Println(*p) // 输出 10&#xff0c;通过指针解引用• &a 表示获取变量 a 的地址 p 表示…...

华为OD最新机试真题-数组组成的最小数字-OD统一考试(B卷)

题目描述 给定一个整型数组,请从该数组中选择3个元素 组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述 行用半角逗号分割的字符串记录的整型数组,0<数组长度<= 100,0<整数的取值范围<= 10000。 输出描述 由3个元素组成…...

Python训练营-Day26-函数专题1:函数定义与参数

题目1&#xff1a;计算圆的面积 任务&#xff1a; 编写一个名为 calculate_circle_area 的函数&#xff0c;该函数接收圆的半径 radius 作为参数&#xff0c;并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求&#xff1a;函数接收一个位置参数 radi…...

Monorepo架构: Nx Cloud 扩展能力与缓存加速

借助 Nx Cloud 实现项目协同与加速构建 1 &#xff09; 缓存工作原理分析 在了解了本地缓存和远程缓存之后&#xff0c;我们来探究缓存是如何工作的。以计算文件的哈希串为例&#xff0c;若后续运行任务时文件哈希串未变&#xff0c;系统会直接使用对应的输出和制品文件。 2 …...

ThreadLocal 源码

ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物&#xff0c;因为每个访问一个线程局部变量的线程&#xff08;通过其 get 或 set 方法&#xff09;都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段&#xff0c;这些类希望将…...