当前位置: 首页 > news >正文

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK

任务调度框架

Java中如何实现定时任务?

比如:
1.每天早上6点定时执行
2.每月最后一个工作日,考勤统计
3.每个月25号信用卡还款
4.会员生日祝福
5.每隔3秒,自动提醒

10分钟的超时订单的自动取消,每隔30秒或1分钟查询一次订单,拿当前的时间上前推10分钟
定时任务,资源会有误差的存在,如果使用定时任务
定时任务,用于统计的时候最多。

自动统计考勤,一般0点之后开始统计,可以使用定时任务

nacos心跳

晚上要求和采购部门生成采购单,达到最低预警值的时候,去发给采购部门

我们可以通过任务调度框架实现上述的需求
任务调度框架,可以实现定时任务,实现间隔多少时间的重复执行,实现指定日期的重复执行

电商自动好评,间隔时间长的,误差几分钟,影响不大。

java中任务调度框架:
1、Spring Task spring自带的
2、Quartz 古老的框架
3、XXL-Job
4、第三方云平台-比如说:阿里云-SchedulerX等等
选择一个:Spring Task
2个注解+
包:task任务、job

使用步骤:
1、开关类,使用注解
@EnableScheduling // 开启任务调度
在这里插入图片描述
2、定义定时任务

@Scheduled(cron = "0/3 * * * ?")

CORN表达式:
秒 分 时 日 月 星期几 年 其中,只有年可以省略
定时任务,需要重复执行的方法

CORN表达式:
特殊字符串,主要用来描述时间的,用于任务调度等
https://cron.qqe2.com/

/ 间隔
- 是连续
, 枚举值
L 最后,星期、日中用
W 有效工作日
LW 某个月最后一个工作日
# 用于确定每个月第几个星期几,母亲节或父亲节
4#2 某个月的第二个星期三  4代表星期三,中文的时候有可能不影响

在这里插入图片描述

在这里插入图片描述

项目名:SpringTask01
Spring Web、Lombok

RabbitMq实现延迟:

死信+延迟消息处理

死信:RabbitMQ的队列中的消息,满足以下条件任意其一就会成为死信消息:
1.消息被拒绝
2.消息过期
3.队列满了
死信交换器:专门用来转发队列中的死信消息,将死信消息转发到指定的队列中

十分钟未支付,取消订单?
十分钟之后,消息会过期,过期后,通过死信交换器转发队列中的死信消息,将死信消息转发到指定的队列中,由消费者去处理。

我们可以通过死信+死信交换器实现延迟消息处理
RabbitMQ实现延迟消息处理有2种方式:
1、死信+死信交换器 代码实现(可控,更方便一些)
2、延迟消息插件

1、死信+死信交换器 代码实现(可控,更方便一些)
发送消息,到队列1,(产生延迟队列,产生死信)
在这里插入图片描述
一个消息,过一段时间,实现消费。

核心:
1.队列 是2个队列,第1个队列: 目的 产生死信(1.设置有效期2.不设置消费者),借助死信交换器,把产生的死信发到指定的队列中
第二个对了:目的 消费死信,这里获取的信息,时间就是延迟的,延迟的就是上面的有效期
2.1个交换器
死信交换器,整个系统一般就一个,可以用来转发各个功能产生的死信,是Direct类型的交换,通过RK进行消息匹配到对应的队列中。
RabbitMQ的消息的有效期有2种设置方式:
1.设置队列上的有效期,整个队列中所有消息都使用
2.可以在每个消息上设置有效期,这种适用于有多个不同有效期的消息

在这里插入图片描述
如果队列和消息都有有效期,谁短听谁的。

代码:
RabbitMQ02
实现:
1.pom

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--        RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>

2.代码:
config-RabbitMQConfig

