xxl-job调度任务原理解析
xxljob可以对定时任务进行调度,现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口,spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程:
admin服务端初始化
JobTriggerPoolHelper.java#toStart()方法中会初始化两个调用任务的线程池,快线程池最大线程数为200,慢线程池最大线程数为100。然后启动线程定时轮询需要调度的定时任务。首先计算每秒能处理的定时任务数量,公式为(快线程池的最大线程数+满线程池的最大线程数)*20(1000ms/每个任务处理的时长50ms),最多为6000。从数据库中加锁查出任务触发时间<当前时间+预读时间(5s)的任务,然后分情况处理。
- 当前时间大于任务触发时间+预读时间,即任务触发时间已经过期超过5s,此时不做任何处理,只刷新任务下次触发时间
- 当前时间大于任务触发时间但不超过5s,即任务虽然过期但是过期时间不到5s,此时触发任务,将任务数据保存到ringData
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();,ringData的key是秒数,value是jobid,然后刷新任务的下次触发时间 - 当前时间小于任务触发时间,即还没到任务的触发时间,此时也会将任务写道ringData中,等到期就会进行处理,因为在内存中查询任务比到数据库查询要快很多。
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {boolean preReadSuc = true;try {preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// 1、pre readlong nowTime = System.currentTimeMillis();List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {for (XxlJobInfo jobInfo: scheduleList) {if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);refreshNextValidTime(jobInfo, new Date());if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}
最后判断任务调度状态,TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);,有任务需要调度则下一秒继续扫描,如果没有发现任务则睡眠5s(PRE_READ_MS)。
刚才说到待执行的任务会加入ringData,现在往下看怎么处理ringData的。这里会回退一秒,因为可能出现任务超时的情况,导致任务处理时遗漏。处理的逻辑很简单,到了某秒时,根据秒数取出对应的jobid集合,然后依次处理触发每个任务即可。触发任务的逻辑我们稍微再说。
List<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}if (ringItemData.size() > 0) {for (int jobId: ringItemData) {JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);}ringItemData.clear();}
客户端初始化
客户端创建定时任务只要在bean中添加@XxlJob注解即可,调度任务是通过XxlJobSpringExecutor实现的。过程是到spring容器中获取所有bean,找出对方法使用了@XxlJob的bean,然后使用MethodJobHandler进行封装,注册到jobHandlerRepositoryprivate static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();。
String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Method[] methods = bean.getClass().getDeclaredMethods();for (Method method: methods) {XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class);if (xxlJob != null) {String name = xxlJob.value();method.setAccessible(true);if(xxlJob.init().trim().length() > 0) {initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);}if(xxlJob.destroy().trim().length() > 0) {destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);}registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));}}}
客户端会启动一个netty服务器,xxl-job底层的核心就是netty,监听${xxl.job.executor.port}配置的端口,等待来自服务端的调度。
ServerBootstrap bootstrap = new ServerBootstrap();((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool)});}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();NettyHttpServer.logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort());NettyHttpServer.this.onStarted();future.channel().closeFuture().sync();
服务端触发任务
触发任务是从JobTriggerPoolHelper.java#addTrigger()中开始的。默认是快线程池触发,如果1min内执行时间超过500ms的次数大于10,则改为满线程池。
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}triggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {long minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {minTim = minTim_now;jobTimeoutCountMap.clear();}long cost = System.currentTimeMillis()-start;if (cost > 500) { // ob-timeout threshold 500msAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}
真正执行是在processTrigger()方法中,先根据调度策略获取处理任务的客户端地址,默认是轮询策略。先获取任务id,然后找到任务对应的客户端索引,通过nextInt()方法找到下个索引,再到客户端地址列表中根据索引获取地址。
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){XxlJobLog jobLog = new XxlJobLog();XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);TriggerParam triggerParam = new TriggerParam();String address = null;routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}triggerResult = runExecutor(triggerParam, address);
//轮询策略调度任务private static int count(int jobId) {// cache clearif (System.currentTimeMillis() > CACHE_VALID_TIME) {routeCountEachJob.clear();CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;}// count++Integer count = routeCountEachJob.get(jobId);count = (count==null || count>1000000)?(new Random().nextInt(100)):++count; // 初始化时主动Random一次,缓解首次压力routeCountEachJob.put(jobId, count);return count;}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {String address = addressList.get(count(triggerParam.getJobId())%addressList.size());return new ReturnT<String>(address);}
处理任务时通过proxy进行动态代理,在XxlRpcReferenceBean.class#getObject为调度的定时任务生成了动态代理对象,在InvocationHandler的invoke()方法中实现了逻辑增强,最终到NettyHttpClient#asyncSend()将消息发送到客户端netty服务器。
客户端执行定时任务
客户端是在NettyHttpServerHandler#channelRead0()中处理定时任务的,先对服务器的字节流进行反序列化,在XxlRpcProviderFactory.class#invokeService()以反射方式远程调用ExecutorBizImpl.java#run()方法。
Class<?> serviceClass = serviceBean.getClass();String methodName = xxlRpcRequest.getMethodName();Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();Object[] parameters = xxlRpcRequest.getParameters();Method method = serviceClass.getMethod(methodName, parameterTypes);method.setAccessible(true);Object result = method.invoke(serviceBean, parameters);xxlRpcResponse.setResult(result);
在run方法中启动处理任务的JobThread进行处理,JobThread中就是根据定时任务名获取对应的MethodJobHandler,取出要执行的Method,再反射执行即可。
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
总结下,xxl-job首先在服务端启动线程轮询要执行的定时任务,计算定时任务的触发时间,然后后获取代理对象,将要执行的任务信息通过netty发送到客户端,客户端以反射方式执行定时任务。有不对的地方请大神指出,欢迎大家一起讨论交流,共同进步。
相关文章:
xxl-job调度任务原理解析
xxljob可以对定时任务进行调度,现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口,spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程: admin服务端初始化 JobTriggerPoolHelper.java#toStart()方法…...
实验2 路由器基本配置
实验2 路由器基本配置 一、 原理描述二、 实验目的三、 实验内容四、 实验步骤1.建立实验拓扑2.基础配置3.配置路由器接口IP地址4.查看路由器配置信息5.连通性测试6.使用抓包工具 一、 原理描述 华为设备支持多种配置方式,操作人员要熟悉使用命令行的方式进行设备管…...
docker部署安装整理
centos下安装部署docker 在CentOS下部署Docker,你需要按照以下步骤进行操作: 更新系统: 首先,确保你的CentOS系统是最新的。打开终端,并运行以下命令来更新你的系统: sudo yum update -y安装所需的软件包…...
为什么你明明拥有5年开发经验,但是依然写不出来一份简历?
前端训练营:1v1私教,终身辅导计划,帮你拿到满意的 offer。 已帮助数百位同学拿到了中大厂 offer。欢迎来撩~~~~~~~~ Hello,大家好,我是 Sunday。 在最近不到一年的时间里,我跟上千位同学进行了沟通&#x…...
【ZZULIOJ】1062: 最大公约数(Java)
目录 题目描述 输入 输出 样例输入 Copy 样例输出 Copy 提示 code 题目描述 输入两个不大于10的9次方的正整数,输出其最大公约数。 输入 输入两个正整数m和n,数据之间用空格隔开。 输出 输出一个整数,表示m和n的最大公约数。 样…...
北斗导航 | ARAIM算法的原理和性能测试
===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== ARAIM算法的原理和性能测试 针对高级接收机自主完好性监视(ARAIM)算法…...
elasticsearch7安全配置--最低安全等级,用户名密码
上一篇博客在centos7上安装了elasticsearch7 接下来对elasticsearch进行安全方面的配置 minimal security 最低安全等级,用户名密码 首先开启xpack vim config/elasticsearch.yml xpack.security.enabled: true由于我是单机配置的,还加了如下配置 d…...
项目架构MVC,DDD学习
写在前面 本文一起看下项目架构DDD,MVC相关的内容。 1:MVC 不管我们做什么项目,自己想想其实只是做了三件事,如下: 其实,这三件事完全在一个类中做完也可以可以正常把项目完成的,就像下面这…...
SQLite的PRAGMA 声明
PRAGMA 语句是特定于 SQLite 的 SQL 扩展,用于 修改 SQLite 库的操作或查询 SQLite 库 内部(非表)数据。PRAGMA声明使用相同的 接口作为其他 SQLite 命令(例如 SELECT、INSERT)但 在以下重要方面有所不同: …...
使用ArrayList.removeAll(List list)导致的机器重启
背景 先说一下背景,博主所在的业务组有一个核心系统,需要同步两个不同数据源给过来的数据到redis中,但是每次同步之前需要过滤掉一部分数据,只存储剩下的数据。每次同步的数据与需要过滤掉的数据量级大概在0-100w的数据不等。 由…...
如何在项目中使用uni-ui组件库
1、安装uni-ui npm i dcloudio/uni-ui 2、组件自动引用 配置easycom 使用 npm 安装好 uni-ui 之后,需要配置 easycom 规则,让 npm 安装的组件支持 easycom 打开项目根目录下的 pages.json 并添加 easycom 节点: // pages.json {"e…...
redis的过期策略和内存淘汰机制(redis篇)
分享并学习一下redis的过期策略和内存淘汰机制 在平时的工作或者学习中,即便自己没有实打实的用过redis。但是能有对这方面的思考,再结合一些实际场景和理论,那么我相信自己或者你都会越来越厉害的。 首先,我们需要认清为啥redis要…...
Java中Runnable和Callable有什么不同?(企业真题)
Java中Runnable和Callable有什么不同? 与之前的方式的对比:与Runnable方式的对比的好处 call()可以有返回值,更灵活 call()可以使用throws的方式处理异常,更灵活 Callable使用了泛型参数,可以指明具体的call()的返回值…...
图机器学习导论
图:描述关系数据的通用语言,起源于哥尼斯堡七桥问题 传统的机器学习:数据样本之间独立同分布,简单拟合数据边界,在传统的机器学习中,每个数据样本彼此无关。传统的神经网络,只能处理简单的表格、…...
地推网推拉新平台哪家强?一文清楚告诉你
在当今这个充满副业的时代,地推网推拉新平台的寻找与对接成为了许多人关注的焦点。那么,我们应该如何找到那些既靠谱又有潜力的拉新项目呢? 经过深入研究和全网检索,我为大家盘点了5个值得一试地推网推拉新平台。 尤其是“聚小推…...
Day:004(4) | Python爬虫:高效数据抓取的编程技术(数据解析)
XPath工具 浏览器-元素-CtrlF 浏览器-控制台- $x(表达式) Xpath helper (安装包需要科学上网) 问题 使用离线安装包 出现 程序包无效 解决方案 使用修改安装包的后缀名为 rar,解压文件到一个文件夹,再用 加载文件夹的方式安装即可 安装 python若使用…...
(80) 只出现一次的数字(81)反转字符串
文章目录 1. 每日一言2. (80) 只出现一次的数字2.1 解题思路2.2 代码 3. (81)反转字符串3.1 解题思路3.2 代码 4. 结语 1. 每日一言 生活是一场即兴表演,值得庆幸的是我们总是有所感受,并且将一直感受下去。 2. (80) 只出现一次的数字 题目链接&#x…...
基于拉格朗日分布算法的电动汽车充放电调度MATLAB程序
微❤关注“电气仔推送”获得资料(专享优惠) 程序简介 该模型主要做的是基于拉格朗日分布算法的电动汽车充放电调度模型。利用蒙特卡洛模拟法模拟出电动汽车负荷曲线,并求解出无序充电功率曲线和有序充电曲线,该模型在电动汽车个…...
【Linux 学习】进程优先级和命令行参数!
1. 什么是优先级? 指定进程获取某种资源(CPU)的先后顺序; Linux 中优先级数字越小,优先级越高; 1.1 优先级和权限的区别? 权限 : 能不能做 优先级: 已经能了,但是获…...
Git删除未跟踪的文件Untracked files
在 Git 中,要删除未跟踪的文件(Untracked files),你可以使用 git clean 命令。请注意,这个命令会从你的工作目录中永久删除这些文件,因此在执行之前请确保你不再需要这些文件或已经妥善备份。 以下是如何使…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
Mobile ALOHA全身模仿学习
一、题目 Mobile ALOHA:通过低成本全身远程操作学习双手移动操作 传统模仿学习(Imitation Learning)缺点:聚焦与桌面操作,缺乏通用任务所需的移动性和灵活性 本论文优点:(1)在ALOHA…...
【笔记】WSL 中 Rust 安装与测试完整记录
#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统:Ubuntu 24.04 LTS (WSL2)架构:x86_64 (GNU/Linux)Rust 版本:rustc 1.87.0 (2025-05-09)Cargo 版本:cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...
基于鸿蒙(HarmonyOS5)的打车小程序
1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...
Python的__call__ 方法
在 Python 中,__call__ 是一个特殊的魔术方法(magic method),它允许一个类的实例像函数一样被调用。当你在一个对象后面加上 () 并执行时(例如 obj()),Python 会自动调用该对象的 __call__ 方法…...
深度解析云存储:概念、架构与应用实践
在数据爆炸式增长的时代,传统本地存储因容量限制、管理复杂等问题,已难以满足企业和个人的需求。云存储凭借灵活扩展、便捷访问等特性,成为数据存储领域的主流解决方案。从个人照片备份到企业核心数据管理,云存储正重塑数据存储与…...
