SpringCloud(五)MQ消息队列
MQ
- 概念
- 常见消息模型
- helloworld案例
- 实现
- 实现spring AMQP发送消息
- 实现spring AMQP接收消息
- 工作消息队列
- 实现
- 发布订阅模型
- Fanout Exchange
- 实现
- DirectExchange
- 实现
- TopicExchange
- 实现
- DirectExchange 和FanoutExchange的差异
- DirectExchange 和TopicExchange的差异
- 基于@RabbitListener注解声明队列有 哪些常用注
- 消息转换器
- 注意
- 同步调用
- 异步调用
- 安装
- SpringAMQP
- 特征
概念
MQ(MessageQueue):消息队列,事件驱动架构中的Broker

- channel:操作MQ的工具
- exchange:路由消息到队列
- queue:缓存消息
- virtual host: 虚拟主机,是对queue、exchange等资源逻辑分组
常见消息模型


helloworld案例
角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息

实现
实现spring AMQP发送消息
- 在父工程引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
- 在publisher服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor- 在publisher服务中新建一个测试类,编写测试方法
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue(){String queueName = "simple.queue";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(queueName,message);} }
实现spring AMQP接收消息
- 在父工程引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
在consumer服务中编写消费逻辑,监听simple.queue。
- 在consumer服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor- 在publisher服务中新建一个测试类,编写测试方法
@Component public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");} }
工作消息队列
作用: 提高消息处理速度,避免队列消息堆积。

