当前位置: 首页 > news >正文

微服务day07

MQ高级

发送者可靠性,MQ的可靠性,消费者可靠性。

发送者可靠性

发送者重连

连接重试的配置文件:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

关闭MQ。

运行测试用例结果,进行了三次重连。

11-12 10:15:41:975  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:43:997  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:46:002  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]

发送者确认

总结如下:

  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 其它情况都会返回NACK,告知投递失败

其中acknack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。

默认两种机制都是关闭状态,需要通过配置文件来开启

开启发送者确认:

在publisher模块的application.yaml中添加配置:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制

  • simple:同步阻塞等待MQ的回执

  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

package com.itheima.publisher.config;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configuration
@Slf4j
@RequiredArgsConstructor
public class MQconfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆

  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback

    @Testpublic void testObgect1() throws InterruptedException {CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {//failure出现异常时的处理逻辑,基本不会触发。log.error("发送失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {//判断是否成功发送到服务器if (result.isAck()){log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});//准备Map数据Map map = new HashMap();map.put("name","jack");map.put("age",21);rabbitTemplate.convertAndSend("obgect.queue1",map,cd);//用来接受返回的值Thread.sleep(2000);}

可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。

当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。

而如果连交换机都是错误的,则只会收到nack。

注意

开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致

  • 交换机名称错误:同样是编程错误导致

  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。

MQ的可靠性

数据持久化

默认情况下springamqp发送的消息就是持久化的,不需要做特殊处理。

    @Testpublic void testTopic3(){//自定义消息为非持久化Message build = MessageBuilder.withBody("这是一个大新闻啊".getBytes()).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();//修改交换机的名字为hm.topicString name = "hm.topic";//修改路由键为bluerabbitTemplate.convertAndSend(name,"china.goods",build);}

注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。

不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

Lazy Queue

MQ可靠性小结

消费者可靠性

消费者确认机制

消费者失败重试策略

相关文档:

哈哈哈

业务幂等性

 修改发送者代码,修改消息转换器:

    @Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

修改消费者代码来获取消息id:

    @RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(Message message) throws InterruptedException {log.info("spring 消费者接收到消息:【" + message.getBody().toString() + "】");log.info("spring 消费者接收到消息id为:【" + message.getMessageProperties().getMessageId().toString() + "】");if (true) {throw new RuntimeException("故意的");}log.info("消息处理完成");}

 运行结果: 由于前面的步骤设置了报错,因此重发了3次。

11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:985  WARN 27400 --- [ntContainer#1-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.direct' with routing key error

只需要修改消费者的处理逻辑部分即可:

package com.hmall.trade.listener;import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class Orderlisten {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.topic"),key = "pay.success"))public void listenPaySuccess(Long orderId){//1、根据id获取订单信息Order order = orderService.getById(orderId);//判断订单状态,只有待支付才修改if (order == null || order.getStatus() != 1){//条件错误直接结束return;}orderService.markOrderPaySuccess(orderId);}
}

小结,小小面试题

延迟消息

死信交换机

消息的消费者代码:

创建普通交换机和信息队列,将消息队列的死信绑定死信交换机

dleConfigtasion.classpackage com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class dleConfigrasion {//交给Bean注解来进行处理//创建交换机@Beanpublic DirectExchange ttlExchange(){//参数:交换机名称,是否持久化,是否自动删除,持久化默认为开启(持久化就是是否保存到磁盘)return ExchangeBuilder.directExchange("ttl.direct").build();}//创建消息队列@Beanpublic Queue ttltQueue(){
//        return new Queue("fanout.queue1");//使用build来创建消息队列//指定该队列的死信交换机return QueueBuilder.durable("ttl.queue").deadLetterExchange("dlt.direct").build();}// 绑定队列和交换机@Beanpublic Binding bindingfanoutQueue1red(Queue ttltQueue, DirectExchange ttlExchange){return BindingBuilder.bind(ttltQueue).to(ttlExchange).with("hi");}}

死信交换机和消息队列:

leatinMq.class   @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlt.queue"),exchange = @Exchange(name = "dlt.direct",type = ExchangeTypes.DIRECT),key = {"hi"}))public void Directlisten1dlt(String msg){System.err.println("消费者接收到队列dlt.direct的消息:"+msg+"_"+ LocalTime.now());}

消息发送方:需要指定消息的存活时间:

    @Testpublic void testTopic(){//修改路由键为bluerabbitTemplate.convertAndSend("ttl.direct", "hi", "这是一个大新闻啊",new MessagePostProcessor() {//可以对生成的message进行处理@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间为10秒钟message.getMessageProperties().setExpiration("1000");return message;}});}

代码结束。

延时插件:延时插件下载地址

插件官方文档:延时插件官方文档

详细操作文档: 黑马文档

取消超时订单

相关文章:

微服务day07

MQ高级 发送者可靠性&#xff0c;MQ的可靠性&#xff0c;消费者可靠性。 发送者可靠性 发送者重连 连接重试的配置文件&#xff1a; spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 10…...

芯原科技嵌入式面试题及参考答案

Linux 相关驱动怎么写? 在 Linux 中编写驱动主要有以下步骤。 首先,需要了解设备的硬件特性。这包括设备的工作原理、寄存器地址和功能、中断号等信息。例如,对于一个简单的 GPIO 设备,要知道其数据寄存器、方向寄存器的位置以及读写操作的规则。 然后是模块的初始化部分。…...

二叉树Golang

二叉树 前言 完全二叉树 最底层节点按顺序从左到右排列。 满二叉树 一颗二叉树只有0度和2度的节点。 二叉搜索树 左子树上的所有节点的值均小于根节点的值。右子树上的所有节点的值均大于根节点的值。 平衡二叉搜索树 左右两个子树的高度差的绝对值不超过1 。 二叉树的存储…...

通过css的哪些方式可以实现隐藏页面上的元素?

1&#xff1a;opacity:0 通过将元素的透明度设置为o&#xff0c;实现隐藏效果&#xff0c;但是依然会占用空间并可以进行交互。 2&#xff1a;visibility:hidden 与透明度度为0的方案类似&#xff0c;会占据空间&#xff0c;但不可以进行交互。 3&#xff1a;Overflow:hi…...

微信小程序 === 使用腾讯地图选点

目录 插件介绍 接入指引 相关参数说明 插件错误处理 效果图 permission 插件的作用 添加插件 引入插件代码包 使用插件 页面 js 接口 插件介绍 腾讯位置服务地图选点插件 可以让用户快速、准确地选择并确认自己的当前位置&#xff0c;并将相关位置信息回传给开发者。…...

Redis高可用-Cluster(集群)

Redis cluster cluster 为无中心&#xff0c;分布式 sharding&#xff0c;高可用技术架构。 在哨兵 sentinel 机制中&#xff0c;可以解决 redis 高可用的问题&#xff0c;即当 master 故障后可以自动将 slave 提升为 master 从而可以保证 redis 服务的正常使用。 但是无法解…...

Spring Boot编程训练系统:数据管理与存储

摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了编程训练系统的开发全过程。通过分析编程训练系统管理的不足&#xff0c;创建了一个计算机管理编程训练系统的方案。文章介绍了编程训练系统的系统分析部分&…...

报告解读 | 创意经济2024:如何在变革中抢占先机?

在科技飞速发展的今天&#xff0c;创意行业正面临前所未有的变化。《Skillshare Trendshare 2024》报告揭示了多项趋势&#xff0c;为创意人士提供了深刻的洞察和实用的建议。本文将为您详细解读这些趋势&#xff0c;助您在创意领域脱颖而出。 1. 人工智能&#xff08;AI&…...

Flume1.9.0自定义Sink组件将数据发送至Mysql

需求 1、将Flume采集到的日志数据也同步保存到MySQL中一份&#xff0c;但是Flume目前不支持直接向MySQL中写数据&#xff0c;所以需要用到自定义Sink&#xff0c;自定义一个MysqlSink。 2、日志数据默认在Linux本地的/data/log/user.log日志文件中&#xff0c;使用Flume采集到…...

如何在 Ubuntu 24.04 上安装和配置 Fail2ban ?

确保你的 Ubuntu 24.04 服务器的安全是至关重要的&#xff0c;特别是如果它暴露在互联网上。一个常见的威胁是未经授权的访问尝试&#xff0c;特别是通过 SSH。Fail2ban 是一个强大的工具&#xff0c;可以通过自动阻止可疑活动来帮助保护您的服务器。 在本指南中&#xff0c;我…...

uniapp如何i18n国际化

1、正常情况下项目在代码生成的时候就已经有i18n的相关依赖&#xff0c;如果没有可以自行使用如下命令下载&#xff1a; npm install vue-i18n --save 2、创建相关文件 en文件下&#xff1a; zh文件下&#xff1a; index文件下&#xff1a; 3、在main.js中注册&#xff1a…...

C++__day1

1、思维导图 2、如果登录失败&#xff0c;提示用户登录失败信息&#xff0c;并且提示错误几次&#xff0c;且重新输入&#xff1b;如果输入错误三次&#xff0c;则退出系统 #include <iostream> using namespace std;int main() {string id , pswd;string user"admi…...

Emacs进阶之插入时间信息(一百六十三)

简介&#xff1a; CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布&#xff1a;《Android系统多媒体进阶实战》&#x1f680; 优质专栏&#xff1a; Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 优质专栏&#xff1a; 多媒体系统工程师系列【…...

Java线程池:ThreadPoolExecutor原理解析

一、线程池的基本概念 1.1 线程池的定义 线程池是一组预先创建的线程&#xff0c;这些线程可以重复使用来执行多个任务&#xff0c;避免了频繁创建和销毁线程的开销。线程池的核心思想是通过复用一组工作线程&#xff0c;来处理大量的并发任务&#xff0c;减少系统资源消耗&a…...

二叉树、哈夫曼报文大全

1、泛型链树 #include <iostream> #include<Windows.h> #include<string> #include<stack> #include<queue> using namespace std; void menu() {cout << "**********" << endl;cout << "-1.添加" <&…...

NotePad++中安装XML Tools插件

一、概述 作为开发人员&#xff0c;日常开发中大部的数据是标准的json格式&#xff0c;但是对于一些古老的应用&#xff0c;例如webservice接口&#xff0c;由于其响应结果是xml&#xff0c;那么我们拿到xml格式的数据后&#xff0c;常常会对其进行格式化&#xff0c;以便阅读。…...

聊天服务器(7)数据模块

目录 Mysql数据库代码封装头文件与源文件 Mysql数据库代码封装 业务层代码不要直接写数据库&#xff0c;因为业务层和数据层的代码逻辑也想完全区分开。万一不想存储mysql&#xff0c;想存redis的话&#xff0c;就要改动大量业务代码。解耦合就是改起来很方便。 首先需要安装m…...

VS2022编译32位OpenCV

使用环境 Visual Studio 2022 OpenCV: 4.7.0 cmake: 3.30.2一、使用CMake工具生成vs2022的openCV工程解决方案 打开cmake&#xff0c;选择opencv的源代码目录&#xff0c;创建一个文件夹&#xff0c;作为VS工程文件的生成目录 点击configure构建项目&#xff0c;弹出构建设置…...

WP网站如何增加文章/页面的自定义模板

通过Wordpress我们后台在发布文章或者页面的时候其实可以看到有些主题 他有选择使用的页面模板&#xff0c;可以自定义模板&#xff0c;但是有些主题却没有选择主题这个功能&#xff0c;那这个自定义模板的功能是如何实现的呢&#xff1f;以下分两种情况&#xff1a;Page页面和…...

【Linux网络编程】简单的UDP网络程序

目录 一&#xff0c;socket编程的相关说明 1-1&#xff0c;sockaddr结构体 1-2&#xff0c;Socket API 二&#xff0c;基于Udp协议的简单通信 一&#xff0c;socket编程的相关说明 Socket编程是一种网络通信编程技术&#xff0c;它允许两个或多个程序在网络上相互通信&…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

PHP和Node.js哪个更爽?

先说结论&#xff0c;rust完胜。 php&#xff1a;laravel&#xff0c;swoole&#xff0c;webman&#xff0c;最开始在苏宁的时候写了几年php&#xff0c;当时觉得php真的是世界上最好的语言&#xff0c;因为当初活在舒适圈里&#xff0c;不愿意跳出来&#xff0c;就好比当初活在…...

java 实现excel文件转pdf | 无水印 | 无限制

文章目录 目录 文章目录 前言 1.项目远程仓库配置 2.pom文件引入相关依赖 3.代码破解 二、Excel转PDF 1.代码实现 2.Aspose.License.xml 授权文件 总结 前言 java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南

&#x1f680; C extern 关键字深度解析&#xff1a;跨文件编程的终极指南 &#x1f4c5; 更新时间&#xff1a;2025年6月5日 &#x1f3f7;️ 标签&#xff1a;C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言&#x1f525;一、extern 是什么&#xff1f;&…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

【笔记】WSL 中 Rust 安装与测试完整记录

#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统&#xff1a;Ubuntu 24.04 LTS (WSL2)架构&#xff1a;x86_64 (GNU/Linux)Rust 版本&#xff1a;rustc 1.87.0 (2025-05-09)Cargo 版本&#xff1a;cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...

破解路内监管盲区:免布线低位视频桩重塑停车管理新标准

城市路内停车管理常因行道树遮挡、高位设备盲区等问题&#xff0c;导致车牌识别率低、逃费率高&#xff0c;传统模式在复杂路段束手无策。免布线低位视频桩凭借超低视角部署与智能算法&#xff0c;正成为破局关键。该设备安装于车位侧方0.5-0.7米高度&#xff0c;直接规避树枝遮…...

云安全与网络安全:核心区别与协同作用解析

在数字化转型的浪潮中&#xff0c;云安全与网络安全作为信息安全的两大支柱&#xff0c;常被混淆但本质不同。本文将从概念、责任分工、技术手段、威胁类型等维度深入解析两者的差异&#xff0c;并探讨它们的协同作用。 一、核心区别 定义与范围 网络安全&#xff1a;聚焦于保…...

【把数组变成一棵树】有序数组秒变平衡BST,原来可以这么优雅!

【把数组变成一棵树】有序数组秒变平衡BST,原来可以这么优雅! 🌱 前言:一棵树的浪漫,从数组开始说起 程序员的世界里,数组是最常见的基本结构之一,几乎每种语言、每种算法都少不了它。可你有没有想过,一组看似“线性排列”的有序数组,竟然可以**“长”成一棵平衡的二…...