xxl-job调度任务原理解析
xxljob可以对定时任务进行调度,现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口,spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程:
admin服务端初始化
JobTriggerPoolHelper.java#toStart()方法中会初始化两个调用任务的线程池,快线程池最大线程数为200,慢线程池最大线程数为100。然后启动线程定时轮询需要调度的定时任务。首先计算每秒能处理的定时任务数量,公式为(快线程池的最大线程数+满线程池的最大线程数)*20(1000ms/每个任务处理的时长50ms),最多为6000。从数据库中加锁查出任务触发时间<当前时间+预读时间(5s)的任务,然后分情况处理。
- 当前时间大于任务触发时间+预读时间,即任务触发时间已经过期超过5s,此时不做任何处理,只刷新任务下次触发时间
- 当前时间大于任务触发时间但不超过5s,即任务虽然过期但是过期时间不到5s,此时触发任务,将任务数据保存到ringData
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();,ringData的key是秒数,value是jobid,然后刷新任务的下次触发时间 - 当前时间小于任务触发时间,即还没到任务的触发时间,此时也会将任务写道ringData中,等到期就会进行处理,因为在内存中查询任务比到数据库查询要快很多。
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {boolean preReadSuc = true;try {preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// 1、pre readlong nowTime = System.currentTimeMillis();List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {for (XxlJobInfo jobInfo: scheduleList) {if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);refreshNextValidTime(jobInfo, new Date());if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}
最后判断任务调度状态,TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);,有任务需要调度则下一秒继续扫描,如果没有发现任务则睡眠5s(PRE_READ_MS)。
刚才说到待执行的任务会加入ringData,现在往下看怎么处理ringData的。这里会回退一秒,因为可能出现任务超时的情况,导致任务处理时遗漏。处理的逻辑很简单,到了某秒时,根据秒数取出对应的jobid集合,然后依次处理触发每个任务即可。触发任务的逻辑我们稍微再说。
List<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}if (ringItemData.size() > 0) {for (int jobId: ringItemData) {JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);}ringItemData.clear();}
客户端初始化
客户端创建定时任务只要在bean中添加@XxlJob注解即可,调度任务是通过XxlJobSpringExecutor实现的。过程是到spring容器中获取所有bean,找出对方法使用了@XxlJob的bean,然后使用MethodJobHandler进行封装,注册到jobHandlerRepositoryprivate static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();。
String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Method[] methods = bean.getClass().getDeclaredMethods();for (Method method: methods) {XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class);if (xxlJob != null) {String name = xxlJob.value();method.setAccessible(true);if(xxlJob.init().trim().length() > 0) {initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);}if(xxlJob.destroy().trim().length() > 0) {destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);}registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));}}}
客户端会启动一个netty服务器,xxl-job底层的核心就是netty,监听${xxl.job.executor.port}配置的端口,等待来自服务端的调度。
ServerBootstrap bootstrap = new ServerBootstrap();((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool)});}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();NettyHttpServer.logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort());NettyHttpServer.this.onStarted();future.channel().closeFuture().sync();
服务端触发任务
触发任务是从JobTriggerPoolHelper.java#addTrigger()中开始的。默认是快线程池触发,如果1min内执行时间超过500ms的次数大于10,则改为满线程池。
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}triggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {long minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {minTim = minTim_now;jobTimeoutCountMap.clear();}long cost = System.currentTimeMillis()-start;if (cost > 500) { // ob-timeout threshold 500msAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}
真正执行是在processTrigger()方法中,先根据调度策略获取处理任务的客户端地址,默认是轮询策略。先获取任务id,然后找到任务对应的客户端索引,通过nextInt()方法找到下个索引,再到客户端地址列表中根据索引获取地址。
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){XxlJobLog jobLog = new XxlJobLog();XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);TriggerParam triggerParam = new TriggerParam();String address = null;routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}triggerResult = runExecutor(triggerParam, address);
//轮询策略调度任务private static int count(int jobId) {// cache clearif (System.currentTimeMillis() > CACHE_VALID_TIME) {routeCountEachJob.clear();CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;}// count++Integer count = routeCountEachJob.get(jobId);count = (count==null || count>1000000)?(new Random().nextInt(100)):++count; // 初始化时主动Random一次,缓解首次压力routeCountEachJob.put(jobId, count);return count;}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {String address = addressList.get(count(triggerParam.getJobId())%addressList.size());return new ReturnT<String>(address);}
处理任务时通过proxy进行动态代理,在XxlRpcReferenceBean.class#getObject为调度的定时任务生成了动态代理对象,在InvocationHandler的invoke()方法中实现了逻辑增强,最终到NettyHttpClient#asyncSend()将消息发送到客户端netty服务器。
客户端执行定时任务
客户端是在NettyHttpServerHandler#channelRead0()中处理定时任务的,先对服务器的字节流进行反序列化,在XxlRpcProviderFactory.class#invokeService()以反射方式远程调用ExecutorBizImpl.java#run()方法。
Class<?> serviceClass = serviceBean.getClass();String methodName = xxlRpcRequest.getMethodName();Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();Object[] parameters = xxlRpcRequest.getParameters();Method method = serviceClass.getMethod(methodName, parameterTypes);method.setAccessible(true);Object result = method.invoke(serviceBean, parameters);xxlRpcResponse.setResult(result);
在run方法中启动处理任务的JobThread进行处理,JobThread中就是根据定时任务名获取对应的MethodJobHandler,取出要执行的Method,再反射执行即可。
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
总结下,xxl-job首先在服务端启动线程轮询要执行的定时任务,计算定时任务的触发时间,然后后获取代理对象,将要执行的任务信息通过netty发送到客户端,客户端以反射方式执行定时任务。有不对的地方请大神指出,欢迎大家一起讨论交流,共同进步。
相关文章:
xxl-job调度任务原理解析
xxljob可以对定时任务进行调度,现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口,spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程: admin服务端初始化 JobTriggerPoolHelper.java#toStart()方法…...
实验2 路由器基本配置
实验2 路由器基本配置 一、 原理描述二、 实验目的三、 实验内容四、 实验步骤1.建立实验拓扑2.基础配置3.配置路由器接口IP地址4.查看路由器配置信息5.连通性测试6.使用抓包工具 一、 原理描述 华为设备支持多种配置方式,操作人员要熟悉使用命令行的方式进行设备管…...
docker部署安装整理
centos下安装部署docker 在CentOS下部署Docker,你需要按照以下步骤进行操作: 更新系统: 首先,确保你的CentOS系统是最新的。打开终端,并运行以下命令来更新你的系统: sudo yum update -y安装所需的软件包…...
为什么你明明拥有5年开发经验,但是依然写不出来一份简历?
前端训练营:1v1私教,终身辅导计划,帮你拿到满意的 offer。 已帮助数百位同学拿到了中大厂 offer。欢迎来撩~~~~~~~~ Hello,大家好,我是 Sunday。 在最近不到一年的时间里,我跟上千位同学进行了沟通&#x…...
【ZZULIOJ】1062: 最大公约数(Java)
目录 题目描述 输入 输出 样例输入 Copy 样例输出 Copy 提示 code 题目描述 输入两个不大于10的9次方的正整数,输出其最大公约数。 输入 输入两个正整数m和n,数据之间用空格隔开。 输出 输出一个整数,表示m和n的最大公约数。 样…...
北斗导航 | ARAIM算法的原理和性能测试
===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== ARAIM算法的原理和性能测试 针对高级接收机自主完好性监视(ARAIM)算法…...
elasticsearch7安全配置--最低安全等级,用户名密码
上一篇博客在centos7上安装了elasticsearch7 接下来对elasticsearch进行安全方面的配置 minimal security 最低安全等级,用户名密码 首先开启xpack vim config/elasticsearch.yml xpack.security.enabled: true由于我是单机配置的,还加了如下配置 d…...
项目架构MVC,DDD学习
写在前面 本文一起看下项目架构DDD,MVC相关的内容。 1:MVC 不管我们做什么项目,自己想想其实只是做了三件事,如下: 其实,这三件事完全在一个类中做完也可以可以正常把项目完成的,就像下面这…...
SQLite的PRAGMA 声明
PRAGMA 语句是特定于 SQLite 的 SQL 扩展,用于 修改 SQLite 库的操作或查询 SQLite 库 内部(非表)数据。PRAGMA声明使用相同的 接口作为其他 SQLite 命令(例如 SELECT、INSERT)但 在以下重要方面有所不同: …...
使用ArrayList.removeAll(List list)导致的机器重启
背景 先说一下背景,博主所在的业务组有一个核心系统,需要同步两个不同数据源给过来的数据到redis中,但是每次同步之前需要过滤掉一部分数据,只存储剩下的数据。每次同步的数据与需要过滤掉的数据量级大概在0-100w的数据不等。 由…...
如何在项目中使用uni-ui组件库
1、安装uni-ui npm i dcloudio/uni-ui 2、组件自动引用 配置easycom 使用 npm 安装好 uni-ui 之后,需要配置 easycom 规则,让 npm 安装的组件支持 easycom 打开项目根目录下的 pages.json 并添加 easycom 节点: // pages.json {"e…...
redis的过期策略和内存淘汰机制(redis篇)
分享并学习一下redis的过期策略和内存淘汰机制 在平时的工作或者学习中,即便自己没有实打实的用过redis。但是能有对这方面的思考,再结合一些实际场景和理论,那么我相信自己或者你都会越来越厉害的。 首先,我们需要认清为啥redis要…...
Java中Runnable和Callable有什么不同?(企业真题)
Java中Runnable和Callable有什么不同? 与之前的方式的对比:与Runnable方式的对比的好处 call()可以有返回值,更灵活 call()可以使用throws的方式处理异常,更灵活 Callable使用了泛型参数,可以指明具体的call()的返回值…...
图机器学习导论
图:描述关系数据的通用语言,起源于哥尼斯堡七桥问题 传统的机器学习:数据样本之间独立同分布,简单拟合数据边界,在传统的机器学习中,每个数据样本彼此无关。传统的神经网络,只能处理简单的表格、…...
地推网推拉新平台哪家强?一文清楚告诉你
在当今这个充满副业的时代,地推网推拉新平台的寻找与对接成为了许多人关注的焦点。那么,我们应该如何找到那些既靠谱又有潜力的拉新项目呢? 经过深入研究和全网检索,我为大家盘点了5个值得一试地推网推拉新平台。 尤其是“聚小推…...
Day:004(4) | Python爬虫:高效数据抓取的编程技术(数据解析)
XPath工具 浏览器-元素-CtrlF 浏览器-控制台- $x(表达式) Xpath helper (安装包需要科学上网) 问题 使用离线安装包 出现 程序包无效 解决方案 使用修改安装包的后缀名为 rar,解压文件到一个文件夹,再用 加载文件夹的方式安装即可 安装 python若使用…...
(80) 只出现一次的数字(81)反转字符串
文章目录 1. 每日一言2. (80) 只出现一次的数字2.1 解题思路2.2 代码 3. (81)反转字符串3.1 解题思路3.2 代码 4. 结语 1. 每日一言 生活是一场即兴表演,值得庆幸的是我们总是有所感受,并且将一直感受下去。 2. (80) 只出现一次的数字 题目链接&#x…...
基于拉格朗日分布算法的电动汽车充放电调度MATLAB程序
微❤关注“电气仔推送”获得资料(专享优惠) 程序简介 该模型主要做的是基于拉格朗日分布算法的电动汽车充放电调度模型。利用蒙特卡洛模拟法模拟出电动汽车负荷曲线,并求解出无序充电功率曲线和有序充电曲线,该模型在电动汽车个…...
【Linux 学习】进程优先级和命令行参数!
1. 什么是优先级? 指定进程获取某种资源(CPU)的先后顺序; Linux 中优先级数字越小,优先级越高; 1.1 优先级和权限的区别? 权限 : 能不能做 优先级: 已经能了,但是获…...
Git删除未跟踪的文件Untracked files
在 Git 中,要删除未跟踪的文件(Untracked files),你可以使用 git clean 命令。请注意,这个命令会从你的工作目录中永久删除这些文件,因此在执行之前请确保你不再需要这些文件或已经妥善备份。 以下是如何使…...
KubeSphere 容器平台高可用:环境搭建与可视化操作指南
Linux_k8s篇 欢迎来到Linux的世界,看笔记好好学多敲多打,每个人都是大神! 题目:KubeSphere 容器平台高可用:环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...
龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...
智慧医疗能源事业线深度画像分析(上)
引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...
css实现圆环展示百分比,根据值动态展示所占比例
代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...
Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
