基于Rocket MQ扩展的无限延迟消息队列
基于Rocket MQ扩展的无限延迟消息队列
背景:
- Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的延迟消息无法实现该功能, 所以对方案进行了改造.
实现原理:
-
简单而言, 就是在Rocket MQ延迟队列固定时间间隔的基础上, 通过多次发送延迟消息, 达到任意延时时间组合计算. 通过反射的方式, 实现延迟业务逻辑的调用.
-
源码如下:
-
/** Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.*/ package com.example.xxx.utils;import com.vevor.bmp.crm.common.constants.MQConstants; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;import javax.annotation.Resource; import java.io.Serializable; import java.util.Calendar; import java.util.Date; import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :基于Rocket MQ的任意延迟时长工具* @program :user-growth* @date :Created in 2023/5/22 3:35 下午* @since :1.8.0*/ @Slf4j @Component @RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,topic = MQConstants.CRM_DELAY_QUEUE_TOPIC,// 消息消费顺序consumeMode = ConsumeMode.CONCURRENTLY,// 最大消息重复消费次数maxReconsumeTimes = 3) public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> {/*** Rocket MQ客户端*/@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** MQ默认延迟等级*/private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L,30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};@SneakyThrows@Overridepublic void onMessage(DelayTable<Object> message) {Date endTime = message.getEndTime();int delayLevel = getDelayLevel(endTime);// 继续延迟if (delayLevel != 0) {int currentDelayCount = message.getCurrentDelayCount();currentDelayCount++;message.setCurrentDelayCount(currentDelayCount);message.setCurrentDelayLevel(delayLevel);message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);this.sendDelayMessage(message);return;}// 执行业务log.info("delay message end! start to process business...");Class<? extends DelayMessageHandler> messageHandler = message.getMessageHandler();if (messageHandler != null) {DelayMessageHandler delayMessageHandler = messageHandler.newInstance();delayMessageHandler.handle();}}/*** 延迟消息体** @param <E> 消息类型*/@Datapublic static class DelayTable<E> implements Serializable {private static final long serialVersionUID = 2405172041950251807L;/*** 延迟消息体*/private E content;/*** 消息延迟结束时间*/private Date endTime;/*** 总延迟毫秒数*/private long totalDelayTime;/*** 总延迟时间单位*/private TimeUnit totalDelayTimeUnit;/*** 当前延迟次数*/private int currentDelayCount;/*** 当前延迟等级*/private int currentDelayLevel;/*** 当前延迟毫秒数*/private long currentDelayMillis;/*** 延迟处理逻辑*/private Class<? extends DelayMessageHandler> messageHandler;}/*** 发送延迟消息** @param message 消息体* @param delay 延迟时长* @param timeUnit 延迟时间单位* @param handler 延迟时间到了之后,需要处理的逻辑* @param <E> 延迟消息类型*/public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) {// 把延迟时间转换成时间戳(毫秒)long totalDelayMills = timeUnit.toMillis(delay);// 根据延迟时间计算结束时间Calendar instance = Calendar.getInstance();instance.add(Calendar.MILLISECOND, (int)totalDelayMills);Date endTime = instance.getTime();// 根据延迟时间匹配延迟等级(delay level)int delayLevel = getDelayLevel(endTime);long delayMillis = TIME_DELAY_LEVEL[delayLevel];// 发送消息DelayTable<E> delayTable = new DelayTable<>();// 全局数据delayTable.setContent(message);delayTable.setMessageHandler(handler);delayTable.setEndTime(endTime);delayTable.setTotalDelayTime(delay);delayTable.setTotalDelayTimeUnit(timeUnit);// 当前延迟等级数据delayTable.setCurrentDelayCount(1);delayTable.setCurrentDelayLevel(delayLevel);delayTable.setCurrentDelayMillis(delayMillis);this.sendDelayMessage(delayTable);}/*** 计算延迟等级** @param targetTime 延迟截止时间* @return Rocket MQ延迟消息等级*/private static int getDelayLevel(Date targetTime) {long currentTime = System.currentTimeMillis();long delayMillis = targetTime.getTime() - currentTime;if (delayMillis <= 0) {// 不延迟,即延迟等级为 0return 0;}// 判断处于哪个延迟等级// 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1hfor (int i = 1; i <= 18; i++) {long delayLevelTime = TIME_DELAY_LEVEL[i];if (delayMillis < delayLevelTime) {return i - 1;} else if (delayMillis == delayLevelTime) {return i;}}// 最大延迟等级为 18return 18;}/*** 发送延迟消息** @param delayTable 延迟对象,可以循环使用*/@SneakyThrowsprivate <E> void sendDelayMessage(DelayTable<E> delayTable) {// 消息序列化Message<DelayTable<E>> message = MessageBuilder.withPayload(delayTable).build();// 设置\发送延迟消息int delayLevel = delayTable.getCurrentDelayLevel();rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message, 3000, delayLevel);log.debug("delay count: {}, delay level: {}, time: {} milliseconds",delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);}/*** 延迟回调接口** 回调逻辑必须实现该接口#hander()方法,在延迟结束后,会通过反射的方式调用该方法*/public interface DelayMessageHandler extends Serializable {long serialVersionUID = 2405172041950251807L;/*** 回调函数*/void handle();}}
测试代码:
-
/** Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.*/ package com.vevor.bmp.crm.io.controller;import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils; import com.vevor.common.pojo.vo.ResponseResult; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource; import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :延迟队列测试* @program :user-growth* @date :Created in 2023/5/22 4:54 下午* @since :1.8.0*/ @Slf4j @RestController public class DelayQueueController {@Resourceprivate RocketMQDelayQueueUtils rocketMQDelayQueueUtils;@GetMapping("/mq/delay")@SneakyThrowspublic ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) {// 获取延时队列rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);return ResponseResult.success();}/*** @version :* @description :* @program :user-growth* @date :Created in 2023/5/23 2:11 下午* @since :*/@Datapublic static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler {/*** 回调函数*/@Overridepublic void handle() {log.info("i am business logical! {}", System.currentTimeMillis());}} }
优缺点:
- 优点: 与定时任务框架相比, 通过延迟消息的方式具实时性高、 支持分布式、轻量级、高并发等优点.
- 缺点: 消息的准确性不可靠, 正常情况下准确性在秒级, 但是当MQ服务出现消息堆积时, 消息的时间就会偏差较大, 所以准确性依赖MQ服务的稳定.
相关文章:
基于Rocket MQ扩展的无限延迟消息队列
基于Rocket MQ扩展的无限延迟消息队列 背景: Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的…...
Python办公自动化 – 日志分析和自动化FTP操作
Python办公自动化 – 日志分析和自动化FTP操作 以下是往期的文章目录,需要可以查看哦。 Python办公自动化 – Excel和Word的操作运用 Python办公自动化 – Python发送电子邮件和Outlook的集成 Python办公自动化 – 对PDF文档和PPT文档的处理 Python办公自动化 – 对…...
MyBatis 关联查询
目录 一、一对一查询(sqlMapper配置文件) 1、需求: 2、创建account和user实体类 3、创建AccountMapper 接口 4、创建并配置AccountMapper.xml 5、测试 二、一对多查询(sqlMapper配置文件) 1、需求:…...
NVIDIA NCCL 源码学习(十二)- double binary tree
上节我们以ring allreduce为例看到了集合通信的过程,但是随着训练任务中使用的gpu个数的扩展,ring allreduce的延迟会线性增长,为了解决这个问题,NCCL引入了tree算法,即double binary tree。 double binary tree 朴素…...
.net core webapi 大文件上传到wwwroot文件夹
1.配置staticfiles(program文件中) app.UseStaticFiles();2.在wwwroot下创建upload文件夹 3.返回结果封装 namespace webapi;/// <summary> /// 统一数据响应格式 /// </summary> public class Results<T> {/// <summary>/// 自定义的响应码ÿ…...
C++设计模式 #3策略模式(Strategy Method)
动机 在软件构建过程中,某些对象使用的的算法可能多种多样,经常改变。如果将这些算法都写在类中,会使得类变得异常复杂;而且有时候支持不频繁使用的算法也是性能负担。 如何在运行时根据需求透明地更改对象的算法?将…...
金融知识——OMS、EMS和PMS分别是什么意思
金融知识——OMS、EMS和PMS分别是什么意思 OMSEMSPMS OMS OMS(Order Management System)是为了管理头寸,以多种方式创建订单,并进行订单屈从检验以使得用户在订单创建时收到一些约束。在交易管理方面,OMS提供交易组合…...
Docker——微服务的部署
Docker——微服务的部署 文章目录 Docker——微服务的部署初识DockerDocker与虚拟机Docker架构安装DockerCentOS安装Docker卸载(可选)安装docker启动docker配置镜像加速 Docker的基本操作Docker的基本操作——镜像Docker基本操作——容器Docker基本操作—…...
AI时代架构设计新模式
云原生架构原则 云原生架构本身作为一种架构,也有若干架构原则作为应用架构的核心架构控制面,通过遵从这些架构原则可以让技术主管和架构师在做技术选择时不会出现大的偏差。 服务化原则 当代码规模超出小团队的合作范围时,就有必要进行服务…...
速盾网络:高防IP的好处
随着互联网的快速发展,网络安全问题日益突出,越来越多的企业和个人开始关注网络安全防护。其中,高防IP作为一种高效的防御手段,越来越受到用户的青睐。本文将介绍速盾网络高防IP的好处,帮助您了解其优势和应用场景。一…...
创建Maven Web工程
目录下也会有对应的生命周期。其中常用的是:clean、compile、package、install。 比如这里install ,如果其他项目需要将这里的模块作为依赖使用,那就可以 install 。安装到本地仓库的位置: Java的Web工程,所以我们要选…...
【PHP入门】2.2 流程控制
-流程控制- 流程控制:代码执行的方向 2.2.1控制分类 顺序结构:代码从上往下,顺序执行。(代码执行的最基本结构) 分支结构:给定一个条件,同时有多种可执行代码(块)&am…...
springCould中的zookeeper-从小白开始【3】
目录 1.启动zookeeper❤️❤️❤️ 2.创建8004模块 ❤️❤️❤️ 3.临时节点还是永久节点❤️❤️❤️ 4.创建zk80消费模块❤️❤️❤️ 1.启动zookeeper❤️❤️❤️ 进入自己zookeeper的bin目录下 分别使用命令: ./zkServer.sh start 和 ./zkCli.sh -serve…...
Node.js-模块化(二)
1. 模块化的基本概念 1.1 什么是模块化 模块化是指解决一个复杂问题时,自顶向下逐层将系统拆分成若干模块的过程。对于整个系统来说,模块是可组合、分解和更换的单元。 1.2 编程领域中的模块化 编程领域中的模块化,就是遵守固定的规则&…...
MAC 安装nginx
使用Homebrew方式进行安装 步骤: 1、更新 Homebrew brew update 2、下载并安装 Nginx brew install nginx 3、查看 nginx 配置信息 brew info nginx zhanghuaBreeze ~ % brew info nginx // 版本信息 > nginx: stable 1.25.1 (bottled), HEAD HTTP(S) se…...
开源 AI 新秀崛起:Bittensor 更像是真正的“OpenAI”
强大的人工智能正在飞速发展,而完全由 OpenAI、Midjourney、Google(Bard)这样的少数公司控制 AI 不免让人感到担忧。在这样的背景下,试图用创新性解决方案处理人工智能中心化问题、权力集中于少数公司的 Bittensor,可谓…...
设计模式:循序渐进走入工厂模式
文章目录 前言一、引入二、简单工厂模式1.实现2.优缺点3.扩展 三、工厂方法模式1.实现2.优缺点 四、抽象工厂模式1.实现2.优缺点3.使用场景 五、模式扩展六、JDK源码解析总结 前言 软件设计模式之工厂模式。 一、引入 需求:设计一个咖啡店点餐系统。 设计一个咖啡类…...
如何将图片(matlab、python)无损放入word论文
许多论文对插图有要求,直接插入png、jpg一般是不行的,这是一篇顶刊文章(pdf)的插图,放大2400%后依旧清晰,搜罗了网上的方法,总结了一下如何将图片无损放入论文中。 这里主要讨论的是数据生成的图…...
在Next.js和React中搭建Cesium项目
在Next.js和React中搭建Cesium项目,需要确保Cesium能够与服务端渲染(SSR)兼容,因为Next.js默认是SSR的。Cesium是一个基于WebGL的地理信息可视化库,通常用于在网页中展示三维地球或地图。下面是一个基本的步骤,用于在Next.js项目中…...
docker学习(十、搭建redis集群,三主三从)
文章目录 一、docker创建6个redis容器创建6个redis容器回顾各个属性含义 二、划分主从,3主3从划分主从查看状态查看节点信息 docker搭建Redis集群相关知识: docker学习(九、分布式存储亿级数据知识) docker学习(十、搭…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
Java 语言特性(面试系列2)
一、SQL 基础 1. 复杂查询 (1)连接查询(JOIN) 内连接(INNER JOIN):返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...
