当前位置: 首页 > news >正文

基于Rocket MQ扩展的无限延迟消息队列

基于Rocket MQ扩展的无限延迟消息队列

背景:

  • Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的延迟消息无法实现该功能, 所以对方案进行了改造.

实现原理:

  • 简单而言, 就是在Rocket MQ延迟队列固定时间间隔的基础上, 通过多次发送延迟消息, 达到任意延时时间组合计算. 通过反射的方式, 实现延迟业务逻辑的调用.

  • 源码如下:

  • /** Copyright (c) 2020-2030 XXX.Co.Ltd. All Rights Reserved.*/
    package com.example.xxx.utils;import com.vevor.bmp.crm.common.constants.MQConstants;
    import lombok.Data;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;import javax.annotation.Resource;
    import java.io.Serializable;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :基于Rocket MQ的任意延迟时长工具* @program :user-growth* @date :Created in 2023/5/22 3:35 下午* @since :1.8.0*/
    @Slf4j
    @Component
    @RocketMQMessageListener(consumerGroup = MQConstants.CRM_DELAY_QUEUE_TOPIC_GROUP,topic = MQConstants.CRM_DELAY_QUEUE_TOPIC,// 消息消费顺序consumeMode = ConsumeMode.CONCURRENTLY,// 最大消息重复消费次数maxReconsumeTimes = 3)
    public class RocketMQDelayQueueUtils implements RocketMQListener<RocketMQDelayQueueUtils.DelayTable<Object>> {/*** Rocket MQ客户端*/@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** MQ默认延迟等级*/private static final long[] TIME_DELAY_LEVEL = new long[]{0L, 1000L, 5000L, 10000L,30000L, 60000L, 120000L, 180000L, 240000L, 300000L, 360000L, 420000L,480000L, 540000L, 600000L, 1200000L, 1800000L, 3600000L, 7200000L};@SneakyThrows@Overridepublic void onMessage(DelayTable<Object> message) {Date endTime = message.getEndTime();int delayLevel = getDelayLevel(endTime);// 继续延迟if (delayLevel != 0) {int currentDelayCount = message.getCurrentDelayCount();currentDelayCount++;message.setCurrentDelayCount(currentDelayCount);message.setCurrentDelayLevel(delayLevel);message.setCurrentDelayMillis(TIME_DELAY_LEVEL[delayLevel]);this.sendDelayMessage(message);return;}// 执行业务log.info("delay message end! start to process business...");Class<? extends DelayMessageHandler> messageHandler = message.getMessageHandler();if (messageHandler != null) {DelayMessageHandler delayMessageHandler = messageHandler.newInstance();delayMessageHandler.handle();}}/*** 延迟消息体** @param <E> 消息类型*/@Datapublic static class DelayTable<E> implements Serializable {private static final long serialVersionUID = 2405172041950251807L;/*** 延迟消息体*/private E content;/*** 消息延迟结束时间*/private Date endTime;/*** 总延迟毫秒数*/private long totalDelayTime;/*** 总延迟时间单位*/private TimeUnit totalDelayTimeUnit;/*** 当前延迟次数*/private int currentDelayCount;/*** 当前延迟等级*/private int currentDelayLevel;/*** 当前延迟毫秒数*/private long currentDelayMillis;/*** 延迟处理逻辑*/private Class<? extends DelayMessageHandler> messageHandler;}/*** 发送延迟消息** @param message 消息体* @param delay 延迟时长* @param timeUnit 延迟时间单位* @param handler 延迟时间到了之后,需要处理的逻辑* @param <E> 延迟消息类型*/public <E> void delay(E message, int delay, TimeUnit timeUnit, Class<? extends DelayMessageHandler> handler) {// 把延迟时间转换成时间戳(毫秒)long totalDelayMills = timeUnit.toMillis(delay);// 根据延迟时间计算结束时间Calendar instance = Calendar.getInstance();instance.add(Calendar.MILLISECOND, (int)totalDelayMills);Date endTime = instance.getTime();// 根据延迟时间匹配延迟等级(delay level)int delayLevel = getDelayLevel(endTime);long delayMillis = TIME_DELAY_LEVEL[delayLevel];// 发送消息DelayTable<E> delayTable = new DelayTable<>();// 全局数据delayTable.setContent(message);delayTable.setMessageHandler(handler);delayTable.setEndTime(endTime);delayTable.setTotalDelayTime(delay);delayTable.setTotalDelayTimeUnit(timeUnit);// 当前延迟等级数据delayTable.setCurrentDelayCount(1);delayTable.setCurrentDelayLevel(delayLevel);delayTable.setCurrentDelayMillis(delayMillis);this.sendDelayMessage(delayTable);}/*** 计算延迟等级** @param targetTime 延迟截止时间* @return Rocket MQ延迟消息等级*/private static int getDelayLevel(Date targetTime) {long currentTime = System.currentTimeMillis();long delayMillis = targetTime.getTime() - currentTime;if (delayMillis <= 0) {// 不延迟,即延迟等级为 0return 0;}// 判断处于哪个延迟等级// 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1hfor (int i = 1; i <= 18; i++) {long delayLevelTime = TIME_DELAY_LEVEL[i];if (delayMillis < delayLevelTime) {return i - 1;} else if (delayMillis == delayLevelTime) {return i;}}// 最大延迟等级为 18return 18;}/*** 发送延迟消息** @param delayTable 延迟对象,可以循环使用*/@SneakyThrowsprivate <E> void sendDelayMessage(DelayTable<E> delayTable) {// 消息序列化Message<DelayTable<E>> message = MessageBuilder.withPayload(delayTable).build();// 设置\发送延迟消息int delayLevel = delayTable.getCurrentDelayLevel();rocketMQTemplate.syncSend(MQConstants.CRM_DELAY_QUEUE_TOPIC, message, 3000, delayLevel);log.debug("delay count: {}, delay level: {}, time: {} milliseconds",delayTable.currentDelayCount, delayLevel, TIME_DELAY_LEVEL[delayLevel]);}/*** 延迟回调接口** 回调逻辑必须实现该接口#hander()方法,在延迟结束后,会通过反射的方式调用该方法*/public interface DelayMessageHandler extends Serializable {long serialVersionUID = 2405172041950251807L;/*** 回调函数*/void handle();}}
    

测试代码:

  • /** Copyright (c) 2020-2030 Sishun.Co.Ltd. All Rights Reserved.*/
    package com.vevor.bmp.crm.io.controller;import com.vevor.bmp.crm.cpm.utils.RocketMQDelayQueueUtils;
    import com.vevor.common.pojo.vo.ResponseResult;
    import lombok.Data;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
    import java.util.concurrent.TimeUnit;/*** @version :1.8.0* @description :延迟队列测试* @program :user-growth* @date :Created in 2023/5/22 4:54 下午* @since :1.8.0*/
    @Slf4j
    @RestController
    public class DelayQueueController {@Resourceprivate RocketMQDelayQueueUtils rocketMQDelayQueueUtils;@GetMapping("/mq/delay")@SneakyThrowspublic ResponseResult<String> mqDelay(@RequestParam Integer delay, @RequestParam String task) {// 获取延时队列rocketMQDelayQueueUtils.delay(task, delay, TimeUnit.SECONDS, CallBack.class);return ResponseResult.success();}/*** @version :* @description :* @program :user-growth* @date :Created in 2023/5/23 2:11 下午* @since :*/@Datapublic static class CallBack implements RocketMQDelayQueueUtils.DelayMessageHandler {/*** 回调函数*/@Overridepublic void handle() {log.info("i am business logical! {}", System.currentTimeMillis());}}
    }
    

优缺点:

  • 优点: 与定时任务框架相比, 通过延迟消息的方式具实时性高、 支持分布式、轻量级、高并发等优点.
  • 缺点: 消息的准确性不可靠, 正常情况下准确性在秒级, 但是当MQ服务出现消息堆积时, 消息的时间就会偏差较大, 所以准确性依赖MQ服务的稳定.

相关文章:

基于Rocket MQ扩展的无限延迟消息队列

基于Rocket MQ扩展的无限延迟消息队列 背景: Rocket MQ支持的延迟队列时间是固定间隔的, 默认19个等级(包含0等级): 0s, 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h. 我们的需求是实现用户下单后48小时或72小时给用户发送逼单邮件. 使用默认的…...

Python办公自动化 – 日志分析和自动化FTP操作

Python办公自动化 – 日志分析和自动化FTP操作 以下是往期的文章目录&#xff0c;需要可以查看哦。 Python办公自动化 – Excel和Word的操作运用 Python办公自动化 – Python发送电子邮件和Outlook的集成 Python办公自动化 – 对PDF文档和PPT文档的处理 Python办公自动化 – 对…...

MyBatis 关联查询

目录 一、一对一查询&#xff08;sqlMapper配置文件&#xff09; 1、需求&#xff1a; 2、创建account和user实体类 3、创建AccountMapper 接口 4、创建并配置AccountMapper.xml 5、测试 二、一对多查询&#xff08;sqlMapper配置文件&#xff09; 1、需求&#xff1a;…...

NVIDIA NCCL 源码学习(十二)- double binary tree

上节我们以ring allreduce为例看到了集合通信的过程&#xff0c;但是随着训练任务中使用的gpu个数的扩展&#xff0c;ring allreduce的延迟会线性增长&#xff0c;为了解决这个问题&#xff0c;NCCL引入了tree算法&#xff0c;即double binary tree。 double binary tree 朴素…...

.net core webapi 大文件上传到wwwroot文件夹

1.配置staticfiles(program文件中) app.UseStaticFiles();2.在wwwroot下创建upload文件夹 3.返回结果封装 namespace webapi;/// <summary> /// 统一数据响应格式 /// </summary> public class Results<T> {/// <summary>/// 自定义的响应码&#xff…...

C++设计模式 #3策略模式(Strategy Method)

动机 在软件构建过程中&#xff0c;某些对象使用的的算法可能多种多样&#xff0c;经常改变。如果将这些算法都写在类中&#xff0c;会使得类变得异常复杂&#xff1b;而且有时候支持不频繁使用的算法也是性能负担。 如何在运行时根据需求透明地更改对象的算法&#xff1f;将…...

金融知识——OMS、EMS和PMS分别是什么意思

金融知识——OMS、EMS和PMS分别是什么意思 OMSEMSPMS OMS OMS&#xff08;Order Management System&#xff09;是为了管理头寸&#xff0c;以多种方式创建订单&#xff0c;并进行订单屈从检验以使得用户在订单创建时收到一些约束。在交易管理方面&#xff0c;OMS提供交易组合…...

Docker——微服务的部署

Docker——微服务的部署 文章目录 Docker——微服务的部署初识DockerDocker与虚拟机Docker架构安装DockerCentOS安装Docker卸载&#xff08;可选&#xff09;安装docker启动docker配置镜像加速 Docker的基本操作Docker的基本操作——镜像Docker基本操作——容器Docker基本操作—…...

AI时代架构设计新模式

云原生架构原则 云原生架构本身作为一种架构&#xff0c;也有若干架构原则作为应用架构的核心架构控制面&#xff0c;通过遵从这些架构原则可以让技术主管和架构师在做技术选择时不会出现大的偏差。 服务化原则 当代码规模超出小团队的合作范围时&#xff0c;就有必要进行服务…...

速盾网络:高防IP的好处

随着互联网的快速发展&#xff0c;网络安全问题日益突出&#xff0c;越来越多的企业和个人开始关注网络安全防护。其中&#xff0c;高防IP作为一种高效的防御手段&#xff0c;越来越受到用户的青睐。本文将介绍速盾网络高防IP的好处&#xff0c;帮助您了解其优势和应用场景。一…...

创建Maven Web工程

目录下也会有对应的生命周期。其中常用的是&#xff1a;clean、compile、package、install。 比如这里install &#xff0c;如果其他项目需要将这里的模块作为依赖使用&#xff0c;那就可以 install 。安装到本地仓库的位置&#xff1a; Java的Web工程&#xff0c;所以我们要选…...

【PHP入门】2.2 流程控制

-流程控制- 流程控制&#xff1a;代码执行的方向 2.2.1控制分类 顺序结构&#xff1a;代码从上往下&#xff0c;顺序执行。&#xff08;代码执行的最基本结构&#xff09; 分支结构&#xff1a;给定一个条件&#xff0c;同时有多种可执行代码&#xff08;块&#xff09;&am…...

springCould中的zookeeper-从小白开始【3】

目录 1.启动zookeeper❤️❤️❤️ 2.创建8004模块 ❤️❤️❤️ 3.临时节点还是永久节点❤️❤️❤️ 4.创建zk80消费模块❤️❤️❤️ 1.启动zookeeper❤️❤️❤️ 进入自己zookeeper的bin目录下 分别使用命令&#xff1a; ./zkServer.sh start 和 ./zkCli.sh -serve…...

Node.js-模块化(二)

1. 模块化的基本概念 1.1 什么是模块化 模块化是指解决一个复杂问题时&#xff0c;自顶向下逐层将系统拆分成若干模块的过程。对于整个系统来说&#xff0c;模块是可组合、分解和更换的单元。 1.2 编程领域中的模块化 编程领域中的模块化&#xff0c;就是遵守固定的规则&…...

MAC 安装nginx

使用Homebrew方式进行安装 步骤&#xff1a; 1、更新 Homebrew brew update 2、下载并安装 Nginx brew install nginx 3、查看 nginx 配置信息 brew info nginx zhanghuaBreeze ~ % brew info nginx // 版本信息 > nginx: stable 1.25.1 (bottled), HEAD HTTP(S) se…...

开源 AI 新秀崛起:Bittensor 更像是真正的“OpenAI”

强大的人工智能正在飞速发展&#xff0c;而完全由 OpenAI、Midjourney、Google&#xff08;Bard&#xff09;这样的少数公司控制 AI 不免让人感到担忧。在这样的背景下&#xff0c;试图用创新性解决方案处理人工智能中心化问题、权力集中于少数公司的 Bittensor&#xff0c;可谓…...

设计模式:循序渐进走入工厂模式

文章目录 前言一、引入二、简单工厂模式1.实现2.优缺点3.扩展 三、工厂方法模式1.实现2.优缺点 四、抽象工厂模式1.实现2.优缺点3.使用场景 五、模式扩展六、JDK源码解析总结 前言 软件设计模式之工厂模式。 一、引入 需求&#xff1a;设计一个咖啡店点餐系统。 设计一个咖啡类…...

如何将图片(matlab、python)无损放入word论文

许多论文对插图有要求&#xff0c;直接插入png、jpg一般是不行的&#xff0c;这是一篇顶刊文章&#xff08;pdf&#xff09;的插图&#xff0c;放大2400%后依旧清晰&#xff0c;搜罗了网上的方法&#xff0c;总结了一下如何将图片无损放入论文中。 这里主要讨论的是数据生成的图…...

在Next.js和React中搭建Cesium项目

在Next.js和React中搭建Cesium项目&#xff0c;需要确保Cesium能够与服务端渲染(SSR)兼容&#xff0c;因为Next.js默认是SSR的。Cesium是一个基于WebGL的地理信息可视化库&#xff0c;通常用于在网页中展示三维地球或地图。下面是一个基本的步骤&#xff0c;用于在Next.js项目中…...

docker学习(十、搭建redis集群,三主三从)

文章目录 一、docker创建6个redis容器创建6个redis容器回顾各个属性含义 二、划分主从&#xff0c;3主3从划分主从查看状态查看节点信息 docker搭建Redis集群相关知识&#xff1a; docker学习&#xff08;九、分布式存储亿级数据知识&#xff09; docker学习&#xff08;十、搭…...

从零实现富文本编辑器#5-编辑器选区模型的状态结构表达

先前我们总结了浏览器选区模型的交互策略&#xff0c;并且实现了基本的选区操作&#xff0c;还调研了自绘选区的实现。那么相对的&#xff0c;我们还需要设计编辑器的选区表达&#xff0c;也可以称为模型选区。编辑器中应用变更时的操作范围&#xff0c;就是以模型选区为基准来…...

练习(含atoi的模拟实现,自定义类型等练习)

一、结构体大小的计算及位段 &#xff08;结构体大小计算及位段 详解请看&#xff1a;自定义类型&#xff1a;结构体进阶-CSDN博客&#xff09; 1.在32位系统环境&#xff0c;编译选项为4字节对齐&#xff0c;那么sizeof(A)和sizeof(B)是多少&#xff1f; #pragma pack(4)st…...

【算法训练营Day07】字符串part1

文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接&#xff1a;344. 反转字符串 双指针法&#xff0c;两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂度…...

ios苹果系统,js 滑动屏幕、锚定无效

现象&#xff1a;window.addEventListener监听touch无效&#xff0c;划不动屏幕&#xff0c;但是代码逻辑都有执行到。 scrollIntoView也无效。 原因&#xff1a;这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作&#xff0c;从而会影响…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...

七、数据库的完整性

七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中&#xff0c;Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式&#xff0c;用于在多个 Goroutine 之间传递数据&#xff0c;从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...

uniapp 开发ios, xcode 提交app store connect 和 testflight内测

uniapp 中配置 配置manifest 文档&#xff1a;manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号&#xff1a;4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...