RabbitMQ不完整的笔记
同步的不足
1、拓展性差,当要添加功能时,需要在原来的功能代码上做修改,高耦合。
2、性能下降,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行
3、级联失败,由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚(回滚的范围取决于自己的设定)
而通过RabbitMQ就能解决上述问题,因为其是异步调用
安装 安装docker后,使用docker拉取RabbitMQ的镜像,进行部署
相关概念
publisher: 生产者,也就是发送消息的一方
consumer: 消费者,也就是消费消息的一方
queue: 队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
exchange: 交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列
virtual host: 虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
(与MySQL中的不同数据库相似)
SpringAMQP
RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互,而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP
提供的三个功能:
自动声明队列、交换机及其绑定关系基于注解的监听器模式,异步接收消息封装了RabbitTemplate工具,用于发送消息
案例入门
采用的方案结合注解的方式没有使用控制台。方便快速在Spring项目中开发
publisher发送消息到交换机,交换机发送消息到队列,consumer发送接受队列中的消息
导入AMQP依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
WorkQueues模型
让多个消费者绑定到一个队列,共同消费队列中的消息
有什么好处?
消息的处理速度就能提高
最佳实践
**消息发送到队列,模拟大量消息堆积的队列
/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
消息接受
两个消费者接收队列中的消息,接受者1每20ms接收一个,消费者2每200ms接受一个
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}
两个消息接收者都设置了Thead.sleep,模拟任务耗时:
消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
消费者2 sleep了200毫秒,相当于每秒处理5个消息
可是实际测试结果是消费者1和消费者2竟然每人消费了25条消息:
消费者1很快完成了自己的25条消息,消费者2却在缓慢的处理自己的25条消息。
出现这种现象表明队列对于消息的分配并没有考虑到每个消费者的实际能力
优化配置
在yml文件中配置prefetch: 1 (prefetch: 1
表示每个消费者每次只能从队列中预取1个消息,消费完就能拿下一次,不需要等轮询。它可以帮助保证每个消息在被消费者处理时都能得到较为均匀的分配,避免某个消费者处理速度慢而导致其他消费者空闲的情况。)
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
交换机
作用: 在接受生产者消息的同时最重要的是如何处理消息,比如是交给所有队列还是交给某个特定的队列
Fanout: 将消息交给所有绑定到交换机的队列,就像上面的案例,默认是每个队列平均的接受消息
Direct: 基于RoutingKey(路由key)发送给订阅了消息的队列,交换机不再把消息交给每个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routing Key与消息的Routing Key完全一致,这个队列才能接收到消息
总结
Direct交换机与Fanout交换机的差异?
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
Topic交换机
在Direct交换机中它对于队列接收消息的选择性是单一的,只能被单个的Routing Key绑定,而如果在将队列绑定Key时使用通配符绑定 就能将队列同时绑定多个Key
比如:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
item.#:能够匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu
接下来正式开始编码使用RabbitMQ,直接使用注解声明队列和交换机
消息发送者
后续前端请求该方法就可以发送消息
/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "hmall.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
消息接受者
声明Direct模式的交换机和队列
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),//。如果指定名称的队列不存在,则会自动创建该队列exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),//声明了direct的交换机key = {"red", "blue"}//声明了key
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
结果就是只有listenDirectQueue1才能接收到消息
一、使用 RabbitMQ 作为中间通信层,实现不同编程语言间的通信,使用消息队列完成定时任务,保证功能可靠性
1.1、虚拟机安装RabbitMQ
首先是在虚拟机中安装配置RabbitMQ,基于docker安装,
docker pull rabbitmq
创建并运行 RabbitMQ 容器
docker run -d -p 15672:15672 -p 5672:5672 \-e RABBITMQ_DEFAULT_VHOST=my_vhost \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin \--hostname myRabbit \--name rabbitmq \rabbitmq
1.2、SpringBoot部署RabbitMQ
添加依赖spring-boot-starter-amqp
修改yaml配置
spring:#rabbitmq 配置rabbitmq:host: 192.168.79.202username: guestpassword: guest#虚拟主机virtual-host: /#端口port: 5672listener:simple:#消费者最小数量concurrency: 10#消费者最大数量max-concurrency: 10#限制消费者,每次只能处理一条消息,处理完才能继续下一条消息prefetch: 1#启动时是否默认启动容器,默认为 trueauto-startup: true#被拒绝时重新进入队列的default-requeue-rejected: truetemplate:retry:#启用消息重试机制,默认为 falseenabled: true#初始重试间隔时间initial-interval: 1000ms#重试最大次数,默认为 3 次max-attempts: 3#重试最大时间间隔,默认 10000msmax-interval: 10000ms#重试的间隔乘数,#配置 2 的话,第一次等 1s,第二次等 2s,第三次等 4smultiplier: 1#在 RabbitMQ 中,initial-interval 和 max-interval 是用于指定消息重试机制的两个参数,#它们的区别如下:#1. initial-interval(初始间隔时间):表示第一次重试的时间间隔,也就是在消息第一次处#理失败后,等待多长时间再尝试重新发送消息。这个参数的默认值是 1 秒。#2.max-interval(最大间隔时间):表示重试过程中的最大时间间隔,也就是每次重试时,#最长等待多长时间再尝试重新发送消息。这个参数的默认值是 10 秒。
1.3、消息发送者
@Autowired
private AmqpTemplate rabbitTemplate;
//这里需要创建AmapTemplate对象,以便调用convertAndSend方法
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "hmall.topic";// 消息String message = "检测到摔倒";// 发送消息rabbitTemplate.convertAndSend(exchangeName, “ " message);
}
这里传入的参数跟使用什么交换机有关系,如果使用funout交换机 就队列交换机和消息,如果使用direct交换机 就需要传入key 交换机名字 消息
在这个例子中生产者使用默认的交换机 所以需要指定队列 而不用指定key(主要是完成语言之间的通信)
1.4、python部署RabbitMQ
python中使用pika操作RabbitMQ
# coding=utf-8
### 消费者import pikauser_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
# 这样生产者和消费者就没有必要的先后启动顺序了
channel.queue_declare(queue='hello')# 回调函数
def callback(ch, method, properties, body):print('消费者收到:{}'.format(body))# channel: 包含channel的一切属性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通过 properties 传入的参数
# body: basic_publish发送的消息channel.basic_consume(queue='hello', # 接收指定queue的消息auto_ack=True, # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息on_message_callback=callback # 设置收到消息的回调函数)print('Waiting for messages. To exit press CTRL+C')# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
channel.start_consuming()
队列使用的是默认队列
1.5、整体流程
在前端界面点击 发送get请求 前端根据传过来的是1还是2 就来进行不同的活动
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MyController {@GetMapping("/example")public String exampleController(@RequestParam("param1") String param1, @RequestParam("param2") int param2) {// 处理参数并返回响应return ;}}
是1 就进行摔倒检测
后端使用RabbitMq的生产者的方法 发送消息到队列, python端接收到发送到指定队列的消息后开始调用摔倒检测 摔倒检测因为是使用的yolov8所以没有在本机运行,调用阿里云的服务
持续检测阿里云返回的是什么 如果返回的是摔倒了,就进行后续的活动
是2就通知python进行人脸检测安防
实现了python和java的通信之后再来说说如何实现的定时任务的
然后在ServiceImpl填写具体的方法 使用RabbitMQ发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE, "ttl.test", orderInfo)
发送给ttl交换机 这个交换机绑定了ttl队列,同时ttl队列绑定了死信交换机,这个死信交换机绑定了死信队列
有由于这个ttl队列设置了过期时间,所以过期时间到后,消息就会到死信交换机
死信队列监听到之后就会开始处理,这样就能确保定时任务中不会确保定时没有成功的情况
RabbitMQ如何保证消息不丢失
生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功(避免了消息在发送交换机或者发送到队列丢失的情况)
MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。
当内存出问题还有磁盘兜底
通过交换机持久化,队列持久化,消息持久化
消费者确认
RabbitMQ支持消费者确认机制,消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息
最好采用auto的确认模式 即 自动ack 由spring检测istener代码是否出现异常,没有异常就返回ack,抛出异常就返回nack
如果消费者接受消息失败,也可以利用Spring的retry机制,在消费者出现异常时利用本地重试,设置重试次数,当次数达到以后,如果消息依然失败将消息投递到异常交换机,交由人工处理
RabbitMQ消息的重复消费问题
出现的原因是网络抖动等,消费者处理消息后因为网络问题没能成功发送确认给MQ,导致Spring的重试机制,就重复消费了消息
解决方案:
每条消息设置一个唯一的标识id
幂等方案 ,可以加锁
死信交换机(延迟队列)
延迟队列:进入队列的消息会被延迟消费的队列
场景:超时订单、限时优惠、定时发布
延迟队列=死信交换机+TTL
TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:
消息所在的队列设置了存活时间
消息本身设置了存活时间
延迟队列插件
可以使用DelayExchange插件
DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。
具体怎么使用
如果有100万消息堆积在MQ,如何解决
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题
增加更多消费者,提高消费速度
在消费者内开启线程池加快消息处理速度
扩大队列容积,提高堆积上限
惰性队列的特征如下:
接收到消息后直接存入磁盘而非内存
消费者要消费消息时才会从磁盘中读取并加载到内存
支持数百万条的消息存储
配置的方式添加
注解的方式添加
RabbitMQ高可用机制
在生产环境下,使用集群来保证高可用性
普通集群、镜像集群、仲裁队列
暂时不看
相关文章:

RabbitMQ不完整的笔记
同步的不足 1、拓展性差,当要添加功能时,需要在原来的功能代码上做修改,高耦合。 2、性能下降,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行 3、级联失败,由于我们是基于OpenFeign调用交易…...

微软Edge浏览器深度解析:功能、同步、隐私与安全
微软Edge浏览器是微软公司开发的一款网页浏览器,它基于Chromium内核,提供了快速、安全和兼容性良好的网页浏览体验。以下是关于微软Edge浏览器的详细信息和使用指南: 微软Edge浏览器的主要特点: 1. 基于Chromium内核: 渲染引擎:Chromium内核是基于开源项目Blink的,它…...

网络性能测试工具:iperf3介绍
文章目录 前言一、iperf3 的安装和使用下载和安装参数说明 二、iperf3 测试服务端启动客户端启动服务端输出反向测试客户端服务端 前言 新接触的网络环境如何评估网络带宽和吞吐量呢,有的项目没有对业务流量进行合理规划,服务或者中间件出口带宽经常有被…...

scp:Linux系统本地与远程文件传输命令
scp 是Linux系统中用于在本地主机和远程主机之间进行文件传输的命令。 详细说明: scp 命令用于安全地将文件从一个主机传输到另一个主机,所有传输数据都是加密的。语法: scp [参数] [源文件路径] [目标主机:目标路径] 参数说明:…...

python基础(习题、资料)
免费提取资料: 练习、资料免费提取。持续更新迅雷云盘https://pan.xunlei.com/s/VNz6kH1EXQtK8j-wwwz_c0k8A1?pwdrj2x# 本文为Python的进阶知识合辑,包括列表(List)、元组(Tuple)、字典(Dic…...

shell脚本免交互
shell脚本的编写一方面为了减少我们命令的输入,另一方面也可以进行简单的自动化运行,其中为了实现自动化过程,一个很重要的点就是免交互,本篇文章跟大家简单分享两个常用的免交互的方法。 Here Document Here document 通过内联重…...

WPF学习笔记:给文字添加线性渐变效果
<TextBox Text"XXX信息管理系统" VerticalAlignment"Center" Background"Transparent" HorizontalAlignment"Center" FontSize"35" FontWeight"Normal"> <TextBox.Effect> <…...

Fully Convolutional Networks for Semantic Segmentation--论文笔记
论文笔记 资料 1.代码地址 2.论文地址 https://arxiv.org/abs/1411.4038 3.数据集地址 论文摘要的翻译 卷积网络是强大的视觉模型,可以产生特征层次结构。我们表明,卷积网络本身,经过端到端,像素对像素的训练,在…...

Camworks编程怎么样:深度解析其四大特点、五大应用领域、六大优势与七大挑战
Camworks编程怎么样:深度解析其四大特点、五大应用领域、六大优势与七大挑战 Camworks编程,作为计算机辅助制造(CAM)领域的一款重要软件,近年来在制造业中得到了广泛的应用。那么,Camworks编程究竟怎么样呢…...

【Linux】操作系统之冯诺依曼体系
🎉博主首页: 有趣的中国人 🎉专栏首页: Linux 🎉其它专栏: C初阶 | C进阶 | 初阶数据结构 小伙伴们大家好,本片文章将会讲解 操作系统中 冯诺依曼体系 的相关内容。 如果看到最后您觉得这篇文…...

c++ QT 实现QMediaPlayer播放音频显示音频级别指示器
文章目录 效果图概述代码总结 效果图 概述 QMediaPlayer就不介绍了,就提供了一个用于播放音频和视频的媒体播放器 QAudioProbe 它提供了一个探针,用于监控音频流。当音频流被捕获或播放时,QAudioProbe 可以接收到音频数据。这个类在需要访问…...

失之毫厘差之千里之load和loads
起源 最近在读pandas库的一些文档的时候,顺便也会将文档上的一些demo在编辑器中进行运行测试,其中在读到pandas处理Json数据这一节的时候,我还是像往常一样,将文档提供的demo写一遍,结果在运行的时候,直接…...

element ui在移动端的适配问题
element ui在移动端的适配问题 问题1: 给el-table表头添加背景色,使用以下代码 :header-row-style“{ background: ‘linear-gradient(90deg, #0079FA 0%, #00C7DD 100%)’ }” 在安卓手机上显示正常,在ios手机上显示背景色添加到每一个th中…...

堆排序详细理解
目录 一、前备知识 二、建堆 2.2.1 向上调整算法建堆 2.2.2 向下调整算法建堆 三、排序 3.1 常见问题 3.2 思路 3.3 源码 一、前备知识 详细图解请点击:二叉树的顺序实现-堆-CSDN博客 本文只附上向上/向下调整算法的源码 //交换 void Swap(int* p, int* …...

RK3588+FPGA+AI高性能边缘计算盒子,应用于视频分析、图像视觉等
搭载RK3588(四核 A76四核 A55),CPU主频高达 2.4GHz ,提供1MB L2 Cache 和 3MB L3 ,Cache提供更强的 CPU运算能力,具备6T AI算力,可扩展至38T算力。 产品规格 系统主控CPURK3588,四核…...

07-操作元素(键盘和鼠标事件)
在前面的文章中重点介绍了一些元素的定位方法,定位到元素后,就需要操作元素了。本篇总结了web页面常用的一些操作元素方法,可以统称为行为事件。 一、简单操作 点击按钮(鼠标左键):click()清空输入框&…...

3389,为了保障3389端口的安全,我们可以采取的措施
3389端口,作为远程桌面协议(RDP)的默认端口,广泛应用于Windows操作系统中,以实现远程管理和控制功能。然而,正因为其广泛使用,3389端口也成为许多潜在安全威胁的入口。因此,确保3389…...

Java集合【超详细】2 -- Map、可变参数、Collections类
文章目录 一、Map集合1.1 Map集合概述和特点【理解】1.2 Map集合的基本功能【应用】1.3 Map集合的获取功能【应用】1.4 Map集合的两种遍历方式 二、HashMap集合2.1 HashMap集合概述和特点【理解】2.2 HashMap的组成、构造函数2.3 put、查找方法2.4 HashMap集合应用案例【应用】…...

最佳 Mac 数据恢复:恢复 Mac 上已删除的文件
尝试过许多 Mac 数据恢复工具,但发现没有一款能达到宣传的效果?我们重点介绍最好的 Mac 数据恢复软件 没有 Mac 用户愿意担心数据丢失,但您永远不知道什么时候会发生这种情况。无论是意外删除 Mac 上的重要文件、不小心弄湿了 Mac、感染病毒…...

芋道系统,springboot+vue3+mysql实现地址的存储与显示
1.效果图 2.前端实现: <el-form-item label"地址" prop"entrepriseAddress"><el-cascaderv-model"formData.entrepriseAddress"size"large":options"region"/></el-form-item> //导入组件 im…...

【C++】C++11新特性:列表初始化、声明、新容器、右值引用、万能引用和完美转发
目录 一、列表初始化 1.1 { } 初始化 1.2 std::initializer_list 二、声明 2.1 auto 2.2 decltype 2.3 nullptr 三、新容器 四、右值引用和移动语义 4.1 左值和左值引用 4.2 右值和右值引用 4.3 左值引用与右值引用比较 4.4 右值引用使用场景和意义:移…...

【IB Protocal Serial--WQE】
IB Protocal Serial--WQE 1 Intro1.1 What1.2 IBA WQE 本系列文章介绍RDMA技术的具体实现–InfiniBand Protocal; Introduce the features, capalities,components, and elements of IBA. the principles of operation. 1 Intro 1.1 What 理解IB协议下面这三句话对…...

C++ 混合运算的类型转换
一 混合运算和隐式转换 257 整型2 浮点5 行吗?成吗?中不中? C 中允许相关的数据类型进行混合运算。 相关类型。 尽管在程序中的数据类型不同,但逻辑上进行这种运算是合理的相关类型在混合运算时会自动进行类型转换,再…...

线性时间选择
给定线性序集中n个元素和一个整数k,1≤k≤n,要求找出这n个元素中第k小的元素 #include<iostream> #include<cstdlib> #include<time.h> using namespace std; int a[100]; int Random(int left,int right) {srand(time(NULL));return …...

【对算法期中卷子的解析和反思】
一、程序阅读并回答问题(共30分) #include<cstdio>#include<cstring>#include<iostream>using namespace std;char chess[10][10];int sign[10];int n, k, ans;void dfs(int x, int k) { if (k 0){ans;return; } if (xk-1 >…...

sudo apt update sudo: apt: command not found
CentOS或RHEL(Red Hat Enterprise Linux)系统上,包管理器是yum或dnf,而不是apt。您可以使用yum或dnf来安装软件包。以下是如何在CentOS或RHEL上安装Git的详细步骤: 1. 使用yum安装Git 首先,更新软件包列表&…...

ios:文本框默认的copy、past改成中文复制粘贴
问题 ios 开发,对于输入框的一些默认文案展示,如复制粘贴是英文的,那么如何改为中文的呢 解决 按照路径找到这个文件 ios/项目/Info.plist,增加 <key>CFBundleAllowMixedLocalizations</key> <true/> <…...

Qt moc系统的黑魔法?
Qt的元对象系统(Meta-Object System)是Qt框架的核心功能之一,为C语言增加了一些动态特性,借助元对象系统Qt可以实现以下功能 信号与槽机制(Signals and Slots)运行时类型信息(Run-Time Type In…...

MyBatis开发中常用总结
文章目录 常用MyBatis参数映射单个参数多个参数使用索引【不推荐】Param注解Map传参POJO【推荐】List数组 动态标签\<if>标签\<trim>标签\<where>标签\<set>标签\<foreach>标签 MyBatis查询一对一一对多 常用MyBatis参数映射 单个参数 XML中可…...

Git基本使用教程(学习记录)
参考文章链接: Git教程(超详细,一文秒懂) RUNOOB Git教程 Git学习记录 1Git概述 1.1版本控制软件功能 版本管理:更新或回退到历史上任何版本,数据备份共享代码:团队间共享代码,…...