Flink on yarn 加载失败plugins失效问题解决
Flink on yarn 加载失败plugins失效问题解决
flink版本:1.13.6
1. 问题
flink 任务运行在yarn集群,plugins加载失效,导致通过扩展资源获取任务参数失效
2. 问题定位
-
yarn容器的jar包及插件信息,jar包是正常上传
-
源码定位
加载plugins入口,TaskManagerRunner.class
PluginUtils.createPluginManagerFromRootFolder
源码加载扩展资源参数入口TaskManagerRunner.class
ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig
日志信息
定位PluginConfig源码
3. 方案
重写覆盖集群的PluginConfig.java,优先从configurtaion获取
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.core.plugin;import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.*;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;/** Stores the configuration for plugins mechanism. */
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class PluginConfig {private static final Logger LOG = LoggerFactory.getLogger(PluginConfig.class);private static final String PUPU_FLINK_PLUGINS_DIR = "flink_plugins_dir";private final Optional<Path> pluginsPath;private final String[] alwaysParentFirstPatterns;private PluginConfig(Optional<Path> pluginsPath, String[] alwaysParentFirstPatterns) {pluginsPath.ifPresent(path -> LOG.info("pluginsPath: {}", path));LOG.info("alwaysParentFirstPatterns: {}", Arrays.stream(alwaysParentFirstPatterns).toArray());this.pluginsPath = pluginsPath;this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;}public Optional<Path> getPluginsPath() {return pluginsPath;}public String[] getAlwaysParentFirstPatterns() {return alwaysParentFirstPatterns;}public static PluginConfig fromConfiguration(Configuration configuration) {return new PluginConfig(getPluginsDir(configuration).map(File::toPath),CoreOptions.getPluginParentFirstLoaderPatterns(configuration));}public static Optional<File> getPluginsDir(Configuration configuration) {String pluginsDir = configuration.get(ConfigOptions.key(PUPU_FLINK_PLUGINS_DIR).stringType().defaultValue(null));if (StringUtils.isBlank(pluginsDir)) {pluginsDir =System.getenv().getOrDefault(ConfigConstants.ENV_FLINK_PLUGINS_DIR,ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS);}File pluginsDirFile = new File(pluginsDir);if (!pluginsDirFile.isDirectory()) {LOG.warn("The plugins directory [{}] does not exist.", pluginsDirFile);return Optional.empty();}return Optional.of(pluginsDirFile);}}
相关文章:

Flink on yarn 加载失败plugins失效问题解决
Flink on yarn 加载失败plugins失效问题解决 flink版本:1.13.6 1. 问题 flink 任务运行在yarn集群,plugins加载失效,导致通过扩展资源获取任务参数失效 2. 问题定位 yarn容器的jar包及插件信息,jar包是正常上传 源码定位 加载plugins入口,TaskMana…...
显卡服务器的特点和优势在哪里
随着科技的发展以及人们对于计算机性能的需求提高,显卡服务器是主要使用图形处理器进行计算和运算,拥有更加强大的计算能力,今天小编就来给大家讲一讲显卡服务器的特点和优势是什么! 1.高可靠性:显卡服务器采用高品质的…...

c++设计模式二:原型模式
使用场景:当需要构建多个相同的类对象时,而且该类对象结构较为复杂,如果每个都重新组织构建会很麻烦。 其实,就是写一个拷贝构造函数,或者写一个拷贝每个成员变量的clone()方法。 举例说明:比如一个相亲网站…...

【Qt控件之QMessageBox】详解
Qt控件之QMessageBox 描述基于属性的API富文本和文本格式属性严重程度以及图标和Pixmap属性静态函数API 高级用法默认按钮和退出按钮示例使用场景 描述 QMessageBox类提供了一个模态对话框,用于通知用户或向用户提问并接收答案。 消息框显示一个主要文本以提醒用户…...

SSH安全登录远程主机
SSH服务器简介 SSH即Security SHell的意思,它可以将连线的封包进行加密技术,之后进行传输,因此相当的安全。 SSH是一种协议标准,其目的是实现安全远程登录以及其它安全网络服务。 SSH协定,在预设的状态下,…...

揭秘!产品经理提升效率的秘密武器:10款AI生成PPT工具
AI的爆炸式增长表现令人惊艳,现有的各类AI工具正在重塑各行各业,不同程度地提高人们的工作效率,并有望创造新的职业机会。但是,面对市面上数量众多的AI工具,且每周都会蹦出新的产品,即便是以好奇心富称的产…...
Oracle修改带数据的字段类型
insert into TNW_FUND_SELORG(TFDINFOID,TSOINFOID) select TFD_INFO_ID,TSO_INFO_ID from TFD_SEL_FUNDLINK_TO_OLDFUNDWEB_DB /*修改原字段名*/ ALTER TABLE 表名 RENAME COLUMN 字段名 TO 字段名1; /*添加一个和原字段同名的字段*/ ALTER TABLE 表名 ADD 字段名 VARCHAR…...

WebService接口方式和Restful接口这两者有什么区别和相同点
WebService和RESTful接口都是用于在网络上进行通信和数据交换的技术,但它们在设计和使用上有一些重要的区别和相似之处。 相同点: 基于HTTP协议:无论是WebService还是RESTful接口,它们都是通过HTTP协议进行通信的。 支持多种数据…...

