RabbitMQ-SpringAMQP使用介绍
RabbitMQ
- 1. Spring AMQP
- 1.1 引入依赖
- 1.2 消息发送
- 1.3 消息接收
- 1.4 WorkQueue模型
- 1.4.1 实例代码
- 1.4.2 能者多劳
- 1.4.3 总结
- 1.5交换机
- 1.6 Fanout交换机(广播)
- 1.7 Direct交换机(订阅)
- 1.8 Topic交换机(通配符订阅)
- 1.9 声明队列and交换机
- 1.10 基于注解声明
- 1.11 消息转换器
- 1.11.1 配置JSON转换器
- 1.11.2 示例
RabbitMQ
采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ
交互。并且RabbitMQ
官方也提供了各种不同语言的客户端。
1. Spring AMQP
RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
官网:https://spring.io/projects/spring-amqp/
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
1.1 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.2 消息发送
配置MQ地址,在publisher
服务的application.yml
中添加配置:
spring:rabbitmq:host: 192.168.1.200 # 虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: feng # 用户名password: 123 # 密码
然后在publisher
服务中编写测试类SpringAmqpTest
,并利用RabbitTemplate
实现消息发送:
@SpringBootTest
public class SpringAmopTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void SimpleQueue(){//队列String queueName = "simpleQueue";//消息String message = "Hello World";//发送消息rabbitTemplate.convertAndSend(queueName,message);}
}
1.3 消息接收
在consumer端的application.xml配置MQ信息:
spring:rabbitmq:host: 192.168.1.200 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: feng # 用户名password: 123321 # 密码
编写监听器
@Component
@Slf4j
public class SpringRabbitListener {@RabbitListener(queues = "simpleQueue")public void listenSimpleQueue(String message){log.info("Simple Queue消息: "+message);}
}
重启服务,即可实现监听!
1.4 WorkQueue模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
1.4.1 实例代码
publisher段代码:
@Test
public void WorkQueue(){//队列String queueName = "work.queue";//消息String message = "Hello World_";//发送消息50次for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);}}
consumer段消费代码:
//消费者1
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message){System.out.println("消费者1处理任务{:"+message+"}"+ LocalTime.now());
}
//消费者2
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message){System.err.println("消费者2处理任务{:"+message+"}"+ LocalTime.now());
}
work queue模式,消息分配模式默认是平均分配,每个消费者处理的任务数量是平均的
也就是上面案例中,消息队列一共五十条消息,消费者1和消费者2,每人会处理25条。
1.4.2 能者多劳
如果想修改work模式的分配模式,比如想按性能分配,性能好的,多处理任务,那些就需要添加配置:
application.xml:
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
这样,50条消息,由消费者1和消费者2 按性能快慢决定执行数量的多少,谁先执行完成当前任务,就可以去执行消息队列中的下一个任务,没有预分配机制;
1.4.3 总结
work queue模型的使用:
- 多个消费者绑定到一个消息队列,可以加快消息处理速度;
- 同一条消息只会被一个消费者处理;
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳;
1.5交换机
交换机exchange,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
1.6 Fanout交换机(广播)
FanoutExchange会将消息路由到每个绑定的队列。
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
1.7 Direct交换机(订阅)
基于RoutingKey(路由key)发送给订阅了消息的队列。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
Direct交换机与Fanout交换机差异:
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
1.8 Topic交换机(通配符订阅)
Topic与direct相似,都是通过RoutingKey绑定,按照RoutingKey进行路由,只不过Topic类型的Exchange在绑定bandingKey时,可以使用通配符配置:
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
Direct交换机与Topic交换机的差异:
- Topic交换机接收的消息RoutingKey必须是多个单词,以
.
分割- Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词*
:代表1个词
1.9 声明队列and交换机
fanout交换机
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明Fanout交换机* @return Fanout交换机类型*/public FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange", true, false);}/*** 声明队列* @return Queue类型*/public Queue queue() {return new Queue("queue", true, false, false);}/*** 队列绑定到交换机* @param queue* @param fanoutExchange* @return*/public Binding binding(Queue queue ,FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue).to(fanoutExchange);}
}
Direct交换机
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
1.10 基于注解声明
Direct模式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
Topic模式:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
1.11 消息转换器
Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
1.11.1 配置JSON转换器
在publisher
和consumer
两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web
依赖,则无需再次引入Jackson
依赖。
配置消息转换器,在publisher
和consumer
两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
消息转换器中添加的messageId可以便于我们将来做幂等性判断。
1.11.2 示例
publisher
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void SimpleQueue(){//队列String queueName = "simpleQueue";//消息Map<String,Object> map = new HashMap<>(2);map.put("name", "xiaoming");map.put("age", 18);
// String message = "测试消息!!!";//发送消息rabbitTemplate.convertAndSend(queueName,map);}
consumer
@RabbitListener(queues = "simpleQueue")
public void listenSimpleQueue(Map<String, Object> map) throws InterruptedException {log.info("Simple Queue消息: "+map);System.out.println("sout=============:"+map.toString());
}
w HashMap<>(2);map.put("name", "xiaoming");map.put("age", 18);
// String message = "测试消息!!!";//发送消息rabbitTemplate.convertAndSend(queueName,map);}
consumer
@RabbitListener(queues = "simpleQueue")
public void listenSimpleQueue(Map<String, Object> map) throws InterruptedException {log.info("Simple Queue消息: "+map);System.out.println("sout=============:"+map.toString());
}
相关文章:

RabbitMQ-SpringAMQP使用介绍
RabbitMQ 1. Spring AMQP1.1 引入依赖1.2 消息发送1.3 消息接收1.4 WorkQueue模型1.4.1 实例代码1.4.2 能者多劳1.4.3 总结 1.5交换机1.6 Fanout交换机(广播)1.7 Direct交换机(订阅)1.8 Topic交换机(通配符订阅&#x…...

ASP.NET Core 中服务生命周期详解:Scoped、Transient 和 Singleton 的业务场景分析
前言 在 ASP.NET Core 中,服务的生命周期直接影响应用的性能和行为。通过依赖注入容器 (Dependency Injection, DI),我们可以为服务定义其生命周期:Scoped、Transient 和 Singleton。本文将详细阐述这些生命周期的区别及其在实际业务中的应用…...
c语言----------小知识
1 system函数的使用 #include <stdlib.h> int system(const char *command); 功能:在已经运行的程序中执行另外一个外部程序 参数:外部可执行程序名字 返回值: 成功:0 失败:任意数字示例代码: #inc…...
React Context用法总结
1. 基本概念 1.1 什么是 Context Context 提供了一种在组件树中共享数据的方式,而不必通过 props 显式地逐层传递。它主要用于共享那些对于组件树中许多组件来说是"全局"的数据。 1.2 基本用法 // 1. 创建 Context const ThemeContext React.createC…...

[笔记] 使用 Jenkins 实现 CI/CD :从 GitLab 拉取 Java 项目并部署至 Windows Server
随着软件开发节奏的加快,持续集成(CI)和持续部署(CD)已经成为确保软件质量和加速产品发布的不可或缺的部分。Jenkins作为一款广泛使用的开源自动化服务器,为开发者提供了一个强大的平台来实施这些实践。然而…...

腾讯云AI代码助手编程挑战赛-如意
作品简介 《如意》是一款结合腾讯云AI代码助手生成的、集智能问答、知识学习和生活助手功能于一体的应用,在通过先进的AI技术提升用户的工作效率、学习效果和生活质量。无论是解答疑难问题、提供专业建议,还是帮助规划日程、提升技能,它都能…...

TAS测评倍智题库 | 益丰大药房2025年中高层测评BA商业推理测评真题考什么?
您好!您已被邀请参加360评估。您的评估与反馈将有助于被评估人更深入地了解个人情况,发现个人优势和潜在风险。请您秉持公正、开放的心态进行评估。请尽快完成评估,在此衷心感谢您的配合与支持! 相关事宜: 请您在…...

2025 First LOOK! CnosDB 新版本 2.4.3.1 发布
🔹 版本号:2.4.3.1 🔹 发布日期:2024年11月05日 功能优化 简化编解码器错误定义 #2368 删除不必要的const DEFAULT_* #2378 添加 wal 压缩检查 #2377 移除 page reader #2380 创建配额 #2367 减少内存复制和计算 #2384 构…...

PyMysql 01|(包含超详细项目实战)连接数据库、增删改查、异常捕获
目录 一、数据库操作应用场景 二、安装PyMysql 三、事务的概念 四、数据库的准备 五、PyMysql连接数据库 1、建立连接方法 2、入门案例 六、PyMysql操作数据库 1、数据库查询 1️⃣查询操作流程 2️⃣cursor游标 3️⃣查询常用方法 4️⃣案例 5️⃣异常捕获 …...

Android14上使用libgpiod[gpioinfo gpioget gpioset ...]
环境 $ cat /etc/os-release NAME="Ubuntu" VERSION="20.04.5 LTS (Focal Fossa)" ID=ubuntu ID_LIKE=debian PRETTY_NAME="Ubuntu 20.04.5 LTS" VERSION_ID="20.04" HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="…...

网络安全 信息收集入门
1.信息收集定义 信息收集是指收集有关目标应用程序和系统的相关信息。这些信息可以帮助攻击者了解目标系统的架构、技术实现细节、运行环境、网络拓扑结构、安全措施等方面的信息,以便我们在后续的渗透过程更好的进行。 2.收集方式-主动和被动收集 ①收集方式不同…...
修改sshd默认配置,提升安全
对于Linux服务器,特别是暴露在公网的服务器,会经常被人扫描、探测和攻击。包括通过ssh访问登录攻击。对此,对默认的sshd配置进行调整,提升安全。 下面以CentOS 7.9为例说明: 一、常见安全措施 以root用户编辑vim /e…...
Clojure语言的面向对象编程
Clojure语言的面向对象编程 引言 Clojure是一种现代的Lisp方言,它特别强调函数式编程,Immutable数据结构和强大的并发能力。然而,很多人可能会问:Clojure支持面向对象编程吗?虽然Clojure没有像Java或C那样的传统类和…...

spring boot启动源码分析(三)之Environment准备
上一篇《spring-boot启动源码分析(二)之SpringApplicationRunListener》 环境介绍: spring boot版本:2.7.18 主要starter:spring-boot-starter-web 本篇开始讲启动过程中Environment环境准备,Environment是管理所有…...

MySQL复习
基础篇 InnoDB、MyISAM 和 MEMORY 存储引擎的区别? 主要区别: 为什么MySQL选择 InnoDB 作为默认存储引擎? 1.innodb支持事务,myisam、memory不支持。 2.innodb支持行级锁,可以使多个事务同时访问不同的行…...
ASP.NET Core 实现微服务 -- Polly 服务降级熔断
在我们实施微服务之后,服务间的调用变的异常频繁。多个服务之间可能是互相依赖的关系。某个服务出现故障或者是服务间的网络出现故障都会造成服务调用的失败,进而影响到某个业务服务处理失败。某一个服务调用失败轻则造成当前相关业务无法处理࿱…...

服务器漏洞修复解决方案
漏洞1、远程桌面授权服务启用检测【原理扫描】 Windows Remote Desktop Licensing Service is running: Get Server version: 0x60000604 1、解决方案:建议禁用相关服务避免目标被利用 方法一:使用服务管理器 打开“运行”对话框(WinR&am…...

“AI智慧组卷系统:让考试变得更简单、更公平!
大家好,我是一名资深的产品经理,今天咱们就来聊聊教育领域的一款黑科技产品——AI智慧组卷系统。在这个信息技术飞速发展的时代,AI技术已经渗透到了我们生活的方方面面,教育行业也不例外。下面我就用大白话给大家介绍一下这个AI智…...

MT6706BL 同步整流 规格书
MT6706BL 是用于反激式变换器的高性能 65V 同步整流器。MT6706BL兼容各种反激转换器类型。MT6706BL 支持 DCM、CCM 和准谐振模式。MT6706BL 集 成 了 一 个 65V 功 率MOSFET,可以取代肖特基二极管,提高效率。V SW <V TH-ON 时,MT6706BL 内…...

vue el-table 数据变化后,高度渲染问题
场景:el-table设置了height属性,但是切换查询条件后再次点击查询重新获取data时,el-table渲染的高度会有问题,滚动区域变矮了。 解决办法:使用doLayout方法,在表格数据渲染后调用doLayout方法可以重新布局…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...

蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...
GitHub 趋势日报 (2025年06月06日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...

Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
解决:Android studio 编译后报错\app\src\main\cpp\CMakeLists.txt‘ to exist
现象: android studio报错: [CXX1409] D:\GitLab\xxxxx\app.cxx\Debug\3f3w4y1i\arm64-v8a\android_gradle_build.json : expected buildFiles file ‘D:\GitLab\xxxxx\app\src\main\cpp\CMakeLists.txt’ to exist 解决: 不要动CMakeLists.…...

Ubuntu系统多网卡多相机IP设置方法
目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机,交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息,系统版本:Ubuntu22.04.5 LTS;内核版本…...

Python训练营-Day26-函数专题1:函数定义与参数
题目1:计算圆的面积 任务: 编写一个名为 calculate_circle_area 的函数,该函数接收圆的半径 radius 作为参数,并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求:函数接收一个位置参数 radi…...

热门Chrome扩展程序存在明文传输风险,用户隐私安全受威胁
赛门铁克威胁猎手团队最新报告披露,数款拥有数百万活跃用户的Chrome扩展程序正在通过未加密的HTTP连接静默泄露用户敏感数据,严重威胁用户隐私安全。 知名扩展程序存在明文传输风险 尽管宣称提供安全浏览、数据分析或便捷界面等功能,但SEMR…...