当前位置: 首页 > news >正文

RabbitMQ学习笔记(下):延迟队列,发布确认高级,备份交换机

十、延迟队列

延迟队列

概念:

延迟队列使用场景:

 

流程图:

延迟队列整合Springboot

导入依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>

在java/com/atguigu/rabbitmq下创建config创建类SwaggerConfig,写入代码:

@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig(){return new Docket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}private ApiInfo webApiInfo() {return new ApiInfoBuilder().title("rabbitmq接口文档").description("本文档描述了rabbitmq微服务接口定义").version("1.0").contact(new Contact("enjoy6288","http://atguigu.com","1551388580@qq.com")).build();}
}

队列TTL代码框架图:

队列TTL(配置类代码):

@Configuration
public class TtlQueueConfig {//普通交换机的名称public static final String X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列的名称public static final String DEAD_LETTER_QUEUE = "QD";//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明普通队列A的TTL为10s@Bean("queueA")public Queue queueA(){Map<String,Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");//设置TTLarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列B的TTL为40s@Bean("queueB")public Queue queueB(){Map<String,Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");//设置TTLarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD() {return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//绑定A-X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定B-x@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定D-y@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

队列TTL(生产者):

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);}
}

队列TTL(消费者)

@Slf4j
@Component
public class DeadLetterQueueConsumer {//接收消息@RabbitListener(queues="QD")public void receiveD(Message message, Channel channel) throws Exception{String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

延迟队列优化

不能为需求增加队列,

写一个通用队列作为延迟队列:

配置类,在上面配置类中加入如下代码:

public static final String QUEUE_C = "QC";
//和死信交换机连接
@Bean("queueC")
public Queue queue(){Map<String,Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
//和普通交换机绑定
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}

在前面生产者的基础上写入如下代码: 

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg->{//发送消息的时候延迟时长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}
}

点击启动类重新启动,在网页端输入:localhost:8080/ttl/sendExpirationMsg/你好1/20000和localhost:8080/ttl/sendExpirationMsg/你好2/2000。

问题:延迟队列是排队的,当队列中有多条消息时,延迟队列的消息会根据前面最长时间发送。

安装延迟队列插件:

在https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases下载rabbitmq_delayed_message_exchange插件,解压放置到RabbitMQ的插件目录。

rabbitmq的插件在:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins目录下,上传插件到目录下,如果上传失败用sudo rz先获得权限。

输入:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启rabbitmq:systemctl restart rabbitmq-server

进入rabbitmq的交换机界面,查看下面是否出现,如果出现代表安装成功:

基于插件的延迟队列:

配置类

@Configuration
public class DelayedQueueConfig {//队列public static final String DELAYED_QUEUEE_NAME="delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME="delayed.exchange";//routingKeypublic static final String DELAYED_ROUTING_KEY="delayed.routingkey";//声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUEE_NAME);}//声明交换机,基于插件的@Beanpublic CustomExchange delayedExchange(){Map<String,Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");//交换机名称,交换机类型,是否需要持久化,是否需要自动删除,其它参数return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}//绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

//发消息,基于插件的消息及延迟时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable  Integer delayTime){log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg->{//发送消息时延迟时间(毫秒)msg.getMessageProperties().setDelay(delayTime);return msg;});
}

消费者,创建DelayQueueConsumer类:

@Slf4j
@Component
public class DelayQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUEE_NAME)public void receiveDelayQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);}}

测试:localhost:8080/ttl/sendDelayMsg/come on baby1/20000。localhost:8080/ttl/sendDelayMsg/come on baby2/2000

十一、发布确认高级

加入交换机或者队列两者有其中一者宕掉,消息都会丢失。

配置类:

生产者及消费者:

回调接口:

交换机确认

回退消息

十二、备份交换机

配置类:

结果分析:

幂性性:

使用场景:

代码实现:

十三、集群

惰性队列:

集群原理:

搭建集群:

镜像队列:

实现高可用负载均衡

十四、Federation

Exchange原理:

Exchange实现:

Queue实现:

Shovel:

相关文章:

