当前位置: 首页 > news >正文

手撸XXL-JOB(二)——定时任务管理

在上一节中,我们介绍了SpringBoot中关于定时任务的执行方式,以及ScheduledExecutorService接口提供的定时任务执行方法。假设我们现在要写类似XXL-JOB这样的任务调度平台,那么,对于任务的管理,是尤为重要的。接下来我们将一步一步,实现一个任务调度管理类。

YangJobManager类基础实现

假设我们现在的任务管理类,名为YangJobManager类。对于定时任务的执行,我们最终会调用到ScheduledExecutorService的相关方法,因此,我们的YangJobManager类,需要有ScheduledExecutorService属性,其次,我们希望能对要执行的定时线程任务,其命名进行修改,因此,我们需要有一个线程工厂的属性。基于上述两点,我们对YangJobManager类进行实现:

package com.yang.job;import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;public class YangJobManager {private ScheduledExecutorService scheduledExecutorService;private ThreadFactory threadFactory;public YangJobManager(ScheduledExecutorService scheduledExecutorService, ThreadFactory threadFactory) {this.scheduledExecutorService = scheduledExecutorService;this.threadFactory = threadFactory;}public void schedule(Runnable runnable, Long delay) {Thread thread = threadFactory.newThread(runnable);scheduledExecutorService.schedule(thread, delay, TimeUnit.SECONDS);}public void scheduleWithFixedDelay(Runnable runnable, Long delay, Long period) {Thread thread = threadFactory.newThread(runnable);scheduledExecutorService.scheduleWithFixedDelay(thread, delay, period, TimeUnit.SECONDS);}public void scheduleWithFixedRate(Runnable runnable, Long delay, Long period) {Thread thread = threadFactory.newThread(runnable);scheduledExecutorService.scheduleAtFixedRate(thread, delay, period, TimeUnit.SECONDS);}public void shutdown() {if (this.scheduledExecutorService == null) {return;}if (this.scheduledExecutorService.isShutdown()) {return;}scheduledExecutorService.shutdown();try {if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {scheduledExecutorService.shutdownNow();}} catch (InterruptedException e) {e.printStackTrace();}}
}

然后,我们实现YangJobThreadFactory,完成对线程的命名

public class YangJobThreadFactory implements ThreadFactory {private String poolName;private String threadPrefixName;private static AtomicInteger poolNumber = new AtomicInteger(1);private AtomicInteger threadNumber = new AtomicInteger(1);public YangJobThreadFactory(String poolName) {this.poolName = poolName;this.threadPrefixName = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";}public String getPoolName() {return this.poolName;}@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName(this.threadPrefixName + threadNumber.getAndIncrement());return thread;}}

然后我们添加测试方法:

 public static void main(String[] args) {ThreadFactory threadFactory = new YangJobThreadFactory("yang");ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);YangJobManager yangJobManager = new YangJobManager(scheduledExecutorService, threadFactory);yangJobManager.schedule(() -> {System.out.println(Thread.currentThread().getName() + "schedule定时任务开始执行:" + new Date());}, 1L);yangJobManager.scheduleWithFixedDelay(() -> {System.out.println(Thread.currentThread().getName() + "withFixedDelay定时任务开始执行:" + new Date());}, 0L, 1L);yangJobManager.scheduleWithFixedRate(() -> {System.out.println(Thread.currentThread().getName() + "withFixedRate定时任务开始执行:" + new Date());}, 0L, 1L);try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}yangJobManager.shutdown();}

执行结果如下:
image.png

提供统一的schedule方法

虽然我们能顺利将任务提交给YangJobManager执行,当感觉还不够收敛,因为我们创建了三个方法:schedule,scheduleWithFixedDelay, shceduleWithFixedRate,每个方法执行逻辑都差不多,最后都是调用scheduledExecutorService的相关方法,我们可以将这些方法都收敛到一个入口——schedule,然后在入参中添加一个参数,表示要执行的策略,根据入参的参数,选择对应的方法执行。
首先,我们添加一个执行策略枚举:

