RabbitMQ-SpringAMQP使用介绍
RabbitMQ
- 1. Spring AMQP
- 1.1 引入依赖
- 1.2 消息发送
- 1.3 消息接收
- 1.4 WorkQueue模型
- 1.4.1 实例代码
- 1.4.2 能者多劳
- 1.4.3 总结
- 1.5交换机
- 1.6 Fanout交换机(广播)
- 1.7 Direct交换机(订阅)
- 1.8 Topic交换机(通配符订阅)
- 1.9 声明队列and交换机
- 1.10 基于注解声明
- 1.11 消息转换器
- 1.11.1 配置JSON转换器
- 1.11.2 示例
RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
1. Spring AMQP
RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
官网:https://spring.io/projects/spring-amqp/
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
1.1 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2 消息发送
配置MQ地址,在publisher服务的application.yml中添加配置:
spring:rabbitmq:host: 192.168.1.200 # 虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: feng # 用户名password: 123 # 密码
然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
@SpringBootTest
public class SpringAmopTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void SimpleQueue(){//队列String queueName = "simpleQueue";//消息String message = "Hello World";//发送消息rabbitTemplate.convertAndSend(queueName,message);}
}
1.3 消息接收
在consumer端的application.xml配置MQ信息:
spring:rabbitmq:host: 192.168.1.200 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: feng # 用户名password: 123321 # 密码
编写监听器
@Component
@Slf4j
public class SpringRabbitListener {@RabbitListener(queues = "simpleQueue")public void listenSimpleQueue(String message){log.info("Simple Queue消息: "+message);}
}
重启服务,即可实现监听!
1.4 WorkQueue模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

1.4.1 实例代码
publisher段代码:
@Test
public void WorkQueue(){//队列String queueName = "work.queue";//消息String message = "Hello World_";//发送消息50次for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);}}
consumer段消费代码:
//消费者1
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message){System.out.println("消费者1处理任务{:"+message+"}"+ LocalTime.now());
}
//消费者2
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message){System.err.println("消费者2处理任务{:"+message+"}"+ LocalTime.now());
}
work queue模式,消息分配模式默认是平均分配,每个消费者处理的任务数量是平均的
也就是上面案例中,消息队列一共五十条消息,消费者1和消费者2,每人会处理25条。
1.4.2 能者多劳
如果想修改work模式的分配模式,比如想按性能分配,性能好的,多处理任务,那些就需要添加配置:
application.xml:
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
这样,50条消息,由消费者1和消费者2 按性能快慢决定执行数量的多少,谁先执行完成当前任务,就可以去执行消息队列中的下一个任务,没有预分配机制;
1.4.3 总结
work queue模型的使用:
- 多个消费者绑定到一个消息队列,可以加快消息处理速度;
- 同一条消息只会被一个消费者处理;
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳;
1.5交换机
交换机exchange,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
1.6 Fanout交换机(广播)
FanoutExchange会将消息路由到每个绑定的队列。

- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
1.7 Direct交换机(订阅)
基于RoutingKey(路由key)发送给订阅了消息的队列。