RabbitMQ学习笔记(下):延迟队列,发布确认高级,备份交换机

十、延迟队列 延迟队列 概念&#xff1a; 延迟队列使用场景&#xff1a; 流程图&#xff1a; 延迟队列整合Springboot 导入依赖&#xff1a; <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot…...

Python 无废话-基础知识面向对象编程详解

类定义 如何理解万物皆对象&#xff1f; 生活中一些事物&#xff0c;动物&#xff08;可爱的小狗、调皮的小猫&#xff09;、交通工具&#xff08;比亚迪U8汽车、飞机&#xff09;、人&#xff08;学生、教师&#xff09;…… 这些对象都有着独特或共性的属性和方法来描述其…...

凉鞋的 Unity 笔记 106. 第二轮循环场景视图Sprite Renderer

106. 第二轮循环&场景视图&Sprite Renderer 从这一篇开始&#xff0c;我们开始进行第二轮循环。 这次我们至少能够在游戏运行窗口看到一些东西。 首先还是在场景层次窗口进行编辑&#xff0c;先创建一个 Sprite&#xff0c;操作如下: 创建后&#xff0c;会在 Scene …...

Vue中如何进行分布式路由配置与管理

Vue中的分布式路由配置与管理 随着现代Web应用程序的复杂性不断增加&#xff0c;分布式路由配置和管理成为了一个重要的主题。Vue.js作为一种流行的前端框架&#xff0c;提供了多种方法来管理Vue应用程序的路由。本文将深入探讨在Vue中如何进行分布式路由配置与管理&#xff0…...

Solidity 合约漏洞,价值 38BNB 漏洞分析

Solidity 合约漏洞&#xff0c;价值 38BNB 漏洞分析 1. 漏洞简介 https://twitter.com/NumenAlert/status/1626447469361102850 https://twitter.com/bbbb/status/1626392605264351235 2. 相关地址或交易 攻击交易&#xff1a; https://bscscan.com/tx/0x146586f05a451313…...

【C++】:类和对象(2)

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux的基础知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数…...

【GIT版本控制】--提交更改

一、添加文件到暂存区 在GIT中&#xff0c;要提交更改&#xff0c;首先需要将文件添加到暂存区&#xff08;Staging Area&#xff09;。这是一个用于存放将要提交的更改的临时区域。以下是将文件添加到暂存区的步骤&#xff1a; 打开终端或命令提示符&#xff1a;首先&#x…...

解决高分屏DPI缩放PC端百度网盘界面模糊的问题

第一步 更新最新版本 首先&#xff0c;在百度网盘官网下载最新安装包&#xff1a; https://pan.baidu.com/download 进行覆盖安装 第二步 修改兼容性设置 右键百度网盘图标&#xff0c;点击属性&#xff0c;在兼容性选项卡中点击更改所有用户的设置 弹出的选项卡中选择更改高…...

全能视频工具 VideoProc Converter 4K for mac中文

VideoProc 4K提供快速完备的4K影片处理方案&#xff0c;您可以透过这款软体调节输出影片格式和大小。能够有效压缩HD/4K影片体积90%以上&#xff0c;以便更好更快地上传到YouTube&#xff0c;或是通过电子邮件附件发送。业界领先的视讯压缩引擎&#xff0c;让你轻松处理大体积视…...

Vue中实现自定义编辑邮件发送到指定邮箱(纯前端实现)

formspree里面注册账号 注册完成后进入后台新建项目并且新建表单 这一步完成之后你将得到一个地址 最后就是在项目中请求这个地址 关键代码如下&#xff1a; submitForm() {this.fullscreenLoading true;this.$axios({method: "post",url: "https://xxxxxxx…...

计算机专业毕业设计项目推荐11-博客项目(Go+Vue+Mysql)

博客项目&#xff08;GoVueMysql&#xff09; **介绍****系统总体开发情况-功能模块****各部分模块实现** 介绍 本系列(后期可能博主会统一为专栏)博文献给即将毕业的计算机专业同学们,因为博主自身本科和硕士也是科班出生,所以也比较了解计算机专业的毕业设计流程以及模式&am…...

