Java通过Redis进行延时队列,定时发布消息(根据用户选择时间进行发布)
前言
目前很多产品都用到过定时发布或者定时推送等功能,定时推送有两种定义,一种是后台自己有相关规则,通过定时器设置好相应的时间进行推送(例如定时任务框架QuartZ、xxl-job等实现,或者通过springboot自带定时任务@Scheduled注解等实现),这些都是基于后台设定的规则来进行定时推送。
还有一种场景便是根据用户自己选择想要的时间进行推送,这时候再用到上面的方法来做会比较麻烦和复杂,就需要用到延时队列来实现
实现方式
在做这个功能之前,我在网上查阅了想要实现这种根据用户选择时间来推送的相关资料,发现方式还是挺多的,包括但不限于以下几种:
- 最简单暴力的方法,通过上述的定时任务框架或者springboot自带的定时器来实现,把cron表达式书写为每分钟一次,然后每分钟都去检查是否和用户设置的时间能匹配上,如能匹配上就进行相关的业务操作
- 通过实现springboot自带的SchedulingConfigurer接口来进行动态任务调用
- 通过DelayQueue队列进行实现
- 通过MQ中间件的发送消费来实现
- 通过Redis设置key过期时间触发来进行实现
…
上面的几种实现第1、2点比较简单,也很有效果,但是容易出现效率问题和准确性的问题,下面45点的比较不错,但是相比较起来学习成本会高一些,具体实现的思路差不太多,这些都有相关的资料,通过上面的关键字搜索便能查阅到
功能实现
看了那么多的方案之后再结合自身的项目,最终决定用一种新的方案来实现,通过Redis自带的DelayedQueue延时队列来完成,和上面的第45点其实思路差不太多,只不过这个更简单方便一点
定义一个实体类来进行配置
@Data
public class TaskBodyDto implements Serializable {/*** 重试最大次数*/public static final int MAX_RETRY = 3;private String idKey;private String beanName;private String methodName;private Map<String, Object> paramMap;/*** 重试计时器*/private int cnt;/*** 延迟的时间*/private long delay;/*** 延迟的时间单位*/private TimeUnit timeUnit;}
定义RedissonDelayQueue类
@Slf4j
@Component
public class RedissonDelayQueueDemo {@Resourceprivate RedissonClient redissonClient;private RBlockingQueue rBlockingQueue;private RDelayedQueue rDelayedQueue;@PostConstructprivate void init() {rBlockingQueue = redissonClient.getBlockingQueue(TaskListener.class.getName());rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);}public void add(TaskBodyDto reqVo) {rDelayedQueue.offer(reqVo, reqVo.getDelay(), reqVo.getTimeUnit());log.info("增加了延时队列{}", reqVo);}/*** 增加订单延时队列 -单位为秒** @param id id,传入一个唯一标识,可以是业务ID* @param beanName 类名* @param methodName 方法名* @param paramMap 参数* @param delay 延迟时间*/public void add(String id, String beanName, String methodName, Map<String, Object> paramMap, long delay) {TaskBodyDto reqVo= new TaskBodyDto();String idKey = beanName + ":" + methodName + ":" + id;log.info("增加了延时队列" + idKey);reqVo.setIdKey(idKey);reqVo.setBeanName(beanName);reqVo.setMethodName(methodName);reqVo.setParamMap(paramMap);reqVo.setTimeUnit(TimeUnit.SECONDS);reqVo.setDelay(delay);this.add(reqVo);}/*** 删除延时队列** @param id id,传入一个唯一标识,可以是业务ID* @param beanName 类名* @param methodName 方法名*/public void remove(String id, String beanName, String methodName) {String idKey = beanName + ":" + methodName + ":" + id;log.info("删除了延时队列:" + idKey);RDelayedQueue<TaskBodyDto> delayedQueue = rDelayedQueue;Stream<TaskBodyDto> stream = delayedQueue.stream().filter(s -> idKey.equals(s.getIdKey()));List<TaskBodyDto> c = stream.collect(Collectors.toList());if (!c.isEmpty()) {
// log.info("删除延时队列{}", c);delayedQueue.remove(c.get(0));}}}
然后写一个工具类方便调用
@Component
public class RedissionDelayQueueUtils {@Autowiredprivate RedissonDelayQueue redissonDelayQueue;@Autowiredprivate static RedissonDelayQueue staticRedissonDelayQueue;@PostConstructpublic void init() {staticRedissonDelayQueue = redissonDelayQueue;}/*** 添加定时任务* @param id 唯一标识,可以是业务ID* @param paramMap 参数 key-value* @param beanName bean类名称 注意类名需要小写* @param methodName 方法名* @param seconds 延迟时间 单位为秒*/public static void addDelayQueue(String id, Map<String, Object> paramMap, String beanName, String methodName, Integer seconds) {staticRedissonDelayQueue.add(id, beanName, methodName, paramMap, seconds);}/*** 删除定时任务* @param id 唯一标识,可以是业务ID* @param beanName bean类名称 注意类名需要小写* @param methodName 方法名*/public static void removeDelayQueue(String id, String beanName, String methodName) {staticRedissonDelayQueue.remove(id, beanName, methodName);}}
然后再配置好监听器,在监听器里面通过反射获取到相关的方法然后执行里面的业务
@Slf4j
@Component
public class TaskListener implements RedisDelayedQueueListener<TaskBodyDto> {private static final List<Class> WRAP_CLASS = Arrays.asList(Integer.class, Boolean.class, Double.class, Byte.class, Short.class, Long.class, Float.class, Double.class, BigDecimal.class, String.class);//队列Queue@Autowiredprivate RedissonDelayQueue redissonDelayQueue;@Autowiredprivate TaskSender taskSender;@Overridepublic void invoke(TaskBodyDto reqVo) {log.info("开始执行监听...{}", reqVo);reqVo.setCnt(reqVo.getCnt() + 1);try {Object bean = ApplicationContextUtil.getBean(reqVo.getBeanName());Method method = ReflectUtil.getMethodByName(bean.getClass(), reqVo.getMethodName());Class target = AopUtils.getTargetClass(bean);Method targetMethod = ReflectUtil.getMethodByName(target, reqVo.getMethodName());List<Object> objects = getMethodParamList(targetMethod, reqVo.getParamMap());method.invoke(bean, objects.toArray());} catch (Exception e) {log.error("invoke task err!", e);if (reqVo.getCnt() > TaskBodyDto.MAX_RETRY) {log.error("重试次数超过最大次数,不再重试。");DeadQueDto deadQueDto = new DeadQueDto();deadQueDto.setBeanName(reqVo.getBeanName());deadQueDto.setMethodName(reqVo.getMethodName());deadQueDto.setParamMap(reqVo.getParamMap());taskSender.sendTask(deadQueDto);} else {//重试,30分钟后重试,秒为单位则用原数据if (reqVo.getTimeUnit().name().equals(TimeUnit.DAYS.name()) || reqVo.getTimeUnit().name().equals(TimeUnit.HOURS.name())) {reqVo.setDelay(30);reqVo.setTimeUnit(TimeUnit.MINUTES);redissonDelayQueue.add(reqVo);} else if (reqVo.getTimeUnit().name().equals(TimeUnit.MINUTES.name()) && reqVo.getDelay() > 30) {reqVo.setDelay(30);reqVo.setTimeUnit(TimeUnit.MINUTES);redissonDelayQueue.add(reqVo);} else {redissonDelayQueue.add(reqVo);}}}}private List<Object> getMethodParamList(Method method, Map<String, Object> paramMap) throws Exception {List<Object> objectList = new ArrayList<>();// 利用Spring提供的类获取方法形参名DefaultParameterNameDiscoverer nameDiscoverer = new DefaultParameterNameDiscoverer();String[] param = nameDiscoverer.getParameterNames(method);for (int i = 0; i < method.getParameterTypes().length; i++) {Class<?> parameterType = method.getParameterTypes()[i];Object object = null;// 基本类型不支持,支持包装类String paramKey = param[i];if (WRAP_CLASS.contains(parameterType)) {if (param != null && paramMap.containsKey(paramKey)) {object = paramMap.get(paramKey);object = ConvertUtils.convert(object, parameterType);}} else if (!parameterType.isPrimitive()) {if (parameterType.isAssignableFrom(List.class) || parameterType.isAssignableFrom(Map.class) || parameterType.isAssignableFrom(Set.class)) {object = paramMap.get(paramKey);} else {object = parameterType.newInstance();BeanUtils.populate(object, paramMap);}}objectList.add(object);}return objectList;}}
都配置好之后,可以写个方法进行测试
比如我要三十分钟之后执行test方法
public class Test {private void test(String name,String value){//执行业务代码}}
然后在需要执行这个功能的地方进行调用,比如用户在界面选择了发布时间之后,后端接口收到请求进行处理
//延时队列Map<String, Object> map = new HashMap<>();map.put("name","张三");map.put("value","这是value");RedissionDelayQueueUtils.addDelayQueue("唯一标识",map,"test","test", (int) DateUtil.between("用户选择的时间",new Date(), DateUnit.SECOND));
注意事项:这上面的map便是被执行的方法需要的一些参数,切记不能直接传入Object类,只能通过基本数据类型进行传递,传入的bean类名也需要小写,DateUtil.between()这个方法是用的hutool工具类里面的日期工具类,为了算出用户选择的时间和当前时间相差多少秒,可自行更改为适合自己的方法,反正最后只需要取到两者时间差多少秒即可
后续redis的配置那些照常配置即可
总结
总结下来其实思路还是比较明确,就是通过redis延时队列的机制,这边配置好相关的参数然后加入到redis里面去,配置好监听器之后由redis进行监听触发,然后再通过反射的方式取到需要执行的bean和方法进行执行即可,其实延时队列的方法很多,我上面还推荐了一些其他的方法,通过给出的关键字即可查阅相关的资料,总之根据自身的情况选择最适合的方法就行
最后不管采取哪种方式,建议在触发以及执行的地方及时把日志打印出来,方便后期调试以及对问题的定位
相关文章:
Java通过Redis进行延时队列,定时发布消息(根据用户选择时间进行发布)
前言 目前很多产品都用到过定时发布或者定时推送等功能,定时推送有两种定义,一种是后台自己有相关规则,通过定时器设置好相应的时间进行推送(例如定时任务框架QuartZ、xxl-job等实现,或者通过springboot自带定时任务Scheduled注解等实现)&am…...

从 0 搭建 Vite 3 + Vue 3 Js版 前端工程化项目
之前分享过一篇vue3+ts+vite构建工程化项目的文章,针对小的开发团队追求开发速度,不想使用ts想继续使用js,所以就记录一下从0搭建一个vite+vue3+js的前端项目,做记录分享。 技术栈 Vite 3 - 构建工具 Vue 3 Vue Router - 官方路由管理器 Pinia - Vue Store你也可以选择vue…...

【论文阅读笔记】Smil: Multimodal learning with severely missing modality
Ma M, Ren J, Zhao L, et al. Smil: Multimodal learning with severely missing modality[C]//Proceedings of the AAAI Conference on Artificial Intelligence. 2021, 35(3): 2302-2310.[开源] 本文的核心思想是探讨和解决多模态学习中的一个重要问题:在训练和测…...

在Windows系统上安装git-Git的过程记录
01-上git的官网下载git的windows安装版本 下载页面链接: https://git-scm.com/downloads 选择Standalone Installer的版本进行下载: 这里给大家一全git-2.43.0的百度网盘下载链接: https://pan.baidu.com/s/11HwNTCZmtSWj0VG2x60HIA?pwdut…...
qt QString常用方法
1. QString 尾部拼接,尾部插入字符.调用append()函数.同时,QString字符串直接用加号 也可以进行拼接. QString s "我的女神";s s "刘亦菲";s "最近可好?";s.append("你跑哪儿去了?");//拼接结果: 我的女神刘亦菲最近可好?你跑…...

吴恩达《机器学习》10-6-10-7:学习曲线、决定下一步做什么
一、学习曲线 1. 学习曲线概述 学习曲线将训练集误差和交叉验证集误差作为训练集实例数量(m)的函数绘制而成。这意味着从较少的数据开始,逐渐增加训练集的实例数量。该方法的核心思想在于,当训练较少数据时,模型可能…...

分子骨架跃迁工具-DiffHopp 评测
一、文章背景介绍 DiffHopp模型发表在ICML 2023 Workshop on Computational Biology(简称:2023 ICML-WCB)上的文章。第一作者是剑桥计算机系的Jos Torge。 DiffHopp是一个专门针对骨架跃迁任务而训练的E3等变条件扩散模型。此外,…...

MySQL双主双从数据库集群搭建
1 引言 在之前的文章中提到过相关搭建方法,具体请参考《MySQL主从数据库搭建》这篇文章,本文主要讲述双主双从,双主多从集群的搭建方式。 这里要问一个问题,为什么MySQL要搭建数据库集群呢?我想应该有以下几点原因&…...

vue实现动态路由菜单!!!
目录 总结一、步骤1.编写静态路由编写router.jsmain.js注册 2.编写permisstions.js权限文件编写permisstions.jsaxios封装的APIstore.js状态库system.js Axios-APIrequest.js axios请求实例封装 3.编写菜单树组件MenuTree.vue 4.主页中使用菜单树组件 总结 递归处理后端响应的…...

企业如何选择安全又快速的大文件传输平台
在现代信息化社会,数据已经成为各个行业的重要资源,而数据的传输和交换则是数据价值的体现。在很多场合,企业需要传输或接收大文件,例如设计图纸、视频素材、软件开发包、数据库备份等。这些文件的大小通常在几百兆字节到几十个字…...

springboot 自定义starter逐级抽取
自定义starter 背景:各个组件需要引入starter 还有自己的配置风格 –基本配置原理 (1)自定义配置文件 导入配置可以在配置文件中自动识别,提示 导入依赖后可以发现提示 (2)配置文件实现 –让配置文件对其他模块生…...

GAN:ImprovedGAN-训练GAN的改进策略
论文:https://arxiv.org/abs/1606.03498 代码:https://github.com/openai/improved_gan 发表:NIPS 2016 一、文章创新 1:Feature matching:特征匹配通过为生成器指定新目标来解决GANs的不稳定性,从而防止…...

docker限制容器内存的方法
在服务器中使用 docker 时,如果不对 docker 的可调用内存进行限制,当 docker 内的程序出现不可预测的问题时,就很有可能因为内存爆炸导致服务器主机的瘫痪。而对 docker 进行限制后,可以将瘫痪范围控制在 docker 内。 因此&#…...

阿里达摩院裁撤量子实验室
我是卢松松,点点上面的头像,欢迎关注我哦! 马云的达摩院也不搞量子计算了,因为缺钱,整体裁掉了达摩院量子实验室,把所有的设备都赠送给了浙江大学。 达摩院量子实验室:总共30个研究员…...
mysql数据库基础知识,Mysql的索引和主键区别,数据库的事务的基本特性
文章目录 数据库基础知识Mysql的索引和主键的区别数据库的事务的基本特性 数据库基础知识 为什么要使用数据库 数据保存在内存 优点: 存取速度快 缺点: 数据不能永久保存 数据保存在文件 优点: 数据永久保存 缺点:1…...

解决Vscode使用git提交卡住的问题
使用Vscode的git提交代码经常会很慢/卡住。 先点击左下角,进入设置 找到git的配置(建议直接搜索),把use Editor As commit input的勾选去掉即可解决。...
Linux C语言 32-网络编程之UDP例程
Linux C语言 32-网络编程之UDP例程 本节关键字:C语言 网络编程 UDP协议 套接字操作 服务端 客户端 相关C库函数:setsockopt, socket, bind, recvfrom, sendto, close 相关接口介绍 Linux C语言 30-套接字操作 例程执行任务说明 本例程中服务端的任务…...

ubuntu22.04系统下载程序和依赖,并拷贝到指定路径下
脚本1 apt install aptitude apt-get -d install xxx #xxx是待下载的安装包 mv /var/cache/apt/archives/* /home/tuners/1apt install aptitude apt-get -d install xxx mv /var/cache/apt/archives/*.deb /home/tuners/1 xxx 为程序包名称 /home/tuners/1为保存程序包的…...
Kafka KRaft 版本集群部署详细教程(附配置文件详细解释)
版本说明 Ubuntu 18.04.6Kafka 3.6.0JDK8 集群配置 操作系统ip域名Kafka Broker 端口Kafka Controller 端口Ubuntu 18.04.6192.168.50.131kafka1.com90929093Ubuntu 18.04.6192.168.50.132kafka2.com90929093Ubuntu 18.04.6192.168.50.133kafka3.com90929093 安装 vim, cur…...

在龙蜥 anolis os 23 上 源码安装 PostgreSQL 16.1
在龙蜥 OS 23上,本来想使用二进制安装,结果发现没有针对龙蜥的列表: 于是想到了源码安装,下面我们列出了PG源码安装的步骤: 1.安装准备 1.1.创建操作系统组及用户 groupadd postgres useradd -g postgres -m postgr…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

el-switch文字内置
el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

初探Service服务发现机制
1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源…...
【Go语言基础【13】】函数、闭包、方法
文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数(函数作为参数、返回值) 三、匿名函数与闭包1. 匿名函数(Lambda函…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...

莫兰迪高级灰总结计划简约商务通用PPT模版
莫兰迪高级灰总结计划简约商务通用PPT模版,莫兰迪调色板清新简约工作汇报PPT模版,莫兰迪时尚风极简设计PPT模版,大学生毕业论文答辩PPT模版,莫兰迪配色总结计划简约商务通用PPT模版,莫兰迪商务汇报PPT模版,…...