实现
- 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello,spring amqp!";for (int i = 0; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列
@RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage1 (String msg) throws InterruptedException{System.out.println("spring 消费者1接收到消息:【"+msg+"】");Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage2 (String msg) throws InterruptedException{System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色Thread.sleep(200);
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息
- 修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toorlistener:direct:prefetch: 1
发布订阅模型
概念: 与之前模型区别是,允许将同一消息发送给多个消费者。
实现方式: exchange(交换机)
exchange: 负责消息路由,不存储,路由失败则消息丢失
常见exchange类型:
- Fanout:广播
- Direct:路由
- Topic:话题

Fanout Exchange
Fanout Exchange:将接收到的消息路由到每一个跟其绑定的queue
实现
-
在consumer服务中,利用代码声明队列(Queue)、交换机(Exchange),并将两者绑定(Binding)
- SringAMQP提供了声明交换机、队列、绑定关系的API。

- 在consumer服务常见一个类,添加@configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding。
@Configuration public class FanoutConfig {//声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("root.fanout");}//声明第一个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);} //第二个同第一个 } - SringAMQP提供了声明交换机、队列、绑定关系的API。
-
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.fanout发送消息
@Testpublic void testSendFanoutExchange(){String exchangeName = "root.fanout";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(exchangeName,message);}
DirectExchange
DirectExchange: 将接收到的消息更具规则路由到指定的Queue,因此称为路由模式(routes)
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

实现
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中编写两个消费者方法,分别监听direct.queue1和direct.queue2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue1"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue2"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue2的消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.direct发送消息
@Testpublic void testSendDirectExchange(){String exchangeName = "root.direct";String message = "hello,red!";rabbitTemplate.convertAndSend(exchangeName,"red",message);}
TopicExchange
TopicExchange: 与DirectExchange类似,区别在于routineKey必须是多个单词的列表,并且以== . ==分割。
Queue与Exchange指定BindingKey时可以使用通配符
# :代指0个或多个单词
*:代指一个单词

实现
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue1"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue2"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue2的消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.topic发送消息
@Testpublic void testSendTopicExchange(){String exchangeName = "root.topic";String message = "hello,china.news!";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
DirectExchange 和FanoutExchange的差异
- FanoutExchange将消息路由给每一个与之绑定的队列
- DirectExchange根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
DirectExchange 和TopicExchange的差异
- TopicExchange的routineKey必须使用多个单词,以== . ==分割
- TopicExchange可以使用通配符
基于@RabbitListener注解声明队列有 哪些常用注
- @Queue
- @Exchange
消息转换器
设置JSON方式序列化:
发送消息
- 在publisher服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 在publisher服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
接收消息
- 在consumer服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 在consumer服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
- 定义消费者
@RabbitListener(queues = "object.queue")public void listenObjectQueueMessage(Map<String,Object> msg){System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色}
注意
MessageConverter默认是JDK序列化
接收方和发送方必须使用相同的MessageConverter
同步调用
优点: 时效性高
问题:
- 耦合度高
- 性能下降
- 资源浪费
- 级联失败
异步调用
实现方式:
- 事件驱动(常用)
优势:
- 服务解耦
- 性能提升,吞吐量提高
- 故障隔离。没有强依赖,不担心级联失败问题
- 流量削锋
缺点:
- 依赖Broker的可靠性、安全性、吞吐能力
- 架构复杂、业务没有明显的流程线,不好追踪管理
安装
docker pull rabbitmq:3-managementdocker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=toor \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
SpringAMQP
AMQP(Advance Message Queuing Protocol):是用于在应用程序或之间传递业务消息的开放标准,该协议与语言平台无关,更符合微服务中独立性的要求
Spring AMQP: 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分:
- spring-amqp:基础抽象
- spring-rabbit:底层默认实现
特征
- 监听器容器,用于异步处理入站消息
- 用于发送和接收消息的RabbitTemplate
- RabbitAdmin用于自动声明队列、交换和绑定
相关文章:
SpringCloud(五)MQ消息队列
MQ概念常见消息模型helloworld案例实现实现spring AMQP发送消息实现spring AMQP接收消息工作消息队列实现发布订阅模型Fanout Exchange实现DirectExchange实现TopicExchange实现DirectExchange 和FanoutExchange的差异DirectExchange 和TopicExchange的差异基于RabbitListener注…...
SQL语法基础汇总
三年前的存稿 默认端口号 3306 超级用户名 root 登录 mysql -uroot -p / mysql -uroot -proot 退出 exit / quit 服务器版本 SELECT VERSION(); 当前日期 SELECT NOW(); 当前用户 SELECT USER(); 备份 mysqldump -uroot -p 数据库名称 > 保存的路径 还原 create database1-…...
惠普星14Pro电脑开机不了显示错误代码界面怎么办?
惠普星14Pro电脑开机不了显示错误代码界面怎么办?有用户电脑开机之后,进入了一个错误界面,里面有一些错误代码。重启电脑之后依然是无法进入到桌面中,那么这个情况怎么去进行解决呢?我们可以重装一个新系统,…...
顺序表的构造及功能
定义顺序表是一种随机存储都结构,其特点是表中的元素的逻辑顺序与物理顺序相同。假设线性表L存储起始位置为L(A),sizeof(ElemType)是每个数据元素所占的存储空间的大小,则线性表L所对应的顺序存储如下图。顺序表的优缺点优点:随机…...
cesium: 绘制线段(008)
第008个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中绘制线段,左键点击开始绘制,右键点击取消绘制 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共139行)相关API参考:专栏目标示例效果 配置方式 1)…...
HTML、CSS学习笔记4(3D转换、动画)
目录 一、空间转换(3D转换) 1.空间位移 语法: 取值:(正负均可) 透视: 2.空间旋转 3.立体呈现 二、动画(animation) 1.动画的使用 先定义动画 再调用定义好的动画 …...
java的分布式锁
什么是分布式锁 分布式锁是指分布式环境下,系统部署在多个机器中,实现多进程分布式互斥的一种锁。为了保证多个进程能看到锁,锁被存在公共存储(比如 Redis、Memcache、数据库等三方存储中),以实现多个进程并…...
17- TensorFlow实现手写数字识别 (tensorflow系列) (项目十七)
项目要点 模型创建: model Sequential()添加卷积层: model.add(Dense(32, activationrelu, input_dim100)) # 第一层需要 input_dim添加dropout: model.add(Dropout(0.2))添加第二次网络: model.add(Dense(512, activationrelu)) # 除了first, 其他层不要输入shape添加输出…...
Polkadot 基础
Polkadot Polkadot联合并保护了一个不断增长的专业区块链生态系统,称为parachains。Polkadot上的应用程序和服务可以安全地跨链通信,形成真正可互操作的去中心化网络的基础。 真正的互操作性 Polkadot支持跨区块链传输任何类型的数据或资产,…...
spring源码编译
spring源码编译1、安装gradle2、拉取源码3、配置gradle文件来源及镜像仓库4、预编译5、验证6、可能遇到的报错6.1、jdk.jfr不存在6.2、checkstyleMain6.3、org.gradle.api.artifacts.result.ComponentSelectionReason.getDescription()Ljava/lang/String6.4、其他jdk࿱…...
防盗链是什么?带你了解什么是防盗链
目录 什么是防盗链 防盗链的定义 防盗链的产生 防盗链的实现 什么是防盗链 防盗链其实就是采用服务器端编程,通过url过滤技术实现的防止盗链的软件。 比如:photo.abc.com/video.mp4 这个下载地址,如果没有装防盗链,别人就能轻…...
Linux基础命令-fdisk管理磁盘分区表
文章目录 fdisk 命令介绍 命令格式 基本参数 1)常用参数 2)fdisk菜单操作说明 创建一个磁盘分区 1)创建分区 2)创建交换分区 参考实例 1) 显示当前分区的信息 2) 显示每个磁盘的分区信息 命令…...
(四)K8S 安装 Nginx Ingress Controller
ingress-nginx 是 Kubernetes 的入口控制器,使用NGINX作为反向代理和负载均衡器 版本介绍 版本1:Ingress NGINX Controller(k8s社区的ingres-nginx) 以 NGINX 开源技术为基础(kubernetes.io),可在GitHub的 kubernet…...
高频面试题
MyISAM和InnoDB是MySQL两种常见的存储引擎,它们之间有以下几点区别: 事务支持:MyISAM不支持事务处理,而InnoDB支持事务处理。 行级锁:MyISAM只支持表级锁,而InnoDB支持行级锁,可以避免并发访问…...
js 字节数组操作,TCP协议组装
js字节数组,进制转换js基础知识数组 Array扩展操作符三个点(...)ArrayBufferslice() 数组复制reduce 对数组中的每个元素执行一个提供的函数,将其结果汇总为单个返回值splice 数组删除,添加,替换js 字节数组转数字以及…...
JavaScript的引入并执行-包含动态引入与静态引入
JavaScript的引入并执行-包含动态引入与静态引入 JavaScript引入方式 html文件需要引入JavaScript代码,才能在页面里使用JavaScript代码。 静态引入 行内式 直接在DOM标签上使用 <!DOCTYPE html> <html lang"en"> <head><meta ch…...
第四阶段01-酷鲨商城项目准备
1. 关于csmall-product项目 这是“酷鲨商城”大项目中的“商品管理”项目,是一个后台管理项目(给管理员,或运营人员使用的项目,并不是普通用户使用的),并且,只会涉及与发布商品可能相关的功能开…...
Uncaught ReferenceError: jQuery is not defined
今天在拉取项目部署到本地的时候遇到了一个问题特此记录一下 (以后闭坑) 我和同事同时拉取了一样的代码,结果同事的页面加载正常而我的页面像被狗啃了一样,知道是js的问题但是不知道问题出在哪里?后来还是同事帮我解决…...
面试阿里测开岗,被面试官针对,当场翻脸,把我的简历还给我,疑似被拉黑...
好家伙,金三银四一到,这奇葩事可真是多,前两天和粉丝聊天,他说前段时间面试阿里的测开岗,最后和面试官干起来了。 我问他为什么,他说没啥,就觉得面试官太装了,就爱问一些虚而不实的…...
2. 驱动开发--驱动开发环境搭建
文章目录前言一、Linux中配置编译环境1.1 linux下安装软件的方法1.2 交叉编译工具链的安装1.2.1 测试是否安装成功1.3 设置环境变量1.3.1 将工具链导出到环境变量1.4 为工具链创建arm-linux-xxx符号链接二、 搭建运行开发环境2.1 tftp网络方式加载内核和设备树文件2.2 nfs网络方…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...
云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地
借阿里云中企出海大会的东风,以**「云启出海,智联未来|打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办,现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...
UE5 学习系列(三)创建和移动物体
这篇博客是该系列的第三篇,是在之前两篇博客的基础上展开,主要介绍如何在操作界面中创建和拖动物体,这篇博客跟随的视频链接如下: B 站视频:s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...
智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...