package com.yd.rabbitmq02.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;/*** @author MaoXiqi* @organization: Lucky* @create 2023-10-16 11:35:54* @description RabbitMQConfigApi*/
@Configuration
public class RabbitMQConfig {// 1.创建2个队列@Beanpublic Queue createQ1() {// 1.设置队列 内部消息有效期 设置死信交换器 设置RKHashMap<String, Object> params = new HashMap<>();// 设置队列中每个消息的有效期 单位 毫秒params.put("x-message-ttl", 3000);// 设置对应的死信交换器params.put("x-dead-letter-exchange", "dead-ex-yd");// 设置交换器匹配的路由名称params.put("x-dead-letter-routing-key", "test");return QueueBuilder.durable("dl-q01").withArguments(params).build();}@Beanpublic Queue createQ2() {return new Queue("dl-q02");}// 2.创建1个交换器(1.fanout 2,direct 3.topic 4.header)-死信交换器direct类型@Beanpublic DirectExchange createDe() {return new DirectExchange("dead-ex-yd");}// 3.创建1个绑定@Beanpublic Binding createBd1(DirectExchange de) {return BindingBuilder.bind(createQ2()).to(de).with("test");}
}

在这里插入图片描述

2.1.创建两个队列
1、设置队列,内部消息有效期 设置死信交换器 设置RK 注意:参数名是固定的,值根据业务需求去改,值 单位是毫秒

2.2创建1个交换器(1.fanout 2.direct 3.tipic
4.header)-死信交换器direct类型

2.3.创建一个绑定

controller-DeadController

    @GetMapping("send")public String sendDead(String msg) {System.out.println("发送消息," + msg + ",发送时间:" + System.currentTimeMillis());template.convertAndSend("", "dl-q01", msg);return "ok";}

发送
在这里插入图片描述
监听消息-主要是为了消费:
listener-DeadListener

    @RabbitListener(queues = "dl-q02")public void handler(String m) {System.out.println("延迟消息,"+m+",接受时间:" +System.currentTimeMillis());}

在这里插入图片描述
yml:

spring:rabbitmq:host: 121.36.5.100port: 5672username: guestpassword: guest
server:port: 8082

测试:
在这里插入图片描述

RabbitMQ事务:

数据库中事务:保证数据一致性,特别是多个操作要么都成功,要么都失败
RabbitMQ事务:一次性发多条消息,需要开启事务,
RabbitMQ也有自己的事务,如果本次在这里插入图片描述

在这里插入图片描述

使用步骤:

源码:RabbitMQ02
1.创建配置类
2.使用基于事务发送
3.监听消息

1.创建配置类
config-RabbitMQTranConfig
1.准备一个队列
2.创建事务管理器

// 创建事务管理器@Beanpublic RabbitTransactionManager createTran(ConnectionFactory factory) {return new RabbitTransactionManager(factory);}

在这里插入图片描述

2.controller-TranController
开启事务
发送消息

    // 事务@Transactional // 需要开启SpringBoot的事务机制@GetMapping("sendmsg")public String sendMsg(String msg, int count) {// 开启 RabbitMQ的通道的事务template.setChannelTransacted(true);// 发送消息for (int i = 0; i < count; i++) {template.convertAndSend("", "yd-tran-q01", msg + "--" + count);// 出错,看看 事务是否生效if (i==2) {System.out.println(1/0);}}return "ok";}

在这里插入图片描述

3.listener-DeadListener

    // 事务@RabbitListener(queues = "yd-tran-q01")public void handler2(String msg) {System.out.println("监听消息"+msg);// 处理业务逻辑 出错了}

在这里插入图片描述

在这里插入图片描述

手动ACK

RabbitMQ怎么防止消息丢失:
1.发送端没有发送过去
解决:
1.用事务
2.confirm消息确认机制 万能:转人工处理
2.MQ服务器丢失,MQ服务器蹦了,
解决:开启持久化
3.消费端消息丢失:
解决:自动应答,改成开始手动ACK

消息确认机制:默认是自动确认

消息的发送和接收是异步