package com.yang.job.enums;public enum JobExecuteStrategyEnum {IMMEDIATE_EXECUTE("immediate", "立即执行"),ONCE("once", "执行一次"),WITH_FIXED_DELAY("withFixedDelay", "任务执行完毕后间隔执行"),WITH_FIXED_RATE("withFixedRate", "任务执行开始后间隔执行");private String name;private String description;JobExecuteStrategyEnum(String name, String description) {this.name = name;this.description = description;}public String getName() {return this.name;}public static JobExecuteStrategyEnum getJobExecuteStrategyByName(String name) {if (name == null) {return null;}for (JobExecuteStrategyEnum value : values()) {if (name.equals(value.getName())) {return value;}}return null;}public static boolean isLegal(String name) {JobExecuteStrategyEnum jobExecuteStrategyByName = getJobExecuteStrategyByName(name);return jobExecuteStrategyByName != null;}public String getDescription() {return description;}
}

然后添加YangJobManager的schedule方法的入参类:

package com.yang.job.request;import com.yang.job.enums.JobExecuteStrategyEnum;
import lombok.Data;import java.io.Serializable;@Data
public class YangJobSubmitParam implements Serializable {private Runnable runnable;private Integer initialDelay;private Integer period;private JobExecuteStrategyEnum jobExecuteStrategy;
}

最后,修改YangJobManager类,将执行定时任务收敛到schedule方法,进入该方法,首先根据入参判断执行策略,如果是immediate,那么直接对入参的runnable调用run方法执行接口,其他的策略则分别对应scheduledExecutorService的schedule、scheduledWithFixedDelay、scheduledWithFixedRate方法,此外,这里对属性也进行修改,去除ThreadFactory属性。

package com.yang.job;import com.yang.job.enums.JobExecuteStrategyEnum;
import com.yang.job.request.YangJobSubmitParam;import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class YangJobManager {private ScheduledExecutorService scheduledExecutorService;public YangJobManager(ScheduledExecutorService scheduledExecutorService) {this.scheduledExecutorService = scheduledExecutorService;}public void schedule(YangJobSubmitParam yangJobSubmitParam) {JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();if (jobExecuteStrategy == null) {throw new RuntimeException("缺少执行策略=========");}Runnable runnable = yangJobSubmitParam.getRunnable();Integer initialDelay = yangJobSubmitParam.getInitialDelay();Integer period = yangJobSubmitParam.getPeriod();switch (jobExecuteStrategy) {case IMMEDIATE_EXECUTE:runnable.run();break;case ONCE:scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.SECONDS);break;case WITH_FIXED_DELAY:scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, TimeUnit.SECONDS);break;case WITH_FIXED_RATE:scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);break;}}public void shutdown() {if (this.scheduledExecutorService == null) {return;}if (this.scheduledExecutorService.isShutdown()) {return;}scheduledExecutorService.shutdown();try {if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {scheduledExecutorService.shutdownNow();}} catch (InterruptedException e) {e.printStackTrace();}}
}

最后,我们添加测试方法:

public static void main(String[] args) {ThreadFactory threadFactory = new YangJobThreadFactory("yang");ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);YangJobManager yangJobManager = new YangJobManager(scheduledExecutorService);YangJobSubmitParam yangJobSubmitParam1 = new YangJobSubmitParam();yangJobSubmitParam1.setRunnable(() -> System.out.println("立即执行======" + new Date()));yangJobSubmitParam1.setJobExecuteStrategy(JobExecuteStrategyEnum.IMMEDIATE_EXECUTE);YangJobSubmitParam yangJobSubmitParam2 = new YangJobSubmitParam();yangJobSubmitParam2.setRunnable(() -> System.out.println("执行一次======" + new Date()));yangJobSubmitParam2.setInitialDelay(1);yangJobSubmitParam2.setJobExecuteStrategy(JobExecuteStrategyEnum.ONCE);YangJobSubmitParam yangJobSubmitParam3 = new YangJobSubmitParam();yangJobSubmitParam3.setRunnable(() -> System.out.println("withFixedDelay=====" + new Date()));yangJobSubmitParam3.setInitialDelay(1);yangJobSubmitParam3.setPeriod(2);yangJobSubmitParam3.setJobExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_DELAY);YangJobSubmitParam yangJobSubmitParam4 = new YangJobSubmitParam();yangJobSubmitParam4.setRunnable(() -> System.out.println("withFixedRate=====" + new Date()));yangJobSubmitParam4.setInitialDelay(1);yangJobSubmitParam4.setPeriod(2);yangJobSubmitParam4.setJobExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_RATE);yangJobManager.schedule(yangJobSubmitParam1);yangJobManager.schedule(yangJobSubmitParam2);yangJobManager.schedule(yangJobSubmitParam3);yangJobManager.schedule(yangJobSubmitParam4);try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}yangJobManager.shutdown();}

