当前位置: 首页 > news >正文

xxl-job调度任务原理解析

xxljob可以对定时任务进行调度,现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口,spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程:

admin服务端初始化

JobTriggerPoolHelper.java#toStart()方法中会初始化两个调用任务的线程池,快线程池最大线程数为200,慢线程池最大线程数为100。然后启动线程定时轮询需要调度的定时任务。首先计算每秒能处理的定时任务数量,公式为(快线程池的最大线程数+满线程池的最大线程数)*20(1000ms/每个任务处理的时长50ms),最多为6000。从数据库中加锁查出任务触发时间<当前时间+预读时间(5s)的任务,然后分情况处理。

  • 当前时间大于任务触发时间+预读时间,即任务触发时间已经过期超过5s,此时不做任何处理,只刷新任务下次触发时间
  • 当前时间大于任务触发时间但不超过5s,即任务虽然过期但是过期时间不到5s,此时触发任务,将任务数据保存到ringDataprivate volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();,ringData的key是秒数,value是jobid,然后刷新任务的下次触发时间
  • 当前时间小于任务触发时间,即还没到任务的触发时间,此时也会将任务写道ringData中,等到期就会进行处理,因为在内存中查询任务比到数据库查询要快很多。
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {boolean preReadSuc = true;try {preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// 1、pre readlong nowTime = System.currentTimeMillis();List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {for (XxlJobInfo jobInfo: scheduleList) {if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);refreshNextValidTime(jobInfo, new Date());if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);pushTimeRing(ringSecond, jobInfo.getId());refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}

最后判断任务调度状态,TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);,有任务需要调度则下一秒继续扫描,如果没有发现任务则睡眠5s(PRE_READ_MS)。
刚才说到待执行的任务会加入ringData,现在往下看怎么处理ringData的。这里会回退一秒,因为可能出现任务超时的情况,导致任务处理时遗漏。处理的逻辑很简单,到了某秒时,根据秒数取出对应的jobid集合,然后依次处理触发每个任务即可。触发任务的逻辑我们稍微再说。

                        List<Integer> ringItemData = new ArrayList<>();int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;for (int i = 0; i < 2; i++) {List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );if (tmpData != null) {ringItemData.addAll(tmpData);}}if (ringItemData.size() > 0) {for (int jobId: ringItemData) {JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);}ringItemData.clear();}

客户端初始化

客户端创建定时任务只要在bean中添加@XxlJob注解即可,调度任务是通过XxlJobSpringExecutor实现的。过程是到spring容器中获取所有bean,找出对方法使用了@XxlJob的bean,然后使用MethodJobHandler进行封装,注册到jobHandlerRepositoryprivate static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();

        String[] beanDefinitionNames = applicationContext.getBeanDefinitionNames();for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Method[] methods = bean.getClass().getDeclaredMethods();for (Method method: methods) {XxlJob xxlJob = AnnotationUtils.findAnnotation(method, XxlJob.class);if (xxlJob != null) {String name = xxlJob.value();method.setAccessible(true);if(xxlJob.init().trim().length() > 0) {initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);}if(xxlJob.destroy().trim().length() > 0) {destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);}registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));}}}

客户端会启动一个netty服务器,xxl-job底层的核心就是netty,监听${xxl.job.executor.port}配置的端口,等待来自服务端的调度。

                    ServerBootstrap bootstrap = new ServerBootstrap();((ServerBootstrap)bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer<SocketChannel>() {public void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, 0L, 90L, TimeUnit.SECONDS)}).addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(5242880)}).addLast(new ChannelHandler[]{new NettyHttpServerHandler(xxlRpcProviderFactory, serverHandlerPool)});}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture future = bootstrap.bind(xxlRpcProviderFactory.getPort()).sync();NettyHttpServer.logger.info(">>>>>>>>>>> xxl-rpc remoting server start success, nettype = {}, port = {}", NettyHttpServer.class.getName(), xxlRpcProviderFactory.getPort());NettyHttpServer.this.onStarted();future.channel().closeFuture().sync();

