当前位置: 首页 > news >正文

SpringBoot整合RabbitMQ的快速使用教程

     

目录

一、引入依赖

二、配置rabbitmq的连接信息等

1、生产者配置

2、消费者配置 

三、设置消息转换器

四、生产者代码示例

 1、配置交换机和队列信息

2、生产消息代码

五、消费者代码示例

1、消费层代码

2、业务层代码 


        在分布式系统中,消息队列是一种重要的通信方式,它能够有效地将消息从一个应用程序传递到另一个应用程序。RabbitMQ是一款流行的开源消息队列系统,简单易用且功能强大。本文将介绍如何使用SpringBoot快速整合RabbitMQ,实现消息的发送和接收。

 

交换机: 主要负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或多个队列中。交换机的类型有:

  •  Fanout Exchange(扇出交换机)

        Fanout交换机会将接收到的所有消息广播到它知道的所有队列中。这种类型的交换机不考虑路由键,只是简单地将消息复制到所有绑定的队列中。适用于不需要选择性地发送消息给特定队列的情况,例如,广播系统通知或有多个服务需要消费同一份数据的场景。

  • Direct Exchange(直连交换机)

       Direct交换机根据消息的路由键将消息发送到与之匹配的队列中。只有当路由键与绑定关键字完全匹配时,消息才会被路由到相应的队列。适合于精确控制消息投递的场景,如特定的服务或功能模块只关心特定类型的消息。

  • Topic Exchange(主题交换机)

       Topic交换机允许更复杂的匹配规则,通过模式匹配的方式将消息路由到一个或多个队列。路由键和绑定键都使用点分隔的字符串,可以包含特殊字符如“#”和“*”来实现模糊匹配。"*"用于匹配一个单词,而“#”则用于匹配零个或多个单词。适合于需要按内容分类消息的系统,如日志处理系统,可以根据日志等级或来源将日志消息分发到不同的队列。

  • Headers Exchange(头交换机)

        Headers交换机使用消息头的一组键值对来决定消息应该被路由到哪个队列。这种交换机允许更细粒度的路由控制,但配置和使用较为复杂。适合需要基于消息多个属性来动态决定路由的场景,例如某些高级的路由策略或复杂的事件处理系统。

队列:主要用于存储消息,实现先进先出(FIFO)的特性。

一、引入依赖

这里引入了两个依赖。一个是rabbitmq的依赖,另一个是配置json转换器所需要的依赖。生产者和消费者服务都需要引入这两个依赖。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-xml</artifactId>
 </dependency>

二、配置rabbitmq的连接信息等

1、生产者配置

  rabbitmq:
    host: 170.40.20.16
    port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /

2、消费者配置 

   rabbitmq:
    host: 170.40.20.16
  port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能处理一个,处理完成才能获取下一个消息

三、设置消息转换器

        默认情况下Spring采用的序列化方式是JDK序列化,而JDK的序列化存在可读性性差、占用内存大、存在安全漏洞等问题。所以,这里我们一般使用Jackson的序列化代替JDk的序列化。

在生产者和消费者的启动类上加上如下代码:  

@SpringBootApplication
@EnableRabbit //开启rabbitmq的使用
public class ConsumerApp {public static void main( String[] args ) {SpringApplication.run(ConsumerApp.class, args);}//使用的是Jackson库中的Jackson2JsonMessageConverter类,代替使用jdk自带的序列化@Beanpublic MessageConverter jacksonMessageConvertor(){Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);//开启消息id的自动生成功能return jackson2JsonMessageConverter;}
}

四、生产者代码示例

 1、配置交换机和队列信息