执行结果如下:
image.png

提交任务和取消任务

任务的提交对应的是schedule方法,但我们的YangJobManager类缺少了关于任务的取消逻辑。在ScheduledExecutorService的各个定时执行方法中,其返回值是一个ScheduleFuture类,我们可以通过该类的cancel方法,来将对应的线程任务进行取消。此外,对于每一个任务,我们需要有一个任务标识,所以,我们先修改YangJobSubmitParam类:

package com.yang.job.request;import com.yang.job.enums.JobExecuteStrategyEnum;
import lombok.Data;import java.io.Serializable;@Data
public class YangJobSubmitParam implements Serializable {private Integer jobId;private Runnable runnable;private Integer initialDelay;private Integer period;private JobExecuteStrategyEnum jobExecuteStrategy;
}

然后,我们修改YangJobManager类,首先将schedule方法改为submit方法,这样更见名知义,在submit方法中,除了理解执行策略外,其他策略都会获取返回的ScheduleFuture,然后存入对应的map,在取消的时候,我们根据jobId从map中找到对应的ScheduleFuture,并执行cancel方法,以此来取消任务。

package com.yang.job;import com.yang.job.enums.JobExecuteStrategyEnum;
import com.yang.job.request.YangJobSubmitParam;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;public class YangJobManager {private ScheduledExecutorService scheduledExecutorService;private Map<String, ScheduledFuture> jobId2ScheduleFutureMap = new ConcurrentHashMap<>();public YangJobManager(ScheduledExecutorService scheduledExecutorService) {this.scheduledExecutorService = scheduledExecutorService;}public void submitJob(YangJobSubmitParam yangJobSubmitParam) {Integer jobId = yangJobSubmitParam.getJobId();if (jobId == null) {throw new RuntimeException("缺少任务标识=========");}ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());if (scheduledFuture != null && !scheduledFuture.isCancelled()) {// jobId存在对应的任务return;}JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();if (jobExecuteStrategy == null) {throw new RuntimeException("缺少执行策略=========");}if (jobExecuteStrategy == JobExecuteStrategyEnum.IMMEDIATE_EXECUTE) {yangJobSubmitParam.getRunnable().run();return;}scheduledFuture = scheduleJob(yangJobSubmitParam);jobId2ScheduleFutureMap.put(jobId.toString(), scheduledFuture);}public void cancelJob(Integer jobId) {if (jobId == null) {return;}ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());if (scheduledFuture == null) {return;}if (!scheduledFuture.isCancelled()) {scheduledFuture.cancel(true);}jobId2ScheduleFutureMap.remove(jobId.toString());}private ScheduledFuture scheduleJob(YangJobSubmitParam yangJobSubmitParam) {Runnable runnable = yangJobSubmitParam.getRunnable();Integer initialDelay = yangJobSubmitParam.getInitialDelay();Integer period = yangJobSubmitParam.getPeriod();JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();switch (jobExecuteStrategy) {case ONCE:return scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.SECONDS);case WITH_FIXED_DELAY:return scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, TimeUnit.SECONDS);case WITH_FIXED_RATE:return scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);}throw new RuntimeException("执行策略有误===========");}public void shutdown() {if (this.scheduledExecutorService == null) {return;}if (this.scheduledExecutorService.isShutdown()) {return;}scheduledExecutorService.shutdown();try {if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {scheduledExecutorService.shutdownNow();}} catch (InterruptedException e) {e.printStackTrace();}}
}

最后,我们添加对应的测试方法:

 public static void main(String[] args) {ThreadFactory threadFactory = new YangJobThreadFactory("yang");ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);YangJobManager yangJobManager = new YangJobManager(scheduledExecutorService);YangJobSubmitParam yangJobSubmitParam = new YangJobSubmitParam();yangJobSubmitParam.setJobId(1);yangJobSubmitParam.setRunnable(() -> System.out.println("执行任务=====" + new Date()));yangJobSubmitParam.setInitialDelay(0);yangJobSubmitParam.setPeriod(2);yangJobSubmitParam.setJobExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_RATE);yangJobManager.submitJob(yangJobSubmitParam);try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("取消任务==========");yangJobManager.cancelJob(1);try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}yangJobManager.shutdown();}

