基于Rocket MQ扩展的无限延迟消息队列
基于Rocket MQ扩展的无限延迟消息队列
背景:
- Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的延迟消息无法实现该功能, 所以对方案进行了改造.
实现原理:
-
简单而言, 就是在Rocket MQ延迟队列固定时间间隔的基础上, 通过多次发送延迟消息, 达到任意延时时间组合计算. 通过反射的方式, 实现延迟业务逻辑的调用.
-
源码如下:
-
/** Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.*/ package com.example.xxx.utils;import com.vevor.bmp.crm.common.constants.MQConstants; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;import javax.annotation.Resource; import java.io.Serializable; import java.util.Calendar; import java.util.Date; import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :基于Rocket MQ的任意延迟时长工具* @program :user-growth* @date :Created in 2023/5/22 3:35 下午* @since :1.8.0*/ @Slf4j @Component @RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,topic = MQConstants.CRM_DELAY_QUEUE_TOPIC,// 消息消费顺序consumeMode = ConsumeMode.CONCURRENTLY,// 最大消息重复消费次数maxReconsumeTimes = 3) public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> {/*** Rocket MQ客户端*/@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** MQ默认延迟等级*/private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L,30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};@SneakyThrows@Overridepublic void onMessage(DelayTable<Object> message) {Date endTime = message.getEndTime();int delayLevel = getDelayLevel(endTime);// 继续延迟if (delayLevel != 0) {int currentDelayCount = message.getCurrentDelayCount();currentDelayCount++;message.setCurrentDelayCount(currentDelayCount);message.setCurrentDelayLevel(delayLevel);message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);this.sendDelayMessage(message);return;}// 执行业务log.info("delay message end! start to process business...");Class<? extends DelayMessageHandler> messageHandler = message.getMessageHandler();if (messageHandler != null) {DelayMessageHandler delayMessageHandler = messageHandler.newInstance();delayMessageHandler.handle();}}/*** 延迟消息体** @param <E> 消息类型*/@Datapublic static class DelayTable<E> implements Serializable {private static final long serialVersionUID = 2405172041950251807L;/*** 延迟消息体*/private E content;/*** 消息延迟结束时间*/private Date endTime;/*** 总延迟毫秒数*/private long totalDelayTime;/*** 总延迟时间单位*/private TimeUnit totalDelayTimeUnit;/*** 当前延迟次数*/private int currentDelayCount;/*** 当前延迟等级*/private int currentDelayLevel;/*** 当前延迟毫秒数*/private long currentDelayMillis;/*** 延迟处理逻辑*/private Class<? extends DelayMessageHandler> messageHandler;}/*** 发送延迟消息** @param message 消息体* @param delay 延迟时长* @param timeUnit 延迟时间单位* @param handler 延迟时间到了之后,需要处理的逻辑* @param <E> 延迟消息类型*/public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) {// 把延迟时间转换成时间戳(毫秒)long totalDelayMills = timeUnit.toMillis(delay);// 根据延迟时间计算结束时间Calendar instance = Calendar.getInstance();instance.add(Calendar.MILLISECOND, (int)totalDelayMills);Date endTime = instance.getTime();// 根据延迟时间匹配延迟等级(delay level)int delayLevel = getDelayLevel(endTime);long delayMillis = TIME_DELAY_LEVEL[delayLevel];// 发送消息DelayTable<E> delayTable = new DelayTable<>();// 全局数据delayTable.setContent(message);delayTable.setMessageHandler(handler);delayTable.setEndTime(endTime);delayTable.setTotalDelayTime(delay);delayTable.setTotalDelayTimeUnit(timeUnit);// 当前延迟等级数据delayTable.setCurrentDelayCount(1);delayTable.setCurrentDelayLevel(delayLevel);delayTable.setCurrentDelayMillis(delayMillis);this.sendDelayMessage(delayTable);}/*** 计算延迟等级** @param targetTime 延迟截止时间* @return Rocket MQ延迟消息等级*/private static int getDelayLevel(Date targetTime) {long currentTime = System.currentTimeMillis();long delayMillis = targetTime.getTime() - currentTime;if (delayMillis <= 0) {// 不延迟,即延迟等级为 0return 0;}// 判断处于哪个延迟等级// 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1hfor (int i = 1; i <= 18; i++) {long delayLevelTime = TIME_DELAY_LEVEL[i];if (delayMillis < delayLevelTime) {return i - 1;} else if (delayMillis == delayLevelTime) {return i;}}// 最大延迟等级为 18return 18;}/*** 发送延迟消息** @param delayTable 延迟对象,可以循环使用*/@SneakyThrowsprivate <E> void sendDelayMessage(DelayTable<E> delayTable) {// 消息序列化Message<DelayTable<E>> message = MessageBuilder.withPayload(delayTable).build();// 设置\发送延迟消息int delayLevel = delayTable.getCurrentDelayLevel();rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message, 3000, delayLevel);log.debug("delay count: {}, delay level: {}, time: {} milliseconds",delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);}/*** 延迟回调接口** 回调逻辑必须实现该接口#hander()方法,在延迟结束后,会通过反射的方式调用该方法*/public interface DelayMessageHandler extends Serializable {long serialVersionUID = 2405172041950251807L;/*** 回调函数*/void handle();}}
测试代码:
-
/** Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.*/ package com.vevor.bmp.crm.io.controller;import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils; import com.vevor.common.pojo.vo.ResponseResult; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource; import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :延迟队列测试* @program :user-growth* @date :Created in 2023/5/22 4:54 下午* @since :1.8.0*/ @Slf4j @RestController public class DelayQueueController {@Resourceprivate RocketMQDelayQueueUtils rocketMQDelayQueueUtils;@GetMapping("/mq/delay")@SneakyThrowspublic ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) {// 获取延时队列rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);return ResponseResult.success();}/*** @version :* @description :* @program :user-growth* @date :Created in 2023/5/23 2:11 下午* @since :*/@Datapublic static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler {/*** 回调函数*/@Overridepublic void handle() {log.info("i am business logical! {}", System.currentTimeMillis());}} }
优缺点:
- 优点: 与定时任务框架相比, 通过延迟消息的方式具实时性高、 支持分布式、轻量级、高并发等优点.
- 缺点: 消息的准确性不可靠, 正常情况下准确性在秒级, 但是当MQ服务出现消息堆积时, 消息的时间就会偏差较大, 所以准确性依赖MQ服务的稳定.
相关文章:
基于Rocket MQ扩展的无限延迟消息队列
基于Rocket MQ扩展的无限延迟消息队列 背景: Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的…...
Python办公自动化 – 日志分析和自动化FTP操作
Python办公自动化 – 日志分析和自动化FTP操作 以下是往期的文章目录,需要可以查看哦。 Python办公自动化 – Excel和Word的操作运用 Python办公自动化 – Python发送电子邮件和Outlook的集成 Python办公自动化 – 对PDF文档和PPT文档的处理 Python办公自动化 – 对…...
MyBatis 关联查询
目录 一、一对一查询(sqlMapper配置文件) 1、需求: 2、创建account和user实体类 3、创建AccountMapper 接口 4、创建并配置AccountMapper.xml 5、测试 二、一对多查询(sqlMapper配置文件) 1、需求:…...
NVIDIA NCCL 源码学习(十二)- double binary tree
上节我们以ring allreduce为例看到了集合通信的过程,但是随着训练任务中使用的gpu个数的扩展,ring allreduce的延迟会线性增长,为了解决这个问题,NCCL引入了tree算法,即double binary tree。 double binary tree 朴素…...
.net core webapi 大文件上传到wwwroot文件夹
1.配置staticfiles(program文件中) app.UseStaticFiles();2.在wwwroot下创建upload文件夹 3.返回结果封装 namespace webapi;/// <summary> /// 统一数据响应格式 /// </summary> public class Results<T> {/// <summary>/// 自定义的响应码ÿ…...
C++设计模式 #3策略模式(Strategy Method)
动机 在软件构建过程中,某些对象使用的的算法可能多种多样,经常改变。如果将这些算法都写在类中,会使得类变得异常复杂;而且有时候支持不频繁使用的算法也是性能负担。 如何在运行时根据需求透明地更改对象的算法?将…...
金融知识——OMS、EMS和PMS分别是什么意思
金融知识——OMS、EMS和PMS分别是什么意思 OMSEMSPMS OMS OMS(Order Management System)是为了管理头寸,以多种方式创建订单,并进行订单屈从检验以使得用户在订单创建时收到一些约束。在交易管理方面,OMS提供交易组合…...
Docker——微服务的部署
Docker——微服务的部署 文章目录 Docker——微服务的部署初识DockerDocker与虚拟机Docker架构安装DockerCentOS安装Docker卸载(可选)安装docker启动docker配置镜像加速 Docker的基本操作Docker的基本操作——镜像Docker基本操作——容器Docker基本操作—…...
AI时代架构设计新模式
云原生架构原则 云原生架构本身作为一种架构,也有若干架构原则作为应用架构的核心架构控制面,通过遵从这些架构原则可以让技术主管和架构师在做技术选择时不会出现大的偏差。 服务化原则 当代码规模超出小团队的合作范围时,就有必要进行服务…...
速盾网络:高防IP的好处
随着互联网的快速发展,网络安全问题日益突出,越来越多的企业和个人开始关注网络安全防护。其中,高防IP作为一种高效的防御手段,越来越受到用户的青睐。本文将介绍速盾网络高防IP的好处,帮助您了解其优势和应用场景。一…...
创建Maven Web工程
目录下也会有对应的生命周期。其中常用的是:clean、compile、package、install。 比如这里install ,如果其他项目需要将这里的模块作为依赖使用,那就可以 install 。安装到本地仓库的位置: Java的Web工程,所以我们要选…...
【PHP入门】2.2 流程控制
-流程控制- 流程控制:代码执行的方向 2.2.1控制分类 顺序结构:代码从上往下,顺序执行。(代码执行的最基本结构) 分支结构:给定一个条件,同时有多种可执行代码(块)&am…...
springCould中的zookeeper-从小白开始【3】
目录 1.启动zookeeper❤️❤️❤️ 2.创建8004模块 ❤️❤️❤️ 3.临时节点还是永久节点❤️❤️❤️ 4.创建zk80消费模块❤️❤️❤️ 1.启动zookeeper❤️❤️❤️ 进入自己zookeeper的bin目录下 分别使用命令: ./zkServer.sh start 和 ./zkCli.sh -serve…...
Node.js-模块化(二)
1. 模块化的基本概念 1.1 什么是模块化 模块化是指解决一个复杂问题时,自顶向下逐层将系统拆分成若干模块的过程。对于整个系统来说,模块是可组合、分解和更换的单元。 1.2 编程领域中的模块化 编程领域中的模块化,就是遵守固定的规则&…...
MAC 安装nginx
使用Homebrew方式进行安装 步骤: 1、更新 Homebrew brew update 2、下载并安装 Nginx brew install nginx 3、查看 nginx 配置信息 brew info nginx zhanghuaBreeze ~ % brew info nginx // 版本信息 > nginx: stable 1.25.1 (bottled), HEAD HTTP(S) se…...
开源 AI 新秀崛起:Bittensor 更像是真正的“OpenAI”
强大的人工智能正在飞速发展,而完全由 OpenAI、Midjourney、Google(Bard)这样的少数公司控制 AI 不免让人感到担忧。在这样的背景下,试图用创新性解决方案处理人工智能中心化问题、权力集中于少数公司的 Bittensor,可谓…...
设计模式:循序渐进走入工厂模式
文章目录 前言一、引入二、简单工厂模式1.实现2.优缺点3.扩展 三、工厂方法模式1.实现2.优缺点 四、抽象工厂模式1.实现2.优缺点3.使用场景 五、模式扩展六、JDK源码解析总结 前言 软件设计模式之工厂模式。 一、引入 需求:设计一个咖啡店点餐系统。 设计一个咖啡类…...
如何将图片(matlab、python)无损放入word论文
许多论文对插图有要求,直接插入png、jpg一般是不行的,这是一篇顶刊文章(pdf)的插图,放大2400%后依旧清晰,搜罗了网上的方法,总结了一下如何将图片无损放入论文中。 这里主要讨论的是数据生成的图…...
在Next.js和React中搭建Cesium项目
在Next.js和React中搭建Cesium项目,需要确保Cesium能够与服务端渲染(SSR)兼容,因为Next.js默认是SSR的。Cesium是一个基于WebGL的地理信息可视化库,通常用于在网页中展示三维地球或地图。下面是一个基本的步骤,用于在Next.js项目中…...
docker学习(十、搭建redis集群,三主三从)
文章目录 一、docker创建6个redis容器创建6个redis容器回顾各个属性含义 二、划分主从,3主3从划分主从查看状态查看节点信息 docker搭建Redis集群相关知识: docker学习(九、分布式存储亿级数据知识) docker学习(十、搭…...
GPU资源管理混乱?nvitop一站式解决方案深度解析
GPU资源管理混乱?nvitop一站式解决方案深度解析 【免费下载链接】nvitop An interactive NVIDIA-GPU process viewer and beyond, the one-stop solution for GPU process management. 项目地址: https://gitcode.com/gh_mirrors/nv/nvitop 在深度学习训练、…...
Qwen-Image-2512-Pixel-Art-LoRA 模型v1.0 系列作品展:构建一个完整的像素风奇幻世界
Qwen-Image-2512-Pixel-Art-LoRA 模型v1.0 系列作品展:构建一个完整的像素风奇幻世界 朋友们,今天不聊代码,不聊部署,咱们来看点“好玩”的。最近我深度体验了Qwen-Image-2512-Pixel-Art-LoRA模型,它最让我惊喜的&…...
星穹铁道自动化解决方案:用March7thAssistant释放游戏时间价值
星穹铁道自动化解决方案:用March7thAssistant释放游戏时间价值 【免费下载链接】March7thAssistant 🎉 崩坏:星穹铁道全自动 Honkai Star Rail 🎉 项目地址: https://gitcode.com/gh_mirrors/ma/March7thAssistant 副标题&…...
汽车线控转向系统动力学法Carsim和Simulink联合仿真
✅作者简介:热爱科研的Matlab仿真开发者,擅长毕业设计辅导、数学建模、数据处理、建模仿真、程序设计、完整代码获取、论文复现及科研仿真。🍎 往期回顾关注个人主页:Matlab科研工作室👇 关注我领取海量matlab电子书和…...
CSSCI论文写作03:确定论文的选题
什么是选题 选题:选择一个适合的研究指向!!! 选择: 而不是创造,创新是内在要求 你要坚信:所有的选题都有前人关注过研究过,我们不求“栽树”,只求“乘凉”,填补什么空白,只能说明自己的浅薄无知。 适合: 个人经验的学术表达,找到那双穿在自己脚上的鞋子没有不能…...
告别重复代码:BaseMapperPlus在SpringBoot项目中的5个高级用法
BaseMapperPlus实战:SpringBoot项目中提升开发效率的5个高阶技巧 在SpringBoot项目中使用MyBatis-Plus进行数据持久层开发时,BaseMapperPlus作为社区广泛采用的扩展接口,能显著减少模板代码。本文将分享五个实际业务场景中的高阶用法…...
云上实战说 | TapNow x Google Cloud 带您体验从灵感到资产的秒级转化
以下文章来源于谷歌云服务,作者 Google Cloud基于 Google Cloud Veo 和 Nano Banana 的前沿能力,TapNow (万物形象所) 邀您体验生成式 AI 如何重塑品牌与自我表达。现场实时生成风格化写真、宠物贴纸及周边,直观感受从灵感到资产的极速转化&a…...
OpenClaw浏览器自动化:ollama-QwQ-32B驱动的研究资料收集系统
OpenClaw浏览器自动化:ollama-QwQ-32B驱动的研究资料收集系统 1. 为什么需要自动化研究资料收集 作为一名经常需要查阅大量文献的技术写作者,我长期被资料收集的效率问题困扰。传统工作流程中,我需要手动在Google Scholar、arXiv、知乎等平…...
天津专业的阀门厂排名
在天津,阀门行业发展态势良好,众多阀门厂各有特色与优势。中国通用机械工业协会最新发布的《2026年阀门行业高质量发展白皮书》显示,天津的阀门产业在技术创新、产品质量和市场份额等方面都有不错的表现。下面为大家介绍几家天津比较知名的阀…...
ChromePass终极指南:浏览器密码提取与安全管理完全攻略
ChromePass终极指南:浏览器密码提取与安全管理完全攻略 【免费下载链接】chromepass Get all passwords stored by Chrome on WINDOWS. 项目地址: https://gitcode.com/gh_mirrors/chr/chromepass 副标题:从密码危机到数据掌控:3步实现…...
