SpringCloud学习路线(9)——服务异步通讯RabbitMQ
一、初见MQ
(一)什么是MQ?
MQ(MessageQueue),意思是消息队列,也就是事件驱动架构中的Broker。
(二)同步调用
1、概念: 同步调用是指,某一服务需要多个服务共同参与,但多个服务之间有一定的执行顺序,当每一个服务都需要等待前面一个服务完成才能继续执行。
2、存在的问题
- 耦合度高: 新需求需要改动原代码
- 性能下降: 调用者需要等待服务提供者相应,如果调用链过长则响应时间等于每次调用的时间之和。
- 资源浪费: 调用链的每个服务在等待响应过程中,不会释放请求资源,高并发场景下会浪费系统资源。
- 级联失败: 若服务提供者出现宕机,所有调用者都会因故障而导致整个服务集群故障。
(三)异步调用
1、实现模式: 异步调用常见实现的就是事件驱动模式。
2、事件驱动的优势
- 服务解耦: 只需要将请求交付给事件管理器进行管理即可完成服务。
- 性能提升: 与客户交互的服务短时间就能完成,并不需要等待后续服务完成。
- 服务弱依赖: 其它服务宕机不影响服务集群的使用
- 流量缓冲: 事件管理器通过任务队列的方式,使得订阅的服务按照自身速度进行执行。
3、事件驱动的缺点
- 高度依赖Broker的可靠性、安全性、吞吐能力
- 架构复杂时,业务没有明显的流程线,不便于跟踪管理
(四)MQ常见框架
RabbitMQ(中小企业) | ActiveMQ | RocketMQ(大型企业) | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | Alibaba | Apache |
开发语言 | Erlang | Java | Java | Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 极高 |
消息延迟 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠 | 高 | 一般 | 高 | 一般 |
二、使用MQ
(一)RabbitMQ概述
RqbbitMQ是基于Erlang语言开发的开源消息通讯中间件,官方地址:https://rabbitmq.com/
(二)安装MQ
docker pull rabbitmq:3-management
(三)运行RabbitMQ
#配置 MQ的用户名和密码,容器名和主机名,端口,镜像名 ,注意:15672端口是MQ的控制台访问端口,5672是对外暴露的消息通信端口
docker run -e RABBITMQ_DEFAULT_USER=xxx -e RABBITMQ_DEFAULT_PASS=xxxx --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
访问MQ的控制台
(4)RabbitMQ的整体结构
(5)RabbitMQ中的几个概念
- channel: 操作MQ的工具
- exchange: 路由消息到队列
- queue: 缓存消息
- Virtual Host: 虚拟主机,是对queue,exchange等资源进行逻辑分组
(6)常见的MQ模型
- 基本消息队列(BasicQueue): Publisher —1:1— Queue —1:1— Customer
- 工作消息队列(WorkQueue): Publisher —1:1— Queue —1:n— Customer
- 发布/订阅(Publish、Subscribe): 根据交换机类型又有三种模型
- Fanout Exchange: 广播,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
- Direct Exchange: 路由,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
- Topic Exchange: 主题,
- RPC
- 发布者确认
第一种:基本消息队列的基本使用
包含三种角色:publisher、queue、consumer
- publisher: 消费发布者,将消息发布到队列queue
- queue: 消息队列,负责接受并缓存消息
- consumer: 订阅队列,处理队列中的消息
收发消息的过程: 获取连接 》 建立通信通道 》 创建消息队列 》 收发消息 》 释放资源
1、publisher和consumer引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId>
</dependency>
2、Publisher创建发送消息通道
@SpringBootTest
class PublisherApplicationTests {@Testvoid testSendMessage() throws IOException, TimeoutException {
// 1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接参数connectionFactory.setHost("192.168.92.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");
// 3、建立连接Connection connection = connectionFactory.newConnection();
// 4、建立通信通道ChannelChannel channel = connection.createChannel();
// 5、创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);
// 6、发送信息String message = "hello,rabbitmq!";channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("发送消息成功:【"+message+"】");
// 7、关闭通道和连接channel.close();connection.close();}
}
2、Consumer创建订阅通道
class ConsumerApplicationTests {public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();// 2、设置连接参数connectionFactory.setHost("192.168.92.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");
// 3、建立连接Connection connection = connectionFactory.newConnection();
// 4、建立通信通道ChannelChannel channel = connection.createChannel();
// 5、创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);
// 6、订阅消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 7、处理消息String message = new String(body);System.out.println("接收到消息:【"+message+"】");}});System.out.println("等待接收消息....");}
}
第二种:Work Queue 工作队列
与基本队列的区别在于,它能使用多个订阅队列进行高效的处理请求。(因为一个订阅队列的处理速度是有限的)
使用过程与基本队列几乎一致,只是开启了多个订阅队列。
在使用过程中我们会发现,多个订阅队列对任务的分配是平均的,这就是预取机制。
我们需要的是快速处理的订阅队列获取更多的请求,慢速处理的订阅队列获取少量的请求,它如何实现呢?
通过修改配置文件,设置一个 preFetch 值。
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672 #端口virtual-host: / #虚拟主机username: root #用户名password: root #密码listener:simple:prefetch: 1 # 每次取 1 个请求,处理完才能取下一个。
第三种:FanoutQueue 广播消息队列
SpringAMQP提供声明交换机、队列、绑定关系的API
主要使用的是Exchange.FanoutExchange类。
实现思路:
1、在consumer服务,声明队列,交换机,并将两者绑定。
@Configuration
public class FanoutConfig{//交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("com.fanout");}//队列@Beanpublic Queue fanoutQueue1(){return new Queue("com.queue1");}//绑定关系@Beanpublic Binding bindingQueue(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}//...以相同方式声明第2个队列,并完成绑定}
2、在consumer服务,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@Component
public class SpringRabbitListener {@RabbitListener(queues = "com.queue1")public void listenFanoutQueue1(String msg) throws InterruptedException {//...处理结果}@RabbitListener(queues = "com.queue2")public void listenFanoutQueue2(String msg) throws InterruptedException {//...处理结果}
}
3、在publisher编写测试方法,向交换机发送信息
@Test
public void sendFanoutExchange() {//1、交换机String exchangeName = "com.fanout";//2、消息String message = "Hello Fanout";//3、发送消息rabbitTemplate.covertAndSend(exchangeName, "", message);
}
第四种:路由信息队列
路由模式的流程: 即设置密钥的绑定关系,只有携带相应的密钥才能进入相应的队列
- 每一个 Queue 与 Exchange 设置一个 BindingKey
- 发布者发送消息时,需要指定消息的 RoutingKey
- Exchange根据消息路由到 BindingKey 与 RoutingKey 一致的队列
实现思路:
1、利用 @RabbitListener 声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","blue"}))
public void listenRoutingQueue1(String msg) throws InterruptedException {//...处理结果
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","green"}))
public void listenRoutingQueue2(String msg) throws InterruptedException {//...处理结果
2、发送消息实现
//指定队列处理
@Test
public void sendRoutingExchange1(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,RoutingMQ";//发送消息rabbitTemplate.covertAndSend(exchangeName, "blue", message);
}//多队列处理
@Test
public void sendRoutingExchange2(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,RoutingMQ";//发送消息rabbitTemplate.covertAndSend(exchangeName, "red", message);
}
第五种:主题信息队列(通配key)
TopicExchange 与 DirectExchange 的区别: routingkey必须是多个单词的列表,并且以,
分割。并且Queue与Exchange指定的BindingKey时可使用通配符:
- **#:**代指 0 / n 个单词
- *: 代指一个单词
实现思路:
1、通过 @RabbitListener 声明Exchange、Queue、RoutingKey
@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue1"), key = {"china.#"}))
public void listenTopicQueue1(String msg) {//处理代码....
}@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue2"), key = {"#.news"}))
public void listenTopicQueue2(String msg) {//处理代码....
}
2、在publisher服务中,向交换机发送消息
@Test
public void sendTopicMessage(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,Topic";rabbitTemplate.convertAndSend(exchangeName,"china.call",message);
}
四、SpringAMQP
(一)概念
- AMQP: Advanced Message Queuing Protocol 传递消息队列协议,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言及平台无关,更符合为服务中独立性的要求。
- Spring AMQP: Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。其中 spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
(二)实现基础消息队列
1、引入spring-amqp依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、publisher服务中利用RabbitTemplate发送消息到任务队列
- 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root
- 编写发送方法
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendMessage(){String queueName = "simple.queue";String message = "Hello World";rabbitTemplate.convertAndSend(queueName,message);}
3、在consumer服务中编写消费逻辑,绑定simple.queue队列
- 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root
- 编写发送方法1
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void getMessage(){String queueName = "simple.queue";// receive 表示接收方法,接收到的信息会封装到Message,可以看receive的返回值Message message = rabbitTemplate.receive(queueName);// Message.getBody 是 byte[]System.out.println(new String(message.getBody()));}
- 编写发送方法2
- 创建一个监听类
// 注册成 Bean 对象
@Component
public class SpringRabbitListener {// 监听器注释,queues = 订阅队列,并将返回值注入参数列表中 @RabbitListener(queues = "simple.queue")public void ListenSimpleQueueMessage(String msg){System.out.println("Spring 消费者接收到消息:【" + msg + "】");}
}
(三)消息转换器
为了让我们能够自由识别consumer发送的消息,则需要使用的是消息转换器。
消息转换器如何使用?
Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理,默认实现的是SimpleMessageConverter,基于ObjectObjectOutputStream完成序列化。
我们只需要定义一个 MessageConverter 类型的Bean即可,推荐使用JSON序列化
1、publisher引入依赖
<!-- 接收消息需要使用jackson的转换依赖 -->
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency><!-- 发送消息需要使用jackson的核心依赖 -->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
2、publisher启动类,声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
3、consumer启动类,声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
4、监听队列消息
@RabbitListener(queues = "object.queue")
public void listenObjectMessage(Object msg) {//处理数据....
}
相关文章:

SpringCloud学习路线(9)——服务异步通讯RabbitMQ
一、初见MQ (一)什么是MQ? MQ(MessageQueue),意思是消息队列,也就是事件驱动架构中的Broker。 (二)同步调用 1、概念: 同步调用是指,某一服务…...

postcss-pxtorem适配插件动态配置rootValue(根据文件路径名称,动态改变vue.config里配置的值)
项目背景:一个项目里有两个分辨率的设计稿(1920和2400),不能拆开来打包 参考: 是参考vant插件:移动端Vant组件库rem适配下大小异常的解决方案:https://github.com/youzan/vant/issues/1181 说明: 因为vue.c…...

代码随想录算法训练营第二十三天 | 额外题目系列
额外题目 1365. 有多少小于当前数字的数字借着本题,学习一下各种排序未看解答自己编写的青春版重点代码随想录的代码我的代码(当天晚上理解后自己编写) 941.有效的山脉数组未看解答自己编写的青春版重点代码随想录的代码我的代码(当天晚上理解后自己编写) 1207. 独一…...
UiAutomator
运行Espresso和UI Automator测试时要使用模拟器。国内手机的ROM大多进行过修改,可能加入很多限制,导致测试无法正常运行。 Espresso只支持一个活动内部交互行为的测试。跨越多个活动、多个应用的场景需要使用UI Automator。使用Espresso和UI Automator的…...
stm32标准库开发常用函数的使用和代码说明
文章目录 GPIO(General Purpose Input/Output)NVIC(Nested Vectored Interrupt Controller)DMA(Direct Memory Access)USART(Universal Synchronous/Asynchronous Receiver/Transmitter…...

有关合泰BA45F5260中断的思考
最近看前辈写的代码,发现这样一段代码: #ifdef SUPPORT_RF_NET_FUNCTION if(UART_INT_is_L()) { TmrInsertTimer(eTmrHdlUartRxDelay,TMR_PERIOD(2000),NULL); break; } #endif 其中UART_INT_is_L&am…...
Numpy-算数函数与数学函数
⛳算数函数 如果参与运算的两个对象都是ndarray,并且形状相同,那么会对位彼此之间进 第 30 页 行( - * /)运算。NumPy 算术函数包含简单的加减乘除: add(),subtract(),multiply() 和divide()。 …...
Nginx在springboot中起到的作用
面试时这样回答: 在Spring Boot项目中使用Nginx可以有以下用途: 1. 反向代理:Nginx可以作为反向代理服务器,将外部请求转发到后端的Spring Boot应用,并可以实现负载均衡、高可用、缓存等功能,提高系统的性…...

12.(开发工具篇vscode+git)vscode 不能识别npm命令
1:vscode 不能识别npm命令 问题描述: 解决方式: (1)右击VSCode图标,选择以管理员身份运行; (2)在终端中执行get-ExecutionPolicy,显示Restrictedÿ…...

如何在MacBook上彻底删除mysql
好久以前安装过,但是现在配置mysql一直出错,索性全部删掉重新配置。 一、停止MySQL服务 首先,请确保 MySQL 服务器已经停止运行,以免影响后续的删除操作。 sudo /usr/local/mysql/support-files/mysql.server stop如果你输入之…...
web攻击面试|网络渗透面试(一)
Web攻击面试大纲 常见Web攻击类型 1.1 SQL注入攻击 1.2 XSS攻击 1.3 CSRF攻击 1.4 命令注入攻击SQL注入攻击 2.1 基本概念 2.2 攻击原理 2.3 防御措施XSS攻击 3.1 基本概念 3.2 攻击原理 3.3 防御措施CSRF攻击 4.1 基本概念 4.2 攻击原理 4.3 防御措施命令注入攻击 5.1 基本概…...
VBA操作WORD(六)另存为不含宏的文档
Sub 另存为不含宏的文档()Application.DisplayAlerts False Application.ScreenUpdating FalseDim oDoc As DocumentSet oDoc Word.ActiveDocumentDim oRng As RangeSet oRng oDoc.ContentDim sPath As String默认存储路径,当前用户桌面,注释掉的是当…...
分享69个Java源码,总有一款适合您
Java源码 分享69个Java源码,总有一款适合您 下面是文件的名字,我放了一些图片,文章里不是所有的图主要是放不下...,大家下载后可以看到。 源码下载链接: https://pan.baidu.com/s/1ZgbJhMNwIyFyqFzHsDdL5w 提取码&a…...
《cool! autodistill帮你标注数据训练yolov8模型》学习笔记
《cool! autodistill帮你标注数据训练yolov8模型》 Summary Autodistill是一个用于自动标注数据训练边缘模型的工具。 Highlights 💡 Autodistill由Robotflow推出,用于训练建立部署计算机视觉模型。💻 通过使用大模型自动标注和训练小模型…...

Rust vs Go:常用语法对比(十)
题图来自 Rust vs. Golang: Which One is Better?[1] 182. Quine program Output the source of the program. 输出程序的源代码 package mainimport "fmt"func main() { fmt.Printf("%s%c%s%c\n", s, 0x60, s, 0x60)}var s package mainimport "fm…...
SliverPersistentHeader组件 实现Flutter吸顶效果
效果: 20230723-212152-73_Trim 代码: import package:flutter/cupertino.dart; import package:flutter/material.dart;class StickHeaderPage extends StatefulWidget {overrideState<StatefulWidget> createState() {// TODO: implement creat…...
Nginx性能优化配置
一、全局优化 # 工作进程数 worker_processes auto; # 建议 CPU核心数|CPU线程数# 最大支持的连接(open-file)数量;最大值受限于 Linux open files (ulimit -n) # 建议公式:worker_rlimit_nofile > worker_processes * worker_connections…...
杭州多校2023“钉耙编程”中国大学生算法设计超级联赛(4)
1003Simple Set Problem 首先将元素的值 x 以及所属集合的编号 y 作为二元组 (x,y) 存入数组,然后按照 x 升序排列, 之后使用双指针扫描数组(尺取法),当区间内出现了所有编号时更新答案的最小值, #includ…...

音视频入门之音频采集、编码、播放
作者:花海blog 今天我们学习音频的采集、编码、生成文件、转码等操作,我们生成三种格式的文件格式,pcm、wav、aac 三种格式,并且我们用 AudioStack 来播放音频,最后我们播放这个音频。 使用 AudioRecord 实现录音生成…...
在 Linux 系统中,如何发起POST/GET请求
在 Linux 系统中,可以使用命令行工具 curl 或者 wget 来发送 POST 请求。这两个工具都是非常常用的命令行工具,可以通过命令行直接发送 HTTP 请求。 1. 使用 curl 发送 POST 请求: curl -X POST -H "Content-Type: application/json&q…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...

stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...

EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

dify打造数据可视化图表
一、概述 在日常工作和学习中,我们经常需要和数据打交道。无论是分析报告、项目展示,还是简单的数据洞察,一个清晰直观的图表,往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server,由蚂蚁集团 AntV 团队…...

基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...

【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...

免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...