当前位置: 首页 > news >正文

通过rabbitmq生成延时消息,并生成rabbitmq镜像

通过rabbitmq生成延时消息队列,并生成rabbitmq镜像

  • 整体描述
    • 1. 使用场景
    • 2. 目前问题
    • 3. 前期准备
  • 具体步骤
    • 1. 拉取镜像
    • 2. 运行镜像
    • 3. 安装插件
    • 4. 代码支持
      • 4.1 config文件
      • 4.2 消费监听
      • 4.2 消息生产
    • 5. 功能测试
  • 镜像操作
    • 1. 镜像制作
    • 2. 镜像导入
  • 总结

整体描述

1. 使用场景

在使用消息队列时,我们有时候需要生成一些延时消息,比如判断一个任务的开始时间,我在创建任务的时候计算出此时距离任务开始的时间,然后往消息队列里发送一个延时消息,我们希望等到任务开始的时候,再消费此消息,此时任务开始,可以进行一些业务上的操作。

2. 目前问题

之前写过一篇创建rabbitmq镜像的文章,链接: 在centos搭建rabbitmq并制作docker镜像,使用的rabbitmq的版本是3.6.8,只能通过过期时间expiration来设置消息的过期时间,在消息过期的时候,会进入死信队列中,也能达到上述要求。但是,但是,这个过期时间expiration,rabbitmq在处理的时候有个坑,前面消息如果没有过期,后面的消息就算过期了,也不会触发,就是先发的消息没有到期,之后再发的消息就算到期了,也不会触发回调。这显然不行。

3. 前期准备

需要准备的主要就是docker环境,这个可以自行搜一下怎么安装docker环境,由于和本文主要讲的内容关系不大,略…

具体步骤

为了解决此问题,我们可以用延时队列插件来实现,这个插件时一个开发者写的,在github上但是已经被rabbitmq官方接受了,所以可以放心用。

1. 拉取镜像

首先我们先拉取一个rabbitmq的官方镜像进行操作,这个需要注意一下拉取的版本,由于延时队列的插件支持的版本是3.7之后的rabbitmq,所以需要拉取3.7之后的,我这拉取的是3.8.17版本。在命令行输入:

docker pull rabbitmq:3.8.17-management

注:这个如果报错,看下自己的docker环境有没有问题。带management是带管理页面的镜像,我们选用的带management的镜像,后期使用的时候好操作和定位问题。

2. 运行镜像

拉取成功之后,使用命令:

docker images

查看镜像是否拉取成功,如下就是成功了:
rabbitmq镜像
之前用的3.6.8的,不支持延时消息队列的插件…
然后运行镜像,创建容器并启动:

docker run --name rabbitmq-server -p 5672:5672 -p 15672:15672 -d rabbitmq:3.8.17-management

此时用:

docker ps -a

查看容器:
rabbitmq容器
容器已经创建并启动,我们通过web页面可以访问rabbitmq的管理页面,在浏览器输入:http://localhost:15672/
默认账号:guest,密码:guest
rabbitmq登录页面

3. 安装插件

此时rabbitmq已经运行,我们需要安装插件来支持延时消息队列,插件下载地址
选择相应的rabbitmq版本进行下载,注意版本不要选错了。下载完是一个rabbitmq_delayed_message_exchange-3.8.0.ez的文件,我们需要把这个文件上传到docker的/opt/rabbitmq/plugins目录下。
上传之后,进入/opt/rabbitmq/sbin目录执行如下命令让插件生效:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

执行之后看到如下就成功了:
插件启动成功
成功之后刷新一下管理页面,在新建交换机那里,type能多一个x-delayed-message的选项:
添加延时交换机
此时,我们的rabbitmq就配置完成了。

4. 代码支持

rabbitmq目前已经可以接收延时消息了,在代码端我们也需要进行相应的修改,以达到发送延时消息的目的。

4.1 config文件

