xxl-job调度任务原理解析
xxljob可以对定时任务进行调度,现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口,spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程:
admin服务端初始化
JobTriggerPoolHelper.java#toStart()方法中会初始化两个调用任务的线程池,快线程池最大线程数为200,慢线程池最大线程数为100。然后启动线程定时轮询需要调度的定时任务。首先计算每秒能处理的定时任务数量,公式为(快线程池的最大线程数+满线程池的最大线程数)*20(1000ms/每个任务处理的时长50ms),最多为6000。从数据库中加锁查出任务触发时间<当前时间+预读时间(5s)的任务,然后分情况处理。
- 当前时间大于任务触发时间+预读时间,即任务触发时间已经过期超过5s,此时不做任何处理,只刷新任务下次触发时间
- 当前时间大于任务触发时间但不超过5s,即任务虽然过期但是过期时间不到5s,此时触发任务,将任务数据保存到ringData
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();,ringData的key是秒数,value是jobid,然后刷新任务的下次触发时间 - 当前时间小于任务触发时间,即还没到任务的触发时间,此时也会将任务写道ringData中,等到期就会进行处理,因为在内存中查询任务比到数据库查询要快很多。
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {boolean preReadSuc = true;try {preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// 1、pre readlong nowTime = System.currentTimeMillis();List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {for (XxlJobInfo jobInfo: scheduleList) {if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);refreshNextValidTime(jobInfo, new Date());if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}
最后判断任务调度状态,TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);,有任务需要调度则下一秒继续扫描,如果没有发现任务则睡眠5s(PRE_READ_MS)。
刚才说到待执行的任务会加入ringData,现在往下看怎么处理ringData的。这里会回退一秒,因为可能出现任务超时的情况,导致任务处理时遗漏。处理的逻辑很简单,到了某秒时,根据秒数取出对应的jobid集合,然后依次处理触发每个任务即可。触发任务的逻辑我们稍微再说。
List<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}if (ringItemData.size() > 0) {for (int jobId: ringItemData) {JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);}ringItemData.clear();}
客户端初始化
客户端创建定时任务只要在bean中添加@XxlJob注解即可,调度任务是通过XxlJobSpringExecutor实现的。过程是到spring容器中获取所有bean,找出对方法使用了@XxlJob的bean,然后使用MethodJobHandler进行封装,注册到jobHandlerRepositoryprivate static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();。
String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Method[] methods = bean.getClass().getDeclaredMethods();for (Method method: methods) {XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class);if (xxlJob != null) {String name = xxlJob.value();method.setAccessible(true);if(xxlJob.init().trim().length() > 0) {initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);}if(xxlJob.destroy().trim().length() > 0) {destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);}registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));}}}
客户端会启动一个netty服务器,xxl-job底层的核心就是netty,监听${xxl.job.executor.port}配置的端口,等待来自服务端的调度。
ServerBootstrap bootstrap = new ServerBootstrap();((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool)});}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();NettyHttpServer.logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort());NettyHttpServer.this.onStarted();future.channel().closeFuture().sync();
服务端触发任务
触发任务是从JobTriggerPoolHelper.java#addTrigger()中开始的。默认是快线程池触发,如果1min内执行时间超过500ms的次数大于10,则改为满线程池。
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}triggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {long minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {minTim = minTim_now;jobTimeoutCountMap.clear();}long cost = System.currentTimeMillis()-start;if (cost > 500) { // ob-timeout threshold 500msAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}
真正执行是在processTrigger()方法中,先根据调度策略获取处理任务的客户端地址,默认是轮询策略。先获取任务id,然后找到任务对应的客户端索引,通过nextInt()方法找到下个索引,再到客户端地址列表中根据索引获取地址。
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){XxlJobLog jobLog = new XxlJobLog();XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);TriggerParam triggerParam = new TriggerParam();String address = null;routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}triggerResult = runExecutor(triggerParam, address);
//轮询策略调度任务private static int count(int jobId) {// cache clearif (System.currentTimeMillis() > CACHE_VALID_TIME) {routeCountEachJob.clear();CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;}// count++Integer count = routeCountEachJob.get(jobId);count = (count==null || count>1000000)?(new Random().nextInt(100)):++count; // 初始化时主动Random一次,缓解首次压力routeCountEachJob.put(jobId, count);return count;}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {String address = addressList.get(count(triggerParam.getJobId())%addressList.size());return new ReturnT<String>(address);}
处理任务时通过proxy进行动态代理,在XxlRpcReferenceBean.class#getObject为调度的定时任务生成了动态代理对象,在InvocationHandler的invoke()方法中实现了逻辑增强,最终到NettyHttpClient#asyncSend()将消息发送到客户端netty服务器。
客户端执行定时任务
客户端是在NettyHttpServerHandler#channelRead0()中处理定时任务的,先对服务器的字节流进行反序列化,在XxlRpcProviderFactory.class#invokeService()以反射方式远程调用ExecutorBizImpl.java#run()方法。
Class<?> serviceClass = serviceBean.getClass();String methodName = xxlRpcRequest.getMethodName();Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();Object[] parameters = xxlRpcRequest.getParameters();Method method = serviceClass.getMethod(methodName, parameterTypes);method.setAccessible(true);Object result = method.invoke(serviceBean, parameters);xxlRpcResponse.setResult(result);
在run方法中启动处理任务的JobThread进行处理,JobThread中就是根据定时任务名获取对应的MethodJobHandler,取出要执行的Method,再反射执行即可。
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
总结下,xxl-job首先在服务端启动线程轮询要执行的定时任务,计算定时任务的触发时间,然后后获取代理对象,将要执行的任务信息通过netty发送到客户端,客户端以反射方式执行定时任务。有不对的地方请大神指出,欢迎大家一起讨论交流,共同进步。
相关文章:
xxl-job调度任务原理解析
xxljob可以对定时任务进行调度,现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口,spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程: admin服务端初始化 JobTriggerPoolHelper.java#toStart()方法…...
实验2 路由器基本配置
实验2 路由器基本配置 一、 原理描述二、 实验目的三、 实验内容四、 实验步骤1.建立实验拓扑2.基础配置3.配置路由器接口IP地址4.查看路由器配置信息5.连通性测试6.使用抓包工具 一、 原理描述 华为设备支持多种配置方式,操作人员要熟悉使用命令行的方式进行设备管…...
docker部署安装整理
centos下安装部署docker 在CentOS下部署Docker,你需要按照以下步骤进行操作: 更新系统: 首先,确保你的CentOS系统是最新的。打开终端,并运行以下命令来更新你的系统: sudo yum update -y安装所需的软件包…...
为什么你明明拥有5年开发经验,但是依然写不出来一份简历?
前端训练营:1v1私教,终身辅导计划,帮你拿到满意的 offer。 已帮助数百位同学拿到了中大厂 offer。欢迎来撩~~~~~~~~ Hello,大家好,我是 Sunday。 在最近不到一年的时间里,我跟上千位同学进行了沟通&#x…...
【ZZULIOJ】1062: 最大公约数(Java)
目录 题目描述 输入 输出 样例输入 Copy 样例输出 Copy 提示 code 题目描述 输入两个不大于10的9次方的正整数,输出其最大公约数。 输入 输入两个正整数m和n,数据之间用空格隔开。 输出 输出一个整数,表示m和n的最大公约数。 样…...
北斗导航 | ARAIM算法的原理和性能测试
===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== ARAIM算法的原理和性能测试 针对高级接收机自主完好性监视(ARAIM)算法…...
elasticsearch7安全配置--最低安全等级,用户名密码
上一篇博客在centos7上安装了elasticsearch7 接下来对elasticsearch进行安全方面的配置 minimal security 最低安全等级,用户名密码 首先开启xpack vim config/elasticsearch.yml xpack.security.enabled: true由于我是单机配置的,还加了如下配置 d…...
项目架构MVC,DDD学习
写在前面 本文一起看下项目架构DDD,MVC相关的内容。 1:MVC 不管我们做什么项目,自己想想其实只是做了三件事,如下: 其实,这三件事完全在一个类中做完也可以可以正常把项目完成的,就像下面这…...
SQLite的PRAGMA 声明
PRAGMA 语句是特定于 SQLite 的 SQL 扩展,用于 修改 SQLite 库的操作或查询 SQLite 库 内部(非表)数据。PRAGMA声明使用相同的 接口作为其他 SQLite 命令(例如 SELECT、INSERT)但 在以下重要方面有所不同: …...
使用ArrayList.removeAll(List list)导致的机器重启
背景 先说一下背景,博主所在的业务组有一个核心系统,需要同步两个不同数据源给过来的数据到redis中,但是每次同步之前需要过滤掉一部分数据,只存储剩下的数据。每次同步的数据与需要过滤掉的数据量级大概在0-100w的数据不等。 由…...
如何在项目中使用uni-ui组件库
1、安装uni-ui npm i dcloudio/uni-ui 2、组件自动引用 配置easycom 使用 npm 安装好 uni-ui 之后,需要配置 easycom 规则,让 npm 安装的组件支持 easycom 打开项目根目录下的 pages.json 并添加 easycom 节点: // pages.json {"e…...
redis的过期策略和内存淘汰机制(redis篇)
分享并学习一下redis的过期策略和内存淘汰机制 在平时的工作或者学习中,即便自己没有实打实的用过redis。但是能有对这方面的思考,再结合一些实际场景和理论,那么我相信自己或者你都会越来越厉害的。 首先,我们需要认清为啥redis要…...
Java中Runnable和Callable有什么不同?(企业真题)
Java中Runnable和Callable有什么不同? 与之前的方式的对比:与Runnable方式的对比的好处 call()可以有返回值,更灵活 call()可以使用throws的方式处理异常,更灵活 Callable使用了泛型参数,可以指明具体的call()的返回值…...
图机器学习导论
图:描述关系数据的通用语言,起源于哥尼斯堡七桥问题 传统的机器学习:数据样本之间独立同分布,简单拟合数据边界,在传统的机器学习中,每个数据样本彼此无关。传统的神经网络,只能处理简单的表格、…...
地推网推拉新平台哪家强?一文清楚告诉你
在当今这个充满副业的时代,地推网推拉新平台的寻找与对接成为了许多人关注的焦点。那么,我们应该如何找到那些既靠谱又有潜力的拉新项目呢? 经过深入研究和全网检索,我为大家盘点了5个值得一试地推网推拉新平台。 尤其是“聚小推…...
Day:004(4) | Python爬虫:高效数据抓取的编程技术(数据解析)
XPath工具 浏览器-元素-CtrlF 浏览器-控制台- $x(表达式) Xpath helper (安装包需要科学上网) 问题 使用离线安装包 出现 程序包无效 解决方案 使用修改安装包的后缀名为 rar,解压文件到一个文件夹,再用 加载文件夹的方式安装即可 安装 python若使用…...
(80) 只出现一次的数字(81)反转字符串
文章目录 1. 每日一言2. (80) 只出现一次的数字2.1 解题思路2.2 代码 3. (81)反转字符串3.1 解题思路3.2 代码 4. 结语 1. 每日一言 生活是一场即兴表演,值得庆幸的是我们总是有所感受,并且将一直感受下去。 2. (80) 只出现一次的数字 题目链接&#x…...
基于拉格朗日分布算法的电动汽车充放电调度MATLAB程序
微❤关注“电气仔推送”获得资料(专享优惠) 程序简介 该模型主要做的是基于拉格朗日分布算法的电动汽车充放电调度模型。利用蒙特卡洛模拟法模拟出电动汽车负荷曲线,并求解出无序充电功率曲线和有序充电曲线,该模型在电动汽车个…...
【Linux 学习】进程优先级和命令行参数!
1. 什么是优先级? 指定进程获取某种资源(CPU)的先后顺序; Linux 中优先级数字越小,优先级越高; 1.1 优先级和权限的区别? 权限 : 能不能做 优先级: 已经能了,但是获…...
Git删除未跟踪的文件Untracked files
在 Git 中,要删除未跟踪的文件(Untracked files),你可以使用 git clean 命令。请注意,这个命令会从你的工作目录中永久删除这些文件,因此在执行之前请确保你不再需要这些文件或已经妥善备份。 以下是如何使…...
ArcSWAT实战避坑指南 | 从数据库配置到模型运行,详解常见报错与高效解决方案
1. ArcSWAT入门避坑:从安装到首次运行的关键准备 第一次接触ArcSWAT的水文研究者,往往会在安装环节就踩坑。我见过太多人因为版本兼容性问题,导致后续模型根本无法启动。这里分享几个血泪教训: ArcGIS版本选择是首要关键。虽然官方…...
零成本实现3D模型跨平台迁移:Blender到Unreal Engine的无缝解决方案
零成本实现3D模型跨平台迁移:Blender到Unreal Engine的无缝解决方案 【免费下载链接】bl_datasmith Blender addon to export UE4 Datasmith format 项目地址: https://gitcode.com/gh_mirrors/bl/bl_datasmith 你是否曾遇到这样的困境:在Blender…...
零基础图解VLN视觉语言导航:从输入到决策的完整模型拆解
1. 视觉语言导航(VLN)是什么? 想象你第一次去朋友家做客,对方在电话里说:“进门左转,看到红色沙发后直走,右手边第二个房间就是。”这时候你的大脑会做三件事:用眼睛观察环境&#x…...
BilibiliDown:你的专属B站视频管家,轻松下载与管理海量内容
BilibiliDown:你的专属B站视频管家,轻松下载与管理海量内容 【免费下载链接】BilibiliDown (GUI-多平台支持) B站 哔哩哔哩 视频下载器。支持稍后再看、收藏夹、UP主视频批量下载|Bilibili Video Downloader 😳 项目地址: https://gitcode.…...
毕业生就业新趋势:央国企成首选“避风港”
据教育部数据显示,2024届全国普通高校毕业生规模预计达到1179万人,创历史新高。在宏观经济环境面临挑战、部分行业竞争加剧的背景下,庞大的毕业生群体面临着前所未有的就业压力。与此同时,一个显著的趋势正在显现:中央…...
红外遥控技术原理与实现方案详解
红外遥控技术原理与实现方案1. 红外遥控技术概述红外遥控技术是一种利用红外光波进行短距离无线通信的技术方案,主要应用于家电控制领域。该技术通过调制红外光波来传输控制信号,具有成本低、实现简单、抗干扰能力强等特点。1.1 技术特点与应用场景红外遥…...
DeepSeek-OCR 2技术突破:动态视觉token重排效果展示
DeepSeek-OCR 2技术突破:动态视觉token重排效果展示 1. 引言 想象一下,当你阅读一份复杂的学术论文时,眼睛不会机械地从左上角扫到右下角,而是会自然地跳过标题、关注图表、追踪公式推导,甚至在不同的文本栏之间灵活…...
VIIRS在灾害监测中的实战应用:以洪水检测为例的Python代码解析
VIIRS在灾害监测中的实战应用:以洪水检测为例的Python代码解析 当洪水席卷城镇时,每一分钟的响应延迟都可能意味着更多生命财产的损失。VIIRS(可见光红外成像辐射计套件)作为NASA灾害监测系统的"鹰眼",其375…...
2026电商客服外包TOP5实力品牌详细解读
进入2026年,电商行业已从粗放式扩张转向精细化运营时代,客户服务不再局限于简单的问答回复,而是成为驱动店铺销售增长、积累品牌声誉的关键要素。根据最新行业研究报告,专业的外包客服团队能够帮助店铺将询单转化率提高20%-30%&am…...
OpenClaw技能市场巡礼:Qwen3-32B生态的十大实用工具
OpenClaw技能市场巡礼:Qwen3-32B生态的十大实用工具 1. 为什么需要关注OpenClaw技能市场? 第一次接触OpenClaw时,我被它"让AI直接操作电脑"的理念震撼了。但真正让我决定长期使用的,却是它背后那个不断壮大的技能市场…...
