SpringCloud学习路线(9)——服务异步通讯RabbitMQ
一、初见MQ
(一)什么是MQ?
MQ(MessageQueue),意思是消息队列,也就是事件驱动架构中的Broker。
(二)同步调用
1、概念: 同步调用是指,某一服务需要多个服务共同参与,但多个服务之间有一定的执行顺序,当每一个服务都需要等待前面一个服务完成才能继续执行。
2、存在的问题
- 耦合度高: 新需求需要改动原代码
- 性能下降: 调用者需要等待服务提供者相应,如果调用链过长则响应时间等于每次调用的时间之和。
- 资源浪费: 调用链的每个服务在等待响应过程中,不会释放请求资源,高并发场景下会浪费系统资源。
- 级联失败: 若服务提供者出现宕机,所有调用者都会因故障而导致整个服务集群故障。
(三)异步调用
1、实现模式: 异步调用常见实现的就是事件驱动模式。
2、事件驱动的优势
- 服务解耦: 只需要将请求交付给事件管理器进行管理即可完成服务。
- 性能提升: 与客户交互的服务短时间就能完成,并不需要等待后续服务完成。
- 服务弱依赖: 其它服务宕机不影响服务集群的使用
- 流量缓冲: 事件管理器通过任务队列的方式,使得订阅的服务按照自身速度进行执行。
3、事件驱动的缺点
- 高度依赖Broker的可靠性、安全性、吞吐能力
- 架构复杂时,业务没有明显的流程线,不便于跟踪管理
(四)MQ常见框架
RabbitMQ(中小企业) | ActiveMQ | RocketMQ(大型企业) | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | Alibaba | Apache |
开发语言 | Erlang | Java | Java | Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 极高 |
消息延迟 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠 | 高 | 一般 | 高 | 一般 |
二、使用MQ
(一)RabbitMQ概述
RqbbitMQ是基于Erlang语言开发的开源消息通讯中间件,官方地址:https://rabbitmq.com/
(二)安装MQ
docker pull rabbitmq:3-management
(三)运行RabbitMQ
#配置 MQ的用户名和密码,容器名和主机名,端口,镜像名 ,注意:15672端口是MQ的控制台访问端口,5672是对外暴露的消息通信端口
docker run -e RABBITMQ_DEFAULT_USER=xxx -e RABBITMQ_DEFAULT_PASS=xxxx --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
访问MQ的控制台
(4)RabbitMQ的整体结构
(5)RabbitMQ中的几个概念
- channel: 操作MQ的工具
- exchange: 路由消息到队列
- queue: 缓存消息
- Virtual Host: 虚拟主机,是对queue,exchange等资源进行逻辑分组
(6)常见的MQ模型
- 基本消息队列(BasicQueue): Publisher —1:1— Queue —1:1— Customer
- 工作消息队列(WorkQueue): Publisher —1:1— Queue —1:n— Customer
- 发布/订阅(Publish、Subscribe): 根据交换机类型又有三种模型
- Fanout Exchange: 广播,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
- Direct Exchange: 路由,Publisher—1:1—Exchange—1:n—Queue—1:1—Customer
- Topic Exchange: 主题,
- RPC
- 发布者确认
第一种:基本消息队列的基本使用
包含三种角色:publisher、queue、consumer
- publisher: 消费发布者,将消息发布到队列queue
- queue: 消息队列,负责接受并缓存消息
- consumer: 订阅队列,处理队列中的消息
收发消息的过程: 获取连接 》 建立通信通道 》 创建消息队列 》 收发消息 》 释放资源
1、publisher和consumer引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId>
</dependency>
2、Publisher创建发送消息通道
@SpringBootTest
class PublisherApplicationTests {@Testvoid testSendMessage() throws IOException, TimeoutException {
// 1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();
// 2、设置连接参数connectionFactory.setHost("192.168.92.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");
// 3、建立连接Connection connection = connectionFactory.newConnection();
// 4、建立通信通道ChannelChannel channel = connection.createChannel();
// 5、创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);
// 6、发送信息String message = "hello,rabbitmq!";channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));System.out.println("发送消息成功:【"+message+"】");
// 7、关闭通道和连接channel.close();connection.close();}
}
2、Consumer创建订阅通道
class ConsumerApplicationTests {public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接ConnectionFactory connectionFactory = new ConnectionFactory();// 2、设置连接参数connectionFactory.setHost("192.168.92.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("root");connectionFactory.setPassword("root");
// 3、建立连接Connection connection = connectionFactory.newConnection();
// 4、建立通信通道ChannelChannel channel = connection.createChannel();
// 5、创建队列String queueName = "simple.queue";channel.queueDeclare(queueName,false,false,false,null);
// 6、订阅消息channel.basicConsume(queueName,true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 7、处理消息String message = new String(body);System.out.println("接收到消息:【"+message+"】");}});System.out.println("等待接收消息....");}
}
第二种:Work Queue 工作队列
与基本队列的区别在于,它能使用多个订阅队列进行高效的处理请求。(因为一个订阅队列的处理速度是有限的)
使用过程与基本队列几乎一致,只是开启了多个订阅队列。
在使用过程中我们会发现,多个订阅队列对任务的分配是平均的,这就是预取机制。
我们需要的是快速处理的订阅队列获取更多的请求,慢速处理的订阅队列获取少量的请求,它如何实现呢?
通过修改配置文件,设置一个 preFetch 值。
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672 #端口virtual-host: / #虚拟主机username: root #用户名password: root #密码listener:simple:prefetch: 1 # 每次取 1 个请求,处理完才能取下一个。
第三种:FanoutQueue 广播消息队列
SpringAMQP提供声明交换机、队列、绑定关系的API
主要使用的是Exchange.FanoutExchange类。
实现思路:
1、在consumer服务,声明队列,交换机,并将两者绑定。
@Configuration
public class FanoutConfig{//交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("com.fanout");}//队列@Beanpublic Queue fanoutQueue1(){return new Queue("com.queue1");}//绑定关系@Beanpublic Binding bindingQueue(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}//...以相同方式声明第2个队列,并完成绑定}
2、在consumer服务,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@Component
public class SpringRabbitListener {@RabbitListener(queues = "com.queue1")public void listenFanoutQueue1(String msg) throws InterruptedException {//...处理结果}@RabbitListener(queues = "com.queue2")public void listenFanoutQueue2(String msg) throws InterruptedException {//...处理结果}
}
3、在publisher编写测试方法,向交换机发送信息
@Test
public void sendFanoutExchange() {//1、交换机String exchangeName = "com.fanout";//2、消息String message = "Hello Fanout";//3、发送消息rabbitTemplate.covertAndSend(exchangeName, "", message);
}
第四种:路由信息队列
路由模式的流程: 即设置密钥的绑定关系,只有携带相应的密钥才能进入相应的队列
- 每一个 Queue 与 Exchange 设置一个 BindingKey
- 发布者发送消息时,需要指定消息的 RoutingKey
- Exchange根据消息路由到 BindingKey 与 RoutingKey 一致的队列
实现思路:
1、利用 @RabbitListener 声明Exchange、Queue、RoutingKey
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","blue"}))
public void listenRoutingQueue1(String msg) throws InterruptedException {//...处理结果
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "com.exchange", type = ExcahngeTypes.DIRECT), key = {"red","green"}))
public void listenRoutingQueue2(String msg) throws InterruptedException {//...处理结果
2、发送消息实现
//指定队列处理
@Test
public void sendRoutingExchange1(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,RoutingMQ";//发送消息rabbitTemplate.covertAndSend(exchangeName, "blue", message);
}//多队列处理
@Test
public void sendRoutingExchange2(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,RoutingMQ";//发送消息rabbitTemplate.covertAndSend(exchangeName, "red", message);
}
第五种:主题信息队列(通配key)
TopicExchange 与 DirectExchange 的区别: routingkey必须是多个单词的列表,并且以,
分割。并且Queue与Exchange指定的BindingKey时可使用通配符:
- **#:**代指 0 / n 个单词
- *: 代指一个单词
实现思路:
1、通过 @RabbitListener 声明Exchange、Queue、RoutingKey
@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue1"), key = {"china.#"}))
public void listenTopicQueue1(String msg) {//处理代码....
}@RabbitListener(bingdings = @QueueBinding(exchange = @Exchange(name = "com.exchange", type = ExchangeTypes.TOPIC), queue = @Queue(name = "com.queue2"), key = {"#.news"}))
public void listenTopicQueue2(String msg) {//处理代码....
}
2、在publisher服务中,向交换机发送消息
@Test
public void sendTopicMessage(){//交换机,消息String exchangeName = "com.exchange";String message = "Hello,Topic";rabbitTemplate.convertAndSend(exchangeName,"china.call",message);
}
四、SpringAMQP
(一)概念
- AMQP: Advanced Message Queuing Protocol 传递消息队列协议,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言及平台无关,更符合为服务中独立性的要求。
- Spring AMQP: Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。其中 spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
(二)实现基础消息队列
1、引入spring-amqp依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、publisher服务中利用RabbitTemplate发送消息到任务队列
- 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root
- 编写发送方法
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendMessage(){String queueName = "simple.queue";String message = "Hello World";rabbitTemplate.convertAndSend(queueName,message);}
3、在consumer服务中编写消费逻辑,绑定simple.queue队列
- 配置mq连接信息
spring:rabbitmq:host: 192.168.92.131 #IPport: 5672virtual-host: /username: rootpassword: root
- 编写发送方法1
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void getMessage(){String queueName = "simple.queue";// receive 表示接收方法,接收到的信息会封装到Message,可以看receive的返回值Message message = rabbitTemplate.receive(queueName);// Message.getBody 是 byte[]System.out.println(new String(message.getBody()));}
- 编写发送方法2
- 创建一个监听类
// 注册成 Bean 对象
@Component
public class SpringRabbitListener {// 监听器注释,queues = 订阅队列,并将返回值注入参数列表中 @RabbitListener(queues = "simple.queue")public void ListenSimpleQueueMessage(String msg){System.out.println("Spring 消费者接收到消息:【" + msg + "】");}
}
(三)消息转换器
为了让我们能够自由识别consumer发送的消息,则需要使用的是消息转换器。
消息转换器如何使用?
Spring对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理,默认实现的是SimpleMessageConverter,基于ObjectObjectOutputStream完成序列化。
我们只需要定义一个 MessageConverter 类型的Bean即可,推荐使用JSON序列化
1、publisher引入依赖
<!-- 接收消息需要使用jackson的转换依赖 -->
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency><!-- 发送消息需要使用jackson的核心依赖 -->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
2、publisher启动类,声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
3、consumer启动类,声明MessageConverter
@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}
4、监听队列消息
@RabbitListener(queues = "object.queue")
public void listenObjectMessage(Object msg) {//处理数据....
}
相关文章:

SpringCloud学习路线(9)——服务异步通讯RabbitMQ
一、初见MQ (一)什么是MQ? MQ(MessageQueue),意思是消息队列,也就是事件驱动架构中的Broker。 (二)同步调用 1、概念: 同步调用是指,某一服务…...

postcss-pxtorem适配插件动态配置rootValue(根据文件路径名称,动态改变vue.config里配置的值)
项目背景:一个项目里有两个分辨率的设计稿(1920和2400),不能拆开来打包 参考: 是参考vant插件:移动端Vant组件库rem适配下大小异常的解决方案:https://github.com/youzan/vant/issues/1181 说明: 因为vue.c…...

代码随想录算法训练营第二十三天 | 额外题目系列
额外题目 1365. 有多少小于当前数字的数字借着本题,学习一下各种排序未看解答自己编写的青春版重点代码随想录的代码我的代码(当天晚上理解后自己编写) 941.有效的山脉数组未看解答自己编写的青春版重点代码随想录的代码我的代码(当天晚上理解后自己编写) 1207. 独一…...
UiAutomator
运行Espresso和UI Automator测试时要使用模拟器。国内手机的ROM大多进行过修改,可能加入很多限制,导致测试无法正常运行。 Espresso只支持一个活动内部交互行为的测试。跨越多个活动、多个应用的场景需要使用UI Automator。使用Espresso和UI Automator的…...
stm32标准库开发常用函数的使用和代码说明
文章目录 GPIO(General Purpose Input/Output)NVIC(Nested Vectored Interrupt Controller)DMA(Direct Memory Access)USART(Universal Synchronous/Asynchronous Receiver/Transmitter…...

有关合泰BA45F5260中断的思考
最近看前辈写的代码,发现这样一段代码: #ifdef SUPPORT_RF_NET_FUNCTION if(UART_INT_is_L()) { TmrInsertTimer(eTmrHdlUartRxDelay,TMR_PERIOD(2000),NULL); break; } #endif 其中UART_INT_is_L&am…...
Numpy-算数函数与数学函数
⛳算数函数 如果参与运算的两个对象都是ndarray,并且形状相同,那么会对位彼此之间进 第 30 页 行( - * /)运算。NumPy 算术函数包含简单的加减乘除: add(),subtract(),multiply() 和divide()。 …...
Nginx在springboot中起到的作用
面试时这样回答: 在Spring Boot项目中使用Nginx可以有以下用途: 1. 反向代理:Nginx可以作为反向代理服务器,将外部请求转发到后端的Spring Boot应用,并可以实现负载均衡、高可用、缓存等功能,提高系统的性…...

12.(开发工具篇vscode+git)vscode 不能识别npm命令
1:vscode 不能识别npm命令 问题描述: 解决方式: (1)右击VSCode图标,选择以管理员身份运行; (2)在终端中执行get-ExecutionPolicy,显示Restrictedÿ…...

如何在MacBook上彻底删除mysql
好久以前安装过,但是现在配置mysql一直出错,索性全部删掉重新配置。 一、停止MySQL服务 首先,请确保 MySQL 服务器已经停止运行,以免影响后续的删除操作。 sudo /usr/local/mysql/support-files/mysql.server stop如果你输入之…...
web攻击面试|网络渗透面试(一)
Web攻击面试大纲 常见Web攻击类型 1.1 SQL注入攻击 1.2 XSS攻击 1.3 CSRF攻击 1.4 命令注入攻击SQL注入攻击 2.1 基本概念 2.2 攻击原理 2.3 防御措施XSS攻击 3.1 基本概念 3.2 攻击原理 3.3 防御措施CSRF攻击 4.1 基本概念 4.2 攻击原理 4.3 防御措施命令注入攻击 5.1 基本概…...
VBA操作WORD(六)另存为不含宏的文档
Sub 另存为不含宏的文档()Application.DisplayAlerts False Application.ScreenUpdating FalseDim oDoc As DocumentSet oDoc Word.ActiveDocumentDim oRng As RangeSet oRng oDoc.ContentDim sPath As String默认存储路径,当前用户桌面,注释掉的是当…...
分享69个Java源码,总有一款适合您
Java源码 分享69个Java源码,总有一款适合您 下面是文件的名字,我放了一些图片,文章里不是所有的图主要是放不下...,大家下载后可以看到。 源码下载链接: https://pan.baidu.com/s/1ZgbJhMNwIyFyqFzHsDdL5w 提取码&a…...
《cool! autodistill帮你标注数据训练yolov8模型》学习笔记
《cool! autodistill帮你标注数据训练yolov8模型》 Summary Autodistill是一个用于自动标注数据训练边缘模型的工具。 Highlights 💡 Autodistill由Robotflow推出,用于训练建立部署计算机视觉模型。💻 通过使用大模型自动标注和训练小模型…...

Rust vs Go:常用语法对比(十)
题图来自 Rust vs. Golang: Which One is Better?[1] 182. Quine program Output the source of the program. 输出程序的源代码 package mainimport "fmt"func main() { fmt.Printf("%s%c%s%c\n", s, 0x60, s, 0x60)}var s package mainimport "fm…...
SliverPersistentHeader组件 实现Flutter吸顶效果
效果: 20230723-212152-73_Trim 代码: import package:flutter/cupertino.dart; import package:flutter/material.dart;class StickHeaderPage extends StatefulWidget {overrideState<StatefulWidget> createState() {// TODO: implement creat…...
Nginx性能优化配置
一、全局优化 # 工作进程数 worker_processes auto; # 建议 CPU核心数|CPU线程数# 最大支持的连接(open-file)数量;最大值受限于 Linux open files (ulimit -n) # 建议公式:worker_rlimit_nofile > worker_processes * worker_connections…...
杭州多校2023“钉耙编程”中国大学生算法设计超级联赛(4)
1003Simple Set Problem 首先将元素的值 x 以及所属集合的编号 y 作为二元组 (x,y) 存入数组,然后按照 x 升序排列, 之后使用双指针扫描数组(尺取法),当区间内出现了所有编号时更新答案的最小值, #includ…...

音视频入门之音频采集、编码、播放
作者:花海blog 今天我们学习音频的采集、编码、生成文件、转码等操作,我们生成三种格式的文件格式,pcm、wav、aac 三种格式,并且我们用 AudioStack 来播放音频,最后我们播放这个音频。 使用 AudioRecord 实现录音生成…...
在 Linux 系统中,如何发起POST/GET请求
在 Linux 系统中,可以使用命令行工具 curl 或者 wget 来发送 POST 请求。这两个工具都是非常常用的命令行工具,可以通过命令行直接发送 HTTP 请求。 1. 使用 curl 发送 POST 请求: curl -X POST -H "Content-Type: application/json&q…...

从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

PL0语法,分析器实现!
简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...

LLMs 系列实操科普(1)
写在前面: 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容,原视频时长 ~130 分钟,以实操演示主流的一些 LLMs 的使用,由于涉及到实操,实际上并不适合以文字整理,但还是决定尽量整理一份笔…...

基于Springboot+Vue的办公管理系统
角色: 管理员、员工 技术: 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能: 该办公管理系统是一个综合性的企业内部管理平台,旨在提升企业运营效率和员工管理水…...
MinIO Docker 部署:仅开放一个端口
MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...
Oracle11g安装包
Oracle 11g安装包 适用于windows系统,64位 下载路径 oracle 11g 安装包...

算术操作符与类型转换:从基础到精通
目录 前言:从基础到实践——探索运算符与类型转换的奥秘 算术操作符超级详解 算术操作符:、-、*、/、% 赋值操作符:和复合赋值 单⽬操作符:、--、、- 前言:从基础到实践——探索运算符与类型转换的奥秘 在先前的文…...

边缘计算网关提升水产养殖尾水处理的远程运维效率
一、项目背景 随着水产养殖行业的快速发展,养殖尾水的处理成为了一个亟待解决的环保问题。传统的尾水处理方式不仅效率低下,而且难以实现精准监控和管理。为了提升尾水处理的效果和效率,同时降低人力成本,某大型水产养殖企业决定…...

Canal环境搭建并实现和ES数据同步
作者:田超凡 日期:2025年6月7日 Canal安装,启动端口11111、8082: 安装canal-deployer服务端: https://github.com/alibaba/canal/releases/1.1.7/canal.deployer-1.1.7.tar.gz cd /opt/homebrew/etc mkdir canal…...