在该方法中,我们提交任务,该任务间隔时间为2秒,10秒过后,取消任务,取消任务过后,再睡眠10秒,在后面10秒钟,不会执行任务(或执行一次,因为在cancel之前刚好有任务没执行完),执行结果如下:
image.png

YangJobManager建造者

对于YangJobManager,目前我们所拥有的属性、方法都比较简单,但是如果后续这个类进一步扩展,构造该类可能会变得很麻烦,因此,我们添加一个YangJobBuilder建造者类,用于构造YangJobManager,此外,我们将YangJobManager的构造方法设置为private,从而将构造YangJobManager的职责,彻底收敛到YangJobManagerBuilder类中,我们修改YangJobManager类如下:

package com.yang.job;import com.yang.job.enums.JobExecuteStrategyEnum;
import com.yang.job.factory.YangJobThreadFactory;
import com.yang.job.request.YangJobSubmitParam;import java.util.Map;
import java.util.concurrent.*;public class YangJobManager {private ScheduledExecutorService scheduledExecutorService;private Map<String, ScheduledFuture> jobId2ScheduleFutureMap = new ConcurrentHashMap<>();private YangJobManager(ScheduledExecutorService scheduledExecutorService) {this.scheduledExecutorService = scheduledExecutorService;}public void submitJob(YangJobSubmitParam yangJobSubmitParam) {Integer jobId = yangJobSubmitParam.getJobId();if (jobId == null) {throw new RuntimeException("缺少任务标识=========");}ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());if (scheduledFuture != null && !scheduledFuture.isCancelled()) {// jobId存在对应的任务return;}JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();if (jobExecuteStrategy == null) {throw new RuntimeException("缺少执行策略=========");}if (jobExecuteStrategy == JobExecuteStrategyEnum.IMMEDIATE_EXECUTE) {yangJobSubmitParam.getRunnable().run();return;}scheduledFuture = scheduleJob(yangJobSubmitParam);jobId2ScheduleFutureMap.put(jobId.toString(), scheduledFuture);}public void cancelJob(Integer jobId) {if (jobId == null) {return;}ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());if (scheduledFuture == null) {return;}if (!scheduledFuture.isCancelled()) {scheduledFuture.cancel(true);}jobId2ScheduleFutureMap.remove(jobId.toString());}private ScheduledFuture scheduleJob(YangJobSubmitParam yangJobSubmitParam) {Runnable runnable = yangJobSubmitParam.getRunnable();Integer initialDelay = yangJobSubmitParam.getInitialDelay();Integer period = yangJobSubmitParam.getPeriod();JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();switch (jobExecuteStrategy) {case ONCE:return scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.SECONDS);case WITH_FIXED_DELAY:return scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, TimeUnit.SECONDS);case WITH_FIXED_RATE:return scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);}throw new RuntimeException("执行策略有误===========");}public void shutdown() {if (this.scheduledExecutorService == null) {return;}if (this.scheduledExecutorService.isShutdown()) {return;}scheduledExecutorService.shutdown();try {if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {scheduledExecutorService.shutdownNow();}} catch (InterruptedException e) {e.printStackTrace();}}public static class YangJobManagerBuilder {private ThreadFactory threadFactory;private ScheduledExecutorService scheduledExecutorService;public YangJobManagerBuilder() {}public YangJobManagerBuilder setThreadFactory(ThreadFactory threadFactory) {this.threadFactory = threadFactory;return this;}public YangJobManagerBuilder setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {this.scheduledExecutorService = scheduledExecutorService;return this;}public YangJobManager build() {if (this.threadFactory == null) {this.threadFactory = new YangJobThreadFactory("yang");}if (this.scheduledExecutorService == null) {this.scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),this.threadFactory);} else {if (this.scheduledExecutorService instanceof ScheduledThreadPoolExecutor) {ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) this.scheduledExecutorService;scheduledThreadPoolExecutor.setThreadFactory(this.threadFactory);}}return new YangJobManager(this.scheduledExecutorService);}}
}

任务执行类

在之前的代码中,我们的Runnable都是匿名函数类,但是在我们的定时任务调度平台中,一般情况下,这个任务是会持久化到数据库中的,我们一般不会说把这个Runnable的代码也存到数据库吧,一般存储的,应该就是某个任务执行类的类路径,和方法名,以及入参,然后在启动项目时,从数据库中加载这些数据,并通过反射或代理等方式,来构造这个Runnable。
首先,我们定义一个任务执行类,来规范任务的执行方法和入参格式:

// 任务执行类
package com.yang.job.execute;public interface IYangJobExecutor {void execute(YangJobExecuteRequest yangJobExecuteRequest);
}// 任务执行方法入参
package com.yang.job.execute;import lombok.Data;import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;@Data
public class YangJobExecuteRequest implements Serializable {private String jobId;private Map<String, String> params = new HashMap<>();public void addParam(String key, String value) {params.put(key, value);}public String getParam(String key) {return params.get(key);}
}

接着,我们创建这个YangJobExecutor的实现类,用于测试,在该类中,执行任务的方法很简单,打印当前类的名字以及入参。

package com.yang.task;import com.yang.job.execute.IYangJobExecutor;
import com.yang.job.execute.YangJobExecuteRequest;import java.util.Date;public class TestJobExecutor implements IYangJobExecutor {@Overridepublic void execute(YangJobExecuteRequest yangJobExecuteRequest) {System.out.println(String.format("%s 任务执行类执行了,入参为:%s, 当前时间:%s",this.getClass().getName(), yangJobExecuteRequest.toString(),new Date().toString()));}
}

然后我们创建一个YangJobData,假设我们从数据库中获取的数据格式如下:

package com.yang.job.data;import lombok.Data;import java.io.Serializable;@Data
public class YangJobData implements Serializable {private Integer jobId;private String cron;private String executeStrategy;private String executeClassPath;private String executeParams;
}

executeStrategy表示任务的执行策略,executeClassPath表示要执行的任务类的路径,executeParams表示执行任务方法的入参。
在XXL-JOB中,我们可以使用cron来设置定时任务的执行时间,因此我们这里,也使用cron作为定时任务的执行时间设置,为了解析cron表达式,我们添加下列依赖:

  <dependency><groupId>com.cronutils</groupId><artifactId>cron-utils</artifactId><version>9.2.0</version></dependency>

然后创建一个CronUtils工具类,用于解析cron表达式。

package com.yang.demo.infra.utils;import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinition;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;import java.time.ZonedDateTime;
import java.util.Optional;public class CronUtils {private static final CronDefinition CRON_DEFINITION = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ);private static final CronParser CRON_PARSER = new CronParser(CRON_DEFINITION);public static ZonedDateTime nextExecutionTime(String cron, ZonedDateTime startTime) {ExecutionTime executionTime = ExecutionTime.forCron(CRON_PARSER.parse(cron));Optional<ZonedDateTime> zonedDateTime = executionTime.nextExecution(startTime);return zonedDateTime.get();}
}

