SpringCloud(五)MQ消息队列
MQ
- 概念
- 常见消息模型
- helloworld案例
- 实现
- 实现spring AMQP发送消息
- 实现spring AMQP接收消息
- 工作消息队列
- 实现
- 发布订阅模型
- Fanout Exchange
- 实现
- DirectExchange
- 实现
- TopicExchange
- 实现
- DirectExchange 和FanoutExchange的差异
- DirectExchange 和TopicExchange的差异
- 基于@RabbitListener注解声明队列有 哪些常用注
- 消息转换器
- 注意
- 同步调用
- 异步调用
- 安装
- SpringAMQP
- 特征
概念
MQ(MessageQueue):消息队列,事件驱动架构中的Broker
- channel:操作MQ的工具
- exchange:路由消息到队列
- queue:缓存消息
- virtual host: 虚拟主机,是对queue、exchange等资源逻辑分组
常见消息模型
helloworld案例
角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
实现
实现spring AMQP发送消息
- 在父工程引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
- 在publisher服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor
- 在publisher服务中新建一个测试类,编写测试方法
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue(){String queueName = "simple.queue";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(queueName,message);} }
实现spring AMQP接收消息
- 在父工程引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
-
在consumer服务中编写消费逻辑,监听simple.queue。
- 在consumer服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toor
- 在publisher服务中新建一个测试类,编写测试方法
@Component public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");} }
工作消息队列
作用: 提高消息处理速度,避免队列消息堆积。
实现
- 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello,spring amqp!";for (int i = 0; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
- 在consumer服务中定义两个消息监听者,都监听simple.queue队列
@RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage1 (String msg) throws InterruptedException{System.out.println("spring 消费者1接收到消息:【"+msg+"】");Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueueMessage2 (String msg) throws InterruptedException{System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色Thread.sleep(200);
- 消费者1每秒处理50条消息,消费者2每秒处理10条消息
- 修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: rootpassword: toorlistener:direct:prefetch: 1
发布订阅模型
概念: 与之前模型区别是,允许将同一消息发送给多个消费者。
实现方式: exchange(交换机)
exchange: 负责消息路由,不存储,路由失败则消息丢失
常见exchange类型:
- Fanout:广播
- Direct:路由
- Topic:话题
Fanout Exchange
Fanout Exchange:将接收到的消息路由到每一个跟其绑定的queue
实现
-
在consumer服务中,利用代码声明队列(Queue)、交换机(Exchange),并将两者绑定(Binding)
- SringAMQP提供了声明交换机、队列、绑定关系的API。
- 在consumer服务常见一个类,添加@configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding。
@Configuration public class FanoutConfig {//声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("root.fanout");}//声明第一个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//绑定队列1和交换机@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);} //第二个同第一个 }
- SringAMQP提供了声明交换机、队列、绑定关系的API。
-
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.fanout发送消息
@Testpublic void testSendFanoutExchange(){String exchangeName = "root.fanout";String message = "hello,spring amqp!";rabbitTemplate.convertAndSend(exchangeName,message);}
DirectExchange
DirectExchange: 将接收到的消息更具规则路由到指定的Queue,因此称为路由模式(routes)
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
实现
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中编写两个消费者方法,分别监听direct.queue1和direct.queue2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue1"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="direct.queue2"),exchange = @Exchange(name = "root.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void listenDirectQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到direct.queue2的消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.direct发送消息
@Testpublic void testSendDirectExchange(){String exchangeName = "root.direct";String message = "hello,red!";rabbitTemplate.convertAndSend(exchangeName,"red",message);}
TopicExchange
TopicExchange: 与DirectExchange类似,区别在于routineKey必须是多个单词的列表,并且以== . ==分割。
Queue与Exchange指定BindingKey时可以使用通配符
# :代指0个或多个单词
*:代指一个单词
实现
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中编写两个消费者方法,分别监听topic.queue1和topic.queue2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue1"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue1的消息:【"+msg+"】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name="topic.queue2"),exchange = @Exchange(name = "root.topic",type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2Message (String msg) throws InterruptedException{System.out.println("spring 消费者接收到topic.queue2的消息:【"+msg+"】");}
- 在publisher中编写测试方法,向root.topic发送消息
@Testpublic void testSendTopicExchange(){String exchangeName = "root.topic";String message = "hello,china.news!";rabbitTemplate.convertAndSend(exchangeName,"china.news",message);}
DirectExchange 和FanoutExchange的差异
- FanoutExchange将消息路由给每一个与之绑定的队列
- DirectExchange根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
DirectExchange 和TopicExchange的差异
- TopicExchange的routineKey必须使用多个单词,以== . ==分割
- TopicExchange可以使用通配符
基于@RabbitListener注解声明队列有 哪些常用注
- @Queue
- @Exchange
消息转换器
设置JSON方式序列化:
发送消息
- 在publisher服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 在publisher服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
接收消息
- 在consumer服务引入依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
- 在consumer服务声明MessageConverter
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
- 定义消费者
@RabbitListener(queues = "object.queue")public void listenObjectQueueMessage(Map<String,Object> msg){System.err.println("spring 消费者1接收到消息:【"+msg+"】");//err输出为红色}
注意
MessageConverter默认是JDK序列化
接收方和发送方必须使用相同的MessageConverter
同步调用
优点: 时效性高
问题:
- 耦合度高
- 性能下降
- 资源浪费
- 级联失败
异步调用
实现方式:
- 事件驱动(常用)
优势:
- 服务解耦
- 性能提升,吞吐量提高
- 故障隔离。没有强依赖,不担心级联失败问题
- 流量削锋
缺点:
- 依赖Broker的可靠性、安全性、吞吐能力
- 架构复杂、业务没有明显的流程线,不好追踪管理
安装
docker pull rabbitmq:3-managementdocker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=toor \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
SpringAMQP
AMQP(Advance Message Queuing Protocol):是用于在应用程序或之间传递业务消息的开放标准,该协议与语言平台无关,更符合微服务中独立性的要求
Spring AMQP: 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分:
- spring-amqp:基础抽象
- spring-rabbit:底层默认实现
特征
- 监听器容器,用于异步处理入站消息
- 用于发送和接收消息的RabbitTemplate
- RabbitAdmin用于自动声明队列、交换和绑定
相关文章:

SpringCloud(五)MQ消息队列
MQ概念常见消息模型helloworld案例实现实现spring AMQP发送消息实现spring AMQP接收消息工作消息队列实现发布订阅模型Fanout Exchange实现DirectExchange实现TopicExchange实现DirectExchange 和FanoutExchange的差异DirectExchange 和TopicExchange的差异基于RabbitListener注…...
SQL语法基础汇总
三年前的存稿 默认端口号 3306 超级用户名 root 登录 mysql -uroot -p / mysql -uroot -proot 退出 exit / quit 服务器版本 SELECT VERSION(); 当前日期 SELECT NOW(); 当前用户 SELECT USER(); 备份 mysqldump -uroot -p 数据库名称 > 保存的路径 还原 create database1-…...

惠普星14Pro电脑开机不了显示错误代码界面怎么办?
惠普星14Pro电脑开机不了显示错误代码界面怎么办?有用户电脑开机之后,进入了一个错误界面,里面有一些错误代码。重启电脑之后依然是无法进入到桌面中,那么这个情况怎么去进行解决呢?我们可以重装一个新系统,…...

顺序表的构造及功能
定义顺序表是一种随机存储都结构,其特点是表中的元素的逻辑顺序与物理顺序相同。假设线性表L存储起始位置为L(A),sizeof(ElemType)是每个数据元素所占的存储空间的大小,则线性表L所对应的顺序存储如下图。顺序表的优缺点优点:随机…...

cesium: 绘制线段(008)
第008个 点击查看专栏目录 本示例的目的是介绍如何在vue+cesium中绘制线段,左键点击开始绘制,右键点击取消绘制 直接复制下面的 vue+cesium源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共139行)相关API参考:专栏目标示例效果 配置方式 1)…...

HTML、CSS学习笔记4(3D转换、动画)
目录 一、空间转换(3D转换) 1.空间位移 语法: 取值:(正负均可) 透视: 2.空间旋转 3.立体呈现 二、动画(animation) 1.动画的使用 先定义动画 再调用定义好的动画 …...
java的分布式锁
什么是分布式锁 分布式锁是指分布式环境下,系统部署在多个机器中,实现多进程分布式互斥的一种锁。为了保证多个进程能看到锁,锁被存在公共存储(比如 Redis、Memcache、数据库等三方存储中),以实现多个进程并…...

17- TensorFlow实现手写数字识别 (tensorflow系列) (项目十七)
项目要点 模型创建: model Sequential()添加卷积层: model.add(Dense(32, activationrelu, input_dim100)) # 第一层需要 input_dim添加dropout: model.add(Dropout(0.2))添加第二次网络: model.add(Dense(512, activationrelu)) # 除了first, 其他层不要输入shape添加输出…...

Polkadot 基础
Polkadot Polkadot联合并保护了一个不断增长的专业区块链生态系统,称为parachains。Polkadot上的应用程序和服务可以安全地跨链通信,形成真正可互操作的去中心化网络的基础。 真正的互操作性 Polkadot支持跨区块链传输任何类型的数据或资产,…...

spring源码编译
spring源码编译1、安装gradle2、拉取源码3、配置gradle文件来源及镜像仓库4、预编译5、验证6、可能遇到的报错6.1、jdk.jfr不存在6.2、checkstyleMain6.3、org.gradle.api.artifacts.result.ComponentSelectionReason.getDescription()Ljava/lang/String6.4、其他jdk࿱…...
防盗链是什么?带你了解什么是防盗链
目录 什么是防盗链 防盗链的定义 防盗链的产生 防盗链的实现 什么是防盗链 防盗链其实就是采用服务器端编程,通过url过滤技术实现的防止盗链的软件。 比如:photo.abc.com/video.mp4 这个下载地址,如果没有装防盗链,别人就能轻…...

Linux基础命令-fdisk管理磁盘分区表
文章目录 fdisk 命令介绍 命令格式 基本参数 1)常用参数 2)fdisk菜单操作说明 创建一个磁盘分区 1)创建分区 2)创建交换分区 参考实例 1) 显示当前分区的信息 2) 显示每个磁盘的分区信息 命令…...