在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息
Direct交换机与Fanout交换机差异:
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
1.8 Topic交换机(通配符订阅)
Topic与direct相似,都是通过RoutingKey绑定,按照RoutingKey进行路由,只不过Topic类型的Exchange在绑定bandingKey时,可以使用通配符配置:
通配符规则:
#:匹配一个或多个词*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.spu.insert或者item.spuitem.*:只能匹配item.spu
Direct交换机与Topic交换机的差异:
- Topic交换机接收的消息RoutingKey必须是多个单词,以
.分割- Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词*:代表1个词
1.9 声明队列and交换机
fanout交换机
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明Fanout交换机* @return Fanout交换机类型*/public FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange", true, false);}/*** 声明队列* @return Queue类型*/public Queue queue() {return new Queue("queue", true, false, false);}/*** 队列绑定到交换机* @param queue* @param fanoutExchange* @return*/public Binding binding(Queue queue ,FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue).to(fanoutExchange);}
}
Direct交换机
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
1.10 基于注解声明
Direct模式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
Topic模式:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
1.11 消息转换器
Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
1.11.1 配置JSON转换器
在publisher和consumer两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
配置消息转换器,在publisher和consumer两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
消息转换器中添加的messageId可以便于我们将来做幂等性判断。
1.11.2 示例
publisher
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void SimpleQueue(){//队列String queueName = "simpleQueue";//消息Map<String,Object> map = new HashMap<>(2);map.put("name", "xiaoming");map.put("age", 18);
// String message = "测试消息!!!";//发送消息rabbitTemplate.convertAndSend(queueName,map);}
consumer
@RabbitListener(queues = "simpleQueue")
public void listenSimpleQueue(Map<String, Object> map) throws InterruptedException {log.info("Simple Queue消息: "+map);System.out.println("sout=============:"+map.toString());
}
w HashMap<>(2);map.put("name", "xiaoming");map.put("age", 18);
// String message = "测试消息!!!";//发送消息rabbitTemplate.convertAndSend(queueName,map);}
consumer
@RabbitListener(queues = "simpleQueue")
public void listenSimpleQueue(Map<String, Object> map) throws InterruptedException {log.info("Simple Queue消息: "+map);System.out.println("sout=============:"+map.toString());
}
相关文章:
RabbitMQ-SpringAMQP使用介绍
RabbitMQ 1. Spring AMQP1.1 引入依赖1.2 消息发送1.3 消息接收1.4 WorkQueue模型1.4.1 实例代码1.4.2 能者多劳1.4.3 总结 1.5交换机1.6 Fanout交换机(广播)1.7 Direct交换机(订阅)1.8 Topic交换机(通配符订阅&#x…...
ASP.NET Core 中服务生命周期详解:Scoped、Transient 和 Singleton 的业务场景分析
前言 在 ASP.NET Core 中,服务的生命周期直接影响应用的性能和行为。通过依赖注入容器 (Dependency Injection, DI),我们可以为服务定义其生命周期:Scoped、Transient 和 Singleton。本文将详细阐述这些生命周期的区别及其在实际业务中的应用…...
c语言----------小知识
1 system函数的使用 #include <stdlib.h> int system(const char *command); 功能:在已经运行的程序中执行另外一个外部程序 参数:外部可执行程序名字 返回值: 成功:0 失败:任意数字示例代码: #inc…...
React Context用法总结
1. 基本概念 1.1 什么是 Context Context 提供了一种在组件树中共享数据的方式,而不必通过 props 显式地逐层传递。它主要用于共享那些对于组件树中许多组件来说是"全局"的数据。 1.2 基本用法 // 1. 创建 Context const ThemeContext React.createC…...
[笔记] 使用 Jenkins 实现 CI/CD :从 GitLab 拉取 Java 项目并部署至 Windows Server
随着软件开发节奏的加快,持续集成(CI)和持续部署(CD)已经成为确保软件质量和加速产品发布的不可或缺的部分。Jenkins作为一款广泛使用的开源自动化服务器,为开发者提供了一个强大的平台来实施这些实践。然而…...
腾讯云AI代码助手编程挑战赛-如意
作品简介 《如意》是一款结合腾讯云AI代码助手生成的、集智能问答、知识学习和生活助手功能于一体的应用,在通过先进的AI技术提升用户的工作效率、学习效果和生活质量。无论是解答疑难问题、提供专业建议,还是帮助规划日程、提升技能,它都能…...
TAS测评倍智题库 | 益丰大药房2025年中高层测评BA商业推理测评真题考什么?
您好!您已被邀请参加360评估。您的评估与反馈将有助于被评估人更深入地了解个人情况,发现个人优势和潜在风险。请您秉持公正、开放的心态进行评估。请尽快完成评估,在此衷心感谢您的配合与支持! 相关事宜: 请您在…...
2025 First LOOK! CnosDB 新版本 2.4.3.1 发布
🔹 版本号:2.4.3.1 🔹 发布日期:2024年11月05日 功能优化 简化编解码器错误定义 #2368 删除不必要的const DEFAULT_* #2378 添加 wal 压缩检查 #2377 移除 page reader #2380 创建配额 #2367 减少内存复制和计算 #2384 构…...
PyMysql 01|(包含超详细项目实战)连接数据库、增删改查、异常捕获
目录 一、数据库操作应用场景 二、安装PyMysql 三、事务的概念 四、数据库的准备 五、PyMysql连接数据库 1、建立连接方法 2、入门案例 六、PyMysql操作数据库 1、数据库查询 1️⃣查询操作流程 2️⃣cursor游标 3️⃣查询常用方法 4️⃣案例 5️⃣异常捕获 …...
Android14上使用libgpiod[gpioinfo gpioget gpioset ...]
环境 $ cat /etc/os-release NAME="Ubuntu" VERSION="20.04.5 LTS (Focal Fossa)" ID=ubuntu ID_LIKE=debian PRETTY_NAME="Ubuntu 20.04.5 LTS" VERSION_ID="20.04" HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="…...
网络安全 信息收集入门
1.信息收集定义 信息收集是指收集有关目标应用程序和系统的相关信息。这些信息可以帮助攻击者了解目标系统的架构、技术实现细节、运行环境、网络拓扑结构、安全措施等方面的信息,以便我们在后续的渗透过程更好的进行。 2.收集方式-主动和被动收集 ①收集方式不同…...
修改sshd默认配置,提升安全
对于Linux服务器,特别是暴露在公网的服务器,会经常被人扫描、探测和攻击。包括通过ssh访问登录攻击。对此,对默认的sshd配置进行调整,提升安全。 下面以CentOS 7.9为例说明: 一、常见安全措施 以root用户编辑vim /e…...
Clojure语言的面向对象编程
Clojure语言的面向对象编程 引言 Clojure是一种现代的Lisp方言,它特别强调函数式编程,Immutable数据结构和强大的并发能力。然而,很多人可能会问:Clojure支持面向对象编程吗?虽然Clojure没有像Java或C那样的传统类和…...
spring boot启动源码分析(三)之Environment准备
上一篇《spring-boot启动源码分析(二)之SpringApplicationRunListener》 环境介绍: spring boot版本:2.7.18 主要starter:spring-boot-starter-web 本篇开始讲启动过程中Environment环境准备,Environment是管理所有…...
MySQL复习
基础篇 InnoDB、MyISAM 和 MEMORY 存储引擎的区别? 主要区别: 为什么MySQL选择 InnoDB 作为默认存储引擎? 1.innodb支持事务,myisam、memory不支持。 2.innodb支持行级锁,可以使多个事务同时访问不同的行…...
ASP.NET Core 实现微服务 -- Polly 服务降级熔断
在我们实施微服务之后,服务间的调用变的异常频繁。多个服务之间可能是互相依赖的关系。某个服务出现故障或者是服务间的网络出现故障都会造成服务调用的失败,进而影响到某个业务服务处理失败。某一个服务调用失败轻则造成当前相关业务无法处理࿱…...
服务器漏洞修复解决方案
漏洞1、远程桌面授权服务启用检测【原理扫描】 Windows Remote Desktop Licensing Service is running: Get Server version: 0x60000604 1、解决方案:建议禁用相关服务避免目标被利用 方法一:使用服务管理器 打开“运行”对话框(WinR&am…...
“AI智慧组卷系统:让考试变得更简单、更公平!
大家好,我是一名资深的产品经理,今天咱们就来聊聊教育领域的一款黑科技产品——AI智慧组卷系统。在这个信息技术飞速发展的时代,AI技术已经渗透到了我们生活的方方面面,教育行业也不例外。下面我就用大白话给大家介绍一下这个AI智…...
MT6706BL 同步整流 规格书
MT6706BL 是用于反激式变换器的高性能 65V 同步整流器。MT6706BL兼容各种反激转换器类型。MT6706BL 支持 DCM、CCM 和准谐振模式。MT6706BL 集 成 了 一 个 65V 功 率MOSFET,可以取代肖特基二极管,提高效率。V SW <V TH-ON 时,MT6706BL 内…...
vue el-table 数据变化后,高度渲染问题
场景:el-table设置了height属性,但是切换查询条件后再次点击查询重新获取data时,el-table渲染的高度会有问题,滚动区域变矮了。 解决办法:使用doLayout方法,在表格数据渲染后调用doLayout方法可以重新布局…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
3-11单元格区域边界定位(End属性)学习笔记
返回一个Range 对象,只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意:它移动的位置必须是相连的有内容的单元格…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
c++第七天 继承与派生2
这一篇文章主要内容是 派生类构造函数与析构函数 在派生类中重写基类成员 以及多继承 第一部分:派生类构造函数与析构函数 当创建一个派生类对象时,基类成员是如何初始化的? 1.当派生类对象创建的时候,基类成员的初始化顺序 …...
永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...
从零开始了解数据采集(二十八)——制造业数字孪生
近年来,我国的工业领域正经历一场前所未有的数字化变革,从“双碳目标”到工业互联网平台的推广,国家政策和市场需求共同推动了制造业的升级。在这场变革中,数字孪生技术成为备受关注的关键工具,它不仅让企业“看见”设…...
机器学习的数学基础:线性模型
线性模型 线性模型的基本形式为: f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法,得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...
Java后端检查空条件查询
通过抛出运行异常:throw new RuntimeException("请输入查询条件!");BranchWarehouseServiceImpl.java // 查询试剂交易(入库/出库)记录Overridepublic List<BranchWarehouseTransactions> queryForReagent(Branch…...
