RabbitMQ讲解与整合
RabbitMq安装
类型概念
租户
RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突
交换机

| 交换机 | |||
| 属性 | 意义 | 值 | 意义 |
| type | 类型 | direct | 默认的直接交换机 根据交换机下队列绑定的routingKey直接匹配 |
| fanout | 扇形交换机 简单来说就是发布订阅 队列直接绑定在交换机下,统一发布消息 | ||
| headers | 头部交换机,通过message header头部信息进行比对 可以根据定义全匹配、部分匹配等规则 | ||
| topic | 主题交换机 通过绑定routingKey进行模糊匹配 | ||
| Durability | 耐用 (持久化) | durable | 持久化,数据存放于硬盘 |
| transient | 瞬态,数据存放于内存 | ||
| Auto delete | 自动删除 | Yes | 没有绑定队列时自动删除,针对的是曾经有过但后来没有的事物 |
| No | 不自动删除 | ||
| Internal | 内部使用 | Yes | 该路由绑定的队列不会被用户消费 |
| No | 不自动删除 | ||
队列

| 队列 | |||
| 属性 | 意义 | 值 | 意义 |
| type | 类型 | Default for virtual host | 租户配置的默认选项,下列三种其一 默认Classic无需设置 |
| Classic | 传统的队列类型 数据存储在单个节点上 不具备quorum队列的高可用性和数据保护特性 ps:单机时使用 | ||
| Quorum | 高可用性队列 数据会被复制到多个节点 提供更好的数据可靠性和持久性 ps:部署多节点时使用 | ||
| Stream | 特殊类型的队列 用于支持事件流处理(event streaming) 具有类似于Kafka的流式处理特性 ps:听说不成熟,暂时用不上 | ||
| Durability | 耐用 (持久化) | durable | 持久化,数据存放于硬盘 |
| transient | 瞬态,数据存放于内存 | ||
参数:
| 显示参数 | 实际参数 | 作用 |
|---|---|---|
| Auto expire | x-expires | 设置队列的过期时间,单位为毫秒。当队列在指定时间内未被使用,将会被自动删除 |
| Message TTL | x-message-ttl | 设置队列中消息的过期时间(Time-To-Live),单位为毫秒。消息在队列中存放的时间超过设定的过期时间后会被自动删除 |
| Overflow behaviour | x-overflow | 设置队列溢出行为,可选值为 drop-head(删除最旧的消息)或 reject-publish(拒绝发布新消息) |
| Single active consumer | x-single-active-consumer | 配置队列是否只允许单个消费者消费消息。当设置了x-single-active-consumer参数时,表示队列只允许有一个消费者活跃地消费消息,其他消费者将被阻塞,直到当前的消费者停止消费或断开连接 |
| Dead letter exchange | x-dead-letter-exchange | 设置队列中的死信消息转发到的交换机名称。当消息成为死信时,将会被转发到指定的交换机 |
| Dead letter routing key | x-dead-letter-routing-key | 设置死信消息转发时的路由键。死信消息将通过指定的路由键转发到目标交换机 |
| Max length | x-max-length | 设置队列的最大长度,即队列中消息的最大数量。当队列中消息数量达到设定的最大长度后,新消息将无法入队 |
| Max length bytes | x-max-length-bytes | 设置队列消息的最大总字节数。当队列中消息的总字节数达到设定的最大值后,新消息将无法入队 |
| Leader locator | x-queue-leader-locator | 配置队列的领导者(Leader)定位器,集群中使用 |
SpringBoot整合
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.6.3</version>
</dependency>
配置数据源
spring: rabbitmq:addresses: xxx.xxx.xx.xx:5672username: adminpassword: xxxxxxvirtual-host: /
配置交换机和队列
@Component
public class RabbitMqConfig {// 定义交换机名称public static final String FANOUT_EXCHANGE = "fanout.test";@Bean(name = FANOUT_EXCHANGE)public FanoutExchange fanoutExchange() {// 交换机类型按需创建,这里用的是Fanout,发布订阅,绑定在该交换机下的队列都会收到消息// 参数2:是否持久化// 参数3:是否自动删除return new FanoutExchange(FANOUT_EXCHANGE, true, false);}// 定义队列public static final String FANOUT_QUEUE1 = "queue1";@Bean(name = FANOUT_QUEUE1)public Queue fanoutQueue1() {// 后三个不写也行,这是默认值// 参数2:是否持久化数据到磁盘(防止意外关闭数据丢失)// 参数3:是否具有排他性// 参数4:队列不再使用时是否自动删除return new Queue(FANOUT_QUEUE1, true, false, false);}public static final String FANOUT_QUEUE2 = "queue2";@Bean(name = FANOUT_QUEUE2)public Queue fanoutQueue2() {return new Queue(FANOUT_QUEUE2, true, false, false);}@Beanpublic Binding bindingSimpleQueue1(@Qualifier(FANOUT_QUEUE1) Queue fanoutQueue1,@Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding bindingSimpleQueue2(@Qualifier(FANOUT_QUEUE2) Queue fanoutQueue2,@Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
测试发一条消息到队列
@SpringBootTest(classes = TemplateApplication.class)
public class RabbitMQTest {@AutowiredRabbitMessagingTemplate rabbitMessagingTemplate;@Testpublic void testSent(){//指定交换机->指定队列(因为创建的交换机是FanoutExchange,所以绑定该交换机的队列都会收到一条消息)rabbitMessagingTemplate.convertAndSend("fanout.test","发送数据到FanoutExchange");// 如果创建队列不绑定交换机和路由键,那么实际上会有默认的交换机和路由键,均为空,直接将消息发送给队列,队列名则和路由键保持一致,仍然可以成功发送消息。}
}
测试接收队列消息
写个监听类接收消息:
@Component
public class RabbitMqListenter {@RabbitListener(queues = {RabbitMqConfig.FANOUT_QUEUE1,RabbitMqConfig.FANOUT_QUEUE2})public void reciveLogAll(String msg) throws Exception {System.out.println("消费到数据:" + msg);}
}
-------------基础的使用到这里就结束了-------------
拓展事项
rabbitMqPusher
自己封装一个更加方便使用的发送工具,可有可无,其中可以使用RabbitMessagingTemplate和RabbitTemplate,RabbitMessagingTemplate和RabbitTemplate都是Spring AMQP提供的用于与RabbitMQ进行交互的工具类如果只是简单使用,那么RabbitMessagingTemplate就够用了,如果需要更精细的控制,可以选择使用RabbitTemplate。
,但它们在使用方式和功能上有一些不同点:
RabbitMessagingTemplate:
RabbitMessagingTemplate是MessagingTemplate的子类,用于在Spring应用程序中发送和接收消息。
它提供了一种更高级别的抽象,使得在Spring框架中更容易使用消息发送和接收的功能。
可以直接与Spring的消息通道(MessageChannel)集成,方便进行消息的发送和接收。
RabbitTemplate:
RabbitTemplate是Spring AMQP提供的用于与RabbitMQ进行交互的核心类,提供了丰富的方法来发送和接收消息。
它是一个强大而灵活的工具,可以直接与RabbitMQ的交互进行细粒度的控制。
可以设置消息的属性、监听发送确认、接收确认等功能,更加灵活地处理消息发送和接收的细节。
public interface RabbitMqPublish {void send(String quene, String message);void send(String exchange, String routingKey, String message);void send(String quene, String message, Integer expiration);void send(String exchange, String routingKey, String message, Integer expiration);}
package com.template.rabbitmq.producer.impl;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class RabbitMqPublishImpl implements RabbitMqPublish {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息* @param quene 队列名称 或 交换机名称* @param message 消息内容*/public void send(String quene, String message) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).build());log.info("发送消息---> quene:{} ---> message:{}", message, quene);}/*** 直接发送消息到队列* 超过有效期丢弃** @param quene 队列名称* @param message 消息内容* @param expiration 有效期(毫秒)*/public void send(String quene, String message, Integer expiration) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info("发送消息---> quene:{} ---> message:{} ---> expiration:{}", quene, message, expiration);}/*** 发送消息* 超过有效期丢弃** @param exchange 交换机名称* @param routingKey 路由键* @param message 消息内容* @param expiration 有效期(毫秒)*/public void send(String exchange, String routingKey, String message, Integer expiration) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{} ---> expiration:{}", exchange, routingKey, message, expiration);}/*** 发送消息** @param exchange 交换机名称* @param routingKey 路由键* @param message 消息内容*/public void send(String exchange, String routingKey, String message) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).build());log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{}", exchange, routingKey, message);}}
在RabbitMQ中,如果队列没有设置过期时间(即没有声明x-message-ttl属性),那么即使在发送消息时设置了消息的过期时间也会失效。消息的过期时间只有在队列设置了过期时间的情况下才会生效。
实测以上列代码的方式直接对消息设置有效期是生效的。
死信队列
和普通队列一样,只不过是对其他队列进行配置,将过期的消息路由到死信队列中。
创建死信交换机和死信路由
// 配置交换机的文件中继续增加配置public static final String DIRECT_GP_DEAD_LETTER_EXCHANGE = "DIRECT_GP_DEAD_LETTER_EXCHANGE";public static final String DIRECT_GP_DEAD_LETTER_QUEUE = "DIRECT_GP_DEAD_LETTER_QUEUE";@Bean(DIRECT_GP_DEAD_LETTER_EXCHANGE)public DirectExchangedirectDeadLetterExchange() {return new DirectExchange(DIRECT_GP_DEAD_LETTER_EXCHANGE, true, false, new HashMap<>());}@Bean(DIRECT_GP_DEAD_LETTER_QUEUE)public Queue directDeadLetterQueue() {return new Queue(DIRECT_GP_DEAD_LETTER_QUEUE, true, false, false, new HashMap<>());}
设置队列消息有效期并绑定死信队列
@Bean(name = DIRECT_QUEUE1)public Queue directQueue1() {HashMap<String, Object> headers = new HashMap<>();// 配置消息有效期,消息发送到队列10秒后如果未被消费者消费,则过期headers.put("x-message-ttl",10000);// 配置超期交换机,消息过期后会发送到此交换机headers.put("x-dead-letter-exchange",DIRECT_GP_DEAD_LETTER_EXCHANGE);// 配置超期routingKey,消息过期后转移消息时指定的routingKeyheaders.put("x-dead-letter-routing-key",DIRECT_GP_DEAD_LETTER_QUEUE);// 如果只配置了有效期,未配置交换机和routingKey,则消息会被直接丢弃return new Queue(DIRECT_QUEUE1, true, false, false,headers);}
配置完成后,尝试向DIRECT_QUEUE1发送一条消息,不启动消费者,10秒后消息会自动转移到死信队列中,可在可视化管理界面进行验证。
延时队列
延时队列场景举例:
预定一个会议室,两个小时后开始,要求提前十分钟通知参会人员进行开会。
如果不使用延时队列,那么就需要不断轮询,查看是否到达需要通知的时间,进行消息通知。
延时队列的实现方式:
死信队列+消息有效期
预定时间到提前十分钟通知中间有110分钟,那么创建一条通知消息,设置有效期110分钟丢入队列,不用消费者去监听,等待消息过期后路由到指定的死信队列,再去消费死信队列中的消息即可。
所以延时队列实际上是一种实现方案,而不是一种特定的队列类型。
相关文章:
RabbitMQ讲解与整合
RabbitMq安装 类型概念 租户 RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。 每一个 vhos…...
python 基础知识点(蓝桥杯python科目个人复习计划56)
今日复习内容:做题 例题1:最小的或运算 问题描述:给定整数a,b,求最小的整数x,满足a|x b|x,其中|表示或运算。 输入格式: 第一行包括两个正整数a,b; 输出格式&#…...
【vue】vue中数据双向绑定原理/响应式原理,mvvm,mvc、mvp分别是什么
关于 vue 的原理主要有两个重要内容,分别是 mvvm 数据双向绑定原理,和 响应式原理 MVC(Model-View-Controller): Model(模型):表示应用程序的数据和业务逻辑。View(视图&…...
基于反光柱特征的激光定位算法思路
目录 1. 识别反光柱2. 数据关联2.1 基于几何形状寻找匹配2.2 暴力寻找匹配 3. 位姿估计(最小二乘求解)4. 问题4.1 精度问题4.2 快速旋转时定位较差 1. 识别反光柱 反光柱是特殊材料制成,根据激光雷达对反光材料扫描得到的反射值来提取特征。…...
CSM是什么意思?
CSM(Customer Service Management)是企业客户服务管理的信息化(IT)解决方案架构。本着以客户为中心的管理理念,搭建企业客户服务管理平台,实现企业以客户为中心的管理时代的竞争战略。 CSM的核心是以客户为中心,实现对…...
ES6 面试题
1. const、let 和 var 的区别是什么? 答案: var 声明的变量是函数作用域或全局作用域,而 const 和 let 声明的变量是块级作用域。使用 var 声明的变量可以被重复声明,而 const 和 let 不允许重复声明同一变量。const 声明的变量…...
智能指针(C++)
目录 一、智能指针是什么 二、为什么需要智能指针 三、智能指针的使用和原理 3.1、RALL 3.2 智能指针的原理 3.3、智能指针的分类 3.3.1、auto_ptr 3.3.2、unique_ptr 3.3.3、shared_ptr 3.2.4、weak_ptr 一、智能指针是什么 在c中,动态内存的管理式通过一…...
社区店商业模式探讨:如何创新并持续盈利?
在竞争激烈的商业环境中,社区店要想获得成功并持续盈利,需要不断创新和优化商业模式。 作为一名开鲜奶吧5年的创业者,我将分享一些关于社区店商业模式创新的干货和见解,希望能给想开实体店或创业的朋友们提供有价值的参考。 1、…...
一些可以访问gpt的方式
1、Coze扣子是新一代 AI 大模型智能体开发平台。整合了插件、长短期记忆、工作流、卡片等丰富能力,扣子能帮你低门槛、快速搭建个性化或具备商业价值的智能体,并发布到豆包、飞书等各个平台。https://www.coze.cn/ 2、https://poe.com/ 3、插件阿里…...
springer模板参考文献不显示
Spring期刊模板网站,我的问题是23年12月的版本 https://www.springernature.com/gp/authors/campaigns/latex-author-support/see-where-our-services-will-take-you/18782940 参考文献显示问好,在sn-article.tex文件中,这个sn-mathphys-num…...
【【C语言简单小题学习-1】】
实现九九乘法表 // 输出乘法口诀表 int main() {int i 0;int j 0;for (i 1; i < 9; i){for (j 1; j < i;j)printf("%d*%d%d ", i , j, i*j);printf("\n"); }return 0; }猜数字的游戏设计 #define _CRT_SECURE_NO_WARNINGS 1 #include<stdi…...
mongoDB 优化(1)索引
1、创建复合索引(多字段) db.collection_test1.createIndex({deletedVersion: 1,param: 1,qrYearMonth: 1},{name: "deletedVersion_1_param_1_qrYearMonth_1",background: true} ); 2、新增索引前: 执行查询: mb.r…...
stable diffusion webUI之赛博菩萨【秋葉】——工具包新手安裝与使用教程
stable diffusion webUI之赛博菩萨【秋葉】——工具包新手安裝与使用教程 AI浪潮袭来,还是学习学习为妙赛博菩萨【秋葉】简介——(葉ye,四声,同叶)A绘世启动器.exe(sd-webui-aki-v4.6.x)工具包安…...
鸿蒙应用程序包安装和卸载流程
开发者 开发者可以通过调试命令进行应用的安装和卸载,可参考多HAP的调试流程。 图1 应用程序包安装和卸载流程(开发者) 多HAP的开发调试与发布部署流程 多HAP的开发调试与发布部署流程如下图所示。 图1 多HAP的开发调试与发布部署流程 …...
C语言数组全面解析:从初学到精通
数组 1. 前言2. 一维数组的创建和初始化3. 一维数组的使用4. 一维数组在内存中的存储5. 二维数组的创建和初始化6. 二维数组的使用7. 二维数组在内存中的存储8. 数组越界9. 数组作为函数参数10. 综合练习10.1 用函数初始化,逆置,打印整型数组10.2 交换两…...
2024-02-28(Kafka,Oozie,Flink)
1.Kafka的数据存储形式 一个主题由多个分区组成 一个分区由多个segment段组成 一个segment段由多个文件组成(log,index(稀疏索引),timeindex(根据时间做的索引)) 2.读数据的流程 …...
Window下编写的sh文件在Linux/Docker中无法使用
Window下编写的sh文件在Linux/Docker中无法使用 一、sh文件目的1.1 初始状态1.2 目的 二、过程与异常2.1 首先获取标准ubuntu20.04 - 正常2.2 启动ubuntu20.04容器 - 正常2.3 执行windows下写的preInstall文件 - 报错 三、检查和处理3.1 评估异常3.2 处理异常3.3 调整后运行测试…...
第16章-DNS
目录 1. 域名 1.1 产生背景 1.2 概述 1.3 域名的树形层次化结构 2. DNS 2.1 概述 2.2 工作机制 3. DNS查询模式 3.1 递归查询: 3.2 迭代查询: 4. 相关知识点 4.1 集中式DNS 4.2 国内通用DNS 4.3 配置DNS代理 1. 域名 1.1 产生背景 ① IP…...
Leetcoder Day27| 贪心算法part01
语言:Java/Go 理论 贪心的本质是选择每一阶段的局部最优,从而达到全局最优。 什么时候用贪心?可以用局部最优退出全局最优,并且想不到反例到情况 贪心的一般解题步骤 将问题分解为若干个子问题找出适合的贪心策略求解每一个子…...
SpringBoot自动配置中bean的加载控制
🙈作者简介:练习时长两年半的Java up主 🙉个人主页:程序员老茶 🙊 ps:点赞👍是免费的,却可以让写博客的作者开心好久好久😎 📚系列专栏:Java全栈,…...
Python 测验
Python 测验 引言 Python 作为一种高级编程语言,因其简洁易读、功能强大等特点,在编程领域备受青睐。为了帮助读者检验自己对 Python 的掌握程度,本文特此推出一份 Python 测验。本测验涵盖了 Python 的基础知识、常用库以及高级特性,旨在帮助读者全面了解 Python 的应用…...
Playwright MCP:重新定义浏览器自动化边界的智能会话桥接方案
Playwright MCP:重新定义浏览器自动化边界的智能会话桥接方案 【免费下载链接】playwright-mcp Playwright MCP server 项目地址: https://gitcode.com/gh_mirrors/pl/playwright-mcp 在当今Web自动化测试领域,开发者们面临着一个普遍困境&#x…...
忍者像素绘卷多模态延伸:文字描述→像素绘卷→微信小程序动效导出
忍者像素绘卷多模态延伸:文字描述→像素绘卷→微信小程序动效导出 1. 创作工具介绍 忍者像素绘卷是一款革命性的图像生成工具,专为复古游戏风格内容创作而设计。基于Z-Image-Turbo深度优化引擎,它将传统像素艺术与现代AI技术完美结合&#…...
零基础转型AI产品经理?这份7阶段学习全攻略,助你少走两年弯路,抢占未来高薪岗位!
在AI浪潮席卷全球的今天,越来越多的人开始意识到:AI产品经理,将是未来最具竞争力的岗位之一。尤其是随着大模型(LLM)技术的爆发,一场“技术产品”的革命正在悄然上演。 很多小伙伴私信我:零基础…...
用 Shield CLI 本地开发调试:从零到上线你的第一个 Skill
当 AI Agent 需要调用外部能力时,Skill 就是它的"技能包"。本文以一个文旅素材搜索 Skill 为例,带你走完本地开发 → 调试 → 发布 → 安装使用的完整流程。核心工具只有一个 —— Shield CLI。 背景:什么是 Skill? Sk…...
深度解码:华为IPD流程管理体系L1-L5最佳实践与数字化转型架构全景(PPT)
在当今高度竞争的商业环境中,企业的核心竞争力不再仅仅取决于单一的技术突破或资本优势,而是取决于其业务流程的成熟度与组织运作的系统性。华为作为全球领先的科技企业,其背后支撑庞大业务帝国高效运转的正是其历经多年打磨的IPD(…...
Linux内核中的中断处理机制详解
Linux内核中的中断处理机制详解 引言 中断处理机制是Linux内核中负责处理硬件中断的核心组件,它确保硬件设备能够及时通知内核发生的事件,从而实现设备与操作系统的高效交互。Linux内核的中断处理机制支持多种中断类型,包括外部中断、内部中断…...
阻抗匹配原理与工程实践全解析
1. 阻抗匹配基础概念解析阻抗匹配是电子工程中最基础也最容易被忽视的技术要点之一。记得我刚入行时,第一次调试射频电路就栽在这个坑里——信号反射导致系统根本没法正常工作。阻抗匹配的本质,就是让信号源阻抗与负载阻抗达到共轭匹配状态,实…...
Windows系统运行Android应用的终极方案:APK Installer完全指南
Windows系统运行Android应用的终极方案:APK Installer完全指南 【免费下载链接】APK-Installer An Android Application Installer for Windows 项目地址: https://gitcode.com/GitHub_Trending/ap/APK-Installer 你是否曾经遇到这样的情况:想在W…...
Go语言中的监控系统:从基础到高级
Go语言中的监控系统:从基础到高级 1. 引言 在生产环境中,监控是保证系统稳定运行的重要手段。通过监控,我们可以了解系统的运行状态、发现潜在问题、及时处理故障。Go语言生态中有丰富的监控工具和库,可以帮助开发者构建完善的监…...