jenkins自动化操作步骤(gitblit)
1、登陆地址: http://xxxxxxxxx.org:xxxx/ admin/xxxx 2、创建任务 选择构建一个maven项目 3、配置 最多只保留一天一个任务 选择git仓库和账号密码 选择代码对应分支 build项: 1)使用父项目的pom文件:k56-boot/pom.xml 2&…...
centos中mongodb设置服务自启动并 允许远程IP访问
安装mongodb参考 注意的是配置文件需要把journal设置为true 制作为系统服务 创建MongoDB服务文件。运行以下命令创建服务文件/etc/systemd/system/mongod.service: vi /etc/systemd/system/mongod.service [Unit] DescriptionMongoDB Database Server Documenta…...

实时定位和配送追踪:开发万岳同城外卖APP的关键技术特性
随着生活节奏的不断加快,外卖服务已经成为许多人日常生活中不可或缺的一部分。无论是工作日的午餐,还是周末的家庭聚会,外卖APP已经成为满足各种美食需求的首选方式。然而,同城外卖APP的成功不仅仅取决于美味的食物选择࿰…...
数据库强化(3.存储过程)
1.什么是存储过程? 存储过程(Stored Procedure)是一种在数据库中存储复杂程序,以便外部程序调用的一种数据库对象。MySQL 5.0 版本开始支持存储过程。 它是为了完成特定功能的SQL语句集,经编译创建并保存在数据库中&a…...

雅思小作文笔记
mostly from Simon’s methods and techniques remember the task is describe what you see, not give an opinion. Just write a report.no conclusion, just a summary(the overview) Question type 小作文的题目类型大致如上 Simon所述,在描述数字的时候&…...

Java List Set Map
一、List 1.1 ArrayList 1.2 LinkedList 二、Set 2.1 HashSet 2.2 TreeSet 2.3 LinkedHashSet 三、Map 3.1 HashMap 3.2 TreeMap 3.3 LinkedHashMap 四、对比 类型底层结构重复null值场景备注查询删除新增ListArrayList动态数组可允许快速随机访问元素0(1)0(n)尾部增加0&a…...

【数据结构】数组和字符串(十三):链式字符串的基本操作(串长统计、查找、复制、插入、删除、串拼接)
文章目录 4.3 字符串4.3.1 字符串的定义与存储4.3.2 字符串的基本操作(链式存储)1. 结构体2. 初始化3. 判空4. 串尾添加5. 打印6. 串长统计7. 查找8. 复制9. 插入10. 删除11. 串拼接12. 销毁13. 主函数14. 代码整合 4.3 字符串 字符串(String)是由零个或…...
Python3 获取当前服务器公网 IP 地址
有同学问我如何使用 Python 获取服务器公网的 IP 地址呢?我测试几个发现,方法有很多,好用的就发现一种,即直接使用 Python 自带的 socket 包。 代码示例: # 获取主机 IP dgram socket.socket(socket.AF_INET, socke…...
EAS查前5分钟到现在的组织变动数据
select * from T_ORG_Admin where ( (FCREATETIME between ( sysdate-(1/24/60)*8) and sysdate ) or (FLASTUPDATETIME between ( sysdate-(1/24/60)*8) and sysdate ) ) -- FLASTUPDATETIME < sysdate-(1/24/60)*10 --FNUMBER 110112...
uni-app——如何阻止事件冒泡
引言 在开发移动应用程序时,我们经常需要处理用户交互事件。然而,有时候这些事件会冒泡,导致意外的行为和不良用户体验。在本文中,我们将探讨如何使用UniApp框架来阻止事件冒泡,并提供一些示例代码来帮助您理解如何实…...

[MySQL]索引
目录 概念解释 作用/优点 缺点 适用场景 索引的创建,删除与查看 系统对索引的自动创建 索引建立的时机 索引存储的数据结构 选择B树的原因 B树的原理 查询流程 优点 B树 与B树的区别 优点 概念解释 索引就像是一本字典的目录,我们可以根据目录快速定位到我们想…...

什么是AUTOSAR ComStack,AUTOSAR架构中,CAN通信堆栈CAN Communication Stack介绍
AUTOSAR(Automotive Open System Architecture)ComStack指的是AUTOSAR架构中的通信堆栈。在AUTOSAR体系结构中,ComStack是指用于不同软件组件(如应用软件、基础软件等)之间进行通信的一组协议和服务。 在AUTOSAR架构中…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...

【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...
Mobile ALOHA全身模仿学习
一、题目 Mobile ALOHA:通过低成本全身远程操作学习双手移动操作 传统模仿学习(Imitation Learning)缺点:聚焦与桌面操作,缺乏通用任务所需的移动性和灵活性 本论文优点:(1)在ALOHA…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

【JVM】Java虚拟机(二)——垃圾回收
目录 一、如何判断对象可以回收 (一)引用计数法 (二)可达性分析算法 二、垃圾回收算法 (一)标记清除 (二)标记整理 (三)复制 (四ÿ…...

基于Springboot+Vue的办公管理系统
角色: 管理员、员工 技术: 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能: 该办公管理系统是一个综合性的企业内部管理平台,旨在提升企业运营效率和员工管理水…...
【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)
LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 题目描述解题思路Java代码 题目描述 题目链接:LeetCode 3309. 连接二进制表示可形成的最大数值(中等) 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...

从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障
关键领域软件测试的"安全密码":Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力,从金融交易到交通管控,这些关乎国计民生的关键领域…...

阿里云Ubuntu 22.04 64位搭建Flask流程(亲测)
cd /home 进入home盘 安装虚拟环境: 1、安装virtualenv pip install virtualenv 2.创建新的虚拟环境: virtualenv myenv 3、激活虚拟环境(激活环境可以在当前环境下安装包) source myenv/bin/activate 此时,终端…...