当前位置: 首页 > news >正文

RabbitMQ消息可靠性等机制详解(精细版三)

目录

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

7.1.2 Return机制(保证发送可靠)

7.1.3 编写配置文件

7.1.4 开启Confirm和Return

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件

7.2.2 手动ack

7.3 避免消息重复消费

7.3.1 导入依赖

7.3.2 编写配置文件

7.3.3 修改生产者

7.3.4 修改消费者


 官方文档  RabbitMQ Documentation | RabbitMQ

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

RabbitMQ是一个Erlang开发的AMQP(高级消息排队 协议)(英文全称:Advanced Message Queuing Protocol )的开源实现。-------------接上章 

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。

RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

消息传递可靠性

7.1.2 Return机制(保证发送可靠)

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

消息传递可靠性

在消息发送方项目上加入下面内容:

7.1.3 编写配置文件
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testpublisher-confirms: truepublisher-returns: true

7.1.4 开启Confirm和Return
package com.tingyi.rabbitmq.config;
​
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
/*** @author 听忆*/
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
​@Autowiredprivate RabbitTemplate rabbitTemplate;
​@PostConstruct  // init-methodpublic void initMethod(){//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);
​//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}
​@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到Exchange");}else{System.out.println("消息没有送达到Exchange");}}
​@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息没有送达到Queue");}
}

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件
  • 在消费方application.yml文件添加下面配置, 改为手动应答机制.

spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testlistener:simple:acknowledge-mode: manual

7.2.2 手动ack
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
​
/*** @author 听忆*/
@Component
public class Consumer {
​@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到消息:" + msg);try {int i = 1 / 0;/*** 消费者发起成功通知* 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号* 第二个参数:是否开启批量处理 false:不开启批量* 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,*          当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {e.printStackTrace();/*** 返回失败通知* 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号* 第二个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝* 第三个boolean true消息接收失败重新回到原有队列中*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}
​}
}

7.3 避免消息重复消费

重复消费消息,会对非幂等行操作造成问题

重复消费消息的原因是,消费者没有给RabbitMQ一个ack

重复消费

  1. 为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

  2. id-0(正在执行业务)

  3. id-1(执行业务成功)

  4. 然后使用ack给RabbitMQ返回消息

  5. 如果RabbitMQack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

  6. 极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

备注: java中的方法叫做setIfAbsent, redis中的命令叫做setnx

       作用:如果为空就set值,并返回1, true

​ 如果存在(不为空)不进行操作,并返回0, false​

7.3.1 导入依赖

生产者和消费者都加入下面依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.4.5</version>
</dependency>

7.3.2 编写配置文件
spring:redis:host: 你的地址port: 6379

7.3.3 修改生产者
@Test
public void contextLoads() throws IOException {CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//第四个参数: 设置消息唯一idrabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","你看听忆哇",messageId);System.in.read();
}

7.3.4 修改消费者
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.concurrent.TimeUnit;
​
/*** @author 听忆*/
/*** java中的方法叫做setIfAbsent, redis中的命令叫做setnx* 作用:*      如果为空就set值,并返回1, true*      如果存在(不为空)不进行操作,并返回0, false*/
@Component
public class Consumer {
​@Autowiredprivate StringRedisTemplate redisTemplate;
​@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {//0. 获取MessageId, 消息唯一idString messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//1. 设置key到Redisif(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
​//2. 消费消息System.out.println("接收到消息:" + msg);
​//3. 设置key的value为1redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
​//4.  手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
​}else {
​//5. 获取Redis中的value即可 如果是1,手动ackif("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
​}
}

相关文章:

RabbitMQ消息可靠性等机制详解(精细版三)

目录 七 RabbitMQ的其他操作 7.1 消息的可靠性(发送可靠) 7.1.1 confim机制(保证发送可靠) 7.1.2 Return机制(保证发送可靠) 7.1.3 编写配置文件 7.1.4 开启Confirm和Return 7.2 手动Ack(保证接收可靠) 7.2.1 添加配置文件 7.2.2 手动ack 7.3 避免消息重复消费 7.3.…...

88888

49615...

深度学习之激活函数

激活函数的公式根据不同的函数类型而有所不同。以下是一些常见的激活函数及其数学公式&#xff1a; Sigmoid函数&#xff1a; 公式&#xff1a;f(x)特性&#xff1a;输出范围在0到1之间&#xff0c;常用于二分类问题&#xff0c;将输出转换为概率值。但存在梯度消失问题&#…...

