从0开始搭建一个生产级SpringBoot2.0.X项目(十)SpringBoot 集成RabbitMQ
前言
最近有个想法想整理一个内容比较完整springboot项目初始化Demo。
SpringBoot集成RabbitMQ
RabbitMQ中的一些角色:
-
publisher:生产者
-
consumer:消费者
-
exchange个:交换机,负责消息路由
-
queue:队列,存储消息
-
virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
junit用于测试。
一、pom引入依赖amqp
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><!--rabbitmq-->
二、application-dev.yaml 增加RabbitMQ相关配置
spring:#RabbitMQ服务器配置,地址账号密码,virtualhost等配置rabbitmq:host: 127.0.0.1port: 5672username: murgpassword: 123456virtual-host: murg-host#队列中没有消息,阻塞等待时间template:receive-timeout: 2000
logging:level:org.springframework.security: debug
三、发布/订阅
RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型。此处只列举发布/订阅模式。
此模式下根据交换机类型又分为三种。
1.Fanout类型: 广播模式 把消息交给所有绑定到交换机的队列
2.Direct类型: 路由模式 把消息交给符合指定routing key 的队列
3Topic类型: 主题模式 把消息交给符合主题通配符的队列
3.1 Fanout类型
在广播模式下,消息发送流程是这样的:
1) 可以有多个队列
2) 每个队列都要绑定到Exchange(交换机)
3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
4) 交换机把消息发送给绑定过的所有队列
5) 订阅队列的消费者都能拿到消
3.1.1 声明Fanout类型交换机和队列 将交换机和队列绑定在一起
创建配置类FanoutConfig
声明一个Fanout类型交换机命名为murg.fanout
声明两个Queue队列分别为fanout.queue1和fanout.queue2
分别将两个队列和交换机绑定,后续用于消费消息。
package com.murg.bootdemo.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Fanout 广播模式下*声明交换机和队列*/
@Configuration
public class FanoutConfig {/*** 声明交换机和队列 将交换机和队列绑定在一起* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("murg.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
3.1.2创建消息生产服务
创建消息生产服务MessageProducerService ,注入RabbitTemplate 用于发送消息。
注意一点是rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
新增测试广播模式下发送消息方法
package com.murg.bootdemo.rabbitmq;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;/*** Rabbitmq消息生产者*/
@Service
@RequiredArgsConstructor
public class MessageProducerService {private final RabbitTemplate rabbitTemplate;/*** 测试广播模式下发送消息* @param msg*/public void testFanoutExchange(String msg) {// 发送消息//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了//rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列rabbitTemplate.convertAndSend("murg.fanout","",msg);}}
3.13创建消息消费服务
创建消息生产服务MessageConsumerService
@RabbitListener是 SpringAMQP AMQP提供的注解,用于简化 RabbitMQ 消息监听器的创建。通过在方法上添加 @RabbitListener 注解,可以将方法注册为消息监听器用于处理从 RabbitMQ 中接收到的消息。queues 参数定义队列名字 ,此处创建两个监听 在上述FanoutConfig 中已经将这两个队列和交换机murg.fanout绑定,所有可同时消费murg.fanout交换机的消息。
package com.murg.bootdemo.rabbitmq;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;/*** 消息消费者**/
@Service
@RequiredArgsConstructor
public class MessageConsumerService {/*** 下面是Fanout,广播模式的监听* 通过RabbitListener监听队列名字FanoutConfig 中定义的 fanout.queue1 和fanout.queue2* @param msg*/@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}}
3.14修改测试类进行测试
修改创建项目时生成的 BootdemoApplicationTests.class
增加以下注解
@ActiveProfiles("dev")
指定运行环境为开发环境
@RunWith(SpringRunner.class)
指定测试类的运行器(Runner)。其主要作用是将Spring的测试支持集成到JUnit测试中,使得在运行JUnit测试时,Spring的上下文可以被正确地加载和配置。
@SpringBootTest(classes={BootdemoApplication.class})
指定启动类
package com.murg.bootdemo;import com.murg.bootdemo.rabbitmq.MessageProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Profile;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest(classes={BootdemoApplication.class})// 指定启动类
public class BootdemoApplicationTests {@AutowiredMessageProducerService messageProducerService;@Testpublic void contextLoads() {System.out.printf("aaaaaaaaaaaaa");}//测试@Testpublic void testFanoutExchange(){String msg = "遍身罗绮者,不是养蚕人";for (int i=0;i<10;i++){messageProducerService.testFanoutExchange(msg);}}}
3.15 增加一个测试方法调用消息生产服务,发送 Fanout类型的消息。运行测试控制台输出结果

两个队列都可消费。
3.2 Direct类型
3.2.1MessageProducerService生产服务增加方法testDirectExchange
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
/*** 测试Direct模式下发送消息* 在Direct模型下:** 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)** 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。** Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息* murg.Direct* @param msg*/public void testDirectExchange(String msg,String rountingkey) {// 发送消息//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了//rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列rabbitTemplate.convertAndSend("murg.direct",rountingkey,msg);}
3.2.2MessageConsumerService消费服务增加方法testDirectExchange
Direct模式的消费监听修改队列和交换机的方式改为注解方式。
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:@Exchange(name = "murg.direct",type = ExchangeTypes.DIRECT)声明交换机名字及类型
通过key的值声明接收不同路由的消息
direct.queue1 消费 rountingkey为“蚕妇”的消息
direct.queue2 消费 rountingkey为“自京赴奉先县咏怀五百字”的消息
/**** 下面是rountingKey 路由key 模式的消费监听* 基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。** 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:**/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),//定义队列名字 direct.queue1exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型key = {"蚕妇"} //指定消费rountingkey))public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),//定义队列名字 direct.queue2exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型key = {"自京赴奉先县咏怀五百字"}//指定消费rountingkey))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");}
3.2.3测试类增加测试方法
@Testpublic void testDirectExchange() throws InterruptedException {String msg1 = "遍身罗绮者,不是养蚕人";String key1 ="蚕妇";String msg2 = "朱门酒肉臭,路有冻死骨";String key2 ="自京赴奉先县咏怀五百字";for (int i=0;i<10;i++){if (i % 2 == 0){messageProducerService.testDirectExchange(msg2,key2);Thread.sleep(1000);}else {messageProducerService.testDirectExchange(msg1,key1);Thread.sleep(1000);}}}
调用消息生产服务,发送 Direct类型的消息。运行测试控制台输出结果

3.3 Topic类型
3.3.1MessageProducerService生产服务增加方法testTopicExchange
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
demo.#:能够匹配demo.spu.insert 或者 demo.spu #.demo写法也可以
demo.*:只能匹配demo.spu
public void testTopicExchange(String msg, String key) {rabbitTemplate.convertAndSend("murg.topic",key,msg);}
3.3.2MessageConsumerService消费服务增加方法testDirectExchange
@Exchange(name = "murg.topic",type = ExchangeTypes.TOPIC)声明交换机名字及类型
通过key的值声明接收不同路由的消息
topic.queue1 消费 rountingkey为“罗隐.*”的消息
topic.queue2 消费 rountingkey为“#.贫女”的消息
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),//定义队列名字 topic.queue1exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型key = {"罗隐.*"} //指定消费rountingkey))public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),//定义队列名字 topic.queue2exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型key = {"#.贫女"}//指定消费rountingkey))public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");}
3.3.3测试类增加测试方法
@Testpublic void testTopicExchange() throws InterruptedException {String msg = "采得百花成蜜后,为谁辛苦为谁甜";String key ="罗隐.蜂";String msg2 = "苦恨年年压金线,为他人作嫁衣裳";String key2 ="秦韬玉.贫女";for (int i=0;i<10;i++){if (i % 2 == 0){messageProducerService.testTopicExchange(msg,key);Thread.sleep(1000);}else {messageProducerService.testTopicExchange(msg2,key2);Thread.sleep(1000);}}}
调用消息生产服务,发送 Topic类型的消息。运行测试控制台输出结果

相关文章:
从0开始搭建一个生产级SpringBoot2.0.X项目(十)SpringBoot 集成RabbitMQ
前言 最近有个想法想整理一个内容比较完整springboot项目初始化Demo。 SpringBoot集成RabbitMQ RabbitMQ中的一些角色: publisher:生产者 consumer:消费者 exchange个:交换机,负责消息路由 queue:队列…...
GNU/Linux - /proc/sys/vm/drop_caches
/proc/sys/vm/drop_caches 是 Linux 中的一个特殊文件,允许用户释放系统内存中的各种缓存。让我们深入了解一下这项功能的细节: The /proc/sys/vm/drop_caches is a special file in Linux that allows users to free up various caches in the systems …...
ubuntu 22.04 如何调整进程启动后能打开的文件数限制
在 Ubuntu 22.04 中,可以通过修改系统配置来调整进程启动后能够打开的文件数软限制。软限制是指操作系统允许单个进程打开的文件描述符的最大数量。以下是调整该限制的方法: 1. 查看当前限制 首先,你可以通过 ulimit 命令查看当前的软限制和…...
linux基础-完结(详讲补充)
linux基础-完结 一、Linux目录介绍 二、基础命令详细讲解 1. ls(列出目录内容) 2. cd(更改目录) 3. clear(清除终端屏幕) 4. pwd(显示你当前所在的目录) 5. vim(文本编辑器) 6. touch(创…...
LoRA:大型语言模型(LLMs)的低秩适应;低秩调整、矩阵的低秩与高秩
目录 LoRA:大型语言模型(LLMs)的低秩适应 一、LoRA的基本原理 二、LoRA的举例说明 三、LoRA的优势 低秩调整、矩阵的低秩与高秩 一、低秩调整(LoRA) 二、矩阵的低秩 三、矩阵的高秩 LoRA:大型语言模型(LLMs)的低秩适应 LoRA(Low-Rank Adaptation of LLMs),…...
游戏引擎学习第四天
视频参考:https://www.bilibili.com/video/BV1aDmqYnEnc/ BitBlt 是 Windows GDI(图形设备接口)中的一个函数,用于在设备上下文(device context, DC)之间复制位图数据。BitBlt 的主要用途是将一个图像区域从一个地方复…...
GIT GUI和 GIT bash区别
Git GUI 和 Git Bash 都是与 Git 版本控制工具相关的用户界面,但它们有不同的功能和用途。下面详细说明它们的区别及各自的作用: Git GUI 作用: Git GUI 是一个图形用户界面(GUI)工具,用于执行 Git 操作。…...
丹摩征文活动|Faster-Rcnn-训练与测试详细教程
本文 丹摩智算平台官方网站的介绍Faster-Rcnn-训练与测试提前准备进行Faster-rcnn 的环境配置数据集的介绍 丹摩智算平台官方网站的介绍 丹摩智算平台(DAMODEL)是专为人工智能(AI)开发者打造的高性能计算服务平台,旨在…...
星期-时间范围选择器 滑动选择时间 最小粒度 vue3
星期-时间范围选择器 功能介绍属性说明事件说明实现代码使用范例 根据业务需要,实现了一个可选择时间范围的周视图。用户可以通过鼠标拖动来选择时间段,并且可以通过快速选择组件来快速选择特定的时间范围。 功能介绍 时间范围选择:用户可以…...
一条SQL查询语句的执行流程(MySQL)
第一步:连接器(负责跟客户端建立连接、获取权限、维持和管理连接) 第二步:查询缓存 之前执行过的查询,MySQL以"Key - Value"的形式存在内存(key为SQL,value为结果集)&…...
linux基础——详细篇
免责声明 学习视频来自B 站up主泷羽sec,如涉及侵权马上删除文章。 笔记的只是方便各位师傅学习知识,以下代码、网站只涉及学习内容,其他的都与本人无关,切莫逾越法律红线,否则后果自负。 linux 基础命令重现 cd(切…...
大数据学习10之Hive高级
1.Hive高级 将大的文件按照某一列属性进行GROUP BY 就是分区,只是默认开窗存储; 分区是按行,如一百行数据,按十位上的数字分区,则有十个分区,每个分区里有十行; 分桶是根据某个字段哈希对桶数取…...
MongoDB笔记01-概念与安装
文章目录 前言一、MongoDB相关概念1.1 业务应用场景具体的应用场景什么时候选择MongoDB 1.2 MongoDB简介1.3 体系结构1.4 数据模型1.5 MongoDB的特点 二、本地单机部署2.1 Windows系统中的安装启动第一步:下载安装包第二步:解压安装启动1.命令行参数方式…...
ollama + fastGPT + m3e 本地部署指南
[TOC](ollama fastgptm3e本地部署) 开启WSL 因为这里使用的win部署,所以要安装wsl,如果是linux系统就没那么麻烦 控制面板->程序->程序和功能 更新wsl wsl --set-default-version 2wsl --update --web-download安装ubuntu wsl --install -d Ubuntudoc…...
【设计模式系列】享元模式(十五)
目录 一、什么是享元模式 二、享元模式的角色 三、享元模式的典型应用场景 四、享元模式在ThreadPoolExecutor中的应用 1. 享元对象(Flyweight)- 工作线程(Worker) 2. 享元工厂(Flyweight Factory)- …...
2024大兴区火锅美食节即将开幕——品味多元火锅,点燃冬季消费热潮
为响应“中国国际精品消费月”活动,由大兴区商务局主办、大兴区餐饮行业协会承办的2024大兴区火锅美食节将于11月15日正式启动,为期一个半月的美食盛宴将在大兴区掀起一场冬日的火锅热潮。此次火锅节作为北京市“食在京城、沸腾火锅”火锅美食节的重要组…...
可视化建模与UML《类图实验报告》
史铁生: 余华和莫言扛着我上火车, 推着走打雪仗, 还带我偷西瓜, 被人发现后他们拔腿就跑, 却忘了我还在西瓜地里。 一、实验目的: 1、熟悉类图的构件事物。 2、熟悉类之间的泛化、依赖、聚合和组合关系…...
VS2022项目配置笔记
文章目录 $(ProjectDir)与 $(SolutionDir) 宏附加包含目录VC目录和C/C的区别 $(ProjectDir)与 $(SolutionDir) 宏 假设有一个解决方案 MySolution,其中包含两个项目 ProjectA 和 ProjectB,目录结构如下: C:\Projects\…...
springboot029基于springboot的网上购物商城系统
🍅点赞收藏关注 → 添加文档最下方联系方式领取本源代码、数据库🍅 本人在Java毕业设计领域有多年的经验,陆续会更新更多优质的Java实战项目希望你能有所收获,少走一些弯路。🍅关注我不迷路🍅 项目视频 基于…...
网站访问在TCP/IP四层模型中的流程
访问一个网站的过程可以通过 TCP/IP 网络模型来描述。TCP/IP 模型通常被分为四层:应用层、传输层、网络层和链路层。以下是从这些层级的角度描述你访问一个网站时所发生的过程: 1. 应用层 (Application Layer) 当你在浏览器中输入一个 URL(…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...
蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...
srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...
有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...
AGain DB和倍数增益的关系
我在设置一款索尼CMOS芯片时,Again增益0db变化为6DB,画面的变化只有2倍DN的增益,比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析: 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...
springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...
