深度了解flink(十) JobManager(4) ResourceManager HA
ResourceManager(ZK模式)的高可用启动流程
ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create中
public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor ioExecutor,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,BlobServer blobServer,HeartbeatServices heartbeatServices,DelegationTokenManager delegationTokenManager,MetricRegistry metricRegistry,ExecutionGraphInfoStore executionGraphInfoStore,MetricQueryServiceRetriever metricQueryServiceRetriever,Collection<FailureEnricher> failureEnrichers,FatalErrorHandler fatalErrorHandler)throws Exception {//resourcemanager的选举服务LeaderRetrievalService resourceManagerRetrievalService = null;ResourceManagerService resourceManagerService = null;try {//resourceManager leader 获取服务resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();//LeaderGatewayRetriever实现了LeaderRetrievalListener接口final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =new RpcGatewayRetriever<>(rpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);//启动resourceManagerService start方法会调用具体的选举方法resourceManagerService.start();//启动etrievalService服务的start方法,resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);return new DispatcherResourceManagerComponent(dispatcherRunner,resourceManagerService,dispatcherLeaderRetrievalService,resourceManagerRetrievalService,webMonitorEndpoint,fatalErrorHandler,dispatcherOperationCaches);} catch (Exception exception) {//省略无关代码}}
初始化LeaderRetrievalServer
resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();
getResourceManagerLeaderRetriever会调用父类AbstractHaServices的这个方法
@Overridepublic LeaderRetrievalService getResourceManagerLeaderRetriever() {return createLeaderRetrievalService(getLeaderPathForResourceManager());}
createLeaderRetrievalService会调用具体子类的方法,zk模式下会走到ZooKeeperLeaderElectionHaServices下
@Overrideprotected LeaderRetrievalService createLeaderRetrievalService(String componentId) {// Maybe use a single service for leader retrievalreturn ZooKeeperUtils.createLeaderRetrievalService(curatorFrameworkWrapper.asCuratorFramework(),ZooKeeperUtils.getLeaderPath(componentId),configuration);}
继续方法跟进
public static DefaultLeaderRetrievalService createLeaderRetrievalService(final CuratorFramework client, final String path, final Configuration configuration) {return new DefaultLeaderRetrievalService(createLeaderRetrievalDriverFactory(client, path, configuration));
最终调用DefaultLeaderRetrievalService的构造方法进行初始化,入参是一个LeaderRetrievalDriverFactory接口,zk模式下是ZooKeeperLeaderRetrievalDriverFactory
初始化LeaderRetrievalListener
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =new RpcGatewayRetriever<>(rpcService,DispatcherGateway.class,DispatcherId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));
RpcGatewayRetrieverUML类图如下

LeaderGatewayRetriever#notifyNewLeaderAddress对LeaderRetrievalListener#notifyLeaderAddress做了实现
@Overridepublic void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);final CompletableFuture<T> oldGatewayFuture =atomicGatewayFuture.getAndSet(newGatewayFuture);newGatewayFuture.whenComplete((t, throwable) -> {if (throwable != null) {oldGatewayFuture.completeExceptionally(throwable);} else {oldGatewayFuture.complete(t);}});}
创建ResourceManagerService
resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);
ResourceManagerService负责管理ResourceManager的生命周期,也负责haServer的启动
创建LeaderElection
private ResourceManagerServiceImpl(ResourceManagerFactory<?> resourceManagerFactory,ResourceManagerProcessContext rmProcessContext)throws Exception {//省略其他方法//LeaderElection进行了初始化this.leaderElection =rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElection();
LeaderElection选举
resourceManagerService.start();
跳转到start方法
public void start() throws Exception {synchronized (lock) {if (running) {LOG.debug("Resource manager service has already started.");return;}running = true;}LOG.info("Starting resource manager service.");leaderElection.startLeaderElection(this);}
leaderElection.startLeaderElection(this)zk模式下走的是DefaultLeaderElection的方法,
@Overridepublic void startLeaderElection(LeaderContender contender) throws Exception {Preconditions.checkNotNull(contender);parentService.register(componentId, contender);}
开始对参选者进行注册
protected void register(String componentId, LeaderContender contender) throws Exception {checkNotNull(componentId, "componentId must not be null.");checkNotNull(contender, "Contender must not be null.");synchronized (lock) {if (leaderElectionDriver == null) {createLeaderElectionDriver();}//省略无关代码}
register会判断选举的driver是否存在,如果不存在,则根据高可用的模式进行选举驱动的创建
@Overridepublic ZooKeeperLeaderElectionDriver create(LeaderElectionDriver.Listener leaderElectionListener) throws Exception {return new ZooKeeperLeaderElectionDriver(curatorFramework, leaderElectionListener);}
ZooKeeperLeaderElectionDriver初始化
public ZooKeeperLeaderElectionDriver(CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)throws Exception {//参数校验this.curatorFramework = Preconditions.checkNotNull(curatorFramework);this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);//使用ZooKeeperUtils.generateLeaderLatchPath方法基于curatorFramework的命名空间生成一个ZooKeeper节点路径,该路径通常用于领导者选举的锁。this.leaderLatchPath =ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());//使用CuratorFramework和之前生成的路径创建一个LeaderLatch实例。LeaderLatch是Curator提供的一个领导者选举实现。 this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());//使用ZooKeeperUtils.createTreeCache方法创建一个TreeCache实例,用于监听ZooKeeper中特定路径(这里是根路径"/")下的节点变化this.treeCache =ZooKeeperUtils.createTreeCache(curatorFramework,"/",new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());treeCache.getListenable().addListener((client, event) -> {switch (event.getType()) {case NODE_ADDED:case NODE_UPDATED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleChangedLeaderInformation(event.getData());break;case NODE_REMOVED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleRemovedLeaderInformation(event.getData().getPath());break;}});leaderLatch.addListener(this);curatorFramework.getConnectionStateListenable().addListener(listener);leaderLatch.start();treeCache.start();}
ZooKeeperLeaderElectionDriver.handleStateChange
状态变化时候根据不同状态打印日志
private void handleStateChange(ConnectionState newState) {switch (newState) {case CONNECTED:LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");break;case SUSPENDED:LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");break;case RECONNECTED:LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");break;case LOST:// Maybe we have to throw an exception here to terminate the JobManagerLOG.warn("Connection to ZooKeeper lost. None of the contenders participates in the leader election anymore.");break;}}
DefaultLeaderElectionService.notifyLeaderContenderOfLeadership
private void notifyLeaderContenderOfLeadership(String componentId, UUID sessionID) {if (!leaderContenderRegistry.containsKey(componentId)) {LOG.debug("The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",sessionID,leaderElectionDriver);return;} else if (!sessionID.equals(issuedLeaderSessionID)) {LOG.debug("An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.",sessionID,issuedLeaderSessionID);return;}Preconditions.checkState(!confirmedLeaderInformation.hasLeaderInformation(componentId),"The leadership should have been granted while not having the leadership acquired.");LOG.debug("Granting leadership to the contender registered under component '{}' with session ID {}.",componentId,issuedLeaderSessionID);leaderContenderRegistry.get(componentId).grantLeadership(issuedLeaderSessionID);}相关文章:
深度了解flink(十) JobManager(4) ResourceManager HA
ResourceManager(ZK模式)的高可用启动流程 ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create中 public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor i…...
【万兴科技-注册_登录安全分析报告】
前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…...
Android启动流程_Zygote阶段
前言 上一篇文档中我们描述了 Android 启动中的 init 启动部分,本片文档将会继续 Android 启动流程的逻辑,继续梳理 Zygote 部分功能。 说明框架 对于 Zygote 进程,要从以下框架说明: 第一点,编译,zygo…...
2022NOIP比赛总结
种花 1.本题是一道前缀和优化加上枚举的问题。先考虑 C 因为 F 是 C 下边随便加一个点,所以只要求出 C 就求出了 F 。 注意到,并没有要求上下行一样,唯一的要求是 C 的两个横要隔一行,这就是问题的突破点,这题很明显…...
Leetcode 排序链表
这段代码的算法思想是 归并排序,一种适合链表的排序方法。它通过递归地将链表拆分成两部分,分别排序,然后合并已排序的部分,从而达到整体排序的目的。以下是代码的中文解释: 算法步骤: 找到链表的中点&…...
哈希函数简介
哈希函数是一种将任意大小的数据输入(通常称为“消息”)转换为固定大小的输出(称为“哈希值”或“摘要”)的算法。 主要特点: 1、输出固定长度 无论输入数据的大小如何,哈希函数的输出总是固定长度。例如…...
nginx------正向代理,反向代理生产,以及能否不使用代理详解
在生产环境中,选择使用正向代理还是反向代理取决于具体的应用场景和需求。下面详细解释这两种代理的用处以及为什么在不同情况下会选择它们。 正向代理 (Forward Proxy) 用途 匿名访问: 隐藏客户端的真实 IP 地址,提供隐私保护。常用于绕过…...
iptables限制docker端口禁止某台主机访问(使用DOCKER链和raw表的PREROUTING链)
背景: 在Linux上docker映射了端口,想着对服务端口进行限制指定IP访问,发现在filter表的INPUT链限制无效 环境: 主机192.168.56.132上的docker容器部署了nginx并将容器80端口映射到主机8000端口 [rootlocalhost ~]# docker ps …...
【VM实战】VMware迁移到VirtualBox
VMware 虚拟机开机卸载VMware Tools 调整虚拟磁盘 对于Windows 10及以上的虚拟机,一般VMware默认都会选Nvme固态硬盘。在导出前必须将其改为SATA,否则VirtualBox导入会报Appliance Import错误 (E_INVALIDARG 0x80070057) 先删掉当前盘的挂载ÿ…...
Android WebView加载不到cookie
以下配置根据需求酌情添加,建议逐个试验,cookie操作不是内存操作,建议修改配置后卸载app再重新运行防止缓存影响测试结果。 1.设置应用程序的 WebView 实例是否应发送并接受 Cookie CookieManager cookieManager CookieManager.getInstanc…...
c++qt
1.显示画布 #include "code.h" #include <QtWidgets/QApplication> #include<iostream> #include<vector> #include <QWindow> #include <QGraphicsView> #include <QGraphicsScene>using namespace std;//1.空格 2.墙 3.入口…...
零跑汽车嵌入式面试题汇总及参考答案
C++ 的三大特性是什么? C++ 的三大特性分别是封装、继承和多态。 封装 概念:封装是把数据和操作数据的函数绑定在一起,对数据的访问进行限制。通过将数据成员声明为私有或保护,只允许通过公共的成员函数来访问和修改数据,从而隐藏了类的内部实现细节。这有助于提高代码的安…...
LC:贪心题解
文章目录 376. 摆动序列 376. 摆动序列 题目链接:https://leetcode.cn/problems/wiggle-subsequence/description/ 这个题目自己首先想到的是动态规划解题,贪心解法真的非常妙,参考下面题解:https://leetcode.cn/problems/wiggle…...
ubuntu交叉编译dbus库给arm平台使用
1.下载dbus库源码 https://www.freedesktop.org/wiki/Software/dbus 克隆源码: https://gitlab.freedesktop.org/dbus/dbus/-/tree/dbus-1.12?ref_type=heads 下载1.12.20版本: 指定pkgconfig环境变量: export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:$PWD/../expat-2.3.…...
ansible开局配置-openEuler
ansible干啥用的就不多介绍了,这篇文章主要在说ansible的安装、开局配置、免密登录。 ansible安装 查看系统版本 cat /etc/openEuler-latest输出内容如下: openeulerversionopenEuler-24.03-LTS compiletime2024-05-27-21-31-28 gccversion12.3.1-30.…...
连锁收银系统的优势与挑战
在快速发展的零售环境中,连锁收银系统不仅是收银的工具,更是现代零售管理的重要组成部分。它在提升效率、优化客户体验以及数据管理等方面发挥了关键作用。然而,随着技术的进步和市场环境的变化,连锁收银系统也面临着诸多挑战。本…...
轻型民用无人驾驶航空器安全操控理论培训知识总结-多旋翼部分
航空器知识 螺旋桨 螺旋桨为多旋翼民用无人驾驶航空器提供升力,多旋翼民用无人驾驶航空器通过飞控系统控制电机调节螺旋桨转速,来实现飞行。 天线 多旋翼民用无人驾驶航空器的图像传输以及遥控控制信号,主要是通过无线信道进行的,靠民用无人驾驶航空器与遥控器的天线传…...
springboot092安康旅游网站的设计与实现(论文+源码)_kaic
毕业设计(论文) 基于JSP的安康旅游网站的设计与实现 姓 名 学 号 院 系 专 业 指导老师 2021 年 月 教务处制 目 录 目 录 摘 要 Abstract 第一章 绪论 1.1 研究现状 1.2 设…...
优化 Git 管理:提升协作效率的最佳实践20241030
优化 Git 管理:提升协作效率的最佳实践 引言 在现代软件开发中,版本控制系统是确保代码质量和团队协作顺畅的基石。Git 作为最流行的分布式版本控制工具,其灵活性和强大功能使得开发者能够高效地管理项目代码。然而,仅依靠工具本…...
Cocos使用精灵组件显示相机内容
Cocos使用精灵组件显示相机内容 1. 为什么使用精灵渲染 在游戏引擎中,游戏场景内除webview和video外所有的节点都是渲染在Canvas上,这导致了webview和video只能存在于所有节点的最上层或最下层,而这种层级关系会出现节点事件无法正常监听或者…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
椭圆曲线密码学(ECC)
一、ECC算法概述 椭圆曲线密码学(Elliptic Curve Cryptography)是基于椭圆曲线数学理论的公钥密码系统,由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA,ECC在相同安全强度下密钥更短(256位ECC ≈ 3072位RSA…...
vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
Qemu arm操作系统开发环境
使用qemu虚拟arm硬件比较合适。 步骤如下: 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载,下载地址:https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...
破解路内监管盲区:免布线低位视频桩重塑停车管理新标准
城市路内停车管理常因行道树遮挡、高位设备盲区等问题,导致车牌识别率低、逃费率高,传统模式在复杂路段束手无策。免布线低位视频桩凭借超低视角部署与智能算法,正成为破局关键。该设备安装于车位侧方0.5-0.7米高度,直接规避树枝遮…...
深入浅出Diffusion模型:从原理到实践的全方位教程
I. 引言:生成式AI的黎明 – Diffusion模型是什么? 近年来,生成式人工智能(Generative AI)领域取得了爆炸性的进展,模型能够根据简单的文本提示创作出逼真的图像、连贯的文本,乃至更多令人惊叹的…...
pycharm 设置环境出错
pycharm 设置环境出错 pycharm 新建项目,设置虚拟环境,出错 pycharm 出错 Cannot open Local Failed to start [powershell.exe, -NoExit, -ExecutionPolicy, Bypass, -File, C:\Program Files\JetBrains\PyCharm 2024.1.3\plugins\terminal\shell-int…...
在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7
在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤: 第一步: 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为: // 改为 v…...
