RabbitMQ讲解与整合
RabbitMq安装
类型概念
租户
RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。
每一个 vhost 都是一个独立的小型 RabbitMQ 服务器,这个 vhost 中会有自己的消息队列、消息交换机以及相应的绑定关系等等,并且拥有自己独立的权限,不同的 vhost 中的队列和交换机不能互相绑定,这样技能保证运行安全又能避免命名冲突
交换机
交换机 | |||
属性 | 意义 | 值 | 意义 |
type | 类型 | direct | 默认的直接交换机 根据交换机下队列绑定的routingKey直接匹配 |
fanout | 扇形交换机 简单来说就是发布订阅 队列直接绑定在交换机下,统一发布消息 | ||
headers | 头部交换机,通过message header头部信息进行比对 可以根据定义全匹配、部分匹配等规则 | ||
topic | 主题交换机 通过绑定routingKey进行模糊匹配 | ||
Durability | 耐用 (持久化) | durable | 持久化,数据存放于硬盘 |
transient | 瞬态,数据存放于内存 | ||
Auto delete | 自动删除 | Yes | 没有绑定队列时自动删除,针对的是曾经有过但后来没有的事物 |
No | 不自动删除 | ||
Internal | 内部使用 | Yes | 该路由绑定的队列不会被用户消费 |
No | 不自动删除 |
队列
队列 | |||
属性 | 意义 | 值 | 意义 |
type | 类型 | Default for virtual host | 租户配置的默认选项,下列三种其一 默认Classic无需设置 |
Classic | 传统的队列类型 数据存储在单个节点上 不具备quorum队列的高可用性和数据保护特性 ps:单机时使用 | ||
Quorum | 高可用性队列 数据会被复制到多个节点 提供更好的数据可靠性和持久性 ps:部署多节点时使用 | ||
Stream | 特殊类型的队列 用于支持事件流处理(event streaming) 具有类似于Kafka的流式处理特性 ps:听说不成熟,暂时用不上 | ||
Durability | 耐用 (持久化) | durable | 持久化,数据存放于硬盘 |
transient | 瞬态,数据存放于内存 |
参数:
显示参数 | 实际参数 | 作用 |
---|---|---|
Auto expire | x-expires | 设置队列的过期时间,单位为毫秒。当队列在指定时间内未被使用,将会被自动删除 |
Message TTL | x-message-ttl | 设置队列中消息的过期时间(Time-To-Live),单位为毫秒。消息在队列中存放的时间超过设定的过期时间后会被自动删除 |
Overflow behaviour | x-overflow | 设置队列溢出行为,可选值为 drop-head(删除最旧的消息)或 reject-publish(拒绝发布新消息) |
Single active consumer | x-single-active-consumer | 配置队列是否只允许单个消费者消费消息。当设置了x-single-active-consumer参数时,表示队列只允许有一个消费者活跃地消费消息,其他消费者将被阻塞,直到当前的消费者停止消费或断开连接 |
Dead letter exchange | x-dead-letter-exchange | 设置队列中的死信消息转发到的交换机名称。当消息成为死信时,将会被转发到指定的交换机 |
Dead letter routing key | x-dead-letter-routing-key | 设置死信消息转发时的路由键。死信消息将通过指定的路由键转发到目标交换机 |
Max length | x-max-length | 设置队列的最大长度,即队列中消息的最大数量。当队列中消息数量达到设定的最大长度后,新消息将无法入队 |
Max length bytes | x-max-length-bytes | 设置队列消息的最大总字节数。当队列中消息的总字节数达到设定的最大值后,新消息将无法入队 |
Leader locator | x-queue-leader-locator | 配置队列的领导者(Leader)定位器,集群中使用 |
SpringBoot整合
引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.6.3</version>
</dependency>
配置数据源
spring: rabbitmq:addresses: xxx.xxx.xx.xx:5672username: adminpassword: xxxxxxvirtual-host: /
配置交换机和队列
@Component
public class RabbitMqConfig {// 定义交换机名称public static final String FANOUT_EXCHANGE = "fanout.test";@Bean(name = FANOUT_EXCHANGE)public FanoutExchange fanoutExchange() {// 交换机类型按需创建,这里用的是Fanout,发布订阅,绑定在该交换机下的队列都会收到消息// 参数2:是否持久化// 参数3:是否自动删除return new FanoutExchange(FANOUT_EXCHANGE, true, false);}// 定义队列public static final String FANOUT_QUEUE1 = "queue1";@Bean(name = FANOUT_QUEUE1)public Queue fanoutQueue1() {// 后三个不写也行,这是默认值// 参数2:是否持久化数据到磁盘(防止意外关闭数据丢失)// 参数3:是否具有排他性// 参数4:队列不再使用时是否自动删除return new Queue(FANOUT_QUEUE1, true, false, false);}public static final String FANOUT_QUEUE2 = "queue2";@Bean(name = FANOUT_QUEUE2)public Queue fanoutQueue2() {return new Queue(FANOUT_QUEUE2, true, false, false);}@Beanpublic Binding bindingSimpleQueue1(@Qualifier(FANOUT_QUEUE1) Queue fanoutQueue1,@Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding bindingSimpleQueue2(@Qualifier(FANOUT_QUEUE2) Queue fanoutQueue2,@Qualifier(FANOUT_EXCHANGE) FanoutExchange fanoutExchange) {// 将交换机和队列绑定return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
测试发一条消息到队列
@SpringBootTest(classes = TemplateApplication.class)
public class RabbitMQTest {@AutowiredRabbitMessagingTemplate rabbitMessagingTemplate;@Testpublic void testSent(){//指定交换机->指定队列(因为创建的交换机是FanoutExchange,所以绑定该交换机的队列都会收到一条消息)rabbitMessagingTemplate.convertAndSend("fanout.test","发送数据到FanoutExchange");// 如果创建队列不绑定交换机和路由键,那么实际上会有默认的交换机和路由键,均为空,直接将消息发送给队列,队列名则和路由键保持一致,仍然可以成功发送消息。}
}
测试接收队列消息
写个监听类接收消息:
@Component
public class RabbitMqListenter {@RabbitListener(queues = {RabbitMqConfig.FANOUT_QUEUE1,RabbitMqConfig.FANOUT_QUEUE2})public void reciveLogAll(String msg) throws Exception {System.out.println("消费到数据:" + msg);}
}
-------------基础的使用到这里就结束了-------------
拓展事项
rabbitMqPusher
自己封装一个更加方便使用的发送工具,可有可无,其中可以使用RabbitMessagingTemplate和RabbitTemplate,RabbitMessagingTemplate和RabbitTemplate都是Spring AMQP提供的用于与RabbitMQ进行交互的工具类如果只是简单使用,那么RabbitMessagingTemplate就够用了,如果需要更精细的控制,可以选择使用RabbitTemplate。
,但它们在使用方式和功能上有一些不同点:
RabbitMessagingTemplate:
RabbitMessagingTemplate是MessagingTemplate的子类,用于在Spring应用程序中发送和接收消息。
它提供了一种更高级别的抽象,使得在Spring框架中更容易使用消息发送和接收的功能。
可以直接与Spring的消息通道(MessageChannel)集成,方便进行消息的发送和接收。
RabbitTemplate:
RabbitTemplate是Spring AMQP提供的用于与RabbitMQ进行交互的核心类,提供了丰富的方法来发送和接收消息。
它是一个强大而灵活的工具,可以直接与RabbitMQ的交互进行细粒度的控制。
可以设置消息的属性、监听发送确认、接收确认等功能,更加灵活地处理消息发送和接收的细节。
public interface RabbitMqPublish {void send(String quene, String message);void send(String exchange, String routingKey, String message);void send(String quene, String message, Integer expiration);void send(String exchange, String routingKey, String message, Integer expiration);}
package com.template.rabbitmq.producer.impl;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class RabbitMqPublishImpl implements RabbitMqPublish {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息* @param quene 队列名称 或 交换机名称* @param message 消息内容*/public void send(String quene, String message) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).build());log.info("发送消息---> quene:{} ---> message:{}", message, quene);}/*** 直接发送消息到队列* 超过有效期丢弃** @param quene 队列名称* @param message 消息内容* @param expiration 有效期(毫秒)*/public void send(String quene, String message, Integer expiration) {rabbitTemplate.send(quene, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info("发送消息---> quene:{} ---> message:{} ---> expiration:{}", quene, message, expiration);}/*** 发送消息* 超过有效期丢弃** @param exchange 交换机名称* @param routingKey 路由键* @param message 消息内容* @param expiration 有效期(毫秒)*/public void send(String exchange, String routingKey, String message, Integer expiration) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).setExpiration(String.valueOf(expiration)).build());log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{} ---> expiration:{}", exchange, routingKey, message, expiration);}/*** 发送消息** @param exchange 交换机名称* @param routingKey 路由键* @param message 消息内容*/public void send(String exchange, String routingKey, String message) {rabbitTemplate.send(exchange, routingKey, MessageBuilder.withBody(message.getBytes()).build());log.info("发送消息---> exchange:{} ---> routingKey:{} ---> message:{}", exchange, routingKey, message);}}
在RabbitMQ中,如果队列没有设置过期时间(即没有声明x-message-ttl属性),那么即使在发送消息时设置了消息的过期时间也会失效。消息的过期时间只有在队列设置了过期时间的情况下才会生效。
实测以上列代码的方式直接对消息设置有效期是生效的。
死信队列
和普通队列一样,只不过是对其他队列进行配置,将过期的消息路由到死信队列中。
创建死信交换机和死信路由
// 配置交换机的文件中继续增加配置public static final String DIRECT_GP_DEAD_LETTER_EXCHANGE = "DIRECT_GP_DEAD_LETTER_EXCHANGE";public static final String DIRECT_GP_DEAD_LETTER_QUEUE = "DIRECT_GP_DEAD_LETTER_QUEUE";@Bean(DIRECT_GP_DEAD_LETTER_EXCHANGE)public DirectExchangedirectDeadLetterExchange() {return new DirectExchange(DIRECT_GP_DEAD_LETTER_EXCHANGE, true, false, new HashMap<>());}@Bean(DIRECT_GP_DEAD_LETTER_QUEUE)public Queue directDeadLetterQueue() {return new Queue(DIRECT_GP_DEAD_LETTER_QUEUE, true, false, false, new HashMap<>());}
设置队列消息有效期并绑定死信队列
@Bean(name = DIRECT_QUEUE1)public Queue directQueue1() {HashMap<String, Object> headers = new HashMap<>();// 配置消息有效期,消息发送到队列10秒后如果未被消费者消费,则过期headers.put("x-message-ttl",10000);// 配置超期交换机,消息过期后会发送到此交换机headers.put("x-dead-letter-exchange",DIRECT_GP_DEAD_LETTER_EXCHANGE);// 配置超期routingKey,消息过期后转移消息时指定的routingKeyheaders.put("x-dead-letter-routing-key",DIRECT_GP_DEAD_LETTER_QUEUE);// 如果只配置了有效期,未配置交换机和routingKey,则消息会被直接丢弃return new Queue(DIRECT_QUEUE1, true, false, false,headers);}
配置完成后,尝试向DIRECT_QUEUE1发送一条消息,不启动消费者,10秒后消息会自动转移到死信队列中,可在可视化管理界面进行验证。
延时队列
延时队列场景举例:
预定一个会议室,两个小时后开始,要求提前十分钟通知参会人员进行开会。
如果不使用延时队列,那么就需要不断轮询,查看是否到达需要通知的时间,进行消息通知。
延时队列的实现方式:
死信队列+消息有效期
预定时间到提前十分钟通知中间有110分钟,那么创建一条通知消息,设置有效期110分钟丢入队列,不用消费者去监听,等待消息过期后路由到指定的死信队列,再去消费死信队列中的消息即可。
所以延时队列实际上是一种实现方案,而不是一种特定的队列类型。
相关文章:

RabbitMQ讲解与整合
RabbitMq安装 类型概念 租户 RabbitMQ 中有一个概念叫做多租户,每一个 RabbitMQ 服务器都能创建出许多虚拟的消息服务器,这些虚拟的消息服务器就是我们所说的虚拟主机(virtual host),一般简称为 vhost。 每一个 vhos…...

python 基础知识点(蓝桥杯python科目个人复习计划56)
今日复习内容:做题 例题1:最小的或运算 问题描述:给定整数a,b,求最小的整数x,满足a|x b|x,其中|表示或运算。 输入格式: 第一行包括两个正整数a,b; 输出格式&#…...
【vue】vue中数据双向绑定原理/响应式原理,mvvm,mvc、mvp分别是什么
关于 vue 的原理主要有两个重要内容,分别是 mvvm 数据双向绑定原理,和 响应式原理 MVC(Model-View-Controller): Model(模型):表示应用程序的数据和业务逻辑。View(视图&…...

基于反光柱特征的激光定位算法思路
目录 1. 识别反光柱2. 数据关联2.1 基于几何形状寻找匹配2.2 暴力寻找匹配 3. 位姿估计(最小二乘求解)4. 问题4.1 精度问题4.2 快速旋转时定位较差 1. 识别反光柱 反光柱是特殊材料制成,根据激光雷达对反光材料扫描得到的反射值来提取特征。…...

CSM是什么意思?
CSM(Customer Service Management)是企业客户服务管理的信息化(IT)解决方案架构。本着以客户为中心的管理理念,搭建企业客户服务管理平台,实现企业以客户为中心的管理时代的竞争战略。 CSM的核心是以客户为中心,实现对…...
ES6 面试题
1. const、let 和 var 的区别是什么? 答案: var 声明的变量是函数作用域或全局作用域,而 const 和 let 声明的变量是块级作用域。使用 var 声明的变量可以被重复声明,而 const 和 let 不允许重复声明同一变量。const 声明的变量…...

智能指针(C++)
目录 一、智能指针是什么 二、为什么需要智能指针 三、智能指针的使用和原理 3.1、RALL 3.2 智能指针的原理 3.3、智能指针的分类 3.3.1、auto_ptr 3.3.2、unique_ptr 3.3.3、shared_ptr 3.2.4、weak_ptr 一、智能指针是什么 在c中,动态内存的管理式通过一…...

社区店商业模式探讨:如何创新并持续盈利?
在竞争激烈的商业环境中,社区店要想获得成功并持续盈利,需要不断创新和优化商业模式。 作为一名开鲜奶吧5年的创业者,我将分享一些关于社区店商业模式创新的干货和见解,希望能给想开实体店或创业的朋友们提供有价值的参考。 1、…...

一些可以访问gpt的方式
1、Coze扣子是新一代 AI 大模型智能体开发平台。整合了插件、长短期记忆、工作流、卡片等丰富能力,扣子能帮你低门槛、快速搭建个性化或具备商业价值的智能体,并发布到豆包、飞书等各个平台。https://www.coze.cn/ 2、https://poe.com/ 3、插件阿里…...

springer模板参考文献不显示
Spring期刊模板网站,我的问题是23年12月的版本 https://www.springernature.com/gp/authors/campaigns/latex-author-support/see-where-our-services-will-take-you/18782940 参考文献显示问好,在sn-article.tex文件中,这个sn-mathphys-num…...

【【C语言简单小题学习-1】】
实现九九乘法表 // 输出乘法口诀表 int main() {int i 0;int j 0;for (i 1; i < 9; i){for (j 1; j < i;j)printf("%d*%d%d ", i , j, i*j);printf("\n"); }return 0; }猜数字的游戏设计 #define _CRT_SECURE_NO_WARNINGS 1 #include<stdi…...

mongoDB 优化(1)索引
1、创建复合索引(多字段) db.collection_test1.createIndex({deletedVersion: 1,param: 1,qrYearMonth: 1},{name: "deletedVersion_1_param_1_qrYearMonth_1",background: true} ); 2、新增索引前: 执行查询: mb.r…...

stable diffusion webUI之赛博菩萨【秋葉】——工具包新手安裝与使用教程
stable diffusion webUI之赛博菩萨【秋葉】——工具包新手安裝与使用教程 AI浪潮袭来,还是学习学习为妙赛博菩萨【秋葉】简介——(葉ye,四声,同叶)A绘世启动器.exe(sd-webui-aki-v4.6.x)工具包安…...

鸿蒙应用程序包安装和卸载流程
开发者 开发者可以通过调试命令进行应用的安装和卸载,可参考多HAP的调试流程。 图1 应用程序包安装和卸载流程(开发者) 多HAP的开发调试与发布部署流程 多HAP的开发调试与发布部署流程如下图所示。 图1 多HAP的开发调试与发布部署流程 …...

C语言数组全面解析:从初学到精通
数组 1. 前言2. 一维数组的创建和初始化3. 一维数组的使用4. 一维数组在内存中的存储5. 二维数组的创建和初始化6. 二维数组的使用7. 二维数组在内存中的存储8. 数组越界9. 数组作为函数参数10. 综合练习10.1 用函数初始化,逆置,打印整型数组10.2 交换两…...

2024-02-28(Kafka,Oozie,Flink)
1.Kafka的数据存储形式 一个主题由多个分区组成 一个分区由多个segment段组成 一个segment段由多个文件组成(log,index(稀疏索引),timeindex(根据时间做的索引)) 2.读数据的流程 …...

Window下编写的sh文件在Linux/Docker中无法使用
Window下编写的sh文件在Linux/Docker中无法使用 一、sh文件目的1.1 初始状态1.2 目的 二、过程与异常2.1 首先获取标准ubuntu20.04 - 正常2.2 启动ubuntu20.04容器 - 正常2.3 执行windows下写的preInstall文件 - 报错 三、检查和处理3.1 评估异常3.2 处理异常3.3 调整后运行测试…...

第16章-DNS
目录 1. 域名 1.1 产生背景 1.2 概述 1.3 域名的树形层次化结构 2. DNS 2.1 概述 2.2 工作机制 3. DNS查询模式 3.1 递归查询: 3.2 迭代查询: 4. 相关知识点 4.1 集中式DNS 4.2 国内通用DNS 4.3 配置DNS代理 1. 域名 1.1 产生背景 ① IP…...

Leetcoder Day27| 贪心算法part01
语言:Java/Go 理论 贪心的本质是选择每一阶段的局部最优,从而达到全局最优。 什么时候用贪心?可以用局部最优退出全局最优,并且想不到反例到情况 贪心的一般解题步骤 将问题分解为若干个子问题找出适合的贪心策略求解每一个子…...
SpringBoot自动配置中bean的加载控制
🙈作者简介:练习时长两年半的Java up主 🙉个人主页:程序员老茶 🙊 ps:点赞👍是免费的,却可以让写博客的作者开心好久好久😎 📚系列专栏:Java全栈,…...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...

04-初识css
一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果
逻辑回归暴力训练预测金融欺诈
简述 「使用逻辑回归暴力预测金融欺诈,并不断增加特征维度持续测试」的做法,体现了一种逐步建模与迭代验证的实验思路,在金融欺诈检测中非常有价值,本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...

ArcGIS Pro+ArcGIS给你的地图加上北回归线!
今天来看ArcGIS Pro和ArcGIS中如何给制作的中国地图或者其他大范围地图加上北回归线。 我们将在ArcGIS Pro和ArcGIS中一同介绍。 1 ArcGIS Pro中设置北回归线 1、在ArcGIS Pro中初步设置好经纬格网等,设置经线、纬线都以10间隔显示。 2、需要插入背会归线…...