当前位置: 首页 > news >正文

xxl-job的原理(2)—调度中心管理注册信息

一、调度中心管理注册信息

1.JobApiController

执行器调用调度中心的url来实现注册、下线、回调等操作;其主要的实现类是JobApiController,调用/api/registry接口注册执行器信息,调用/api/registryRemove接口下线执行器信息,调用/api/callback接口执行回调操作。

@Controller
@RequestMapping("/api")
public class JobApiController {@Resourceprivate AdminBiz adminBiz;/*** api** @param uri* @param data* @return*/@RequestMapping("/{uri}")@ResponseBody@PermissionLimit(limit=false)public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {// validif (!"POST".equalsIgnoreCase(request.getMethod())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null&& XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0&& !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingif ("callback".equals(uri)) {List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);return adminBiz.callback(callbackParamList);} else if ("registry".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registry(registryParam);} else if ("registryRemove".equals(uri)) {RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);return adminBiz.registryRemove(registryParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}}}

2.AdminBizImpl

执行adminBiz.registry(registryParam)是调用实现类AdminBizImpl,在实现类中调用registry(registryParam)来实现。

@Service
public class AdminBizImpl implements AdminBiz {@Overridepublic ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {return JobCompleteHelper.getInstance().callback(callbackParamList);}@Overridepublic ReturnT<String> registry(RegistryParam registryParam) {return JobRegistryHelper.getInstance().registry(registryParam);}@Overridepublic ReturnT<String> registryRemove(RegistryParam registryParam) {return JobRegistryHelper.getInstance().registryRemove(registryParam);}}

3.JobRegistryHelper

在AdminBizImpl中的实现也不难理解,通过在初始化start()方法中创建的registryOrRemoveThreadPool线程池中执行异步注册任务,注册信息写入到数据表xxl_job_registry中。

//AdminBizImpl.javapublic void start(){// for registry or removeregistryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});//...省略public ReturnT<String> registry(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async execute//my-异步执行,将注册信息持久化到数据库registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {//my-写入数据库int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());if (ret < 1) {XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// freshfreshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}

4.总结