对于执行方法的入参,一般情况下,就是任务的id,以及一些扩展信息,这些扩展信息一般以键值对的形式存储,即"key:value;key:value;"这些形式,所以这里添加一个FeaturesUtils类,用于解析这些键值对信息:

package com.yang.job.utils;import java.util.HashMap;
import java.util.Map;public class FeaturesUtils {private final static String KEY_KEY_SEPARATOR = ";";private final static String KEY_VALUE_SEPARATOR = ":";public static Map<String, String> convert2FeatureMap(String features) {Map<String, String> featureMap = new HashMap<>();if (features == null || features.isEmpty()) {return featureMap;}String[] keyValues = features.split(KEY_KEY_SEPARATOR);for (String keyValue : keyValues) {String[] split = keyValue.split(KEY_VALUE_SEPARATOR);String key = split[0];String value = split[1];featureMap.put(key, value);}return featureMap;}public static String convert2Features(Map<String, String> featureMap) {if (featureMap == null || featureMap.isEmpty()) {return "";}StringBuilder stringBuilder = new StringBuilder();featureMap.forEach((key, value) -> {stringBuilder.append(key).append(KEY_VALUE_SEPARATOR).append(value).append(KEY_KEY_SEPARATOR);});return stringBuilder.toString();}
}

然后我们添加测试方法,模拟从数据库中获取数据,并根据任务类路径,获取对应的runnable并提交到YangJobManager中。

  public static void main(String[] args) {YangJobData yangJobData = mockYangJobData();YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam(yangJobData);YangJobManager yangJobManager = new YangJobManager.YangJobManagerBuilder().setThreadFactory(new YangJobThreadFactory("yang")).build();yangJobManager.submitJob(yangJobSubmitParam);try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}yangJobManager.shutdown();}private static YangJobSubmitParam convert2YangJobSubmitParam(YangJobData yangJobData) {YangJobSubmitParam yangJobSubmitParam = new YangJobSubmitParam();yangJobSubmitParam.setJobId(yangJobData.getJobId());yangJobSubmitParam.setJobExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(yangJobData.getExecuteStrategy()));ZonedDateTime nextExecutionTime = CronUtils.nextExecutionTime(yangJobData.getCron(), ZonedDateTime.now());ZonedDateTime nextNextExecutionTime = CronUtils.nextExecutionTime(yangJobData.getCron(), nextExecutionTime);long nowEochMill = ZonedDateTime.now().toInstant().toEpochMilli();long executeEochMill = nextExecutionTime.toInstant().toEpochMilli();long secondExecuteEochMill = nextNextExecutionTime.toInstant().toEpochMilli();yangJobSubmitParam.setInitialDelay((int)(executeEochMill - nowEochMill) / 1000);yangJobSubmitParam.setPeriod((int)(secondExecuteEochMill - executeEochMill) / 1000);try {Class<?> aClass = Class.forName(yangJobData.getExecuteClassPath());if (!IYangJobExecutor.class.isAssignableFrom(aClass)) {throw new RuntimeException("任务类必须实现IYangJobExecutor接口");}IYangJobExecutor executor = (IYangJobExecutor) aClass.newInstance();YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobData);Runnable runnable = () -> executor.execute(yangJobExecuteRequest);yangJobSubmitParam.setRunnable(runnable);} catch (InstantiationException | IllegalAccessException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}return yangJobSubmitParam;}private static YangJobExecuteRequest convert2YangJobExecuteRequest(YangJobData yangJobData) {YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();yangJobExecuteRequest.setJobId(yangJobData.getJobId().toString());yangJobExecuteRequest.setParams(FeaturesUtils.convert2FeatureMap(yangJobData.getExecuteParams()));return yangJobExecuteRequest;}private static YangJobData mockYangJobData() {YangJobData yangJobData = new YangJobData();yangJobData.setJobId(1);yangJobData.setCron("0/5 * * * * ?");yangJobData.setExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_DELAY.getName());yangJobData.setExecuteClassPath("com.yang.task.TestJobExecutor");yangJobData.setExecuteParams("jobId:1;startIndex:1;endIndex:10;");return yangJobData;}