服务端触发任务

触发任务是从JobTriggerPoolHelper.java#addTrigger()中开始的。默认是快线程池触发,如果1min内执行时间超过500ms的次数大于10,则改为满线程池。

    public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}triggerPool_.execute(new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();try {// do triggerXxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {long minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {minTim = minTim_now;jobTimeoutCountMap.clear();}long cost = System.currentTimeMillis()-start;if (cost > 500) {       // ob-timeout threshold 500msAtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}

真正执行是在processTrigger()方法中,先根据调度策略获取处理任务的客户端地址,默认是轮询策略。先获取任务id,然后找到任务对应的客户端索引,通过nextInt()方法找到下个索引,再到客户端地址列表中根据索引获取地址。

    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){XxlJobLog jobLog = new XxlJobLog();XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);TriggerParam triggerParam = new TriggerParam();String address = null;routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}triggerResult = runExecutor(triggerParam, address);
//轮询策略调度任务private static int count(int jobId) {// cache clearif (System.currentTimeMillis() > CACHE_VALID_TIME) {routeCountEachJob.clear();CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;}// count++Integer count = routeCountEachJob.get(jobId);count = (count==null || count>1000000)?(new Random().nextInt(100)):++count;  // 初始化时主动Random一次,缓解首次压力routeCountEachJob.put(jobId, count);return count;}@Overridepublic ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {String address = addressList.get(count(triggerParam.getJobId())%addressList.size());return new ReturnT<String>(address);} 

处理任务时通过proxy进行动态代理,在XxlRpcReferenceBean.class#getObject为调度的定时任务生成了动态代理对象,在InvocationHandler的invoke()方法中实现了逻辑增强,最终到NettyHttpClient#asyncSend()将消息发送到客户端netty服务器。

客户端执行定时任务

客户端是在NettyHttpServerHandler#channelRead0()中处理定时任务的,先对服务器的字节流进行反序列化,在XxlRpcProviderFactory.class#invokeService()以反射方式远程调用ExecutorBizImpl.java#run()方法。

                Class<?> serviceClass = serviceBean.getClass();String methodName = xxlRpcRequest.getMethodName();Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();Object[] parameters = xxlRpcRequest.getParameters();Method method = serviceClass.getMethod(methodName, parameterTypes);method.setAccessible(true);Object result = method.invoke(serviceBean, parameters);xxlRpcResponse.setResult(result);

