Spring RabbitMQ那些事(1-交换机配置消息发送订阅实操)
目录
- 一、序言
- 二、配置文件application.yml
- 三、RabbitMQ交换机和队列配置
- 1、定义4个队列
- 2、定义Fanout交换机和队列绑定关系
- 2、定义Direct交换机和队列绑定关系
- 3、定义Topic交换机和队列绑定关系
- 4、定义Header交换机和队列绑定关系
- 四、RabbitMQ消费者配置
- 五、RabbitMQ生产者
- 六、测试用例
- 1、发送到FanoutExchage
- 2、发送到DirectExchage
- 3、发送到TopicExchange
- 4、发动到HeadersExchage
- 七、结语
一、序言
在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。
二、配置文件application.yml
我们先上应用启动配置文件application.yml
,如下:
server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /listener:type: simplesimple:acknowledge-mode: autoconcurrency: 5max-concurrency: 20prefetch: 5
备注:这里我们指定了
RabbitListenerContainerFactory
的类型为SimpleRabbitListenerContainerFactory
,并且指定消息确认模式为自动确认。
三、RabbitMQ交换机和队列配置
Spring官方提供了一套 流式API 来定义队列、交换机和绑定关系,非常的方便,接下来我们定义4种类型的交换机和相应队列的绑定关系。
1、定义4个队列
/*** 定义4个队列*/
@Configuration
protected static class QueueConfig {@Beanpublic Queue queue1() {return QueueBuilder.durable("queue-1").build();}@Beanpublic Queue queue2() {return QueueBuilder.durable("queue-2").build();}@Beanpublic Queue queue3() {return QueueBuilder.durable("queue-3").build();}@Beanpublic Queue queue4() {return QueueBuilder.durable("queue-4").build();}
}
2、定义Fanout交换机和队列绑定关系
/*** 定义Fanout交换机和对应的绑定关系*/
@Configuration
protected static class FanoutExchangeBindingConfig {@Beanpublic FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange("fanout-exchange").build();}/*** 定义多个Fanout交换机和队列的绑定关系* @param fanoutExchange* @param queue1* @param queue2* @param queue3* @param queue4* @return*/@Beanpublic Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);}}
备注:这里我们将4个队列绑定到了名为
fanout-exchange
的交换机上。
2、定义Direct交换机和队列绑定关系
@Configuration
protected static class DirectExchangeBindingConfig {@Beanpublic DirectExchange directExchange() {return ExchangeBuilder.directExchange("direct-exchange").build();}@Beanpublic Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");}
}
备注:这里我们定义了名为
direct-exchange
的交换机并通过路由keyqueue3-route-key
将queue-3
绑定到了该交换机上。
3、定义Topic交换机和队列绑定关系
@Configuration
protected static class TopicExchangeBindingConfig {@Beanpublic TopicExchange topicExchange() {return ExchangeBuilder.topicExchange("topic-exchange").build();}@Beanpublic Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");return new Declarables(queue1Binding, queue2Binding);}
}
这里我们定义了名为topic-exchange
类型的交换机,该类型交换机支持路由key通配符匹配,*
代表一个任意字符,#
代表一个或多个任意字符。
备注:
- 通过路由key
com.order.*
将queue-1
绑定到了该交换机上。- 通过路由key
com.#
将queue-2
也绑定到了该交换机上。
4、定义Header交换机和队列绑定关系
@Configuration
protected static class HeaderExchangeBinding {@Beanpublic HeadersExchange headersExchange() {return ExchangeBuilder.headersExchange("headers-exchange").build();}@Beanpublic Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");}
}
备注:这里我们定义了名为
headers-exchange
类型的交换机,并通过参数function=logging
将queue-4
绑定到了该交换机上。
四、RabbitMQ消费者配置
Spring RabbitMQ中支持注解式监听端点配置,用于异步接收消息,如下:
@Slf4j
@Component
public class RabbitMqConsumer {@RabbitListener(queues = "queue-1")public void handleMsgFromQueue1(String msg) {log.info("Message received from queue-1, message body: {}", msg);}@RabbitListener(queues = "queue-2")public void handleMsgFromQueue2(String msg) {log.info("Message received from queue-2, message body: {}", msg);}@RabbitListener(queues = "queue-3")public void handleMsgFromQueue3(String msg) {log.info("Message received from queue-3, message body: {}", msg);}@RabbitListener(queues = "queue-4")public void handleMsgFromQueue4(String msg) {log.info("Message received from queue-4, message body: {}", msg);}
}
备注:这里我们分别定义了4个消费者,分别用来接受4个队列的消息。
五、RabbitMQ生产者
@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqProducer {private final RabbitTemplate rabbitTemplate;public void sendMsgToFanoutExchange(String body) {log.info("开始发送消息到fanout-exchange, 消息体:{}", body);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);}public void sendMsgToDirectExchange(String body) {log.info("开始发送消息到direct-exchange, 消息体:{}", body);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send("direct-exchange", "queue3-route-key", message);}public void sendMsgToTopicExchange(String routingKey, String body) {log.info("开始发送消息到topic-exchange, 消息体:{}", body);Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();rabbitTemplate.send("topic-exchange", routingKey, message);}public void sendMsgToHeadersExchange(String body) {log.info("开始发送消息到headers-exchange, 消息体:{}", body);MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);}}
六、测试用例
这里写了个简单的Controller用来测试,如下:
@RestController
@RequiredArgsConstructor
public class RabbitMsgController {private final RabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/fanout")public ResponseEntity<String> sendMsgToFanoutExchange(String body) {rabbitMqProducer.sendMsgToFanoutExchange(body);return ResponseEntity.ok("广播消息发送成功");}@RequestMapping("/exchange/direct")public ResponseEntity<String> sendMsgToDirectExchange(String body) {rabbitMqProducer.sendMsgToDirectExchange(body);return ResponseEntity.ok("消息发送到Direct交换成功");}@RequestMapping("/exchange/topic")public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);return ResponseEntity.ok("消息发送到Topic交换机成功");}@RequestMapping("/exchange/headers")public ResponseEntity<String> sendMsgToHeadersExchange(String body) {rabbitMqProducer.sendMsgToHeadersExchange(body);return ResponseEntity.ok("消息发送到Headers交换机成功");}}
1、发送到FanoutExchage
直接访问http://localhost:8080/exchange/fanout?body=hello
,可以看到该消息广播到了4个队列上。
2023-11-07 17:41:12.959 INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到fanout-exchange, 消息体:hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-1, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-4, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-3, message body: hello
2023-11-07 17:41:12.972 INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-2, message body: hello
2、发送到DirectExchage
访问http://localhost:8080/exchange/direct?body=hello
,可以看到消息通过路由keyqueue3-route-key
发送到了queue-3
上。
2023-11-07 17:43:26.804 INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到direct-exchange, 消息体:hello
2023-11-07 17:43:26.822 INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-3, message body: hello
3、发送到TopicExchange
访问http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create
,路由key为 com.order.create
的消息分别发送到了queue-1
和queue-2
上。
2023-11-07 17:44:45.301 INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到topic-exchange, 消息体:hello
2023-11-07 17:44:45.312 INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-1, message body: hello
2023-11-07 17:44:45.312 INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-2, message body: hello
4、发动到HeadersExchage
访问http://localhost:8080/exchange/headers?body=hello
,消息通过头部信息function=logging
发送到了headers-exchange
上。
2023-11-07 17:47:21.736 INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer : 开始发送消息到headers-exchange, 消息体:hello
2023-11-07 17:47:21.749 INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer : Message received from queue-4, message body: hello
七、结语
下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅,敬请期待。
相关文章:

Spring RabbitMQ那些事(1-交换机配置消息发送订阅实操)
目录 一、序言二、配置文件application.yml三、RabbitMQ交换机和队列配置1、定义4个队列2、定义Fanout交换机和队列绑定关系2、定义Direct交换机和队列绑定关系3、定义Topic交换机和队列绑定关系4、定义Header交换机和队列绑定关系 四、RabbitMQ消费者配置五、RabbitMQ生产者六…...

C++动态库
C动态库 动态库文件(Dynamic Link Library,DLL)是程序在运行时所需要调用的库。静态库文件是程序在编译时所需要调用的库。 1 环境介绍 VS版本:VS2017 编程语言:C 2 功能介绍 使用VS2017项目模板创建C动态库生成…...

【教3妹学编程-算法题】2923. 找到冠军 I
3妹:2哥2哥,你看到新闻了吗?襄阳健桥医院院长 公然“贩卖出生证明”, 真是太胆大包天了吧。 2哥 : 我也看到新闻了,7人被采取刑事强制措施。 就应该好好查查他们, 一查到底! 3妹:真的…...

矢量图形编辑软件Boxy SVG mac中文版软件特点
Boxy SVG mac是一款基于Web的矢量图形编辑器,它提供了一系列强大的工具和功能,可帮助用户创建精美的矢量图形。Boxy SVG是一款好用的软件,并且可以在Windows、Mac和Linux系统上运行。 Boxy SVG mac软件特点 简单易用:Boxy SVG的用…...

神经网络遗传算法函数极值寻优
大家好,我是带我去滑雪! 对于未知的非线性函数,仅仅通过函数的输入和输出数据难以寻找函数极值,这一类问题可以通过神经网络结合遗传算法求解,利用神经网络的非线性拟合能力和遗传算法的非线性寻优能力寻找函数极值。 …...

剑指JUC原理-16.读写锁
👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家📕系列专栏:Spring源码、JUC源码🔥如果感觉博主的文章还不错的话,请👍三连支持&…...

文件改名:避免繁琐操作,利用筛选文件批量重命名技巧优化文件管理
在我们的日常生活和工作中,我们经常需要处理大量的文件,从文档、图片到音频和视频等。在这些情况下,一个高效的文件管理策略至关重要。文件重命名的必要性主要体现在两个方面。首先,对于大量文件,手动进行重命名不仅费…...

【CocoaPods安装环境和流程以及各种情况】
CocoaPods 环境HomebrewRubyrbenvRubyGems 和 Bundler安装Ruby管理Ruby更新Ruby替换Ruby镜像方式1方式2 CocoaPods安装CocoaPodsCocoaPods使用安装的一些问题单元测试引用问题 参考的链接 环境 Homebrew $ brew --config *可以发现打印有下面一行: Homebrew Ruby: …...
canvas与svg区别与实际应用
Canvas和SVG都是HTML5中的绘图技术。但是两者的实现方式和使用场景有所不同。 Canvas是HTML5中的绘图API,它提供了一套基于像素的绘图工具,可以通过JavaScript来实现动态的图形和动画。Canvas提供的绘图功能强大,可以绘制出复杂的图像和动画…...

rasa train nlu详解:1.1-train_nlu()函数
本文使用《使用ResponseSelector实现校园招聘FAQ机器人》中的例子,主要详解介绍train_nlu()函数中变量的具体值。 一.rasa/model_training.py/train_nlu()函数 train_nlu()函数实现,如下所示: def train_nlu(config: Text,nlu_data: Op…...

使用ResponseSelector实现校园招聘FAQ机器人
本文主要介绍使用ResponseSelector实现校园招聘FAQ机器人,回答面试流程和面试结果查询的FAQ问题。FAQ机器人功能分为业务无关的功能和业务相关的功能2类。 一.data/nlu.yml文件 与普通意图相比,ResponseSelector训练数据中的意图采用group/intent格…...

ENVI IDL:如何基于气象站点数据进行反距离权重插值?
01 前言 仅仅练习,大可使用ArcGIS或者已经封装好的python模块进行插值,此处仅仅从底层理解如何从公式和代码理解反距离权重插值的过程,从而更深刻的理解IDL的使用和插值的理解。 02 函数说明 2.1 Read_CSV()函数 官方语法如下:…...

实战Leetcode(四)
Practice makes perfect! 实战一: 这个题由于我们不知道两个链表的长度我们也不知道它是否有相交的节点,所以我们的方法是先求出两个链表的长度,长度长的先走相差的步数,使得两个链表处于同一起点,两个链…...

C语言——个位数为 6 且能被 3 整除但不能被 5 整除的三位自然数共有多少个,分别是哪些?
#define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int i,j0;for(i100;i<1000;i) {if(i%106&&i%30&&i%5!0){printf("%6d",i); j;}}printf("\n一共%d个\n",j);return 0; } %6d起到美化输出格式的作用ÿ…...

基于Docker容器DevOps应用方案
文章目录 基于docker容器DevOps应用方案环境基础配置1.所有主机永久关闭防火墙和selinux2.配置yum源3.docker的安装教程 配置主机名与IP地址解析部署gitlab.server主机1.安装gitlab2.配置gitlab3.破解管理员密码4.验证web页面 部署jenkins.server主机1.部署tomcat2.安装jenkins…...

Apinto 网关进阶教程,使用 API Mock 生成模拟数据
什么是 API Mock ? API Mock 是一种技术,它允许程序员在不依赖后端数据的情况下,模拟 web服务器端 API 的响应。通常使用 API Mock 来测试前端应用程序,而无需等待后端程序构建完成。API Mock 可以模拟任何 HTTP 请求方法&#x…...

笔记:AI量化策略开发流程-基于BigQuant平台(一)
从本文开始,按照AI策略开发的完整流程(共七步),上手在BigQuant平台上快速构建AI策略。本文首先介绍如何使用证券代码模块指定股票范围和数据起止日期。重要的事情说三遍:模块的输入端口有提示需要连线的上游数据类型&a…...

Spring Cloud 微服务入门篇
文章目录 什么是微服务架构 Microservice微服务的发展历史微服务的定义微小的服务微服务 微服务的发展历史1. 微服务架构的发展历史2. 微服务架构的先驱 微服务架构 Microservice 的优缺点1. 微服务 e Microservice 优点2. 微服务 Microservice 缺点微服务不是银弹:…...
使用Go语言搭建区块链基础
引言 随着区块链技术的发展,越来越多的人开始关注并使用这一技术,其中,比特币和以太坊等区块链项目正在成为人们关注的焦点。而Go语言作为一种高效、简洁的编程语言,越来越多的区块链项目也选择使用Go语言来搭建其底层基础。本文…...
手搓MyBatis框架(原理讲解)
你在学完MyBatis框架后会不会觉得很神奇,为什么我改一个配置文件就可以让程序识别和执行不同的sql语句操作数据库? SqlSessionFactoryBuilder,SqlSessionFactory和SqlSession对象到底是怎样执行的? 如果你有这些问题看就完事了 …...

VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...

STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...

Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
uniapp 字符包含的相关方法
在uniapp中,如果你想检查一个字符串是否包含另一个子字符串,你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的,但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...
在 Spring Boot 项目里,MYSQL中json类型字段使用
前言: 因为程序特殊需求导致,需要mysql数据库存储json类型数据,因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...