这里对于cron的解析,其实不是特别好,这里的思路是,获取下一次执行的时间,和下下一次执行的时间,然后以此来计算initialDelay和period,但是如果这个cron表示的是某几天、某几个小时,比如说星期一、星期二、星期三执行,那么我们那种解析方式是有误的,这个可以后续再好好斟酌一下,目前先这样解析。
执行结果如下:
image.png

相关文章:

手撸XXL-JOB(二)——定时任务管理

在上一节中&#xff0c;我们介绍了SpringBoot中关于定时任务的执行方式&#xff0c;以及ScheduledExecutorService接口提供的定时任务执行方法。假设我们现在要写类似XXL-JOB这样的任务调度平台&#xff0c;那么&#xff0c;对于任务的管理&#xff0c;是尤为重要的。接下来我们…...

DEV--C++小游戏(吃星星(0.2))

目录 吃星星&#xff08;0.2&#xff09; 简介 本次更新 分部代码 头文件&#xff08;增&#xff09; 命名空间变量&#xff08;增&#xff09; 副函数&#xff08;新&#xff0c;增&#xff09; 清屏函数 打印地图函数&#xff08;增&#xff09; 移动函数 选择颜色…...

Lua 协程池

协程池 在 使用 Lua 协程模拟 Golang 的 go defer 编程模式 中介绍了 Lua 协程的使用&#xff0c;模仿 golang 封装了下 还可以做进一步的优化 原来的 go 函数是这样实现的&#xff1a; function go(_co_task)local co coroutine.create(function(_co_wrap)_co_task(_co_w…...