QT实现TCP

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);//实例化一个服务器server new QTcpServer(this);// 此时&#xff0c;服务器已经成功进入监听状态&#xff0c…...

PostgreSQL ash —— pgsentinel插件

一、 插件作用 众所周知&#xff0c;pg是没有像oracle那样的ash视图的&#xff0c;因此要回溯历史问题不太方便。pgsentinel插件会将pg_stat_activity与pg_stat_statements视图内容定期快照&#xff0c;并存入pg_active_session_history和pg_stat_statements_history视图中。 1…...

【刷题笔记10.5】LeetCode:排序链表

LeetCode&#xff1a;排序链表 一、题目描述 给你链表的头结点 head &#xff0c;请将其按 升序 排列并返回 排序后的链表 。 二、分析 这题咱们默认要求&#xff1a;空间复杂度为O(1)。所以这把咱们用自底向上的方法实现归并排序&#xff0c;则可以达到O(1) 的空间复杂…...

三、【色彩模式与颜色填充】

文章目录 Photoshop常用的几种颜色模式包括&#xff1a;1. RGB模式2. CMYK模式3. 灰度模式4. LAB模式5. 多通道模式 Photoshop颜色填充1.色彩基础2.拾色器认识3.颜色填充最后附上流程图&#xff1a; Photoshop常用的几种颜色模式包括&#xff1a; 1. RGB模式 详细可参考&…...

karmada v1.7.0安装指导

前言 安装心得 经过多种方式操作&#xff0c;发现二进制方法安装太复杂&#xff0c;证书生成及其手工操作太多了&#xff0c;没有安装成功&#xff1b;helm方式的安装&#xff0c;v1.7.0的chart包执行安装会报错&#xff0c;手工修复了报错并修改了镜像地址&#xff0c;还是各…...

OK3568 forlinx系统编译过程及问题汇总

1. 共享文件夹无法加载&#xff1b;通过网上把文件夹加载后&#xff0c;拷贝文件很慢&#xff0c;任务管理器查看发现硬盘读写速率很低。解决办法&#xff1a;重新安装vmware tools。 2. 拷贝Linux源码到虚拟机&#xff0c;解压。 3. 虚拟机基本库安装 forlinxubuntu:~$ sudo…...

JVM篇---第五篇

系列文章目录 文章目录 系列文章目录一、简述Java的对象结构二、如何判断对象可以被回收?三、JVM的永久代中会发生垃圾回收么?一、简述Java的对象结构 Java对象由三个部分组成:对象头、实例数据、对齐填充。 对象头由两部分组成,第一部分存储对象自身的运行时数据:哈希码…...

C/C++ 排序算法总结

1.冒泡排序 https://blog.csdn.net/weixin_49303682/article/details/119365319 1 #include <stdio.h>2 3 #define N 94 5 void print(int a[])6 {7 for(int i 0; i < N; i)8 {9 printf("%d ", a[i]); 10 } 11 printf("…...

机器学习---RBM、KL散度、DBN

1. RBM 1.1 BM BM是由Hinton和Sejnowski提出的一种随机递归神经网络&#xff0c;可以看做是一种随机生成的 Hopfield网络&#xff0c;是能够通过学习数据的固有内在表示解决困难学习问题的最早的人工神经网络之 一&#xff0c;因样本分布遵循玻尔兹曼分布而命名为BM。BM由二…...

React hook之useRef

React useRef 详解 useRef 是 React 提供的一个 Hook&#xff0c;用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途&#xff0c;下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口&#xff08;适配服务端返回 Token&#xff09; export const login async (code, avatar) > {const res await http…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

4. TypeScript 类型推断与类型组合

一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式&#xff0c;自动确定它们的类型。 这一特性减少了显式类型注解的需要&#xff0c;在保持类型安全的同时简化了代码。通过分析上下文和初始值&#xff0c;TypeSc…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…...

【LeetCode】算法详解#6 ---除自身以外数组的乘积

1.题目介绍 给定一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O…...