SpringCloud学习路线(9)——服务异步通讯RabbitMQ
一、初见MQ
(一)什么是MQ?
MQ(MessageQueue),意思是消息队列,也就是事件驱动架构中的Broker。
(二)同步调用
1、概念: 同步调用是指,某一服务需要多个服务共同参与,但多个服务之间有一定的执行顺序,当每一个服务都需要等待前面一个服务完成才能继续执行。
2、存在的问题
- 耦合度高: 新需求需要改动原代码
- 性能下降: 调用者需要等待服务提供者相应,如果调用链过长则响应时间等于每次调用的时间之和。
- 资源浪费: 调用链的每个服务在等待响应过程中,不会释放请求资源,高并发场景下会浪费系统资源。
- 级联失败: 若服务提供者出现宕机,所有调用者都会因故障而导致整个服务集群故障。
(三)异步调用
1、实现模式: 异步调用常见实现的就是事件驱动模式。
2、事件驱动的优势
- 服务解耦: 只需要将请求交付给事件管理器进行管理即可完成服务。
- 性能提升: 与客户交互的服务短时间就能完成,并不需要等待后续服务完成。
- 服务弱依赖: 其它服务宕机不影响服务集群的使用
- 流量缓冲: 事件管理器通过任务队列的方式,使得订阅的服务按照自身速度进行执行。
3、事件驱动的缺点
- 高度依赖Broker的可靠性、安全性、吞吐能力
- 架构复杂时,业务没有明显的流程线,不便于跟踪管理
(四)MQ常见框架
RabbitMQ(中小企业) | ActiveMQ | RocketMQ(大型企业) | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | Alibaba | Apache |
开发语言 | Erlang | Java | Java | Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 极高 |
消息延迟 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠 | 高 | 一般 | 高 | 一般 |
二、使用MQ
(一)RabbitMQ概述
RqbbitMQ是基于Erlang语言开发的开源消息通讯中间件,官方地址:https://rabbitmq.com/
(二)安装MQ
docker pull rabbitmq:3-management
(三)运行RabbitMQ
#配置 MQ的用户名和密码,容器名和主机名,端口,镜像名 ,注意:15672端口是MQ的控制台访问端口,5672是对外暴露的消息通信端口
docker run -e RABBITMQ_DEFAULT_USER=xxx -e RABBITMQ_DEFAULT_PASS=xxxx --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
访问MQ的控制台
(4)RabbitMQ的整体结构
(5)RabbitMQ中的几个概念
- channel: 操作MQ的工具
- exchange: 路由消息到队列
- queue: 缓存消息
- Virtual Host: 虚拟主机,是对queue,exchange等资源进行逻辑分组
(6)常见的MQ模型
- 基本消息队列(BasicQueue): Publisher —1:1— Queue —1:1— Customer
- 工作消息队列(WorkQueue): Publisher —1:1— Queue —1:n— Customer
- 发布/订阅(Publish、Subscribe): 根据交换机类型又有三种模型
- Fanout Exchange: 广播,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
- Direct Exchange: 路由,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
- Topic Exchange: 主题,
- RPC
- 发布者确认
第一种:基本消息队列的基本使用
包含三种角色:publisher、queue、consumer
- publisher: 消费发布者,将消息发布到队列queue
- queue: 消息队列,负责接受并缓存消息
- consumer: 订阅队列,处理队列中的消息
收发消息的过程: 获取连接 》 建立通信通道 》 创建消息队列 》 收发消息 》 释放资源
1、publisher和consumer引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId>
</dependency>
2、Publisher创建发送消息通道
@SpringBootTest
class PublisherApplicationTests {@Testvoid testSendMessage() throws IOException, TimeoutException {
// 1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接参数connectionFactory.setHost("192.168.92.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");
// 3、建立连接Connection connection = connectionFactory.newConnection();
// 4、建立通信通道ChannelChannel channel = connection.createChannel();
// 5、创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);
// 6、发送信息String message = "hello,rabbitmq!";channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("发送消息成功:【"+message+"】");
// 7、关闭通道和连接channel.close();connection.close();}
}
2、Consumer创建订阅通道
class ConsumerApplicationTests {public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();// 2、设置连接参数connectionFactory.setHost("192.168.92.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");
// 3、建立连接Connection connection = connectionFactory.newConnection();
// 4、建立通信通道ChannelChannel channel = connection.createChannel();
// 5、创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);
// 6、订阅消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 7、处理消息String message = new String(body);System.out.println("接收到消息:【"+message+"】");}});System.out.println("等待接收消息....");}
}
第二种:Work Queue 工作队列
与基本队列的区别在于,它能使用多个订阅队列进行高效的处理请求。(因为一个订阅队列的处理速度是有限的)
使用过程与基本队列几乎一致,只是开启了多个订阅队列。
在使用过程中我们会发现,多个订阅队列对任务的分配是平均的,这就是预取机制。
我们需要的是快速处理的订阅队列获取更多的请求,慢速处理的订阅队列获取少量的请求,它如何实现呢?
通过修改配置文件,设置一个 preFetch 值。
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672 #端口virtual-host: / #虚拟主机username: root #用户名password: root #密码listener:simple:prefetch: 1 # 每次取 1 个请求,处理完才能取下一个。
第三种:FanoutQueue 广播消息队列
SpringAMQP提供声明交换机、队列、绑定关系的API
主要使用的是Exchange.FanoutExchange类。
实现思路:
1、在consumer服务,声明队列,交换机,并将两者绑定。
@Configuration
public class FanoutConfig{//交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("com.fanout");}//队列@Beanpublic Queue fanoutQueue1(){return new Queue("com.queue1");}//绑定关系@Beanpublic Binding bindingQueue(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}//...以相同方式声明第2个队列,并完成绑定}
2、在consumer服务,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@Component
public class SpringRabbitListener {@RabbitListener(queues = "com.queue1")public void listenFanoutQueue1(String msg) throws InterruptedException {//...处理结果}@RabbitListener(queues = "com.queue2")public void listenFanoutQueue2(String msg) throws InterruptedException {//...处理结果}
}
3、在publisher编写测试方法,向交换机发送信息
@Test
public void sendFanoutExchange() {//1、交换机String exchangeName = "com.fanout";//2、消息String message = "Hello Fanout";//3、发送消息rabbitTemplate.covertAndSend(exchangeName, "", message);
}
第四种:路由信息队列
路由模式的流程: 即设置密钥的绑定关系,只有携带相应的密钥才能进入相应的队列
- 每一个 Queue 与 Exchange 设置一个 BindingKey
- 发布者发送消息时,需要指定消息的 RoutingKey
- Exchange根据消息路由到 BindingKey 与 RoutingKey 一致的队列
实现思路:
1、利用 @RabbitListener 声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","blue"}))
public void listenRoutingQueue1(String msg) throws InterruptedException {//...处理结果
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","green"}))
public void listenRoutingQueue2(String msg) throws InterruptedException {//...处理结果
2、发送消息实现
//指定队列处理
@Test
public void sendRoutingExchange1(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,RoutingMQ";//发送消息rabbitTemplate.covertAndSend(exchangeName, "blue", message);
}//多队列处理
@Test
public void sendRoutingExchange2(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,RoutingMQ";//发送消息rabbitTemplate.covertAndSend(exchangeName, "red", message);
}
第五种:主题信息队列(通配key)
TopicExchange 与 DirectExchange 的区别: routingkey必须是多个单词的列表,并且以,
分割。并且Queue与Exchange指定的BindingKey时可使用通配符:
- **#:**代指 0 / n 个单词
- *: 代指一个单词
实现思路:
1、通过 @RabbitListener 声明Exchange、Queue、RoutingKey
@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue1"), key = {"china.#"}))
public void listenTopicQueue1(String msg) {//处理代码....
}@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue2"), key = {"#.news"}))
public void listenTopicQueue2(String msg) {//处理代码....
}
2、在publisher服务中,向交换机发送消息
@Test
public void sendTopicMessage(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,Topic";rabbitTemplate.convertAndSend(exchangeName,"china.call",message);
}
四、SpringAMQP
(一)概念
- AMQP: Advanced Message Queuing Protocol 传递消息队列协议,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言及平台无关,更符合为服务中独立性的要求。
- Spring AMQP: Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。其中 spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
(二)实现基础消息队列
1、引入spring-amqp依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、publisher服务中利用RabbitTemplate发送消息到任务队列
- 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root
- 编写发送方法
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendMessage(){String queueName = "simple.queue";String message = "Hello World";rabbitTemplate.convertAndSend(queueName,message);}
3、在consumer服务中编写消费逻辑,绑定simple.queue队列
- 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root
- 编写发送方法1
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void getMessage(){String queueName = "simple.queue";// receive 表示接收方法,接收到的信息会封装到Message,可以看receive的返回值Message message = rabbitTemplate.receive(queueName);// Message.getBody 是 byte[]System.out.println(new String(message.getBody()));}
- 编写发送方法2
- 创建一个监听类
// 注册成 Bean 对象
@Component
public class SpringRabbitListener {// 监听器注释,queues = 订阅队列,并将返回值注入参数列表中 @RabbitListener(queues = "simple.queue")public void ListenSimpleQueueMessage(String msg){System.out.println("Spring 消费者接收到消息:【" + msg + "】");}
}
(三)消息转换器
为了让我们能够自由识别consumer发送的消息,则需要使用的是消息转换器。
消息转换器如何使用?
Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理,默认实现的是SimpleMessageConverter,基于ObjectObjectOutputStream完成序列化。
我们只需要定义一个 MessageConverter 类型的Bean即可,推荐使用JSON序列化
1、publisher引入依赖
<!-- 接收消息需要使用jackson的转换依赖 -->
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency><!-- 发送消息需要使用jackson的核心依赖 -->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
2、publisher启动类,声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
3、consumer启动类,声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
4、监听队列消息
@RabbitListener(queues = "object.queue")
public void listenObjectMessage(Object msg) {//处理数据....
}
相关文章:

SpringCloud学习路线(9)——服务异步通讯RabbitMQ
一、初见MQ (一)什么是MQ? MQ(MessageQueue),意思是消息队列,也就是事件驱动架构中的Broker。 (二)同步调用 1、概念: 同步调用是指,某一服务…...

postcss-pxtorem适配插件动态配置rootValue(根据文件路径名称,动态改变vue.config里配置的值)
项目背景:一个项目里有两个分辨率的设计稿(1920和2400),不能拆开来打包 参考: 是参考vant插件:移动端Vant组件库rem适配下大小异常的解决方案:https://github.com/youzan/vant/issues/1181 说明: 因为vue.c…...

代码随想录算法训练营第二十三天 | 额外题目系列
额外题目 1365. 有多少小于当前数字的数字借着本题,学习一下各种排序未看解答自己编写的青春版重点代码随想录的代码我的代码(当天晚上理解后自己编写) 941.有效的山脉数组未看解答自己编写的青春版重点代码随想录的代码我的代码(当天晚上理解后自己编写) 1207. 独一…...
UiAutomator
运行Espresso和UI Automator测试时要使用模拟器。国内手机的ROM大多进行过修改,可能加入很多限制,导致测试无法正常运行。 Espresso只支持一个活动内部交互行为的测试。跨越多个活动、多个应用的场景需要使用UI Automator。使用Espresso和UI Automator的…...
stm32标准库开发常用函数的使用和代码说明
文章目录 GPIO(General Purpose Input/Output)NVIC(Nested Vectored Interrupt Controller)DMA(Direct Memory Access)USART(Universal Synchronous/Asynchronous Receiver/Transmitter…...

有关合泰BA45F5260中断的思考
最近看前辈写的代码,发现这样一段代码: #ifdef SUPPORT_RF_NET_FUNCTION if(UART_INT_is_L()) { TmrInsertTimer(eTmrHdlUartRxDelay,TMR_PERIOD(2000),NULL); break; } #endif 其中UART_INT_is_L&am…...
Numpy-算数函数与数学函数
⛳算数函数 如果参与运算的两个对象都是ndarray,并且形状相同,那么会对位彼此之间进 第 30 页 行( - * /)运算。NumPy 算术函数包含简单的加减乘除: add(),subtract(),multiply() 和divide()。 …...
Nginx在springboot中起到的作用
面试时这样回答: 在Spring Boot项目中使用Nginx可以有以下用途: 1. 反向代理:Nginx可以作为反向代理服务器,将外部请求转发到后端的Spring Boot应用,并可以实现负载均衡、高可用、缓存等功能,提高系统的性…...

12.(开发工具篇vscode+git)vscode 不能识别npm命令
1:vscode 不能识别npm命令 问题描述: 解决方式: (1)右击VSCode图标,选择以管理员身份运行; (2)在终端中执行get-ExecutionPolicy,显示Restrictedÿ…...

如何在MacBook上彻底删除mysql
好久以前安装过,但是现在配置mysql一直出错,索性全部删掉重新配置。 一、停止MySQL服务 首先,请确保 MySQL 服务器已经停止运行,以免影响后续的删除操作。 sudo /usr/local/mysql/support-files/mysql.server stop如果你输入之…...
web攻击面试|网络渗透面试(一)
Web攻击面试大纲 常见Web攻击类型 1.1 SQL注入攻击 1.2 XSS攻击 1.3 CSRF攻击 1.4 命令注入攻击SQL注入攻击 2.1 基本概念 2.2 攻击原理 2.3 防御措施XSS攻击 3.1 基本概念 3.2 攻击原理 3.3 防御措施CSRF攻击 4.1 基本概念 4.2 攻击原理 4.3 防御措施命令注入攻击 5.1 基本概…...
VBA操作WORD(六)另存为不含宏的文档
Sub 另存为不含宏的文档()Application.DisplayAlerts False Application.ScreenUpdating FalseDim oDoc As DocumentSet oDoc Word.ActiveDocumentDim oRng As RangeSet oRng oDoc.ContentDim sPath As String默认存储路径,当前用户桌面,注释掉的是当…...
分享69个Java源码,总有一款适合您
Java源码 分享69个Java源码,总有一款适合您 下面是文件的名字,我放了一些图片,文章里不是所有的图主要是放不下...,大家下载后可以看到。 源码下载链接: https://pan.baidu.com/s/1ZgbJhMNwIyFyqFzHsDdL5w 提取码&a…...
《cool! autodistill帮你标注数据训练yolov8模型》学习笔记
《cool! autodistill帮你标注数据训练yolov8模型》 Summary Autodistill是一个用于自动标注数据训练边缘模型的工具。 Highlights 💡 Autodistill由Robotflow推出,用于训练建立部署计算机视觉模型。💻 通过使用大模型自动标注和训练小模型…...

Rust vs Go:常用语法对比(十)
题图来自 Rust vs. Golang: Which One is Better?[1] 182. Quine program Output the source of the program. 输出程序的源代码 package mainimport "fmt"func main() { fmt.Printf("%s%c%s%c\n", s, 0x60, s, 0x60)}var s package mainimport "fm…...
SliverPersistentHeader组件 实现Flutter吸顶效果
效果: 20230723-212152-73_Trim 代码: import package:flutter/cupertino.dart; import package:flutter/material.dart;class StickHeaderPage extends StatefulWidget {overrideState<StatefulWidget> createState() {// TODO: implement creat…...
Nginx性能优化配置
一、全局优化 # 工作进程数 worker_processes auto; # 建议 CPU核心数|CPU线程数# 最大支持的连接(open-file)数量;最大值受限于 Linux open files (ulimit -n) # 建议公式:worker_rlimit_nofile > worker_processes * worker_connections…...
杭州多校2023“钉耙编程”中国大学生算法设计超级联赛(4)
1003Simple Set Problem 首先将元素的值 x 以及所属集合的编号 y 作为二元组 (x,y) 存入数组,然后按照 x 升序排列, 之后使用双指针扫描数组(尺取法),当区间内出现了所有编号时更新答案的最小值, #includ…...

音视频入门之音频采集、编码、播放
作者:花海blog 今天我们学习音频的采集、编码、生成文件、转码等操作,我们生成三种格式的文件格式,pcm、wav、aac 三种格式,并且我们用 AudioStack 来播放音频,最后我们播放这个音频。 使用 AudioRecord 实现录音生成…...
在 Linux 系统中,如何发起POST/GET请求
在 Linux 系统中,可以使用命令行工具 curl 或者 wget 来发送 POST 请求。这两个工具都是非常常用的命令行工具,可以通过命令行直接发送 HTTP 请求。 1. 使用 curl 发送 POST 请求: curl -X POST -H "Content-Type: application/json&q…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比
在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...

宇树科技,改名了!
提到国内具身智能和机器人领域的代表企业,那宇树科技(Unitree)必须名列其榜。 最近,宇树科技的一项新变动消息在业界引发了不少关注和讨论,即: 宇树向其合作伙伴发布了一封公司名称变更函称,因…...

解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...
libfmt: 现代C++的格式化工具库介绍与酷炫功能
libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库,提供了高效、安全的文本格式化功能,是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全:…...
LCTF液晶可调谐滤波器在多光谱相机捕捉无人机目标检测中的作用
中达瑞和自2005年成立以来,一直在光谱成像领域深度钻研和发展,始终致力于研发高性能、高可靠性的光谱成像相机,为科研院校提供更优的产品和服务。在《低空背景下无人机目标的光谱特征研究及目标检测应用》这篇论文中提到中达瑞和 LCTF 作为多…...

如何把工业通信协议转换成http websocket
1.现状 工业通信协议多数工作在边缘设备上,比如:PLC、IOT盒子等。上层业务系统需要根据不同的工业协议做对应开发,当设备上用的是modbus从站时,采集设备数据需要开发modbus主站;当设备上用的是西门子PN协议时…...
C++ 类基础:封装、继承、多态与多线程模板实现
前言 C 是一门强大的面向对象编程语言,而类(Class)作为其核心特性之一,是理解和使用 C 的关键。本文将深入探讨 C 类的基本特性,包括封装、继承和多态,同时讨论类中的权限控制,并展示如何使用类…...