[Linux][网络][协议技术][DNS][ICMP][ping][traceroute][NAT]详细讲解

目录 1.DNS1.DNS背景2.域名简介 2.ICMP协议1.ICMP功能2.ICMP两类报文 3.ping命令4.traceroute5.NAT技术1.NAT技术背景2.NAT IP转换过程3.静态地址NAT && 动态地址NAT4.网络地址端口转换NAPT5.NAT技术的缺陷6.NAT和代理服务器 6.总结1.数据链路层2.网络层3.传输层4.应用…...

Android 集成Bugly完成线上的异常Exception收集及处理

文章目录 &#xff08;一&#xff09;添加产品APP&#xff08;二&#xff09;集成SDK&#xff08;三&#xff09;参数配置权限混淆 &#xff08;四&#xff09;初始化 &#xff08;一&#xff09;添加产品APP 一&#xff09;在个人头像 -> 我的头像 -> 新建产品 二&…...

Redis——Redis的数据库结构、删除策略及淘汰策略

Redis是一个高性能的key-value存储系统&#xff0c;它支持多种数据结构&#xff0c;并提供了丰富的删除策略和淘汰策略。以下是关于Redis的数据库结构、删除策略及淘汰策略的详细介绍&#xff1a; Redis的数据库结构 Redis是一个key-value数据库&#xff0c;数据存储是以一个…...

【Vue3笔记03】Vue3项目工程中使用vue-router路由

这篇文章,主要介绍Vue3项目工程中如何使用vue-router路由。 目录 一、vue-router路由 1.1、下载vue-router路由 1.2、创建router.js文件 1.3、main.js配置路由...

并行执行的4种类别——《OceanBase 并行执行》系列 4

OceanBase 支持多种类型语句的并行执行。在本篇博客中&#xff0c;我们将根据并行执行的不同类别&#xff0c;分别详细阐述&#xff1a;并行查询、并行数据操作语言&#xff08;DML&#xff09;、并行数据定义语言&#xff08;DDL&#xff09;以及并行 LOAD DATA 。 《并行执行…...

函数练习.

1.打印乘法口诀表 口诀表的行数和列数自己指定如&#xff1a;输入9&#xff0c;输出99口诀表&#xff0c;输出12&#xff0c;输出1212的乘法口诀表。 multiplication(int index) { ​if (index 9) { ​int i 0; ​for (i 1; i < 10; i) { ​int j 0; ​for (j 1; j &…...

Git 分支命令操作详解

目录 1、分支的特点 2、分支常用操作 3、分支的使用 3.1、查看分支 3.2、创建分支 3.3、修改分支 3.4、切换分支 3.5、合并分支 3.6、产生冲突 3.7、解决冲突 3.8、创建分支和切换分支说明 1、分支的特点 同时并行推进多个功能开发&#xff0c;提高开发效率。各个分…...

十二生肖Midjourney绘画大挑战:释放你的创意火花

随着AI艺术逐渐进入大众视野&#xff0c;使用Midjourney绘制十二生肖不仅能够激发我们的想象力&#xff0c;还能让我们与传统文化进行一场新式的对话。在这里&#xff0c;我们会逐一提供给你创意满满的绘画提示词&#xff0c;让你的作品别具一格。而且&#xff0c;我们还精选了…...

【C++】priority_queues(优先级队列)和反向迭代器适配器的实现

目录 一、 priority_queue1.priority_queue的介绍2.priority_queue的使用2.1、接口使用说明2.2、优先级队列的使用样例 3.priority_queue的底层实现3.1、库里面关于priority_queue的定义3.2、仿函数1.什么是仿函数&#xff1f;2.仿函数样例 3.3、实现优先级队列1. 1.0版本的实现…...

Go语言函数