在run方法中启动处理任务的JobThread进行处理,JobThread中就是根据定时任务名获取对应的MethodJobHandler,取出要执行的Method,再反射执行即可。

 IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());if (jobThread == null) {jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

总结下,xxl-job首先在服务端启动线程轮询要执行的定时任务,计算定时任务的触发时间,然后后获取代理对象,将要执行的任务信息通过netty发送到客户端,客户端以反射方式执行定时任务。有不对的地方请大神指出,欢迎大家一起讨论交流,共同进步。

相关文章:

xxl-job调度任务原理解析

xxljob可以对定时任务进行调度&#xff0c;现在看下定时任务调度的过程。XxlJobAdminConfig实现了InitializingBean接口&#xff0c;spring会调用afterPropertiesSet()进行初始化。大致有以下几个过程&#xff1a; admin服务端初始化 JobTriggerPoolHelper.java#toStart()方法…...

实验2 路由器基本配置

实验2 路由器基本配置 一、 原理描述二、 实验目的三、 实验内容四、 实验步骤1.建立实验拓扑2.基础配置3.配置路由器接口IP地址4.查看路由器配置信息5.连通性测试6.使用抓包工具 一、 原理描述 华为设备支持多种配置方式&#xff0c;操作人员要熟悉使用命令行的方式进行设备管…...

docker部署安装整理

centos下安装部署docker 在CentOS下部署Docker&#xff0c;你需要按照以下步骤进行操作&#xff1a; 更新系统&#xff1a; 首先&#xff0c;确保你的CentOS系统是最新的。打开终端&#xff0c;并运行以下命令来更新你的系统&#xff1a; sudo yum update -y安装所需的软件包…...

为什么你明明拥有5年开发经验,但是依然写不出来一份简历?

前端训练营&#xff1a;1v1私教&#xff0c;终身辅导计划&#xff0c;帮你拿到满意的 offer。 已帮助数百位同学拿到了中大厂 offer。欢迎来撩~~~~~~~~ Hello&#xff0c;大家好&#xff0c;我是 Sunday。 在最近不到一年的时间里&#xff0c;我跟上千位同学进行了沟通&#x…...

【ZZULIOJ】1062: 最大公约数(Java)

目录 题目描述 输入 输出 样例输入 Copy 样例输出 Copy 提示 code 题目描述 输入两个不大于10的9次方的正整数&#xff0c;输出其最大公约数。 输入 输入两个正整数m和n&#xff0c;数据之间用空格隔开。 输出 输出一个整数&#xff0c;表示m和n的最大公约数。 样…...

北斗导航 | ARAIM算法的原理和性能测试

===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== ARAIM算法的原理和性能测试 针对高级接收机自主完好性监视(ARAIM)算法…...

elasticsearch7安全配置--最低安全等级,用户名密码

上一篇博客在centos7上安装了elasticsearch7 接下来对elasticsearch进行安全方面的配置 minimal security 最低安全等级&#xff0c;用户名密码 首先开启xpack vim config/elasticsearch.yml xpack.security.enabled: true由于我是单机配置的&#xff0c;还加了如下配置 d…...

项目架构MVC,DDD学习

写在前面 本文一起看下项目架构DDD&#xff0c;MVC相关的内容。 1&#xff1a;MVC 不管我们做什么项目&#xff0c;自己想想其实只是做了三件事&#xff0c;如下&#xff1a; 其实&#xff0c;这三件事完全在一个类中做完也可以可以正常把项目完成的&#xff0c;就像下面这…...

SQLite的PRAGMA 声明

PRAGMA 语句是特定于 SQLite 的 SQL 扩展&#xff0c;用于 修改 SQLite 库的操作或查询 SQLite 库 内部&#xff08;非表&#xff09;数据。PRAGMA声明使用相同的 接口作为其他 SQLite 命令&#xff08;例如 SELECT、INSERT&#xff09;但 在以下重要方面有所不同&#xff1a; …...

使用ArrayList.removeAll(List list)导致的机器重启

背景 先说一下背景&#xff0c;博主所在的业务组有一个核心系统&#xff0c;需要同步两个不同数据源给过来的数据到redis中&#xff0c;但是每次同步之前需要过滤掉一部分数据&#xff0c;只存储剩下的数据。每次同步的数据与需要过滤掉的数据量级大概在0-100w的数据不等。 由…...

如何在项目中使用uni-ui组件库

1、安装uni-ui npm i dcloudio/uni-ui 2、组件自动引用 配置easycom 使用 npm 安装好 uni-ui 之后&#xff0c;需要配置 easycom 规则&#xff0c;让 npm 安装的组件支持 easycom 打开项目根目录下的 pages.json 并添加 easycom 节点&#xff1a; // pages.json {"e…...

redis的过期策略和内存淘汰机制(redis篇)

分享并学习一下redis的过期策略和内存淘汰机制 在平时的工作或者学习中&#xff0c;即便自己没有实打实的用过redis。但是能有对这方面的思考&#xff0c;再结合一些实际场景和理论&#xff0c;那么我相信自己或者你都会越来越厉害的。 首先&#xff0c;我们需要认清为啥redis要…...

Java中Runnable和Callable有什么不同?(企业真题)

Java中Runnable和Callable有什么不同&#xff1f; 与之前的方式的对比&#xff1a;与Runnable方式的对比的好处 call()可以有返回值&#xff0c;更灵活 call()可以使用throws的方式处理异常&#xff0c;更灵活 Callable使用了泛型参数&#xff0c;可以指明具体的call()的返回值…...

图机器学习导论

图&#xff1a;描述关系数据的通用语言&#xff0c;起源于哥尼斯堡七桥问题 传统的机器学习&#xff1a;数据样本之间独立同分布&#xff0c;简单拟合数据边界&#xff0c;在传统的机器学习中&#xff0c;每个数据样本彼此无关。传统的神经网络&#xff0c;只能处理简单的表格、…...

地推网推拉新平台哪家强?一文清楚告诉你

在当今这个充满副业的时代&#xff0c;地推网推拉新平台的寻找与对接成为了许多人关注的焦点。那么&#xff0c;我们应该如何找到那些既靠谱又有潜力的拉新项目呢&#xff1f; 经过深入研究和全网检索&#xff0c;我为大家盘点了5个值得一试地推网推拉新平台。 尤其是“聚小推…...

Day:004(4) | Python爬虫:高效数据抓取的编程技术(数据解析)

XPath工具 浏览器-元素-CtrlF 浏览器-控制台- $x(表达式) Xpath helper (安装包需要科学上网) 问题 使用离线安装包 出现 程序包无效 解决方案 使用修改安装包的后缀名为 rar&#xff0c;解压文件到一个文件夹&#xff0c;再用 加载文件夹的方式安装即可 安装 python若使用…...

(80) 只出现一次的数字(81)反转字符串

文章目录 1. 每日一言2. (80) 只出现一次的数字2.1 解题思路2.2 代码 3. (81)反转字符串3.1 解题思路3.2 代码 4. 结语 1. 每日一言 生活是一场即兴表演&#xff0c;值得庆幸的是我们总是有所感受&#xff0c;并且将一直感受下去。 2. (80) 只出现一次的数字 题目链接&#x…...

基于拉格朗日分布算法的电动汽车充放电调度MATLAB程序

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 程序简介 该模型主要做的是基于拉格朗日分布算法的电动汽车充放电调度模型。利用蒙特卡洛模拟法模拟出电动汽车负荷曲线&#xff0c;并求解出无序充电功率曲线和有序充电曲线&#xff0c;该模型在电动汽车个…...

【Linux 学习】进程优先级和命令行参数!

1. 什么是优先级? 指定进程获取某种资源&#xff08;CPU&#xff09;的先后顺序&#xff1b; Linux 中优先级数字越小&#xff0c;优先级越高&#xff1b; 1.1 优先级和权限的区别&#xff1f; 权限 &#xff1a; 能不能做 优先级&#xff1a; 已经能了&#xff0c;但是获…...

Git删除未跟踪的文件Untracked files

在 Git 中&#xff0c;要删除未跟踪的文件&#xff08;Untracked files&#xff09;&#xff0c;你可以使用 git clean 命令。请注意&#xff0c;这个命令会从你的工作目录中永久删除这些文件&#xff0c;因此在执行之前请确保你不再需要这些文件或已经妥善备份。 以下是如何使…...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

1688商品列表API与其他数据源的对接思路

将1688商品列表API与其他数据源对接时&#xff0c;需结合业务场景设计数据流转链路&#xff0c;重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点&#xff1a; 一、核心对接场景与目标 商品数据同步 场景&#xff1a;将1688商品信息…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

佰力博科技与您探讨热释电测量的几种方法

热释电的测量主要涉及热释电系数的测定&#xff0c;这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中&#xff0c;积分电荷法最为常用&#xff0c;其原理是通过测量在电容器上积累的热释电电荷&#xff0c;从而确定热释电系数…...

力扣热题100 k个一组反转链表题解

题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...

Caliper 负载(Workload)详细解析

Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...

智能职业发展系统:AI驱动的职业规划平台技术解析

智能职业发展系统&#xff1a;AI驱动的职业规划平台技术解析 引言&#xff1a;数字时代的职业革命 在当今瞬息万变的就业市场中&#xff0c;传统的职业规划方法已无法满足个人和企业的需求。据统计&#xff0c;全球每年有超过2亿人面临职业转型困境&#xff0c;而企业也因此遭…...