  1. 执行器通过restful api形式注册到调度中心来,调度中心JobApiController对应有3个注册、下线和回调的方法实现;
  2. 通过AdminBizImpl的adminBiz.registry(registryParam)来实际执行注册方法,实际使用JobRegistryHelper类;
  3. 在JobRegistryHelper类中在初始化的时候会创建一个线程池,每次注册执行器的时候会创建一个异步线程来将注册信息持久化的数据库;

JobApiController_api

二、调度中心的配置和启动

1.添加权限控制

制定权限注解@PermissionLimit,其实现的逻辑在PermissionInterceptor中,首先判断是否需要鉴权,如果需要则根据cookie中拿到的用户信息查库判断是否有权限登录,如果没有权限则重定向到登录页面或提示没有权限。

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PermissionLimit {/*** 登录拦截 (默认拦截)*/boolean limit() default true;/*** 要求管理员权限** @return*/boolean adminuser() default false;}
@Component
public class PermissionInterceptor implements AsyncHandlerInterceptor {@Resourceprivate LoginService loginService;@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//my-处理登录权限的逻辑if (!(handler instanceof HandlerMethod)) {return true;   // proceed with the next interceptor}// if need loginboolean needLogin = true;boolean needAdminuser = false;HandlerMethod method = (HandlerMethod)handler;PermissionLimit permission = method.getMethodAnnotation(PermissionLimit.class);if (permission!=null) {needLogin = permission.limit();needAdminuser = permission.adminuser();}if (needLogin) {XxlJobUser loginUser = loginService.ifLogin(request, response);if (loginUser == null) {response.setStatus(302);response.setHeader("location", request.getContextPath()+"/toLogin");return false;}if (needAdminuser && loginUser.getRole()!=1) {throw new RuntimeException(I18nUtil.getString("system_permission_limit"));}request.setAttribute(LoginService.LOGIN_IDENTITY_KEY, loginUser);}return true;   // proceed with the next interceptor}}

将PermissionInterceptor添加到web配置文件中。

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {@Resourceprivate PermissionInterceptor permissionInterceptor;@Resourceprivate CookieInterceptor cookieInterceptor;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(permissionInterceptor).addPathPatterns("/**");registry.addInterceptor(cookieInterceptor).addPathPatterns("/**");}}

2.配置中心初始化

xxljob的初始化和销毁动作在XxlJobAdminConfig中配置完成。

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {private static XxlJobAdminConfig adminConfig = null;public static XxlJobAdminConfig getAdminConfig() {return adminConfig;}// ---------------------- XxlJobScheduler ----------------------private XxlJobScheduler xxlJobScheduler;@Overridepublic void afterPropertiesSet() throws Exception {adminConfig = this;xxlJobScheduler = new XxlJobScheduler();xxlJobScheduler.init();}@Overridepublic void destroy() throws Exception {xxlJobScheduler.destroy();}
}

具体初始化的操作。

public class XxlJobScheduler  {private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);public void init() throws Exception {// init i18ninitI18n();// admin trigger pool startJobTriggerPoolHelper.toStart();// admin registry monitor runJobRegistryHelper.getInstance().start();// admin fail-monitor runJobFailMonitorHelper.getInstance().start();// admin lose-monitor run ( depend on JobTriggerPoolHelper )JobCompleteHelper.getInstance().start();// admin log report startJobLogReportHelper.getInstance().start();// start-schedule  ( depend on JobTriggerPoolHelper )JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");}public void destroy() throws Exception {// stop-scheduleJobScheduleHelper.getInstance().toStop();// admin log report stopJobLogReportHelper.getInstance().toStop();// admin lose-monitor stopJobCompleteHelper.getInstance().toStop();// admin fail-monitor stopJobFailMonitorHelper.getInstance().toStop();// admin registry stopJobRegistryHelper.getInstance().toStop();// admin trigger pool stopJobTriggerPoolHelper.toStop();}// ---------------------- I18n ----------------------private void initI18n(){for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));}}

相关文章:

xxl-job的原理(2)—调度中心管理注册信息

一、调度中心管理注册信息 1.JobApiController 执行器调用调度中心的url来实现注册、下线、回调等操作&#xff1b;其主要的实现类是JobApiController&#xff0c;调用/api/registry接口注册执行器信息&#xff0c;调用/api/registryRemove接口下线执行器信息&#xff0c;调用…...

小白入门pytorch(二)----神经网络

本文为&#x1f517;[小白入门Pytorch]学习记录博客 文章目录 前言一、神经网络的组成部分1.神经元2.神经网络层3.损失函数4.优化器 二、Pytorch构建神经网络中的网络层全连接层2.卷积层3.池化层4.循环神经网络5.转置卷积层6.归一化层7.激活函数层 三、数据加载与预处理1.数据加…...

【进阶C语言】排序函数(qsort)与模拟实现(回调函数的实例)

本章大致内容目录&#xff1a; 1.认识回调函数 2.排序函数qsort 3.模拟实现qsort 回调函数为C语言重要知识点&#xff0c;以函数指针为主要知识&#xff1b;下面介绍回调函数的定义、回调函数的库函数举例即库函数模拟实现。 一、回调函数 1.回调函数定义 回调函数就是一…...

CentOS 7 上编译和安装 SQLite 3.9.0

文章目录 可能报错分析详细安装过程 可能报错分析 报错如下&#xff1a; django.core.exceptions.ImproperlyConfigured: SQLite 3.9.0 or later is required (found 3.7.17). 原因&#xff1a;版本为3.7.太低了&#xff0c;需要升级到3.9.0至少 详细安装过程 1.安装所需的…...

[GXYCTF2019]禁止套娃 无回显 RCE 过滤__FILE__ dirname等

扫除git 通过githack 获取index.php <?php include "flag.php"; echo "flag在哪里呢&#xff1f;<br>"; if(isset($_GET[exp])){if (!preg_match(/data:\/\/|filter:\/\/|php:\/\/|phar:\/\//i, $_GET[exp])) {if(; preg_replace(/[a-z,_]\(…...

Springboot使用Aop保存接口请求日志到mysql

1、添加aop依赖 <!-- aop日志 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency> 2、新建接口保存数据库的实体类RequestLog.java package com.example…...

网络安全面试题汇总(附答案)

作为从业多年的网络安全工程师&#xff0c;我深知在面试过程中面试官所关注的重点及考察的技能点。网络安全作为当前信息技术领域中非常重要的一部分&#xff0c;对于每一个从事网络安全工作的人员来说&#xff0c;不仅需要掌握一定的技术能力&#xff0c;更需要具备全面的综合…...

Centos7安装kvm,配置虚拟机网络

1.安装软件包&#xff0c;禁用防火墙&#xff08;非必须&#xff09; yum -y install qemu-kvm libvirt virt-install 1&#xff09;禁用防火墙&#xff08;非必须&#xff09; systemctl stop firewalld systemctl disable firewalld 2&#xff09;禁用NetworkManager syst…...

Javascript文件上传

什么是文件上传 文件上传包含两部分&#xff0c; 一部分是选择文件&#xff0c;包含所有相关的界面交互。一部分是网络传输&#xff0c;通过一个网络请求&#xff0c;将文件的数据携带过去&#xff0c;传递到服务器中&#xff0c;剩下的&#xff0c;在服务器中如何存储&#xf…...

golang gin——文件上传(单文件,多文件)

文件上传 单文件上传 从form-data获取文件 package uploadimport ("github.com/gin-gonic/gin""net/http" ) // 单文件上传&#xff0c;多文件上传 func Upload(c *gin.Context) {file, _ : c.FormFile("file") // file为字段名dst : "…...

面试题:Redis和MySQL的事务区别是什么?

大家好&#xff0c;我是小米&#xff01;今天我要和大家聊聊一个在技术面试中经常被问到的问题&#xff1a;“Redis和MySQL的事务区别是什么&#xff1f;”这个问题看似简单&#xff0c;但实际上涉及到了数据库和缓存两个不同领域的知识&#xff0c;让我们一起来深入了解一下吧…...

Canvas绘图

Canvas绘图 Canvas的意义 随着前端的不断发展&#xff0c;页面特效越来越炫酷&#xff0c;W3C组织也不断退出新的CSS特性&#xff1a;例如各种渐变&#xff0c;瀑布流布局&#xff0c;各种阴影&#xff0c;但是随着需求越来越花哨&#xff0c;W3C表示&#xff1a;我去你妈的&…...

逻辑回归评分卡

文章目录 一、基础知识点(1)逻辑回归表达式(2)sigmoid函数的导数损失函数(Cross-entropy, 交叉熵损失函数)交叉熵求导准确率计算评估指标 二、导入库和数据集导入库读取数据 三、分析与训练四、模型评价ROC曲线KS值再做特征筛选生成报告 五、行为评分卡模型表现总结 一、基础知…...

DPDK系列之三十三DPDK并行机制的底层支持

一、背景介绍 在前面介绍了DPDK中的上层对并行的支持&#xff0c;特别是对多核的支持。但是&#xff0c;大家都知道&#xff0c;再怎么好的设计和架构&#xff0c;再优秀的编码&#xff0c;最终都要落到硬件和固件对整个上层应用的支持。单纯的硬件好处理&#xff0c;一个核不…...

LVGL_基础控件滚轮roller

LVGL_基础控件滚轮roller 1、创建滚轮roller控件 /* 创建一个 lv_roller 部件(对象) */ lv_obj_t * roller lv_roller_create(lv_scr_act()); // 创建一个 lv_roller 部件(对象),他的父对象是活动屏幕对象// 将部件(对象)添加到组&#xff0c;如果设置了默认组&#xff0c…...

王道考研操作系统——文件管理

磁盘的基础知识 .txt用记事本这个应用程序打开&#xff0c;文件最重要的属性就是文件名了 保护信息&#xff1a;操作系统对系统当中的各个用户进行了分组&#xff0c;不同分组的用户对文件的操作权限是不一样的 文件的逻辑结构就是文件内部的数据/记录应该被怎么组织起来&…...

商业智能系统的主要功能包括数据仓库、数据ETL、数据统计输出、分析功能

ETL服务内容包含&#xff1a; 数据迁移数据合并数据同步数据交换数据联邦数据仓库...

基于帝国主义竞争优化的BP神经网络(分类应用) - 附代码

基于帝国主义竞争优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于帝国主义竞争优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.帝国主义竞争优化BP神经网络3.1 BP神经网络参数设置3.2 帝国主义竞争算…...

将python项目部署在一台服务器上

将python项目部署在一台服务器上 1.服务器2.部署方法2.1 手动部署2.2 容器化技术部署2.3 服务器less技术部署 1.服务器 服务器一般为&#xff1a;物理服务器和云服务器。 我的是物理服务器&#xff1a;这是将服务器硬件直接放置在您自己的数据中心或机房的传统方法。这种方法需…...

【C语言】善于利用指针(二)

&#x1f497;个人主页&#x1f497; ⭐个人专栏——C语言初步学习⭐ &#x1f4ab;点击关注&#x1f929;一起学习C语言&#x1f4af;&#x1f4ab; ​ 目录 导读&#xff1a;1. 字符指针1.1 字符串的引用方式1.2 有趣的面试题 2. 数组指针2.1 一维数组指针的定义2.2 一维数组…...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

反射获取方法和属性

Java反射获取方法 在Java中&#xff0c;反射&#xff08;Reflection&#xff09;是一种强大的机制&#xff0c;允许程序在运行时访问和操作类的内部属性和方法。通过反射&#xff0c;可以动态地创建对象、调用方法、改变属性值&#xff0c;这在很多Java框架中如Spring和Hiberna…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

JVM 内存结构 详解

内存结构 运行时数据区&#xff1a; Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器&#xff1a; ​ 线程私有&#xff0c;程序控制流的指示器&#xff0c;分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 ​ 每个线程都有一个程序计数…...

【JavaSE】多线程基础学习笔记

多线程基础 -线程相关概念 程序&#xff08;Program&#xff09; 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序&#xff0c;比如我们使用QQ&#xff0c;就启动了一个进程&#xff0c;操作系统就会为该进程分配内存…...

Ubuntu系统多网卡多相机IP设置方法

目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机&#xff0c;交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息&#xff0c;系统版本&#xff1a;Ubuntu22.04.5 LTS&#xff1b;内核版本…...