从0开始搭建一个生产级SpringBoot2.0.X项目(十)SpringBoot 集成RabbitMQ
前言
最近有个想法想整理一个内容比较完整springboot项目初始化Demo。
SpringBoot集成RabbitMQ
RabbitMQ中的一些角色:
-
publisher:生产者
-
consumer:消费者
-
exchange个:交换机,负责消息路由
-
queue:队列,存储消息
-
virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
junit用于测试。
一、pom引入依赖amqp
<!--rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><!--rabbitmq-->
二、application-dev.yaml 增加RabbitMQ相关配置
spring:#RabbitMQ服务器配置,地址账号密码,virtualhost等配置rabbitmq:host: 127.0.0.1port: 5672username: murgpassword: 123456virtual-host: murg-host#队列中没有消息,阻塞等待时间template:receive-timeout: 2000
logging:level:org.springframework.security: debug
三、发布/订阅
RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型。此处只列举发布/订阅模式。
此模式下根据交换机类型又分为三种。
1.Fanout类型: 广播模式 把消息交给所有绑定到交换机的队列
2.Direct类型: 路由模式 把消息交给符合指定routing key 的队列
3Topic类型: 主题模式 把消息交给符合主题通配符的队列
3.1 Fanout类型
在广播模式下,消息发送流程是这样的:
1) 可以有多个队列
2) 每个队列都要绑定到Exchange(交换机)
3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
4) 交换机把消息发送给绑定过的所有队列
5) 订阅队列的消费者都能拿到消
3.1.1 声明Fanout类型交换机和队列 将交换机和队列绑定在一起
创建配置类FanoutConfig
声明一个Fanout类型交换机命名为murg.fanout
声明两个Queue队列分别为fanout.queue1和fanout.queue2
分别将两个队列和交换机绑定,后续用于消费消息。
package com.murg.bootdemo.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Fanout 广播模式下*声明交换机和队列*/
@Configuration
public class FanoutConfig {/*** 声明交换机和队列 将交换机和队列绑定在一起* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("murg.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
3.1.2创建消息生产服务
创建消息生产服务MessageProducerService ,注入RabbitTemplate 用于发送消息。
注意一点是rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
新增测试广播模式下发送消息方法
package com.murg.bootdemo.rabbitmq;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;/*** Rabbitmq消息生产者*/
@Service
@RequiredArgsConstructor
public class MessageProducerService {private final RabbitTemplate rabbitTemplate;/*** 测试广播模式下发送消息* @param msg*/public void testFanoutExchange(String msg) {// 发送消息//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了//rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列rabbitTemplate.convertAndSend("murg.fanout","",msg);}}
3.13创建消息消费服务
创建消息生产服务MessageConsumerService
@RabbitListener是 SpringAMQP AMQP提供的注解,用于简化 RabbitMQ 消息监听器的创建。通过在方法上添加 @RabbitListener
注解,可以将方法注册为消息监听器用于处理从 RabbitMQ 中接收到的消息。queues 参数定义队列名字 ,此处创建两个监听 在上述FanoutConfig 中已经将这两个队列和交换机murg.fanout绑定,所有可同时消费murg.fanout交换机的消息。
package com.murg.bootdemo.rabbitmq;import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;/*** 消息消费者**/
@Service
@RequiredArgsConstructor
public class MessageConsumerService {/*** 下面是Fanout,广播模式的监听* 通过RabbitListener监听队列名字FanoutConfig 中定义的 fanout.queue1 和fanout.queue2* @param msg*/@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}}
3.14修改测试类进行测试
修改创建项目时生成的 BootdemoApplicationTests.class
增加以下注解
@ActiveProfiles("dev")
指定运行环境为开发环境
@RunWith(SpringRunner.class)
指定测试类的运行器(Runner)。其主要作用是将Spring的测试支持集成到JUnit测试中,使得在运行JUnit测试时,Spring的上下文可以被正确地加载和配置。
@SpringBootTest(classes={BootdemoApplication.class})
指定启动类
package com.murg.bootdemo;import com.murg.bootdemo.rabbitmq.MessageProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Profile;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest(classes={BootdemoApplication.class})// 指定启动类
public class BootdemoApplicationTests {@AutowiredMessageProducerService messageProducerService;@Testpublic void contextLoads() {System.out.printf("aaaaaaaaaaaaa");}//测试@Testpublic void testFanoutExchange(){String msg = "遍身罗绮者,不是养蚕人";for (int i=0;i<10;i++){messageProducerService.testFanoutExchange(msg);}}}
3.15 增加一个测试方法调用消息生产服务,发送 Fanout类型的消息。运行测试控制台输出结果
两个队列都可消费。
3.2 Direct类型
3.2.1MessageProducerService生产服务增加方法testDirectExchange
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
/*** 测试Direct模式下发送消息* 在Direct模型下:** 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)** 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。** Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息* murg.Direct* @param msg*/public void testDirectExchange(String msg,String rountingkey) {// 发送消息//murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了//rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列rabbitTemplate.convertAndSend("murg.direct",rountingkey,msg);}
3.2.2MessageConsumerService消费服务增加方法testDirectExchange
Direct模式的消费监听修改队列和交换机的方式改为注解方式。
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:@Exchange(name = "murg.direct",type = ExchangeTypes.DIRECT)声明交换机名字及类型
通过key的值声明接收不同路由的消息
direct.queue1 消费 rountingkey为“蚕妇”的消息
direct.queue2 消费 rountingkey为“自京赴奉先县咏怀五百字”的消息
/**** 下面是rountingKey 路由key 模式的消费监听* 基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。** 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:**/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),//定义队列名字 direct.queue1exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型key = {"蚕妇"} //指定消费rountingkey))public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),//定义队列名字 direct.queue2exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型key = {"自京赴奉先县咏怀五百字"}//指定消费rountingkey))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");}
3.2.3测试类增加测试方法
@Testpublic void testDirectExchange() throws InterruptedException {String msg1 = "遍身罗绮者,不是养蚕人";String key1 ="蚕妇";String msg2 = "朱门酒肉臭,路有冻死骨";String key2 ="自京赴奉先县咏怀五百字";for (int i=0;i<10;i++){if (i % 2 == 0){messageProducerService.testDirectExchange(msg2,key2);Thread.sleep(1000);}else {messageProducerService.testDirectExchange(msg1,key1);Thread.sleep(1000);}}}
调用消息生产服务,发送 Direct类型的消息。运行测试控制台输出结果
3.3 Topic类型
3.3.1MessageProducerService生产服务增加方法testTopicExchange
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
demo.#:能够匹配demo.spu.insert 或者 demo.spu #.demo写法也可以
demo.*:只能匹配demo.spu
public void testTopicExchange(String msg, String key) {rabbitTemplate.convertAndSend("murg.topic",key,msg);}
3.3.2MessageConsumerService消费服务增加方法testDirectExchange
@Exchange(name = "murg.topic",type = ExchangeTypes.TOPIC)声明交换机名字及类型
通过key的值声明接收不同路由的消息
topic.queue1 消费 rountingkey为“罗隐.*”的消息
topic.queue2 消费 rountingkey为“#.贫女”的消息
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),//定义队列名字 topic.queue1exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型key = {"罗隐.*"} //指定消费rountingkey))public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),//定义队列名字 topic.queue2exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型key = {"#.贫女"}//指定消费rountingkey))public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");}
3.3.3测试类增加测试方法
@Testpublic void testTopicExchange() throws InterruptedException {String msg = "采得百花成蜜后,为谁辛苦为谁甜";String key ="罗隐.蜂";String msg2 = "苦恨年年压金线,为他人作嫁衣裳";String key2 ="秦韬玉.贫女";for (int i=0;i<10;i++){if (i % 2 == 0){messageProducerService.testTopicExchange(msg,key);Thread.sleep(1000);}else {messageProducerService.testTopicExchange(msg2,key2);Thread.sleep(1000);}}}
调用消息生产服务,发送 Topic类型的消息。运行测试控制台输出结果
相关文章:

从0开始搭建一个生产级SpringBoot2.0.X项目(十)SpringBoot 集成RabbitMQ
前言 最近有个想法想整理一个内容比较完整springboot项目初始化Demo。 SpringBoot集成RabbitMQ RabbitMQ中的一些角色: publisher:生产者 consumer:消费者 exchange个:交换机,负责消息路由 queue:队列…...
GNU/Linux - /proc/sys/vm/drop_caches
/proc/sys/vm/drop_caches 是 Linux 中的一个特殊文件,允许用户释放系统内存中的各种缓存。让我们深入了解一下这项功能的细节: The /proc/sys/vm/drop_caches is a special file in Linux that allows users to free up various caches in the systems …...
ubuntu 22.04 如何调整进程启动后能打开的文件数限制
在 Ubuntu 22.04 中,可以通过修改系统配置来调整进程启动后能够打开的文件数软限制。软限制是指操作系统允许单个进程打开的文件描述符的最大数量。以下是调整该限制的方法: 1. 查看当前限制 首先,你可以通过 ulimit 命令查看当前的软限制和…...

linux基础-完结(详讲补充)
linux基础-完结 一、Linux目录介绍 二、基础命令详细讲解 1. ls(列出目录内容) 2. cd(更改目录) 3. clear(清除终端屏幕) 4. pwd(显示你当前所在的目录) 5. vim(文本编辑器) 6. touch(创…...

LoRA:大型语言模型(LLMs)的低秩适应;低秩调整、矩阵的低秩与高秩
目录 LoRA:大型语言模型(LLMs)的低秩适应 一、LoRA的基本原理 二、LoRA的举例说明 三、LoRA的优势 低秩调整、矩阵的低秩与高秩 一、低秩调整(LoRA) 二、矩阵的低秩 三、矩阵的高秩 LoRA:大型语言模型(LLMs)的低秩适应 LoRA(Low-Rank Adaptation of LLMs),…...

游戏引擎学习第四天
视频参考:https://www.bilibili.com/video/BV1aDmqYnEnc/ BitBlt 是 Windows GDI(图形设备接口)中的一个函数,用于在设备上下文(device context, DC)之间复制位图数据。BitBlt 的主要用途是将一个图像区域从一个地方复…...
GIT GUI和 GIT bash区别
Git GUI 和 Git Bash 都是与 Git 版本控制工具相关的用户界面,但它们有不同的功能和用途。下面详细说明它们的区别及各自的作用: Git GUI 作用: Git GUI 是一个图形用户界面(GUI)工具,用于执行 Git 操作。…...

丹摩征文活动|Faster-Rcnn-训练与测试详细教程
本文 丹摩智算平台官方网站的介绍Faster-Rcnn-训练与测试提前准备进行Faster-rcnn 的环境配置数据集的介绍 丹摩智算平台官方网站的介绍 丹摩智算平台(DAMODEL)是专为人工智能(AI)开发者打造的高性能计算服务平台,旨在…...

星期-时间范围选择器 滑动选择时间 最小粒度 vue3
星期-时间范围选择器 功能介绍属性说明事件说明实现代码使用范例 根据业务需要,实现了一个可选择时间范围的周视图。用户可以通过鼠标拖动来选择时间段,并且可以通过快速选择组件来快速选择特定的时间范围。 功能介绍 时间范围选择:用户可以…...
一条SQL查询语句的执行流程(MySQL)
第一步:连接器(负责跟客户端建立连接、获取权限、维持和管理连接) 第二步:查询缓存 之前执行过的查询,MySQL以"Key - Value"的形式存在内存(key为SQL,value为结果集)&…...

linux基础——详细篇
免责声明 学习视频来自B 站up主泷羽sec,如涉及侵权马上删除文章。 笔记的只是方便各位师傅学习知识,以下代码、网站只涉及学习内容,其他的都与本人无关,切莫逾越法律红线,否则后果自负。 linux 基础命令重现 cd(切…...

大数据学习10之Hive高级
1.Hive高级 将大的文件按照某一列属性进行GROUP BY 就是分区,只是默认开窗存储; 分区是按行,如一百行数据,按十位上的数字分区,则有十个分区,每个分区里有十行; 分桶是根据某个字段哈希对桶数取…...

MongoDB笔记01-概念与安装
文章目录 前言一、MongoDB相关概念1.1 业务应用场景具体的应用场景什么时候选择MongoDB 1.2 MongoDB简介1.3 体系结构1.4 数据模型1.5 MongoDB的特点 二、本地单机部署2.1 Windows系统中的安装启动第一步:下载安装包第二步:解压安装启动1.命令行参数方式…...

ollama + fastGPT + m3e 本地部署指南
[TOC](ollama fastgptm3e本地部署) 开启WSL 因为这里使用的win部署,所以要安装wsl,如果是linux系统就没那么麻烦 控制面板->程序->程序和功能 更新wsl wsl --set-default-version 2wsl --update --web-download安装ubuntu wsl --install -d Ubuntudoc…...

【设计模式系列】享元模式(十五)
目录 一、什么是享元模式 二、享元模式的角色 三、享元模式的典型应用场景 四、享元模式在ThreadPoolExecutor中的应用 1. 享元对象(Flyweight)- 工作线程(Worker) 2. 享元工厂(Flyweight Factory)- …...

2024大兴区火锅美食节即将开幕——品味多元火锅,点燃冬季消费热潮
为响应“中国国际精品消费月”活动,由大兴区商务局主办、大兴区餐饮行业协会承办的2024大兴区火锅美食节将于11月15日正式启动,为期一个半月的美食盛宴将在大兴区掀起一场冬日的火锅热潮。此次火锅节作为北京市“食在京城、沸腾火锅”火锅美食节的重要组…...

可视化建模与UML《类图实验报告》
史铁生: 余华和莫言扛着我上火车, 推着走打雪仗, 还带我偷西瓜, 被人发现后他们拔腿就跑, 却忘了我还在西瓜地里。 一、实验目的: 1、熟悉类图的构件事物。 2、熟悉类之间的泛化、依赖、聚合和组合关系…...

VS2022项目配置笔记
文章目录 $(ProjectDir)与 $(SolutionDir) 宏附加包含目录VC目录和C/C的区别 $(ProjectDir)与 $(SolutionDir) 宏 假设有一个解决方案 MySolution,其中包含两个项目 ProjectA 和 ProjectB,目录结构如下: C:\Projects\…...

springboot029基于springboot的网上购物商城系统
🍅点赞收藏关注 → 添加文档最下方联系方式领取本源代码、数据库🍅 本人在Java毕业设计领域有多年的经验,陆续会更新更多优质的Java实战项目希望你能有所收获,少走一些弯路。🍅关注我不迷路🍅 项目视频 基于…...
网站访问在TCP/IP四层模型中的流程
访问一个网站的过程可以通过 TCP/IP 网络模型来描述。TCP/IP 模型通常被分为四层:应用层、传输层、网络层和链路层。以下是从这些层级的角度描述你访问一个网站时所发生的过程: 1. 应用层 (Application Layer) 当你在浏览器中输入一个 URL(…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...

无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...

Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...
Java数值运算常见陷阱与规避方法
整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...

vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...

【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...

Vue ③-生命周期 || 脚手架
生命周期 思考:什么时候可以发送初始化渲染请求?(越早越好) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命周期: 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...