package com.thcb.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ的配置类** @author thcb* @date 2023-09-05*/
@Configuration
public class RabbitMqConfig {// 交换机private static final String DELAYED_EXCHANGE = "delayed.exchange";// 队列private static final String DELAYED_QUEUE = "delayed.queue";// 路由private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";/*** 队列*/@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE);}/*** 交换机*/@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, args);}/*** 绑定延迟队列和交换机*/@Beanpublic Binding delayQueueBindingDelayExchange() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();}}

4.2 消费监听

package com.thcb.rabbitmq.recevier;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 消费监听** @author thcb* @date 2023-09-05*/
@Slf4j
@Component
public class DelayQueueReceiver {@RabbitListener(queues = "delayed.queue")public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到DelayedQueue消息:{}", new Date().toString(), msg);}}

4.2 消息生产

这里创建一个controller来生产消息,里面有两个接口,一个生产消息的延时时间是5秒,另一个是30秒,用来测试延时时间。

package com.thcb.rabbitmq.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** 消息生产controller** @author thcb* @date 2023-09-05*/
@RestController
@RequestMapping("/HelloController")
public class HelloController {private static final Logger log = LoggerFactory.getLogger(HelloController.class);@Autowiredprivate AmqpTemplate rabbitTemplate;@RequestMapping("/sendXDLMessage1")@ResponseBodypublic String sendXDLMessage1() {int time = 5000;String message = "{\"type\":\"sendXDLMessage1\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage1 success";}@RequestMapping("/sendXDLMessage2")@ResponseBodypublic String sendXDLMessage2() {int time = 30000;String message = "{\"type\":\"sendXDLMessage2\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage2 success";}
}

5. 功能测试

代码修改完,就可以测试了,启动工程之后,在rabbitmq管理页面能看到自动创建了如下交换机和队列:
创建的交换机
创建的队列
可以看到交换机的类型是x-delayed-message。
接下来就可以调用测试接口,生产2条消息看看了。先调用sendXDLMessage2接口,生产一个延时30秒的消息,过一会再调用sendXDLMessage1的接口,生产一个延时5秒的消息。log结果如下:
运行结果
结果符合我们的预期,先发的30秒延时消息消息2,之后发的5秒延时消息1,然后过了5秒消息1先回调,之后30秒消息2回调。

镜像操作

使用docker主要就是要制作镜像,之后直接就可以用了要不每次还得配置。提示制作之前,把现在的队列和交换机都删除,队列和交换机是通过代码创建的,账号可以换一个,默认的guest不太安全。
一切都准备就绪,就可以制作镜像了。

1. 镜像制作

将镜像打包成tar文件。

docker commit 【镜像id】 rabbitmq:3.8.17
docker save -o rabbitmq-3.8.17.tar rabbitmq:3.8.17

2. 镜像导入

制作完镜像进行导入

docker load <rabbitmq-3.8.17.tar
docker run -d -p 5672:5672 -p 15672:15672 --privileged --restart=always --name rabbitmq rabbitmq:3.8.17

总结

以上就是rabbitmq延时消息的相关内容,另外这个延时消息在消息很多的情况下可能会有一些性能问题,使用的时候需要注意一下。

相关文章:

通过rabbitmq生成延时消息,并生成rabbitmq镜像

通过rabbitmq生成延时消息队列&#xff0c;并生成rabbitmq镜像 整体描述1. 使用场景2. 目前问题3. 前期准备 具体步骤1. 拉取镜像2. 运行镜像3. 安装插件4. 代码支持4.1 config文件4.2 消费监听4.2 消息生产 5. 功能测试 镜像操作1. 镜像制作2. 镜像导入 总结 整体描述 1. 使用…...

结构型模式-外观模式

隐藏系统的复杂性&#xff0c;并向客户端提供了一个客户端可以访问系统的接口。这种类型的设计模式属于结构型模式&#xff0c;它向现有的系统添加一个接口&#xff0c;来隐藏系统的复杂性。 这种模式涉及到一个单一的类&#xff0c;该类提供了客户端请求的简化方法和对现有系…...

vue三个点…运算符时报错 Syntax Error: Unexpected token

出现以下问题报错&#xff1a; 解决&#xff1a; 在项目根目录新建一个名为.babelrc的文件 {"presets": ["stage-2"] }...

C# wpf 实现桌面放大镜

文章目录 前言一、如何实现&#xff1f;1、制作无边框窗口2、Viewbox放大3、截屏显示&#xff08;1&#xff09;、截屏&#xff08;2&#xff09;、转BitmapSource&#xff08;3&#xff09;、显示 4、定时截屏 二、完整代码三、效果预览总结 前言 做桌面截屏功能时需要放大镜…...

Mybatis中的#{}和${}的区别

#{}和${}他们两都是替换参数的作用&#xff0c;但也还是有很大区别的。 目录 一、${} 二、#{} 三、注意点 一、${} 它是直接替换过来&#xff0c;不添加其它的什么。 比如下面的sql语句 select *from user where id${id} 如果id1&#xff0c;那么他替换过来就还是1&#xff…...

选择(使用)数据库

MySQL从小白到总裁完整教程目录:https://blog.csdn.net/weixin_67859959/article/details/129334507?spm1001.2014.3001.5502 语法格式: use 数据库名称;大家应该知道,在对数据库进行操作的时候,要制定数据库的操作对象,也就是说操作哪一个数据库 案列:选择testing数据库 …...

GFS分布式文件系统

1、GlusterFS简介 GlusterFS&#xff08;GFS&#xff09;是一个开源的分布式文件系统 由存储服务器、客户端以及NFS/Samba 存储网关&#xff08;可选&#xff0c;根据需要选择使用&#xff09;组成。MFS 传统的分布式文件系统大多通过元服务器来存储元数据&#xff0c;元数据…...

虚函数、纯虚函数、多态

一.虚函数 在基类的函数前加上virtual关键字&#xff0c;在派生类中重写该函数&#xff0c;运行时将会根据所指对象的实际类型来调用相应的函数&#xff0c;如果对象类型是派生类&#xff0c;就调用派生类的函数&#xff0c;如果对象类型是基类&#xff0c;就调用基类的函数。 …...

QGIS学习3 - 安装与管理插件

QGIS安装与管理插件主要是使用了菜单栏安装与管理插件这个菜单。 1、通过压缩文件等添加非官方插件 通过压缩文件添加有可能会提示存在安全问题等&#xff0c;直接点是即可。 之后点击install plugins即可完成。安装后导入插件 但是load失败了应该是安装没有成功。只能通过u…...

LeetCode377. 组合总和 Ⅳ

377. 组合总和 Ⅳ 文章目录 [377. 组合总和 Ⅳ](https://leetcode.cn/problems/combination-sum-iv/)一、题目二、题解方法一&#xff1a;完全背包一维数组动态规划思路代码分析 方法二&#xff1a;动态规划二维数组 一、题目 给你一个由 不同 整数组成的数组 nums &#xff0…...

QT将数据写入文件,日志记录

项目场景&#xff1a; 在QT应用中&#xff0c;有时候需要将错误信息记录在log文件里面&#xff0c;或者需要将数据输出到文件中进行比对查看使用。 创建log文件&#xff0c;如果文件存在则不创建 QDir dir(QCoreApplication::applicationDirPath()"/recv_data");if(…...

vue2与vue3的使用区别与组件通信

1. 脚手架创建项目的区别&#xff1a; vue2: vue init webpack “项目名称”vue3: vue create “项目名称” 或者vue3一般与vite结合使用: npm create vitelatest yarn create vite2. template中结构 vue2: template下只有一个元素节点 <template><div><div…...

亚信科技与中国信通院达成全方位、跨领域战略合作

9月11日&#xff0c;亚信科技&#xff08;中国&#xff09;有限公司「简称&#xff1a;亚信科技」与中国信息通信研究院「简称&#xff1a;中国信通院」在京达成战略合作&#xff0c;双方将在关键技术研发、产业链协同等方面展开全方位、跨领域、跨行业深度合作&#xff0c;共促…...

华为Linux系统开发工程师面试

在Linux系统开发工程师的面试中&#xff0c;你可能会遇到以下一些问题&#xff1a; 在同一个网站中&#xff0c;当客户访问的时候&#xff0c;会出现有的页面访问的速度快而有的慢&#xff0c;系统和服务完全正常、网络带宽正常&#xff0c;你如何诊断这个问题&#xff1f;你以…...

Qt利用QTime实现sleep效果分时调用串口下发报文解决串口下发给下位机后产生的粘包问题

Qt利用QTime实现sleep效果分时调用串口下发报文解决串口下发给下位机后产生的粘包问题 文章目录 Qt利用QTime实现sleep效果分时调用串口下发报文解决串口下发给下位机后产生的粘包问题现象解决方法 现象 当有多包数据需要连续下发给下位机时&#xff0c;比如下载数据等&#x…...

人工智能:神经细胞模型到神经网络模型

人工智能领域中的重要流派之一是&#xff1a;从神经细胞模型&#xff08;Neural Cell Model&#xff09;到神经网络模型&#xff08;Neural Network Model&#xff09;。 一、神经细胞模型 第一个人工神经细胞模型是“MP”模型&#xff0c;它是由麦卡洛克、匹茨合作&#xff0…...

Redisson分布式锁实战

实战来源 此问题基于电商 这周遇见这么一个问题&#xff0c;简略的说一下 由MQ发布了两个消息&#xff0c;一个是订单新增&#xff0c;一个是订单状态变更 由于直接付款之后&#xff0c;这两个消息的发布时间不分先后&#xff0c;可能会造成两种情况&#xff0c;1、订单状态变更…...

JavaScript中循环遍历数组、跳出循环和继续循环

循环遍历数组 上个文章我们简单的介绍for循环&#xff0c;接下来&#xff0c;我们使用for循环去读取数据的数据&#xff0c;之前我们写过这样的一个数组&#xff0c;如下&#xff1a; const ITshareArray ["张三","二愣子","2033-1997","…...

Java——》Synchronized和Lock区别

推荐链接&#xff1a; 总结——》【Java】 总结——》【Mysql】 总结——》【Redis】 总结——》【Kafka】 总结——》【Spring】 总结——》【SpringBoot】 总结——》【MyBatis、MyBatis-Plus】 总结——》【Linux】 总结——》【MongoD…...

JDK20 + SpringBoot 3.1.0 + JdbcTemplate 使用

JDK20 SpringBoot 3.1.0 JdbcTemplate 使用 一.测试数据库 Postgres二.SpringBoot项目1.Pom 依赖2.配置文件3.启动类4.数据源配置类5.实体对象类包装类6.测试用实体对象1.基类2.扩展类 7.测试类 通过 JdbcTemplate 直接执行 SQL 语句&#xff0c;结合源码动态编译即可方便实现…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

springboot 百货中心供应链管理系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;百货中心供应链管理系统被用户普遍使用&#xff0c;为方…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

全球首个30米分辨率湿地数据集(2000—2022)

数据简介 今天我们分享的数据是全球30米分辨率湿地数据集&#xff0c;包含8种湿地亚类&#xff0c;该数据以0.5X0.5的瓦片存储&#xff0c;我们整理了所有属于中国的瓦片名称与其对应省份&#xff0c;方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问&#xff08;基础概念问题&#xff09; 1. 请解释Spring框架的核心容器是什么&#xff1f;它在Spring中起到什么作用&#xff1f; Spring框架的核心容器是IoC容器&#…...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…...

Web后端基础(基础知识)

BS架构&#xff1a;Browser/Server&#xff0c;浏览器/服务器架构模式。客户端只需要浏览器&#xff0c;应用程序的逻辑和数据都存储在服务端。 优点&#xff1a;维护方便缺点&#xff1a;体验一般 CS架构&#xff1a;Client/Server&#xff0c;客户端/服务器架构模式。需要单独…...