OpenStack开源虚拟化平台(一)

目录 一、OpenStack背景介绍&#xff08;一&#xff09;OpenStack是什么&#xff08;二&#xff09;OpenStack的主要服务 二、计算服务Nova&#xff08;一&#xff09;Nova组件介绍&#xff08;二&#xff09;Libvirt简介&#xff08;三&#xff09;Nova中的RabbitMQ解析 OpenS…...

C++ | Leetcode C++题解之第207题课程表

题目&#xff1a; 题解&#xff1a; class Solution { private:vector<vector<int>> edges;vector<int> indeg;public:bool canFinish(int numCourses, vector<vector<int>>& prerequisites) {edges.resize(numCourses);indeg.resize(numCo…...

vue3中的自定义指令

全局自定义指令 假设我们要创建一个全局指令v-highlight&#xff0c;用于高亮显示元素。这个指令将接受一个颜色参数&#xff0c;并有一个可选的修饰符bold来决定是否加粗文本。 首先&#xff0c;在创建Vue应用时定义这个指令&#xff1a;&#xff08;这里可以将指令抽离成单…...

Postman接口测试工具的原理及应用详解(一)

本系列文章简介&#xff1a; 在当今软件开发的世界中&#xff0c;接口测试作为保证软件质量的重要一环&#xff0c;其重要性不言而喻。随着前后端分离开发模式的普及&#xff0c;接口测试已成为连接前后端开发的桥梁&#xff0c;确保前后端之间的数据交互准确无误。在这样的背景…...

C++ initializer_list类型推导

目录 initializer_list C自动类型推断 auto typeid decltype initializer_list<T> C支持统一初始化{ }&#xff0c;出现了一个新的类型initializer_list<T>&#xff0c;一切类型都可以用列表初始化。提供了一种更加灵活、安全和明确的方式来初始化对象。 class…...

造一个交互式3D火山数据可视化

本文由ScriptEcho平台提供技术支持 项目地址&#xff1a;传送门 使用 Plotly.js 创建交互式 3D 火山数据可视化 应用场景 本代码用于将火山数据库中的数据可视化&#xff0c;展示火山的高度、类型和状态。可用于地质学研究、教育和数据探索。 基本功能 该代码使用 Plotly…...

【网络安全】一文带你了解什么是【CSRF攻击】

CSRF&#xff08;Cross-Site Request Forgery&#xff0c;跨站请求伪造&#xff09;是一种网络攻击方式&#xff0c;它利用已认证用户在受信任网站上的身份&#xff0c;诱使用户在不知情的情况下执行恶意操作。具体来说&#xff0c;攻击者通过各种方式&#xff08;如发送恶意链…...

短视频电商源码如何选择

在数字时代的浪潮下&#xff0c;短视频电商以其直观、生动、互动性强的特点&#xff0c;迅速崛起成为电商行业的一股新势力。对于有志于进军短视频电商领域的创业者来说&#xff0c;选择一款合适的短视频电商源码至关重要。本文将从多个角度探讨如何选择短视频电商源码&#xf…...

444444

356前期...

初识LangChain的快速入门指南

个人名片 &#x1f393;作者简介&#xff1a;java领域优质创作者 &#x1f310;个人主页&#xff1a;码农阿豪 &#x1f4de;工作室&#xff1a;新空间代码工作室&#xff08;提供各种软件服务&#xff09; &#x1f48c;个人邮箱&#xff1a;[2435024119qq.com] &#x1f4f1…...

OpenBayes 教程上新 | CVPR 获奖项目,BioCLlP 快速识别生物种类,再也不会弄混小浣熊和小熊猫了!

市面上有很多植物识别的 App&#xff0c;通过对植物的叶片、花朵、果实等特征进行准确的识别&#xff0c;从而确定植物的种类、名称。但动物识别的 App 却十分有限&#xff0c;这使我们很难区分一些外形相似的动物&#xff0c;例如小浣熊和小熊猫。 左侧为小浣熊&#xff0c;右…...

24 年程序员各岗位薪资待遇汇总(最新)