(四)K8S 安装 Nginx Ingress Controller
ingress-nginx 是 Kubernetes 的入口控制器,使用NGINX作为反向代理和负载均衡器 版本介绍 版本1:Ingress NGINX Controller(k8s社区的ingres-nginx) 以 NGINX 开源技术为基础(kubernetes.io),可在GitHub的 kubernet…...
高频面试题
MyISAM和InnoDB是MySQL两种常见的存储引擎,它们之间有以下几点区别: 事务支持:MyISAM不支持事务处理,而InnoDB支持事务处理。 行级锁:MyISAM只支持表级锁,而InnoDB支持行级锁,可以避免并发访问…...
js 字节数组操作,TCP协议组装
js字节数组,进制转换js基础知识数组 Array扩展操作符三个点(...)ArrayBufferslice() 数组复制reduce 对数组中的每个元素执行一个提供的函数,将其结果汇总为单个返回值splice 数组删除,添加,替换js 字节数组转数字以及…...
JavaScript的引入并执行-包含动态引入与静态引入
JavaScript的引入并执行-包含动态引入与静态引入 JavaScript引入方式 html文件需要引入JavaScript代码,才能在页面里使用JavaScript代码。 静态引入 行内式 直接在DOM标签上使用 <!DOCTYPE html> <html lang"en"> <head><meta ch…...