@Configuration
public class RabbitMqConfig {private static String EXCHANGE_NAME="amq.topic";private static String QUEUE_NAME="alarm.data.topic.queue";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";/*** 声明交换机*/@Beanpublic TopicExchange exchange(){// durable:是否持久化,默认是false// autoDelete:是否自动删除,当没有生产者或者消费者使用此交换机,该交换机会自动删除。return new TopicExchange(EXCHANGE_NAME,true,false);}/*** 声明告警队列* @return*/@Bean("alarmQueue")public Queue alarmQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。return new Queue(QUEUE_NAME,true,false,false);}/*** 声明确认告警队列* @return*/@Bean("confirmAlarmQueue")public Queue confirmAlarmQueue(){return new Queue(CONFIRM_ALARM_QUEUE_NAME,true,false,false);}/*** 声明告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding alarmBinding(@Qualifier("alarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event.#");}/*** 声明确认告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding confirmAlarmBinding(@Qualifier("confirmAlarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event_confirm.#");}
2、生产消息代码
    @Autowiredprivate RabbitTemplate rabbitTemplate;private static String EXCHANGE_NAME="amq.topic";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";@Testvoid producerAlarmMsg() {String msg = "发送一条告警消息";rabbitTemplate.convertAndSend(EXCHANGE_NAME, "server.event.#",msg);System.out.println("msg = " + msg);}@Testvoid producerConfirmAlarmMsg() {String msg = "发送一条确认告警消息";rabbitTemplate.convertAndSend(CONFIRM_ALARM_QUEUE_NAME, "server.event_confirm.#",msg);System.out.println("msg = " + msg);}

五、消费者代码示例

1、消费层代码
@Component
public class AlarmConsumer {@Autowiredprivate IAlarmService alarmService;@RabbitListener(queues ="alarm.data.topic.queue",concurrency = "5")public void getAlarmInfo(String data){alarmService.dealAlarmData(data);}@RabbitListener(queues ="alarm.confirm.data.topic.queue",concurrency = "5")public void getConfirmAlarmInfo(String data){alarmService.dealConfirmAlarmData(data);}
}
2、业务层代码 
@Service
public class IAlarmServiceImpl implements IAlarmService {@Overridepublic void dealAlarmData(String data) {EquipAlarmResp equipAlarmResp= JSON.parseObject(result,EquipAlarmResp.class);List<String> alarmIdsOld = dceEquipAlarmMapper.queryAllAlarmIds();DceEquipAlarmDto dceEquipAlarmDto = CopyBeanUtils.copyProperties(equipAlarmResp, DceEquipAlarmDto.class);dceEquipAlarmDto.setCreateTime(new Date());dceEquipAlarmDto.setAlarmTime(dceEquipAlarmDto.getAlarmTime()/1000);//查询出需要新增或者更新的数据Boolean flag=alarmIdsOld.stream().filter(a->a.equals(dceEquipAlarmDto.getAlarmId())).findFirst().isPresent();//开启事务,保证新增、更新、删除的原子性TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);List<DceEquipAlarmDto> list=new ArrayList<>();list.add(dceEquipAlarmDto);try {//新增if (!flag) {dceEquipAlarmMapper.insertBatch(list);}//更新if (flag) {dceEquipAlarmMapper.updateBatch(list);}//提交事务transactionManager.commit(transaction);} catch (Exception e) {//回滚transactionManager.rollback(transaction);log.error("DynamicEnvironmentServiceImpl.getAlarmInfoByRabbitMq 新华报业动环设备告警信息更新失败!", e);}}@Overridepublic void dealConfirmAlarmData(String data) {EquipConfirmAlarmResp alarmResp = JSON.parseObject(data,EquipConfirmAlarmResp.class);Integer confirmTime = Integer.parseInt(String.valueOf(System.currentTimeMillis() / 1000));alarmResp.setConfirmTime(confirmTime);dceEquipAlarmMapper.updateConfirmAlarmBatch(alarmResp,alarmResp.getAlarmIds());}}

注:以上代码为对接告警信息和对接告警确认消息的示例。

相关文章:

SpringBoot整合RabbitMQ的快速使用教程

目录 一、引入依赖 二、配置rabbitmq的连接信息等 1、生产者配置 2、消费者配置 三、设置消息转换器 四、生产者代码示例 1、配置交换机和队列信息 2、生产消息代码 五、消费者代码示例 1、消费层代码 2、业务层代码 在分布式系统中&#xff0c;消息队列是一种重要…...

pytorch比较操作

文章目录 常用的比较操作1.torch.allclose()2.torch.argsort()3.torch.eq()4.torch.equal()5.torch.greater_equal()6.torch.gt()7.torch.isclose()8.torch.isfinite()9.torch.isif()10.torch.isposinf()11.torch.isneginf()12.torch.isnan()13.torch.kthvalue()14.torch.less_…...

2024年4月—马克思主义基本原理概论真题及答案解析(上海自考)

目录 1.选择题 2.简答题 3.论述题 1.选择题 2.简答题...

「Element-UI表头添加带Icon的提示信息」

一、封装全局组件 &#x1f353; 注意&#xff1a;可以直接复制该文件 <!-- // 写一个PromptMessage的组件&#xff0c;并全局注册 --> <template><div class"tooltip"><el-tooltip effect"dark" placement"right">&l…...

单细胞 10X 和seurat对象学习

单细胞seurat数据的基础知识 rm(list ls()) library(Seurat) #注意这个报错 #Warning: Feature names cannot have underscores (_), replacing with dashes (-) folderslist.files(./,pattern[123]$) folders scList lapply(folders,function(folder){ CreateSeuratObject(…...

Flutter 中的 Flex 小部件:全面指南

Flutter 中的 Flex 小部件&#xff1a;全面指南 Flutter 的布局系统非常灵活&#xff0c;允许开发者以声明式的方式构建复杂的用户界面。Flex 是 Flutter 中用于创建灵活布局的核心小部件之一&#xff0c;它提供了水平和垂直的线性布局能力。本文将详细介绍 Flex 小部件的使用…...

统计每个活动的用户访问量,且每个用户仅统计一次

场景&#xff1a;统计每个活动的用户访问量&#xff0c;且每个用户仅统计一次。 首先活动表是已经存在了的&#xff0c;一般情况下&#xff0c;我们都会在创建一个用户访问表&#xff0c;其中唯一主键是用户ID活动ID作为唯一主键 create table user_visist_activity_record(i…...

基于SpringBoot的本科生考研率统计系统

基于SpringBoot的本科生考研率统计系统 一、开发技术二、功能模块三、代码结构四、数据库设计五、运行截图六、源码获取 一、开发技术 技术&#xff1a;SpringBoot、MyBatis-Plus、Redis、MySQL、Thymeleaf、Html、Vue、Element-ui。 框架&#xff1a;基于开源框架easy-admin开…...

JMeter正则表达式提取器和JSON提取器基础用法,小白必会!

最近在利用JMeter做接口自动化测试&#xff0c;正则表达式提取器和JSON提取器用的还挺多&#xff0c;想着分享下&#xff0c;希望对大家的接口自动化测试项目有所启发。 在 JMeter 中&#xff0c;正则表达式和 JSON 提取器都是用于从响应数据中提取所需内容&#xff0c;但它们…...

5-26作业

网络聊天室 服务器&#xff1a; 1 #include <myhead.h>2 int main(int argc, const char *argv[])3 {4 if(argc!3)5 {6 printf("请输入IP和端口号\n");7 return -1;8 }9 int sfd socket(AF_INET, SOCK_DGRAM, 0);10 if(…...

2024.05.28学习记录

1. 小林coding 计网复习 2.代码随想录刷题. 图论.和复习数组.链表 3.rosebush完成select组件...

撤销最近一次的提交,使用git revert 和 git reset的区别

文章目录 工作区 暂存区 本地仓库 远程仓库需求&#xff1a;已推送到远程仓库&#xff0c;想要撤销操作git revert &#xff08;添加新的提交来“反做”之前的更改&#xff0c;云端会残留上次的提交记录&#xff09;git reset&#xff08;相当于覆盖上次的提交&#xff09;1.--…...

MySQL详细安装、配置过程,多图,详解

本文适合centos7环境下安装mysql&#xff0c;在安装和卸载过程中&#xff0c;都在root用户下完成。文章目录 清理环境获取mysql官方yum源安装mysql yum源安装mysql服务安装报错解决办法验证是否安装完成启动mysql服务登录服务方法一&#xff1a;方法二&#xff1a;方法三&#…...

音视频学习规划

文章目录 概述闲聊点 小结 概述 最近在学习音视频&#xff0c;觉得还是要先写个提纲&#xff0c;给自己制定下学习路线及目标。先写下我的个人流程及思路。 ffmpeg的命令ffmpeg api播放器流媒体RTMP&#xff0c;HLS 闲聊点 先说下学习命令行吧&#xff0c;学习命令行是为了…...

代码随想录算法训练营第21天|● 530.二叉搜索树的最小绝对差 ● 501.二叉搜索树中的众数 ● 236. 二叉树的最近公共祖先

二叉搜索树的最小绝对差 题目连接 https://leetcode.cn/problems/minimum-absolute-difference-in-bst/ 思路&#xff1a; 利用二叉搜索树的中序遍历的特性&#xff0c;将二叉树转成有序数组&#xff0c;进而求任意两个数的最小绝对差。 代码 /*** Definition for a bina…...

K8S中Prometheus+Grafana监控

1.介绍 phometheus:当前一套非常流行的开源监控和报警系统。 运行原理&#xff1a;通过HTTP协议周期性抓取被监控组件的状态。输出被监控组件信息的HTTP接口称为exporter。 常用组件大部分都有exporter可以直接使用&#xff0c;比如haproxy,nginx&#xff0c;Mysql,Linux系统信…...

题解:CF1968F(Equal XOR Segments)

题解&#xff1a;CF1968F&#xff08;Equal XOR Segments&#xff09; 题目翻译&#xff1a;定义一个序列是好&#xff0c;当且仅当可以将其分成大于 1 1 1 份&#xff0c;使得每个部分的异或和相等。现在给定一个长度为 n n n 的序列 a a a&#xff0c;以及 q q q 次查询…...

Python操作MySQL实战

文章导读 本文用于巩固Pymysql操作MySQL与MySQL操作的知识点&#xff0c;实现一个简易的音乐播放器&#xff0c;拟实现的功能包括&#xff1a;用户登录&#xff0c;窗口显示&#xff0c;加载本地音乐&#xff0c;加入和删除播放列表&#xff0c;播放音乐。 点击此处获取参考源…...

【Linux系统】进程间通信

本篇博客整理了进程间通信的方式管道、 system V IPC的原理&#xff0c;结合大量的系统调用接口&#xff0c;和代码示例&#xff0c;旨在让读者透过进程间通信去体会操作系统的设计思想和管理手段。 目录 一、进程间通信 二、管道 1.匿名管道 1.1-通信原理 1.2-系统调用 …...

北大国际医院腹膜后纤维化课题组 多学科协作开辟治疗新径

腹膜后纤维化(Retroperitoneal Fibrosis,简称RPF)是一种罕见的自身免疫性疾病,其核心特征是纤维组织的异常增生与硬化。这种疾病主要影响肾脏下方的腹主动脉和髂动脉区域,增生的纤维组织会逐渐压迫周围的输尿管和下腔静脉,从而导致一系列并发症,包括主动脉瘤、肾功能衰竭等,甚至…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)

宇树机器人多姿态起立控制强化学习框架论文解析 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架&#xff08;一&#xff09; 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

SpringCloudGateway 自定义局部过滤器

场景&#xff1a; 将所有请求转化为同一路径请求&#xff08;方便穿网配置&#xff09;在请求头内标识原来路径&#xff0c;然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

Rapidio门铃消息FIFO溢出机制

关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系&#xff0c;以下是深入解析&#xff1a; 门铃FIFO溢出的本质 在RapidIO系统中&#xff0c;门铃消息FIFO是硬件控制器内部的缓冲区&#xff0c;用于临时存储接收到的门铃消息&#xff08;Doorbell Message&#xff09;。…...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中&#xff0c;我们经常会遇到这样的场景&#xff1a;一个对象的状态变化需要自动通知其他对象&#xff0c;比如&#xff1a; 电商平台中&#xff0c;商品库存变化时需要通知所有订阅该商品的用户&#xff1b;新闻网站中&#xff0…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

Golang——9、反射和文件操作

反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一&#xff1a;使用Read()读取文件2.3、方式二&#xff1a;bufio读取文件2.4、方式三&#xff1a;os.ReadFile读取2.5、写…...