大家好&#xff0c;我是程序员鱼皮。今天分享 24 年 6 月最新的程序员各岗位薪资待遇汇总。 数据是从哪儿来的呢&#xff1f;其实很简单&#xff0c;BOSS 直聘上有一个免费的薪酬查询工具&#xff0c;只要认证成为招聘者就能直接看&#xff0c;便于招聘者了解市场&#xff0c;…...

Android SurfaceFlinger——系统动画服务启动(十四)

在了解了 SurfaceFlinger、HWC、OpenGL ES 和 EGL 等相关概念和基础信息后,我们通过系统动画的调用流程引入更多的内容。 一、解析init.rc 开机就启动进程,肯定就要从 rc 文件开始。负责开机动画的进程是 bootanimation。 1、bootanim.rc 源码位置:/frameworks/base/cmds…...

VaRest插件常用节点以及Http请求数据

1.解析json &#xff08;1&#xff09;Construct Json Object&#xff1a;构建json对象 &#xff08;2&#xff09;Decode Json&#xff1a;解析json 将string转换为json &#xff08;3&#xff09;Encode json&#xff1a;将json转换为string &#xff08;4&#xff09;Get S…...

【Linux】线程id与互斥(线程三)

上一期我们进行了线程控制的了解与相关操作&#xff0c;但是仍旧有一些问题没有解决 本章第一阶段就是解决tid的问题&#xff0c;第二阶段是进行模拟一个简易线程库&#xff08;为了加深对于C库封装linux原生线程的理解&#xff09;&#xff0c;第三阶段就是互斥。 目录 线程id…...

JavaEE—什么是服务器?以及Tomcat安装到如何集成到IDEA中?

目录 ▐ 前言 ▐ JavaEE是指什么? ▐ 什么是服务器&#xff1f; ▐ Tomcat安装教程 * 修改服务端口号 ▐ 将Tomcat集成到IDEA中 ▐ 测试 ▐ 结语 ▐ 前言 至此&#xff0c;这半年来我已经完成了JavaSE&#xff0c;Mysql数据库&#xff0c;以及Web前端知识的学习了&am…...

主流分布式消息中间件RabbitMQ、RocketMQ

分布式消息中间件在现代分布式系统中起着至关重要的作用。以下是一些主流的分布式消息中间件&#xff1a; 1. Apache Kafka - 特点&#xff1a;高吞吐量、低延迟、持久化、水平可扩展、分布式日志系统。 - 使用场景&#xff1a;日志收集与处理、实时流处理、事件驱动架构、大数…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...

laravel8+vue3.0+element-plus搭建方法

创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比

在机器学习的回归分析中&#xff0c;损失函数的选择对模型性能具有决定性影响。均方误差&#xff08;MSE&#xff09;作为经典的损失函数&#xff0c;在处理干净数据时表现优异&#xff0c;但在面对包含异常值的噪声数据时&#xff0c;其对大误差的二次惩罚机制往往导致模型参数…...

【生成模型】视频生成论文调研

工作清单 上游应用方向&#xff1a;控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

MySQL 8.0 事务全面讲解

以下是一个结合两次回答的 MySQL 8.0 事务全面讲解&#xff0c;涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容&#xff0c;并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念&#xff08;ACID&#xff09; 事务是…...

GraphQL 实战篇:Apollo Client 配置与缓存

GraphQL 实战篇&#xff1a;Apollo Client 配置与缓存 上一篇&#xff1a;GraphQL 入门篇&#xff1a;基础查询语法 依旧和上一篇的笔记一样&#xff0c;主实操&#xff0c;没啥过多的细节讲解&#xff0c;代码具体在&#xff1a; https://github.com/GoldenaArcher/graphql…...

ui框架-文件列表展示

ui框架-文件列表展示 介绍 UI框架的文件列表展示组件&#xff0c;可以展示文件夹&#xff0c;支持列表展示和图标展示模式。组件提供了丰富的功能和可配置选项&#xff0c;适用于文件管理、文件上传等场景。 功能特性 支持列表模式和网格模式的切换展示支持文件和文件夹的层…...

Vue3中的computer和watch

computed的写法 在页面中 <div>{{ calcNumber }}</div>script中 写法1 常用 import { computed, ref } from vue; let price ref(100);const priceAdd () > { //函数方法 price 1price.value ; }//计算属性 let calcNumber computed(() > {return ${p…...