第四阶段01-酷鲨商城项目准备
1. 关于csmall-product项目 这是“酷鲨商城”大项目中的“商品管理”项目,是一个后台管理项目(给管理员,或运营人员使用的项目,并不是普通用户使用的),并且,只会涉及与发布商品可能相关的功能开…...

Uncaught ReferenceError: jQuery is not defined
今天在拉取项目部署到本地的时候遇到了一个问题特此记录一下 (以后闭坑) 我和同事同时拉取了一样的代码,结果同事的页面加载正常而我的页面像被狗啃了一样,知道是js的问题但是不知道问题出在哪里?后来还是同事帮我解决…...

面试阿里测开岗,被面试官针对,当场翻脸,把我的简历还给我,疑似被拉黑...
好家伙,金三银四一到,这奇葩事可真是多,前两天和粉丝聊天,他说前段时间面试阿里的测开岗,最后和面试官干起来了。 我问他为什么,他说没啥,就觉得面试官太装了,就爱问一些虚而不实的…...

2. 驱动开发--驱动开发环境搭建
文章目录前言一、Linux中配置编译环境1.1 linux下安装软件的方法1.2 交叉编译工具链的安装1.2.1 测试是否安装成功1.3 设置环境变量1.3.1 将工具链导出到环境变量1.4 为工具链创建arm-linux-xxx符号链接二、 搭建运行开发环境2.1 tftp网络方式加载内核和设备树文件2.2 nfs网络方…...

LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...

CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...

《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...

华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...
智能AI电话机器人系统的识别能力现状与发展水平
一、引言 随着人工智能技术的飞速发展,AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术,在客户服务、营销推广、信息查询等领域发挥着越来越重要…...