当前位置: 首页 > news >正文

基于Rocket MQ扩展的无限延迟消息队列

基于Rocket MQ扩展的无限延迟消息队列

背景:

  • Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的延迟消息无法实现该功能, 所以对方案进行了改造.

实现原理:

  • 简单而言, 就是在Rocket MQ延迟队列固定时间间隔的基础上, 通过多次发送延迟消息, 达到任意延时时间组合计算. 通过反射的方式, 实现延迟业务逻辑的调用.

  • 源码如下:

  • /** Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.*/
    package com.example.xxx.utils;import com.vevor.bmp.crm.common.constants.MQConstants;
    import lombok.Data;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;import javax.annotation.Resource;
    import java.io.Serializable;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :基于Rocket MQ的任意延迟时长工具* @program :user-growth* @date :Created in 2023/5/22 3:35 下午* @since :1.8.0*/
    @Slf4j
    @Component
    @RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,topic = MQConstants.CRM_DELAY_QUEUE_TOPIC,// 消息消费顺序consumeMode = ConsumeMode.CONCURRENTLY,// 最大消息重复消费次数maxReconsumeTimes = 3)
    public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> {/*** Rocket MQ客户端*/@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** MQ默认延迟等级*/private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L,30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};@SneakyThrows@Overridepublic void onMessage(DelayTable<Object> message) {Date endTime = message.getEndTime();int delayLevel = getDelayLevel(endTime);// 继续延迟if (delayLevel != 0) {int currentDelayCount = message.getCurrentDelayCount();currentDelayCount++;message.setCurrentDelayCount(currentDelayCount);message.setCurrentDelayLevel(delayLevel);message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);this.sendDelayMessage(message);return;}// 执行业务log.info("delay message end! start to process business...");Class<? extends DelayMessageHandler> messageHandler = message.getMessageHandler();if (messageHandler != null) {DelayMessageHandler delayMessageHandler = messageHandler.newInstance();delayMessageHandler.handle();}}/*** 延迟消息体** @param <E> 消息类型*/@Datapublic static class DelayTable<E> implements Serializable {private static final long serialVersionUID = 2405172041950251807L;/*** 延迟消息体*/private E content;/*** 消息延迟结束时间*/private Date endTime;/*** 总延迟毫秒数*/private long totalDelayTime;/*** 总延迟时间单位*/private TimeUnit totalDelayTimeUnit;/*** 当前延迟次数*/private int currentDelayCount;/*** 当前延迟等级*/private int currentDelayLevel;/*** 当前延迟毫秒数*/private long currentDelayMillis;/*** 延迟处理逻辑*/private Class<? extends DelayMessageHandler> messageHandler;}/*** 发送延迟消息** @param message 消息体* @param delay 延迟时长* @param timeUnit 延迟时间单位* @param handler 延迟时间到了之后,需要处理的逻辑* @param <E> 延迟消息类型*/public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) {// 把延迟时间转换成时间戳(毫秒)long totalDelayMills = timeUnit.toMillis(delay);// 根据延迟时间计算结束时间Calendar instance = Calendar.getInstance();instance.add(Calendar.MILLISECOND, (int)totalDelayMills);Date endTime = instance.getTime();// 根据延迟时间匹配延迟等级(delay level)int delayLevel = getDelayLevel(endTime);long delayMillis = TIME_DELAY_LEVEL[delayLevel];// 发送消息DelayTable<E> delayTable = new DelayTable<>();// 全局数据delayTable.setContent(message);delayTable.setMessageHandler(handler);delayTable.setEndTime(endTime);delayTable.setTotalDelayTime(delay);delayTable.setTotalDelayTimeUnit(timeUnit);// 当前延迟等级数据delayTable.setCurrentDelayCount(1);delayTable.setCurrentDelayLevel(delayLevel);delayTable.setCurrentDelayMillis(delayMillis);this.sendDelayMessage(delayTable);}/*** 计算延迟等级** @param targetTime 延迟截止时间* @return Rocket MQ延迟消息等级*/private static int getDelayLevel(Date targetTime) {long currentTime = System.currentTimeMillis();long delayMillis = targetTime.getTime() - currentTime;if (delayMillis <= 0) {// 不延迟,即延迟等级为 0return 0;}// 判断处于哪个延迟等级// 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1hfor (int i = 1; i <= 18; i++) {long delayLevelTime = TIME_DELAY_LEVEL[i];if (delayMillis < delayLevelTime) {return i - 1;} else if (delayMillis == delayLevelTime) {return i;}}// 最大延迟等级为 18return 18;}/*** 发送延迟消息** @param delayTable 延迟对象,可以循环使用*/@SneakyThrowsprivate <E> void sendDelayMessage(DelayTable<E> delayTable) {// 消息序列化Message<DelayTable<E>> message = MessageBuilder.withPayload(delayTable).build();// 设置\发送延迟消息int delayLevel = delayTable.getCurrentDelayLevel();rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message, 3000, delayLevel);log.debug("delay count: {}, delay level: {}, time: {} milliseconds",delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);}/*** 延迟回调接口** 回调逻辑必须实现该接口#hander()方法,在延迟结束后,会通过反射的方式调用该方法*/public interface DelayMessageHandler extends Serializable {long serialVersionUID = 2405172041950251807L;/*** 回调函数*/void handle();}}
    

测试代码:

  • /** Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.*/
    package com.vevor.bmp.crm.io.controller;import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils;
    import com.vevor.common.pojo.vo.ResponseResult;
    import lombok.Data;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
    import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :延迟队列测试* @program :user-growth* @date :Created in 2023/5/22 4:54 下午* @since :1.8.0*/
    @Slf4j
    @RestController
    public class DelayQueueController {@Resourceprivate RocketMQDelayQueueUtils rocketMQDelayQueueUtils;@GetMapping("/mq/delay")@SneakyThrowspublic ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) {// 获取延时队列rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);return ResponseResult.success();}/*** @version :* @description :* @program :user-growth* @date :Created in 2023/5/23 2:11 下午* @since :*/@Datapublic static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler {/*** 回调函数*/@Overridepublic void handle() {log.info("i am business logical! {}", System.currentTimeMillis());}}
    }
    

优缺点:

  • 优点: 与定时任务框架相比, 通过延迟消息的方式具实时性高、 支持分布式、轻量级、高并发等优点.
  • 缺点: 消息的准确性不可靠, 正常情况下准确性在秒级, 但是当MQ服务出现消息堆积时, 消息的时间就会偏差较大, 所以准确性依赖MQ服务的稳定.

相关文章:

基于Rocket MQ扩展的无限延迟消息队列

基于Rocket MQ扩展的无限延迟消息队列 背景: Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的…...

Python办公自动化 – 日志分析和自动化FTP操作

Python办公自动化 – 日志分析和自动化FTP操作 以下是往期的文章目录&#xff0c;需要可以查看哦。 Python办公自动化 – Excel和Word的操作运用 Python办公自动化 – Python发送电子邮件和Outlook的集成 Python办公自动化 – 对PDF文档和PPT文档的处理 Python办公自动化 – 对…...

MyBatis 关联查询

目录 一、一对一查询&#xff08;sqlMapper配置文件&#xff09; 1、需求&#xff1a; 2、创建account和user实体类 3、创建AccountMapper 接口 4、创建并配置AccountMapper.xml 5、测试 二、一对多查询&#xff08;sqlMapper配置文件&#xff09; 1、需求&#xff1a;…...

NVIDIA NCCL 源码学习(十二)- double binary tree

上节我们以ring allreduce为例看到了集合通信的过程&#xff0c;但是随着训练任务中使用的gpu个数的扩展&#xff0c;ring allreduce的延迟会线性增长&#xff0c;为了解决这个问题&#xff0c;NCCL引入了tree算法&#xff0c;即double binary tree。 double binary tree 朴素…...

.net core webapi 大文件上传到wwwroot文件夹

1.配置staticfiles(program文件中) app.UseStaticFiles();2.在wwwroot下创建upload文件夹 3.返回结果封装 namespace webapi;/// <summary> /// 统一数据响应格式 /// </summary> public class Results<T> {/// <summary>/// 自定义的响应码&#xff…...

C++设计模式 #3策略模式(Strategy Method)

动机 在软件构建过程中&#xff0c;某些对象使用的的算法可能多种多样&#xff0c;经常改变。如果将这些算法都写在类中&#xff0c;会使得类变得异常复杂&#xff1b;而且有时候支持不频繁使用的算法也是性能负担。 如何在运行时根据需求透明地更改对象的算法&#xff1f;将…...

金融知识——OMS、EMS和PMS分别是什么意思

金融知识——OMS、EMS和PMS分别是什么意思 OMSEMSPMS OMS OMS&#xff08;Order Management System&#xff09;是为了管理头寸&#xff0c;以多种方式创建订单&#xff0c;并进行订单屈从检验以使得用户在订单创建时收到一些约束。在交易管理方面&#xff0c;OMS提供交易组合…...

Docker——微服务的部署

Docker——微服务的部署 文章目录 Docker——微服务的部署初识DockerDocker与虚拟机Docker架构安装DockerCentOS安装Docker卸载&#xff08;可选&#xff09;安装docker启动docker配置镜像加速 Docker的基本操作Docker的基本操作——镜像Docker基本操作——容器Docker基本操作—…...

AI时代架构设计新模式

云原生架构原则 云原生架构本身作为一种架构&#xff0c;也有若干架构原则作为应用架构的核心架构控制面&#xff0c;通过遵从这些架构原则可以让技术主管和架构师在做技术选择时不会出现大的偏差。 服务化原则 当代码规模超出小团队的合作范围时&#xff0c;就有必要进行服务…...

速盾网络:高防IP的好处

随着互联网的快速发展&#xff0c;网络安全问题日益突出&#xff0c;越来越多的企业和个人开始关注网络安全防护。其中&#xff0c;高防IP作为一种高效的防御手段&#xff0c;越来越受到用户的青睐。本文将介绍速盾网络高防IP的好处&#xff0c;帮助您了解其优势和应用场景。一…...

创建Maven Web工程

目录下也会有对应的生命周期。其中常用的是&#xff1a;clean、compile、package、install。 比如这里install &#xff0c;如果其他项目需要将这里的模块作为依赖使用&#xff0c;那就可以 install 。安装到本地仓库的位置&#xff1a; Java的Web工程&#xff0c;所以我们要选…...

【PHP入门】2.2 流程控制

-流程控制- 流程控制&#xff1a;代码执行的方向 2.2.1控制分类 顺序结构&#xff1a;代码从上往下&#xff0c;顺序执行。&#xff08;代码执行的最基本结构&#xff09; 分支结构&#xff1a;给定一个条件&#xff0c;同时有多种可执行代码&#xff08;块&#xff09;&am…...

springCould中的zookeeper-从小白开始【3】

目录 1.启动zookeeper❤️❤️❤️ 2.创建8004模块 ❤️❤️❤️ 3.临时节点还是永久节点❤️❤️❤️ 4.创建zk80消费模块❤️❤️❤️ 1.启动zookeeper❤️❤️❤️ 进入自己zookeeper的bin目录下 分别使用命令&#xff1a; ./zkServer.sh start 和 ./zkCli.sh -serve…...

Node.js-模块化(二)

1. 模块化的基本概念 1.1 什么是模块化 模块化是指解决一个复杂问题时&#xff0c;自顶向下逐层将系统拆分成若干模块的过程。对于整个系统来说&#xff0c;模块是可组合、分解和更换的单元。 1.2 编程领域中的模块化 编程领域中的模块化&#xff0c;就是遵守固定的规则&…...

MAC 安装nginx

使用Homebrew方式进行安装 步骤&#xff1a; 1、更新 Homebrew brew update 2、下载并安装 Nginx brew install nginx 3、查看 nginx 配置信息 brew info nginx zhanghuaBreeze ~ % brew info nginx // 版本信息 > nginx: stable 1.25.1 (bottled), HEAD HTTP(S) se…...

开源 AI 新秀崛起:Bittensor 更像是真正的“OpenAI”

强大的人工智能正在飞速发展&#xff0c;而完全由 OpenAI、Midjourney、Google&#xff08;Bard&#xff09;这样的少数公司控制 AI 不免让人感到担忧。在这样的背景下&#xff0c;试图用创新性解决方案处理人工智能中心化问题、权力集中于少数公司的 Bittensor&#xff0c;可谓…...

设计模式:循序渐进走入工厂模式

文章目录 前言一、引入二、简单工厂模式1.实现2.优缺点3.扩展 三、工厂方法模式1.实现2.优缺点 四、抽象工厂模式1.实现2.优缺点3.使用场景 五、模式扩展六、JDK源码解析总结 前言 软件设计模式之工厂模式。 一、引入 需求&#xff1a;设计一个咖啡店点餐系统。 设计一个咖啡类…...

如何将图片(matlab、python)无损放入word论文

许多论文对插图有要求&#xff0c;直接插入png、jpg一般是不行的&#xff0c;这是一篇顶刊文章&#xff08;pdf&#xff09;的插图&#xff0c;放大2400%后依旧清晰&#xff0c;搜罗了网上的方法&#xff0c;总结了一下如何将图片无损放入论文中。 这里主要讨论的是数据生成的图…...

在Next.js和React中搭建Cesium项目

在Next.js和React中搭建Cesium项目&#xff0c;需要确保Cesium能够与服务端渲染(SSR)兼容&#xff0c;因为Next.js默认是SSR的。Cesium是一个基于WebGL的地理信息可视化库&#xff0c;通常用于在网页中展示三维地球或地图。下面是一个基本的步骤&#xff0c;用于在Next.js项目中…...

docker学习(十、搭建redis集群,三主三从)

文章目录 一、docker创建6个redis容器创建6个redis容器回顾各个属性含义 二、划分主从&#xff0c;3主3从划分主从查看状态查看节点信息 docker搭建Redis集群相关知识&#xff1a; docker学习&#xff08;九、分布式存储亿级数据知识&#xff09; docker学习&#xff08;十、搭…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

拉力测试cuda pytorch 把 4070显卡拉满

import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试&#xff0c;通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小&#xff0c;增大可提高计算复杂度duration: 测试持续时间&#xff08;秒&…...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...

如何在最短时间内提升打ctf(web)的水平?

刚刚刷完2遍 bugku 的 web 题&#xff0c;前来答题。 每个人对刷题理解是不同&#xff0c;有的人是看了writeup就等于刷了&#xff0c;有的人是收藏了writeup就等于刷了&#xff0c;有的人是跟着writeup做了一遍就等于刷了&#xff0c;还有的人是独立思考做了一遍就等于刷了。…...

【从零学习JVM|第三篇】类的生命周期(高频面试题)

前言&#xff1a; 在Java编程中&#xff0c;类的生命周期是指类从被加载到内存中开始&#xff0c;到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期&#xff0c;让读者对此有深刻印象。 目录 ​…...

C# 表达式和运算符(求值顺序)

求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如&#xff0c;已知表达式3*52&#xff0c;依照子表达式的求值顺序&#xff0c;有两种可能的结果&#xff0c;如图9-3所示。 如果乘法先执行&#xff0c;结果是17。如果5…...

MinIO Docker 部署:仅开放一个端口

MinIO Docker 部署:仅开放一个端口 在实际的服务器部署中,出于安全和管理的考虑,我们可能只能开放一个端口。MinIO 是一个高性能的对象存储服务,支持 Docker 部署,但默认情况下它需要两个端口:一个是 API 端口(用于存储和访问数据),另一个是控制台端口(用于管理界面…...