微服务day07
MQ高级
发送者可靠性,MQ的可靠性,消费者可靠性。
发送者可靠性
发送者重连

 
连接重试的配置文件:
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数 
关闭MQ。
运行测试用例结果,进行了三次重连。
11-12 10:15:41:975  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:43:997  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672]
11-12 10:15:46:002  INFO 23824 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.21.101:5672] 
发送者确认

总结如下:
-  
当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
 -  
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
 -  
持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
 -  
其它情况都会返回NACK,告知投递失败
 
其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启
开启发送者确认:
在publisher模块的application.yaml中添加配置:
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制 
这里publisher-confirm-type有三种模式可选:
-  
none:关闭confirm机制 -  
simple:同步阻塞等待MQ的回执 -  
correlated:MQ异步回调返回回执 
一般我们推荐使用correlated,回调机制。
定义ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

package com.itheima.publisher.config;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configuration
@Slf4j
@RequiredArgsConstructor
public class MQconfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
} 
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数

这里的CorrelationData中包含两个核心的东西:
-  
id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆 -  
SettableListenableFuture:回执结果的Future对象 
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

我们新建一个测试,向系统自带的交换机发送消息,并且添加ConfirmCallback
    @Testpublic void testObgect1() throws InterruptedException {CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {//failure出现异常时的处理逻辑,基本不会触发。log.error("发送失败",ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {//判断是否成功发送到服务器if (result.isAck()){log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});//准备Map数据Map map = new HashMap();map.put("name","jack");map.put("age",21);rabbitTemplate.convertAndSend("obgect.queue1",map,cd);//用来接受返回的值Thread.sleep(2000);} 
可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。
当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。
而如果连交换机都是错误的,则只会收到nack。
注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
路由失败:一般是因为RoutingKey错误导致,往往是编程导致
交换机名称错误:同样是编程错误导致
MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。
MQ的可靠性

数据持久化

默认情况下springamqp发送的消息就是持久化的,不需要做特殊处理。
    @Testpublic void testTopic3(){//自定义消息为非持久化Message build = MessageBuilder.withBody("这是一个大新闻啊".getBytes()).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();//修改交换机的名字为hm.topicString name = "hm.topic";//修改路由键为bluerabbitTemplate.convertAndSend(name,"china.goods",build);} 
注意:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
Lazy Queue



MQ可靠性小结

消费者可靠性
消费者确认机制


消费者失败重试策略


相关文档:
哈哈哈
业务幂等性


修改发送者代码,修改消息转换器:
    @Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;} 
修改消费者代码来获取消息id:
    @RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(Message message) throws InterruptedException {log.info("spring 消费者接收到消息:【" + message.getBody().toString() + "】");log.info("spring 消费者接收到消息id为:【" + message.getMessageProperties().getMessageId().toString() + "】");if (true) {throw new RuntimeException("故意的");}log.info("消息处理完成");} 
运行结果: 由于前面的步骤设置了报错,因此重发了3次。
11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:12:977  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:13:978  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息:【[B@6139d571】
11-12 16:20:14:979  INFO 27400 --- [ntContainer#1-1] com.itheima.consumer.mq.leatinMq         : spring 消费者接收到消息id为:【fc7c6a0c-06cb-4a7e-b158-91d1675dd83b】
11-12 16:20:14:985  WARN 27400 --- [ntContainer#1-1] o.s.a.r.retry.RepublishMessageRecoverer  : Republishing failed message to exchange 'error.direct' with routing key error 

只需要修改消费者的处理逻辑部分即可:
package com.hmall.trade.listener;import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class Orderlisten {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.topic"),key = "pay.success"))public void listenPaySuccess(Long orderId){//1、根据id获取订单信息Order order = orderService.getById(orderId);//判断订单状态,只有待支付才修改if (order == null || order.getStatus() != 1){//条件错误直接结束return;}orderService.markOrderPaySuccess(orderId);}
}
 
小结,小小面试题

延迟消息

死信交换机


消息的消费者代码:
创建普通交换机和信息队列,将消息队列的死信绑定死信交换机
dleConfigtasion.classpackage com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class dleConfigrasion {//交给Bean注解来进行处理//创建交换机@Beanpublic DirectExchange ttlExchange(){//参数:交换机名称,是否持久化,是否自动删除,持久化默认为开启(持久化就是是否保存到磁盘)return ExchangeBuilder.directExchange("ttl.direct").build();}//创建消息队列@Beanpublic Queue ttltQueue(){
//        return new Queue("fanout.queue1");//使用build来创建消息队列//指定该队列的死信交换机return QueueBuilder.durable("ttl.queue").deadLetterExchange("dlt.direct").build();}// 绑定队列和交换机@Beanpublic Binding bindingfanoutQueue1red(Queue ttltQueue, DirectExchange ttlExchange){return BindingBuilder.bind(ttltQueue).to(ttlExchange).with("hi");}}
 
死信交换机和消息队列:
leatinMq.class   @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dlt.queue"),exchange = @Exchange(name = "dlt.direct",type = ExchangeTypes.DIRECT),key = {"hi"}))public void Directlisten1dlt(String msg){System.err.println("消费者接收到队列dlt.direct的消息:"+msg+"_"+ LocalTime.now());} 
消息发送方:需要指定消息的存活时间:
    @Testpublic void testTopic(){//修改路由键为bluerabbitTemplate.convertAndSend("ttl.direct", "hi", "这是一个大新闻啊",new MessagePostProcessor() {//可以对生成的message进行处理@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置过期时间为10秒钟message.getMessageProperties().setExpiration("1000");return message;}});} 
代码结束。
延时插件:延时插件下载地址
插件官方文档:延时插件官方文档
详细操作文档: 黑马文档
取消超时订单
相关文章:
微服务day07
MQ高级 发送者可靠性,MQ的可靠性,消费者可靠性。 发送者可靠性 发送者重连 连接重试的配置文件: spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 10…...
芯原科技嵌入式面试题及参考答案
Linux 相关驱动怎么写? 在 Linux 中编写驱动主要有以下步骤。 首先,需要了解设备的硬件特性。这包括设备的工作原理、寄存器地址和功能、中断号等信息。例如,对于一个简单的 GPIO 设备,要知道其数据寄存器、方向寄存器的位置以及读写操作的规则。 然后是模块的初始化部分。…...
二叉树Golang
二叉树 前言 完全二叉树 最底层节点按顺序从左到右排列。 满二叉树 一颗二叉树只有0度和2度的节点。 二叉搜索树 左子树上的所有节点的值均小于根节点的值。右子树上的所有节点的值均大于根节点的值。 平衡二叉搜索树 左右两个子树的高度差的绝对值不超过1 。 二叉树的存储…...
通过css的哪些方式可以实现隐藏页面上的元素?
1:opacity:0 通过将元素的透明度设置为o,实现隐藏效果,但是依然会占用空间并可以进行交互。 2:visibility:hidden 与透明度度为0的方案类似,会占据空间,但不可以进行交互。 3:Overflow:hi…...
微信小程序 === 使用腾讯地图选点
目录 插件介绍 接入指引 相关参数说明 插件错误处理 效果图 permission 插件的作用 添加插件 引入插件代码包 使用插件 页面 js 接口 插件介绍 腾讯位置服务地图选点插件 可以让用户快速、准确地选择并确认自己的当前位置,并将相关位置信息回传给开发者。…...
Redis高可用-Cluster(集群)
Redis cluster cluster 为无中心,分布式 sharding,高可用技术架构。 在哨兵 sentinel 机制中,可以解决 redis 高可用的问题,即当 master 故障后可以自动将 slave 提升为 master 从而可以保证 redis 服务的正常使用。 但是无法解…...
Spring Boot编程训练系统:数据管理与存储
摘要 随着信息技术在管理上越来越深入而广泛的应用,管理信息系统的实施在技术上已逐步成熟。本文介绍了编程训练系统的开发全过程。通过分析编程训练系统管理的不足,创建了一个计算机管理编程训练系统的方案。文章介绍了编程训练系统的系统分析部分&…...
报告解读 | 创意经济2024:如何在变革中抢占先机?
在科技飞速发展的今天,创意行业正面临前所未有的变化。《Skillshare Trendshare 2024》报告揭示了多项趋势,为创意人士提供了深刻的洞察和实用的建议。本文将为您详细解读这些趋势,助您在创意领域脱颖而出。 1. 人工智能(AI&…...
Flume1.9.0自定义Sink组件将数据发送至Mysql
需求 1、将Flume采集到的日志数据也同步保存到MySQL中一份,但是Flume目前不支持直接向MySQL中写数据,所以需要用到自定义Sink,自定义一个MysqlSink。 2、日志数据默认在Linux本地的/data/log/user.log日志文件中,使用Flume采集到…...
如何在 Ubuntu 24.04 上安装和配置 Fail2ban ?
确保你的 Ubuntu 24.04 服务器的安全是至关重要的,特别是如果它暴露在互联网上。一个常见的威胁是未经授权的访问尝试,特别是通过 SSH。Fail2ban 是一个强大的工具,可以通过自动阻止可疑活动来帮助保护您的服务器。 在本指南中,我…...
uniapp如何i18n国际化
1、正常情况下项目在代码生成的时候就已经有i18n的相关依赖,如果没有可以自行使用如下命令下载: npm install vue-i18n --save 2、创建相关文件 en文件下: zh文件下: index文件下: 3、在main.js中注册:…...
C++__day1
1、思维导图 2、如果登录失败,提示用户登录失败信息,并且提示错误几次,且重新输入;如果输入错误三次,则退出系统 #include <iostream> using namespace std;int main() {string id , pswd;string user"admi…...
Emacs进阶之插入时间信息(一百六十三)
简介: CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【…...
Java线程池:ThreadPoolExecutor原理解析
一、线程池的基本概念 1.1 线程池的定义 线程池是一组预先创建的线程,这些线程可以重复使用来执行多个任务,避免了频繁创建和销毁线程的开销。线程池的核心思想是通过复用一组工作线程,来处理大量的并发任务,减少系统资源消耗&a…...
二叉树、哈夫曼报文大全
1、泛型链树 #include <iostream> #include<Windows.h> #include<string> #include<stack> #include<queue> using namespace std; void menu() {cout << "**********" << endl;cout << "-1.添加" <&…...
NotePad++中安装XML Tools插件
一、概述 作为开发人员,日常开发中大部的数据是标准的json格式,但是对于一些古老的应用,例如webservice接口,由于其响应结果是xml,那么我们拿到xml格式的数据后,常常会对其进行格式化,以便阅读。…...
聊天服务器(7)数据模块
目录 Mysql数据库代码封装头文件与源文件 Mysql数据库代码封装 业务层代码不要直接写数据库,因为业务层和数据层的代码逻辑也想完全区分开。万一不想存储mysql,想存redis的话,就要改动大量业务代码。解耦合就是改起来很方便。 首先需要安装m…...
VS2022编译32位OpenCV
使用环境 Visual Studio 2022 OpenCV: 4.7.0 cmake: 3.30.2一、使用CMake工具生成vs2022的openCV工程解决方案 打开cmake,选择opencv的源代码目录,创建一个文件夹,作为VS工程文件的生成目录 点击configure构建项目,弹出构建设置…...
WP网站如何增加文章/页面的自定义模板
通过Wordpress我们后台在发布文章或者页面的时候其实可以看到有些主题 他有选择使用的页面模板,可以自定义模板,但是有些主题却没有选择主题这个功能,那这个自定义模板的功能是如何实现的呢?以下分两种情况:Page页面和…...
【Linux网络编程】简单的UDP网络程序
目录 一,socket编程的相关说明 1-1,sockaddr结构体 1-2,Socket API 二,基于Udp协议的简单通信 一,socket编程的相关说明 Socket编程是一种网络通信编程技术,它允许两个或多个程序在网络上相互通信&…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
【无标题】湖北理元理律师事务所:债务优化中的生活保障与法律平衡之道
文/法律实务观察组 在债务重组领域,专业机构的核心价值不仅在于减轻债务数字,更在于帮助债务人在履行义务的同时维持基本生活尊严。湖北理元理律师事务所的服务实践表明,合法债务优化需同步实现三重平衡: 法律刚性(债…...
实战设计模式之模板方法模式
概述 模板方法模式定义了一个操作中的算法骨架,并将某些步骤延迟到子类中实现。模板方法使得子类可以在不改变算法结构的前提下,重新定义算法中的某些步骤。简单来说,就是在一个方法中定义了要执行的步骤顺序或算法框架,但允许子类…...
C++_哈希表
本篇文章是对C学习的哈希表部分的学习分享 相信一定会对你有所帮助~ 那咱们废话不多说,直接开始吧! 一、基础概念 1. 哈希核心思想: 哈希函数的作用:通过此函数建立一个Key与存储位置之间的映射关系。理想目标:实现…...
基于Uniapp的HarmonyOS 5.0体育应用开发攻略
一、技术架构设计 1.混合开发框架选型 (1)使用Uniapp 3.8版本支持ArkTS编译 (2)通过uni-harmony插件调用原生能力 (3)分层架构设计: graph TDA[UI层] -->|Vue语法| B(Uniapp框架)B --&g…...
React与原生事件:核心差异与性能对比解析
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storms…...
Unity基础-Mathf相关
Unity基础-Mathf相关 一、Mathf数学工具 概述 Mathf是Unity中封装好用于数学计算的工具结构体,提供了丰富的数学计算方法,特别适用于游戏开发场景。它是Unity开发中最常用的数学工具之一,能够帮助我们处理各种数学计算和插值运算。 Mathf…...