RabbitMQ如何防止消息丢失:
1.
代码:
config-RabbitMqTranConfig

    //消费消息的⼿动应答@Beanpublic Queue createQ4() {return new Queue("yd-ack-q01");}

controller-DeadController

    // 事务@Transactional // 需要开启SpringBoot的事务机制@GetMapping("sendmsg")public String sendMsg(String msg, int count) {// 开启 RabbitMQ的通道的事务template.setChannelTransacted(true);// 发送消息for (int i = 0; i < count; i++) {template.convertAndSend("", "yd-tran-q01", msg + "--" + count);// 出错,看看 事务是否生效if (i==2) {System.out.println(1/0);}}return "ok";}

listener-DeadListener
一般设置个上限,比如最多三次

    @RabbitListener(queues = "yd-ack-q01")public void handler3(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//消费者获取消息,默认采用的自动应答,就是获取就应答,这样MQ服务器就删除消息//还可以手动应答:结果:1.成功(MQ删除)2.失败(MQ消息)System.out.println("收到ACK消息,监听消息:" + msg);//拒绝消息 参数说明:1.消息id 2.结果 true 成功 false 拒绝 3.是否把消息添加回队列中channel.basicNack(tag,false,true);//成功消息 参数说明:1.消息id 2.结果 true 成功 false 拒绝//channel.basicAck(tag,true);// 处理业务逻辑 出错了}

在这里插入图片描述
在这里插入图片描述

消费消息的手动应答:
RabbitMQ默认的消费者消息获取模式采用的是手动应答
但是这种有缺陷,可能会出现,消息获取了但是业务出了问题,导致MQ也自动删除了消息,最终导致业务没有执行
所以为了解决这种问题,可以开启手动应答模式,结合自己的业务执行情况,如果业务执行成功,那么就成功应答,如果失败了,就拒绝消息,同时把消息再加回队列,这样就可以再次消息再次处理(加个上限)
在这里插入图片描述
测试
在这里插入图片描述

RabbitMQ如何保证消息的幂等性:
幂等性就是重复消费。
解决:
1.生成一个全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有小费过。
2.
在这里插入图片描述

用户充值,重复消费,相当如充值了多次,是一定要杜绝的。

在这里插入图片描述

不要返回值的时候,可以用RabbitMQ替代OpenFegin,因为消费完就不回了。MQ默认是单向的
短信发信可以用MQ,这个业务要做,做起来可能会很耗时,中间要经过运营商,过程不可控

RabbitMQ应用场景
消息通信,发送消息和接收消息,是异步
1.实现服务通信
用在微服务中,实现2个服务的通信,这种不带返回值的,只是为了执行另一个服务的方法执行
2.解决耗时操作
比如:邮件、短信、第三方接口等,比较耗时
3.解耦
4.提升性能
5.重复代码封装
6.削峰填谷(订单先下到redis中,再通过MQ和延迟队列,慢慢的从redis搬到mysql中)

关键词:异步、解耦、延迟

相关文章:

任务调度框架-如何实现定时任务+RabbitMQ事务+手动ACK

任务调度框架 Java中如何实现定时任务&#xff1f; 比如&#xff1a; 1.每天早上6点定时执行 2.每月最后一个工作日&#xff0c;考勤统计 3.每个月25号信用卡还款 4.会员生日祝福 5.每隔3秒&#xff0c;自动提醒 10分钟的超时订单的自动取消&#xff0c;每隔30秒或1分钟查询…...

修炼k8s+flink+hdfs+dlink(六:学习k8s)

一&#xff1a;增&#xff08;创建&#xff09;。 直接进行创建。 kubectl run nginx --imagenginx使用yaml清单方式进行创建。 二&#xff1a;删除。 kubectl delete pods/nginx 三&#xff1a;修改。 kubectl exec -it my-nginx – /bin/bash 四&#xff1a;查看。 …...

红队专题-从零开始VC++C/S远程控制软件RAT-MFC-[4]客户端与服务端连接

红队专题 招募六边形战士队员服务端编写新建工程server函数创建主线程类获取配置信息运行command 命令头文件里创建引用win32 类库/头文件startsocket 开始监听 类函数添加类StartSocketmysend/myrecv 设置 m_sockCommon 头文件MSGINFO_S 结构体 ThreadMain头文件runflag 启动 …...

Qt Designer生成ui文件,如何转py文件,如何运行

下面将逐步介绍ui文件如何转py文件&#xff0c;怎么运行的具体操作步骤 ui文件转py文件 1.使用Qt Designer生成ui文件&#xff0c;保存到本地 2.输入 cmd &#xff0c;打开命令行窗口 3.进入ui文件的目录下&#xff0c;文件路径使用你本地存放ui文件的位置 cd /d ui文件路径…...

Python数据挖掘:自动售货机销售数据分析与应用

&#x1f4d5;作者简介&#xff1a;热爱跑步的恒川&#xff0c;致力于C/C、Java、Python等多编程语言&#xff0c;热爱跑步&#xff0c;喜爱音乐的一位博主。 &#x1f4d7;本文收录于恒川的日常汇报系列&#xff0c;大家有兴趣的可以看一看 &#x1f4d8;相关专栏C语言初阶、C…...

【设计模式】设计模式概述

&#x1f600;大家好&#xff0c;我是白晨&#xff0c;一个不是很能熬夜&#x1f62b;&#xff0c;但是也想日更的人✈。如果喜欢这篇文章&#xff0c;点个赞&#x1f44d;&#xff0c;关注一下&#x1f440;白晨吧&#xff01;你的支持就是我最大的动力&#xff01;&#x1f4…...

第六届“中国法研杯”司法人工智能挑战赛进行中!

第六届“中国法研杯”司法人工智能挑战赛 赛题上新&#xff01; 第六届“中国法研杯”司法人工智能挑战赛&#xff08;LAIC2023&#xff09;目前已发布司法大模型数据和服务集成调度 、证据推理、司法大数据征文比赛、案件要素识别四大任务。本届大赛中&#xff0c;“案件要素…...

关于 passing ‘const xx’ as ‘this’ argument of 的错误

今天在写一个简单的函数时&#xff0c;编译时出现了如下的错误&#xff1a; 这个很简单的函数是这样的&#xff1a; struct bundle_set {uint32_t baseId;uint32_t endId;bool operator< (const bundle_set &a){return baseId < a.baseId;} }; 在网上搜索到都是说什…...

数据结构和算法(13):优先级队列

概述 按照事先约定的优先级&#xff0c;可以始终高效查找并访问优先级最高数据项的数据结构&#xff0c;也统称作优先级队列 优先级队列将操作对象限定于当前的全局极值者。 根据数据对象之间相对优先级对其进行访问的方式&#xff0c;与此前的访问方式有着本质区别&#xf…...

面试经典150题——Day15

文章目录 一、题目二、题解 一、题目 135. Candy There are n children standing in a line. Each child is assigned a rating value given in the integer array ratings. You are giving candies to these children subjected to the following requirements: Each chil…...

web APIs——第一天(上)

变量声明的时候建议 const优先&#xff0c;尽量使用const 原因&#xff1a; const语义化更好很多变量我们声明的时候就知道他不会被更改了&#xff0c;那为什么不用const呢&#xff1f;实际开发中也是&#xff0c;比如react框架&#xff0c;基本const如果你有纠结的时候&…...

【Leetcode】215. 数组中的第K个最大元素

一、题目 1、题目描述 给定整数数组 nums 和整数 k,请返回数组中第 k 个最大的元素。 请注意,你需要找的是数组排序后的第 k 个最大的元素,而不是第 k 个不同的元素。 你必须设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例1: 输入: [3,2,1,5,6,4], k = 2 输出…...

服务器数据恢复-RAID5常见故障的数据恢复方案

raid5阵列常见故障&#xff1a; 1、服务器硬件故障或者RAID阵列卡故障&#xff1b; 2、服务器意外断电导致的磁盘阵列故障&#xff1b; 3、服务器RAID阵列阵列磁盘出现物理故障&#xff0c;如&#xff1a;电路板坏、磁头损坏、盘面划伤、坏扇区、固件坏等&#xff1b; 4、误操作…...

12个VIM编辑器的高级玩法

vim 是一个很好用的编辑器&#xff0c;应用十分广泛。但关于 vim&#xff0c;总有一些你不知道的事情&#xff0c;我们需要持续不断的学习。 我经常使用 vim&#xff0c;也经常在各大社区、论坛看到 vim 专家用户分享经验&#xff0c;今天我们就总结其中常用的一部分&#xff…...

⽜客论坛的笔记

项目描述: 一个基本功能完整的论坛项目。项目主要功能有: 基于邮件激活的注册方式&#xff0c;基于MD5加密与加盐的密码存储方式&#xff0c;登录功能加入了随机验证码的验证&#xff0c;实现登陆状态检查、为游客与已登录用户展示不同界面与功能。支持用户上传头像&#xff0c…...

JS逆向分析某枝网的HMAC加密、wasm模块加密

这是我2022年学做JS逆向成功的例子&#xff0c;URL&#xff1a;&#xff08;脱敏处理&#xff09;aHR0cHM6Ly93d3cuZ2R0di5jbi9hdWRpb0NoYW5uZWxEZXRhaWwvOTE 逆向分析&#xff1a; 1、每次XHR的GET请求携带的headers包括&#xff1a; {"X-ITOUCHTV-Ca-Timestamp":…...

论坛介绍|COSCon'23开源商业(V)

众多开源爱好者翘首期盼的开源盛会&#xff1a;第八届中国开源年会&#xff08;COSCon23&#xff09;将于 10月28-29日在四川成都市高新区菁蓉汇举办。本次大会的主题是&#xff1a;“开源&#xff1a;川流不息、山海相映”&#xff01;各位新老朋友们&#xff0c;欢迎到成都&a…...

在word、ppt、excel编辑软件标题栏顶部左上角加入自定义功能:另存为、导出PDF

...

Flink学习笔记(三):Flink四种执行图

文章目录 1、Graph 的概念2、Graph 的演变过程2.1、StreamGraph (数据流图)2.2、JobGraph (作业图)2.3、ExecutionGraph (执行图)2.4、Physical Graph (物理图) 1、Graph 的概念 Flink 中的执行图可以分成四层&#xff1a;StreamGraph -> JobGraph -> ExecutionGraph -&g…...

堆-----数据结构

引言 什么是堆&#xff1f;堆是一种特殊的数据结构&#xff08;用数组表示的树&#xff09;。 为什么要使用到堆&#xff1f;比如一场比赛&#xff0c;如果使用擂台赛的方式来决出冠军&#xff08;实力第一&#xff09;&#xff0c;就很难知道实力第二的队伍是什么了。 但是…...

golang循环变量捕获问题​​

在 Go 语言中&#xff0c;当在循环中启动协程&#xff08;goroutine&#xff09;时&#xff0c;如果在协程闭包中直接引用循环变量&#xff0c;可能会遇到一个常见的陷阱 - ​​循环变量捕获问题​​。让我详细解释一下&#xff1a; 问题背景 看这个代码片段&#xff1a; fo…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

蓝桥杯 冶炼金属

原题目链接 &#x1f527; 冶炼金属转换率推测题解 &#x1f4dc; 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V&#xff0c;是一个正整数&#xff0c;表示每 V V V 个普通金属 O O O 可以冶炼出 …...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

搭建DNS域名解析服务器(正向解析资源文件)

正向解析资源文件 1&#xff09;准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2&#xff09;服务端安装软件&#xff1a;bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...