通过rabbitmq生成延时消息,并生成rabbitmq镜像
通过rabbitmq生成延时消息队列,并生成rabbitmq镜像
- 整体描述
- 1. 使用场景
- 2. 目前问题
- 3. 前期准备
- 具体步骤
- 1. 拉取镜像
- 2. 运行镜像
- 3. 安装插件
- 4. 代码支持
- 4.1 config文件
- 4.2 消费监听
- 4.2 消息生产
- 5. 功能测试
- 镜像操作
- 1. 镜像制作
- 2. 镜像导入
- 总结
整体描述
1. 使用场景
在使用消息队列时,我们有时候需要生成一些延时消息,比如判断一个任务的开始时间,我在创建任务的时候计算出此时距离任务开始的时间,然后往消息队列里发送一个延时消息,我们希望等到任务开始的时候,再消费此消息,此时任务开始,可以进行一些业务上的操作。
2. 目前问题
之前写过一篇创建rabbitmq镜像的文章,链接: 在centos搭建rabbitmq并制作docker镜像,使用的rabbitmq的版本是3.6.8,只能通过过期时间expiration来设置消息的过期时间,在消息过期的时候,会进入死信队列中,也能达到上述要求。但是,但是,这个过期时间expiration,rabbitmq在处理的时候有个坑,前面消息如果没有过期,后面的消息就算过期了,也不会触发,就是先发的消息没有到期,之后再发的消息就算到期了,也不会触发回调。这显然不行。
3. 前期准备
需要准备的主要就是docker环境,这个可以自行搜一下怎么安装docker环境,由于和本文主要讲的内容关系不大,略…
具体步骤
为了解决此问题,我们可以用延时队列插件来实现,这个插件时一个开发者写的,在github上但是已经被rabbitmq官方接受了,所以可以放心用。
1. 拉取镜像
首先我们先拉取一个rabbitmq的官方镜像进行操作,这个需要注意一下拉取的版本,由于延时队列的插件支持的版本是3.7之后的rabbitmq,所以需要拉取3.7之后的,我这拉取的是3.8.17版本。在命令行输入:
docker pull rabbitmq:3.8.17-management
注:这个如果报错,看下自己的docker环境有没有问题。带management是带管理页面的镜像,我们选用的带management的镜像,后期使用的时候好操作和定位问题。
2. 运行镜像
拉取成功之后,使用命令:
docker images
查看镜像是否拉取成功,如下就是成功了:
之前用的3.6.8的,不支持延时消息队列的插件…
然后运行镜像,创建容器并启动:
docker run --name rabbitmq-server -p 5672:5672 -p 15672:15672 -d rabbitmq:3.8.17-management
此时用:
docker ps -a
查看容器:
容器已经创建并启动,我们通过web页面可以访问rabbitmq的管理页面,在浏览器输入:http://localhost:15672/
默认账号:guest,密码:guest
3. 安装插件
此时rabbitmq已经运行,我们需要安装插件来支持延时消息队列,插件下载地址
选择相应的rabbitmq版本进行下载,注意版本不要选错了。下载完是一个rabbitmq_delayed_message_exchange-3.8.0.ez的文件,我们需要把这个文件上传到docker的/opt/rabbitmq/plugins目录下。
上传之后,进入/opt/rabbitmq/sbin目录执行如下命令让插件生效:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
执行之后看到如下就成功了:
成功之后刷新一下管理页面,在新建交换机那里,type能多一个x-delayed-message的选项:
此时,我们的rabbitmq就配置完成了。
4. 代码支持
rabbitmq目前已经可以接收延时消息了,在代码端我们也需要进行相应的修改,以达到发送延时消息的目的。
4.1 config文件
package com.thcb.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ的配置类** @author thcb* @date 2023-09-05*/
@Configuration
public class RabbitMqConfig {// 交换机private static final String DELAYED_EXCHANGE = "delayed.exchange";// 队列private static final String DELAYED_QUEUE = "delayed.queue";// 路由private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";/*** 队列*/@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE);}/*** 交换机*/@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, args);}/*** 绑定延迟队列和交换机*/@Beanpublic Binding delayQueueBindingDelayExchange() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();}}
4.2 消费监听
package com.thcb.rabbitmq.recevier;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 消费监听** @author thcb* @date 2023-09-05*/
@Slf4j
@Component
public class DelayQueueReceiver {@RabbitListener(queues = "delayed.queue")public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到DelayedQueue消息:{}", new Date().toString(), msg);}}
4.2 消息生产
这里创建一个controller来生产消息,里面有两个接口,一个生产消息的延时时间是5秒,另一个是30秒,用来测试延时时间。
package com.thcb.rabbitmq.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** 消息生产controller** @author thcb* @date 2023-09-05*/
@RestController
@RequestMapping("/HelloController")
public class HelloController {private static final Logger log = LoggerFactory.getLogger(HelloController.class);@Autowiredprivate AmqpTemplate rabbitTemplate;@RequestMapping("/sendXDLMessage1")@ResponseBodypublic String sendXDLMessage1() {int time = 5000;String message = "{\"type\":\"sendXDLMessage1\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage1 success";}@RequestMapping("/sendXDLMessage2")@ResponseBodypublic String sendXDLMessage2() {int time = 30000;String message = "{\"type\":\"sendXDLMessage2\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage2 success";}
}
5. 功能测试
代码修改完,就可以测试了,启动工程之后,在rabbitmq管理页面能看到自动创建了如下交换机和队列:
可以看到交换机的类型是x-delayed-message。
接下来就可以调用测试接口,生产2条消息看看了。先调用sendXDLMessage2接口,生产一个延时30秒的消息,过一会再调用sendXDLMessage1的接口,生产一个延时5秒的消息。log结果如下:
结果符合我们的预期,先发的30秒延时消息消息2,之后发的5秒延时消息1,然后过了5秒消息1先回调,之后30秒消息2回调。
镜像操作
使用docker主要就是要制作镜像,之后直接就可以用了要不每次还得配置。提示制作之前,把现在的队列和交换机都删除,队列和交换机是通过代码创建的,账号可以换一个,默认的guest不太安全。
一切都准备就绪,就可以制作镜像了。
1. 镜像制作
将镜像打包成tar文件。
docker commit 【镜像id】 rabbitmq:3.8.17
docker save -o rabbitmq-3.8.17.tar rabbitmq:3.8.17
2. 镜像导入
制作完镜像进行导入
docker load <rabbitmq-3.8.17.tar
docker run -d -p 5672:5672 -p 15672:15672 --privileged --restart=always --name rabbitmq rabbitmq:3.8.17
总结
以上就是rabbitmq延时消息的相关内容,另外这个延时消息在消息很多的情况下可能会有一些性能问题,使用的时候需要注意一下。
相关文章:

通过rabbitmq生成延时消息,并生成rabbitmq镜像
通过rabbitmq生成延时消息队列,并生成rabbitmq镜像 整体描述1. 使用场景2. 目前问题3. 前期准备 具体步骤1. 拉取镜像2. 运行镜像3. 安装插件4. 代码支持4.1 config文件4.2 消费监听4.2 消息生产 5. 功能测试 镜像操作1. 镜像制作2. 镜像导入 总结 整体描述 1. 使用…...
结构型模式-外观模式
隐藏系统的复杂性,并向客户端提供了一个客户端可以访问系统的接口。这种类型的设计模式属于结构型模式,它向现有的系统添加一个接口,来隐藏系统的复杂性。 这种模式涉及到一个单一的类,该类提供了客户端请求的简化方法和对现有系…...

vue三个点…运算符时报错 Syntax Error: Unexpected token
出现以下问题报错: 解决: 在项目根目录新建一个名为.babelrc的文件 {"presets": ["stage-2"] }...

C# wpf 实现桌面放大镜
文章目录 前言一、如何实现?1、制作无边框窗口2、Viewbox放大3、截屏显示(1)、截屏(2)、转BitmapSource(3)、显示 4、定时截屏 二、完整代码三、效果预览总结 前言 做桌面截屏功能时需要放大镜…...
Mybatis中的#{}和${}的区别
#{}和${}他们两都是替换参数的作用,但也还是有很大区别的。 目录 一、${} 二、#{} 三、注意点 一、${} 它是直接替换过来,不添加其它的什么。 比如下面的sql语句 select *from user where id${id} 如果id1,那么他替换过来就还是1ÿ…...

选择(使用)数据库
MySQL从小白到总裁完整教程目录:https://blog.csdn.net/weixin_67859959/article/details/129334507?spm1001.2014.3001.5502 语法格式: use 数据库名称;大家应该知道,在对数据库进行操作的时候,要制定数据库的操作对象,也就是说操作哪一个数据库 案列:选择testing数据库 …...

GFS分布式文件系统
1、GlusterFS简介 GlusterFS(GFS)是一个开源的分布式文件系统 由存储服务器、客户端以及NFS/Samba 存储网关(可选,根据需要选择使用)组成。MFS 传统的分布式文件系统大多通过元服务器来存储元数据,元数据…...

虚函数、纯虚函数、多态
一.虚函数 在基类的函数前加上virtual关键字,在派生类中重写该函数,运行时将会根据所指对象的实际类型来调用相应的函数,如果对象类型是派生类,就调用派生类的函数,如果对象类型是基类,就调用基类的函数。 …...

QGIS学习3 - 安装与管理插件
QGIS安装与管理插件主要是使用了菜单栏安装与管理插件这个菜单。 1、通过压缩文件等添加非官方插件 通过压缩文件添加有可能会提示存在安全问题等,直接点是即可。 之后点击install plugins即可完成。安装后导入插件 但是load失败了应该是安装没有成功。只能通过u…...
LeetCode377. 组合总和 Ⅳ
377. 组合总和 Ⅳ 文章目录 [377. 组合总和 Ⅳ](https://leetcode.cn/problems/combination-sum-iv/)一、题目二、题解方法一:完全背包一维数组动态规划思路代码分析 方法二:动态规划二维数组 一、题目 给你一个由 不同 整数组成的数组 nums ࿰…...
QT将数据写入文件,日志记录
项目场景: 在QT应用中,有时候需要将错误信息记录在log文件里面,或者需要将数据输出到文件中进行比对查看使用。 创建log文件,如果文件存在则不创建 QDir dir(QCoreApplication::applicationDirPath()"/recv_data");if(…...
vue2与vue3的使用区别与组件通信
1. 脚手架创建项目的区别: vue2: vue init webpack “项目名称”vue3: vue create “项目名称” 或者vue3一般与vite结合使用: npm create vitelatest yarn create vite2. template中结构 vue2: template下只有一个元素节点 <template><div><div…...

亚信科技与中国信通院达成全方位、跨领域战略合作
9月11日,亚信科技(中国)有限公司「简称:亚信科技」与中国信息通信研究院「简称:中国信通院」在京达成战略合作,双方将在关键技术研发、产业链协同等方面展开全方位、跨领域、跨行业深度合作,共促…...
华为Linux系统开发工程师面试
在Linux系统开发工程师的面试中,你可能会遇到以下一些问题: 在同一个网站中,当客户访问的时候,会出现有的页面访问的速度快而有的慢,系统和服务完全正常、网络带宽正常,你如何诊断这个问题?你以…...
Qt利用QTime实现sleep效果分时调用串口下发报文解决串口下发给下位机后产生的粘包问题
Qt利用QTime实现sleep效果分时调用串口下发报文解决串口下发给下位机后产生的粘包问题 文章目录 Qt利用QTime实现sleep效果分时调用串口下发报文解决串口下发给下位机后产生的粘包问题现象解决方法 现象 当有多包数据需要连续下发给下位机时,比如下载数据等&#x…...

人工智能:神经细胞模型到神经网络模型
人工智能领域中的重要流派之一是:从神经细胞模型(Neural Cell Model)到神经网络模型(Neural Network Model)。 一、神经细胞模型 第一个人工神经细胞模型是“MP”模型,它是由麦卡洛克、匹茨合作࿰…...

Redisson分布式锁实战
实战来源 此问题基于电商 这周遇见这么一个问题,简略的说一下 由MQ发布了两个消息,一个是订单新增,一个是订单状态变更 由于直接付款之后,这两个消息的发布时间不分先后,可能会造成两种情况,1、订单状态变更…...

JavaScript中循环遍历数组、跳出循环和继续循环
循环遍历数组 上个文章我们简单的介绍for循环,接下来,我们使用for循环去读取数据的数据,之前我们写过这样的一个数组,如下: const ITshareArray ["张三","二愣子","2033-1997","…...
Java——》Synchronized和Lock区别
推荐链接: 总结——》【Java】 总结——》【Mysql】 总结——》【Redis】 总结——》【Kafka】 总结——》【Spring】 总结——》【SpringBoot】 总结——》【MyBatis、MyBatis-Plus】 总结——》【Linux】 总结——》【MongoD…...

JDK20 + SpringBoot 3.1.0 + JdbcTemplate 使用
JDK20 SpringBoot 3.1.0 JdbcTemplate 使用 一.测试数据库 Postgres二.SpringBoot项目1.Pom 依赖2.配置文件3.启动类4.数据源配置类5.实体对象类包装类6.测试用实体对象1.基类2.扩展类 7.测试类 通过 JdbcTemplate 直接执行 SQL 语句,结合源码动态编译即可方便实现…...

【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...
现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?
现有的 Redis 分布式锁库(如 Redisson)相比于开发者自己基于 Redis 命令(如 SETNX, EXPIRE, DEL)手动实现分布式锁,提供了巨大的便利性和健壮性。主要体现在以下几个方面: 原子性保证 (Atomicity)ÿ…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...

解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...

前端开发者常用网站
Can I use网站:一个查询网页技术兼容性的网站 一个查询网页技术兼容性的网站Can I use:Can I use... Support tables for HTML5, CSS3, etc (查询浏览器对HTML5的支持情况) 权威网站:MDN JavaScript权威网站:JavaScript | MDN...
OCR MLLM Evaluation
为什么需要评测体系?——背景与矛盾 能干的事: 看清楚发票、身份证上的字(准确率>90%),速度飞快(眨眼间完成)。干不了的事: 碰到复杂表格(合并单元…...
起重机起升机构的安全装置有哪些?
起重机起升机构的安全装置是保障吊装作业安全的关键部件,主要用于防止超载、失控、断绳等危险情况。以下是常见的安全装置及其功能和原理: 一、超载保护装置(核心安全装置) 1. 起重量限制器 功能:实时监测起升载荷&a…...
简单介绍C++中 string与wstring
在C中,string和wstring是两种用于处理不同字符编码的字符串类型,分别基于char和wchar_t字符类型。以下是它们的详细说明和对比: 1. 基础定义 string 类型:std::string 字符类型:char(通常为8位)…...

二叉树-144.二叉树的前序遍历-力扣(LeetCode)
一、题目解析 对于递归方法的前序遍历十分简单,但对于一位合格的程序猿而言,需要掌握将递归转化为非递归的能力,毕竟递归调用的时候会调用大量的栈帧,存在栈溢出风险。 二、算法原理 递归调用本质是系统建立栈帧,而非…...