RabbitMQ讲解与整合
RabbitMq安装
类型概念
租户
RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突
交换机

| 交换机 | |||
| 属性 | 意义 | 值 | 意义 |
| type | 类型 | direct | 默认的直接交换机 根据交换机下队列绑定的routingKey直接匹配 |
| fanout | 扇形交换机 简单来说就是发布订阅 队列直接绑定在交换机下,统一发布消息 | ||
| headers | 头部交换机,通过message header头部信息进行比对 可以根据定义全匹配、部分匹配等规则 | ||
| topic | 主题交换机 通过绑定routingKey进行模糊匹配 | ||
| Durability | 耐用 (持久化) | durable | 持久化,数据存放于硬盘 |
| transient | 瞬态,数据存放于内存 | ||
| Auto delete | 自动删除 | Yes | 没有绑定队列时自动删除,针对的是曾经有过但后来没有的事物 |
| No | 不自动删除 | ||
| Internal | 内部使用 | Yes | 该路由绑定的队列不会被用户消费 |
| No | 不自动删除 | ||
队列

| 队列 | |||
| 属性 | 意义 | 值 | 意义 |
| type | 类型 | Default for virtual host | 租户配置的默认选项,下列三种其一 默认Classic无需设置 |
| Classic | 传统的队列类型 数据存储在单个节点上 不具备quorum队列的高可用性和数据保护特性 ps:单机时使用 | ||
| Quorum | 高可用性队列 数据会被复制到多个节点 提供更好的数据可靠性和持久性 ps:部署多节点时使用 | ||
| Stream | 特殊类型的队列 用于支持事件流处理(event streaming) 具有类似于Kafka的流式处理特性 ps:听说不成熟,暂时用不上 | ||
| Durability | 耐用 (持久化) | durable | 持久化,数据存放于硬盘 |
| transient | 瞬态,数据存放于内存 | ||
参数:
| 显示参数 | 实际参数 | 作用 |
|---|---|---|
| Auto expire | x-expires | 设置队列的过期时间,单位为毫秒。当队列在指定时间内未被使用,将会被自动删除 |
| Message TTL | x-message-ttl | 设置队列中消息的过期时间(Time-To-Live),单位为毫秒。消息在队列中存放的时间超过设定的过期时间后会被自动删除 |
| Overflow behaviour | x-overflow | 设置队列溢出行为,可选值为 drop-head(删除最旧的消息)或 reject-publish(拒绝发布新消息) |
| Single active consumer | x-single-active-consumer | 配置队列是否只允许单个消费者消费消息。当设置了x-single-active-consumer参数时,表示队列只允许有一个消费者活跃地消费消息,其他消费者将被阻塞,直到当前的消费者停止消费或断开连接 |
| Dead letter exchange | x-dead-letter-exchange | 设置队列中的死信消息转发到的交换机名称。当消息成为死信时,将会被转发到指定的交换机 |
| Dead letter routing key | x-dead-letter-routing-key | 设置死信消息转发时的路由键。死信消息将通过指定的路由键转发到目标交换机 |
| Max length | x-max-length | 设置队列的最大长度,即队列中消息的最大数量。当队列中消息数量达到设定的最大长度后,新消息将无法入队 |
| Max length bytes | x-max-length-bytes | 设置队列消息的最大总字节数。当队列中消息的总字节数达到设定的最大值后,新消息将无法入队 |
| Leader locator | x-queue-leader-locator | 配置队列的领导者(Leader)定位器,集群中使用 |
SpringBoot整合
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.6.3</version>
</dependency>
配置数据源
spring: rabbitmq:addresses: xxx.xxx.xx.xx:5672username: adminpassword: xxxxxxvirtual-host: /
配置交换机和队列
@Component
public class RabbitMqConfig {// 定义交换机名称public static final String FANOUT_EXCHANGE = "fanout.test";@Bean(name = FANOUT_EXCHANGE)public FanoutExchange fanoutExchange() {// 交换机类型按需创建,这里用的是Fanout,发布订阅,绑定在该交换机下的队列都会收到消息// 参数2:是否持久化// 参数3:是否自动删除return new FanoutExchange(FANOUT_EXCHANGE, true, false);}// 定义队列public static final String FANOUT_QUEUE1 = "queue1";@Bean(name = FANOUT_QUEUE1)public Queue fanoutQueue1() {// 后三个不写也行,这是默认值// 参数2:是否持久化数据到磁盘(防止意外关闭数据丢失)// 参数3:是否具有排他性// 参数4:队列不再使用时是否自动删除return new Queue(FANOUT_QUEUE1, true, false, false);}public static final String FANOUT_QUEUE2 = "queue2";@Bean(name = FANOUT_QUEUE2)public Queue fanoutQueue2() {return new Queue(FANOUT_QUEUE2, true, false, false);}@Beanpublic Binding bindingSimpleQueue1(@Qualifier(FANOUT_QUEUE1) Queue fanoutQueue1,@Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding bindingSimpleQueue2(@Qualifier(FANOUT_QUEUE2) Queue fanoutQueue2,@Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
测试发一条消息到队列
@SpringBootTest(classes = TemplateApplication.class)
public class RabbitMQTest {@AutowiredRabbitMessagingTemplate rabbitMessagingTemplate;@Testpublic void testSent(){//指定交换机->指定队列(因为创建的交换机是FanoutExchange,所以绑定该交换机的队列都会收到一条消息)rabbitMessagingTemplate.convertAndSend("fanout.test","发送数据到FanoutExchange");// 如果创建队列不绑定交换机和路由键,那么实际上会有默认的交换机和路由键,均为空,直接将消息发送给队列,队列名则和路由键保持一致,仍然可以成功发送消息。}
}
测试接收队列消息
写个监听类接收消息:
@Component
public class RabbitMqListenter {@RabbitListener(queues = {RabbitMqConfig.FANOUT_QUEUE1,RabbitMqConfig.FANOUT_QUEUE2})public void reciveLogAll(String msg) throws Exception {System.out.println("消费到数据:" + msg);}
}
-------------基础的使用到这里就结束了-------------
拓展事项
rabbitMqPusher
自己封装一个更加方便使用的发送工具,可有可无,其中可以使用RabbitMessagingTemplate和RabbitTemplate,RabbitMessagingTemplate和RabbitTemplate都是Spring AMQP提供的用于与RabbitMQ进行交互的工具类如果只是简单使用,那么RabbitMessagingTemplate就够用了,如果需要更精细的控制,可以选择使用RabbitTemplate。
,但它们在使用方式和功能上有一些不同点:
RabbitMessagingTemplate:
RabbitMessagingTemplate是MessagingTemplate的子类,用于在Spring应用程序中发送和接收消息。
它提供了一种更高级别的抽象,使得在Spring框架中更容易使用消息发送和接收的功能。
可以直接与Spring的消息通道(MessageChannel)集成,方便进行消息的发送和接收。
RabbitTemplate:
RabbitTemplate是Spring AMQP提供的用于与RabbitMQ进行交互的核心类,提供了丰富的方法来发送和接收消息。
它是一个强大而灵活的工具,可以直接与RabbitMQ的交互进行细粒度的控制。
可以设置消息的属性、监听发送确认、接收确认等功能,更加灵活地处理消息发送和接收的细节。
public interface RabbitMqPublish {void send(String quene, String message);void send(String exchange, String routingKey, String message);void send(String quene, String message, Integer expiration);void send(String exchange, String routingKey, String message, Integer expiration);}
package com.template.rabbitmq.producer.impl;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class RabbitMqPublishImpl implements RabbitMqPublish {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息* @param quene 队列名称 或 交换机名称* @param message 消息内容*/public void send(String quene, String message) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).build());log.info("发送消息---> quene:{} ---> message:{}", message, quene);}/*** 直接发送消息到队列* 超过有效期丢弃** @param quene 队列名称* @param message 消息内容* @param expiration 有效期(毫秒)*/public void send(String quene, String message, Integer expiration) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info("发送消息---> quene:{} ---> message:{} ---> expiration:{}", quene, message, expiration);}/*** 发送消息* 超过有效期丢弃** @param exchange 交换机名称* @param routingKey 路由键* @param message 消息内容* @param expiration 有效期(毫秒)*/public void send(String exchange, String routingKey, String message, Integer expiration) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{} ---> expiration:{}", exchange, routingKey, message, expiration);}/*** 发送消息** @param exchange 交换机名称* @param routingKey 路由键* @param message 消息内容*/public void send(String exchange, String routingKey, String message) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).build());log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{}", exchange, routingKey, message);}}
在RabbitMQ中,如果队列没有设置过期时间(即没有声明x-message-ttl属性),那么即使在发送消息时设置了消息的过期时间也会失效。消息的过期时间只有在队列设置了过期时间的情况下才会生效。
实测以上列代码的方式直接对消息设置有效期是生效的。
死信队列
和普通队列一样,只不过是对其他队列进行配置,将过期的消息路由到死信队列中。
创建死信交换机和死信路由
// 配置交换机的文件中继续增加配置public static final String DIRECT_GP_DEAD_LETTER_EXCHANGE = "DIRECT_GP_DEAD_LETTER_EXCHANGE";public static final String DIRECT_GP_DEAD_LETTER_QUEUE = "DIRECT_GP_DEAD_LETTER_QUEUE";@Bean(DIRECT_GP_DEAD_LETTER_EXCHANGE)public DirectExchangedirectDeadLetterExchange() {return new DirectExchange(DIRECT_GP_DEAD_LETTER_EXCHANGE, true, false, new HashMap<>());}@Bean(DIRECT_GP_DEAD_LETTER_QUEUE)public Queue directDeadLetterQueue() {return new Queue(DIRECT_GP_DEAD_LETTER_QUEUE, true, false, false, new HashMap<>());}
设置队列消息有效期并绑定死信队列
@Bean(name = DIRECT_QUEUE1)public Queue directQueue1() {HashMap<String, Object> headers = new HashMap<>();// 配置消息有效期,消息发送到队列10秒后如果未被消费者消费,则过期headers.put("x-message-ttl",10000);// 配置超期交换机,消息过期后会发送到此交换机headers.put("x-dead-letter-exchange",DIRECT_GP_DEAD_LETTER_EXCHANGE);// 配置超期routingKey,消息过期后转移消息时指定的routingKeyheaders.put("x-dead-letter-routing-key",DIRECT_GP_DEAD_LETTER_QUEUE);// 如果只配置了有效期,未配置交换机和routingKey,则消息会被直接丢弃return new Queue(DIRECT_QUEUE1, true, false, false,headers);}
配置完成后,尝试向DIRECT_QUEUE1发送一条消息,不启动消费者,10秒后消息会自动转移到死信队列中,可在可视化管理界面进行验证。
延时队列
延时队列场景举例:
预定一个会议室,两个小时后开始,要求提前十分钟通知参会人员进行开会。
如果不使用延时队列,那么就需要不断轮询,查看是否到达需要通知的时间,进行消息通知。
延时队列的实现方式:
死信队列+消息有效期
预定时间到提前十分钟通知中间有110分钟,那么创建一条通知消息,设置有效期110分钟丢入队列,不用消费者去监听,等待消息过期后路由到指定的死信队列,再去消费死信队列中的消息即可。
所以延时队列实际上是一种实现方案,而不是一种特定的队列类型。
相关文章:
RabbitMQ讲解与整合
RabbitMq安装 类型概念 租户 RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。 每一个 vhos…...
python 基础知识点(蓝桥杯python科目个人复习计划56)
今日复习内容:做题 例题1:最小的或运算 问题描述:给定整数a,b,求最小的整数x,满足a|x b|x,其中|表示或运算。 输入格式: 第一行包括两个正整数a,b; 输出格式&#…...
【vue】vue中数据双向绑定原理/响应式原理,mvvm,mvc、mvp分别是什么
关于 vue 的原理主要有两个重要内容,分别是 mvvm 数据双向绑定原理,和 响应式原理 MVC(Model-View-Controller): Model(模型):表示应用程序的数据和业务逻辑。View(视图&…...
基于反光柱特征的激光定位算法思路
目录 1. 识别反光柱2. 数据关联2.1 基于几何形状寻找匹配2.2 暴力寻找匹配 3. 位姿估计(最小二乘求解)4. 问题4.1 精度问题4.2 快速旋转时定位较差 1. 识别反光柱 反光柱是特殊材料制成,根据激光雷达对反光材料扫描得到的反射值来提取特征。…...
CSM是什么意思?
CSM(Customer Service Management)是企业客户服务管理的信息化(IT)解决方案架构。本着以客户为中心的管理理念,搭建企业客户服务管理平台,实现企业以客户为中心的管理时代的竞争战略。 CSM的核心是以客户为中心,实现对…...
ES6 面试题
1. const、let 和 var 的区别是什么? 答案: var 声明的变量是函数作用域或全局作用域,而 const 和 let 声明的变量是块级作用域。使用 var 声明的变量可以被重复声明,而 const 和 let 不允许重复声明同一变量。const 声明的变量…...
智能指针(C++)
目录 一、智能指针是什么 二、为什么需要智能指针 三、智能指针的使用和原理 3.1、RALL 3.2 智能指针的原理 3.3、智能指针的分类 3.3.1、auto_ptr 3.3.2、unique_ptr 3.3.3、shared_ptr 3.2.4、weak_ptr 一、智能指针是什么 在c中,动态内存的管理式通过一…...
社区店商业模式探讨:如何创新并持续盈利?
在竞争激烈的商业环境中,社区店要想获得成功并持续盈利,需要不断创新和优化商业模式。 作为一名开鲜奶吧5年的创业者,我将分享一些关于社区店商业模式创新的干货和见解,希望能给想开实体店或创业的朋友们提供有价值的参考。 1、…...
一些可以访问gpt的方式
1、Coze扣子是新一代 AI 大模型智能体开发平台。整合了插件、长短期记忆、工作流、卡片等丰富能力,扣子能帮你低门槛、快速搭建个性化或具备商业价值的智能体,并发布到豆包、飞书等各个平台。https://www.coze.cn/ 2、https://poe.com/ 3、插件阿里…...
springer模板参考文献不显示
Spring期刊模板网站,我的问题是23年12月的版本 https://www.springernature.com/gp/authors/campaigns/latex-author-support/see-where-our-services-will-take-you/18782940 参考文献显示问好,在sn-article.tex文件中,这个sn-mathphys-num…...
【【C语言简单小题学习-1】】
实现九九乘法表 // 输出乘法口诀表 int main() {int i 0;int j 0;for (i 1; i < 9; i){for (j 1; j < i;j)printf("%d*%d%d ", i , j, i*j);printf("\n"); }return 0; }猜数字的游戏设计 #define _CRT_SECURE_NO_WARNINGS 1 #include<stdi…...
mongoDB 优化(1)索引
1、创建复合索引(多字段) db.collection_test1.createIndex({deletedVersion: 1,param: 1,qrYearMonth: 1},{name: "deletedVersion_1_param_1_qrYearMonth_1",background: true} ); 2、新增索引前: 执行查询: mb.r…...
stable diffusion webUI之赛博菩萨【秋葉】——工具包新手安裝与使用教程
stable diffusion webUI之赛博菩萨【秋葉】——工具包新手安裝与使用教程 AI浪潮袭来,还是学习学习为妙赛博菩萨【秋葉】简介——(葉ye,四声,同叶)A绘世启动器.exe(sd-webui-aki-v4.6.x)工具包安…...
鸿蒙应用程序包安装和卸载流程
开发者 开发者可以通过调试命令进行应用的安装和卸载,可参考多HAP的调试流程。 图1 应用程序包安装和卸载流程(开发者) 多HAP的开发调试与发布部署流程 多HAP的开发调试与发布部署流程如下图所示。 图1 多HAP的开发调试与发布部署流程 …...
C语言数组全面解析:从初学到精通
数组 1. 前言2. 一维数组的创建和初始化3. 一维数组的使用4. 一维数组在内存中的存储5. 二维数组的创建和初始化6. 二维数组的使用7. 二维数组在内存中的存储8. 数组越界9. 数组作为函数参数10. 综合练习10.1 用函数初始化,逆置,打印整型数组10.2 交换两…...
2024-02-28(Kafka,Oozie,Flink)
1.Kafka的数据存储形式 一个主题由多个分区组成 一个分区由多个segment段组成 一个segment段由多个文件组成(log,index(稀疏索引),timeindex(根据时间做的索引)) 2.读数据的流程 …...
Window下编写的sh文件在Linux/Docker中无法使用
Window下编写的sh文件在Linux/Docker中无法使用 一、sh文件目的1.1 初始状态1.2 目的 二、过程与异常2.1 首先获取标准ubuntu20.04 - 正常2.2 启动ubuntu20.04容器 - 正常2.3 执行windows下写的preInstall文件 - 报错 三、检查和处理3.1 评估异常3.2 处理异常3.3 调整后运行测试…...
第16章-DNS
目录 1. 域名 1.1 产生背景 1.2 概述 1.3 域名的树形层次化结构 2. DNS 2.1 概述 2.2 工作机制 3. DNS查询模式 3.1 递归查询: 3.2 迭代查询: 4. 相关知识点 4.1 集中式DNS 4.2 国内通用DNS 4.3 配置DNS代理 1. 域名 1.1 产生背景 ① IP…...
Leetcoder Day27| 贪心算法part01
语言:Java/Go 理论 贪心的本质是选择每一阶段的局部最优,从而达到全局最优。 什么时候用贪心?可以用局部最优退出全局最优,并且想不到反例到情况 贪心的一般解题步骤 将问题分解为若干个子问题找出适合的贪心策略求解每一个子…...
SpringBoot自动配置中bean的加载控制
🙈作者简介:练习时长两年半的Java up主 🙉个人主页:程序员老茶 🙊 ps:点赞👍是免费的,却可以让写博客的作者开心好久好久😎 📚系列专栏:Java全栈,…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径
目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务
通过akshare库,获取股票数据,并生成TabPFN这个模型 可以识别、处理的格式,写一个完整的预处理示例,并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务,进行预测并输…...
GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...
如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...
SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...
C++ 设计模式 《小明的奶茶加料风波》
👨🎓 模式名称:装饰器模式(Decorator Pattern) 👦 小明最近上线了校园奶茶配送功能,业务火爆,大家都在加料: 有的同学要加波霸 🟤,有的要加椰果…...