在Go语言中&#xff0c;函数是一种基本的构建块&#xff0c;用于组织代码并执行特定任务。它们是可重复使用的代码段&#xff0c;可以接收输入参数&#xff0c;执行一系列操作&#xff0c;并可返回结果。以下是Go语言中函数的详细介绍及其使用方法&#xff1a; 基本语法 Go语…...

如何使用EasyExcel导入百万数据

摘要&#xff1a; 本文将详细探讨如何利用EasyExcel库&#xff0c;以及结合Java编程&#xff0c;高效地导入大规模数据至应用程序中。我们将逐步介绍导入流程、代码实现细节&#xff0c;并提供性能优化建议&#xff0c;旨在帮助读者在处理百万级别数据时&#xff0c;提高效率与…...

【解决】Unity Build 应用程序运行即崩溃问题

开发平台&#xff1a;Unity 2021.3.7f1c1   一、问题描述 编辑器 Build 工程结束&#xff0c;但控制台 未显示 Build completed with a result of Succeeded [时间长度] 信息。该情况下打包流程正常&#xff0c;但应用程序包打开即崩溃。   二、问题测试记录 测试1&#xf…...

C++数据结构——红黑树

前言&#xff1a;本篇文章我们继续来分享C中的另一个复杂数据结构——红黑树。 目录 一.红黑树概念 二.红黑树性质 三.红黑树实现 1.基本框架 2.插入 3.判断平衡 四.完整代码 总结 一.红黑树概念 红黑树&#xff0c;是一种二叉搜索树&#xff0c;但在每个结点上增加一个…...

Java并发编程:学习路线图

文章目录 一、操作系统内核原理1、进程管理详解2、内存管理详解3、IO输入输出系统详解4、进程间通信机制详解5、网络通信原理剖析 二、Java内存模型三、并发集合1、Map&#xff08;1&#xff09;ConcurrentHashMap&#xff08;2&#xff09;ConcurrentSkipListMap 2、List&…...

算法_前缀和

DP34 【模板】前缀和 import java.util.Scanner;// 注意类名必须为 Main, 不要有任何 package xxx 信息 public class Main {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别int n in.nextInt(),q in.ne…...

C语言(指针)7

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸各位能阅读我的文章&#xff0c;诚请评论指点&#xff0c;关注收藏&#xff0c;欢迎欢迎~~ &#x1f4a5;个人主页&#xff1a;小羊在奋斗 &#x1f4a5;所属专栏&#xff1a;C语言 本系列文章为个人学习笔记&#x…...

线程纵横:C++并发编程的深度解析与实践

hello &#xff01;大家好呀&#xff01; 欢迎大家来到我的Linux高性能服务器编程系列之《线程纵横&#xff1a;C并发编程的深度解析与实践》&#xff0c;在这篇文章中&#xff0c;你将会学习到C新特性&#xff0c;并发编程&#xff0c;以及其如何带来的高性能的魅力&#xff0…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)

宇树机器人多姿态起立控制强化学习框架论文解析 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架&#xff08;一&#xff09; 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

C# 类和继承(抽象类)

抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...

FFmpeg:Windows系统小白安装及其使用

一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】&#xff0c;注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录&#xff08;即exe所在文件夹&#xff09;加入系统变量…...

Qt 事件处理中 return 的深入解析

Qt 事件处理中 return 的深入解析 在 Qt 事件处理中&#xff0c;return 语句的使用是另一个关键概念&#xff0c;它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别&#xff1a;不同层级的事件处理 方…...

【若依】框架项目部署笔记

参考【SpringBoot】【Vue】项目部署_no main manifest attribute, in springboot-0.0.1-sn-CSDN博客 多一个redis安装 准备工作&#xff1a; 压缩包下载&#xff1a;http://download.redis.io/releases 1. 上传压缩包&#xff0c;并进入压缩包所在目录&#xff0c;解压到目标…...

简单介绍C++中 string与wstring

在C中&#xff0c;string和wstring是两种用于处理不同字符编码的字符串类型&#xff0c;分别基于char和wchar_t字符类型。以下是它们的详细说明和对比&#xff1a; 1. 基础定义 string 类型&#xff1a;std::string 字符类型&#xff1a;char&#xff08;通常为8位&#xff09…...