当前位置: 首页 > news >正文

深度了解flink(十) JobManager(4) ResourceManager HA

ResourceManager(ZK模式)的高可用启动流程

ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create

public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor ioExecutor,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,BlobServer blobServer,HeartbeatServices heartbeatServices,DelegationTokenManager delegationTokenManager,MetricRegistry metricRegistry,ExecutionGraphInfoStore executionGraphInfoStore,MetricQueryServiceRetriever metricQueryServiceRetriever,Collection<FailureEnricher> failureEnrichers,FatalErrorHandler fatalErrorHandler)throws Exception {//resourcemanager的选举服务LeaderRetrievalService resourceManagerRetrievalService = null;ResourceManagerService resourceManagerService = null;try {//resourceManager leader 获取服务resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();//LeaderGatewayRetriever实现了LeaderRetrievalListener接口final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =new RpcGatewayRetriever<>(rpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);//启动resourceManagerService start方法会调用具体的选举方法resourceManagerService.start();//启动etrievalService服务的start方法,resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);return new DispatcherResourceManagerComponent(dispatcherRunner,resourceManagerService,dispatcherLeaderRetrievalService,resourceManagerRetrievalService,webMonitorEndpoint,fatalErrorHandler,dispatcherOperationCaches);} catch (Exception exception) {//省略无关代码}}

初始化LeaderRetrievalServer

resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();

getResourceManagerLeaderRetriever会调用父类AbstractHaServices的这个方法

 @Overridepublic LeaderRetrievalService getResourceManagerLeaderRetriever() {return createLeaderRetrievalService(getLeaderPathForResourceManager());}

createLeaderRetrievalService会调用具体子类的方法,zk模式下会走到ZooKeeperLeaderElectionHaServices

    @Overrideprotected LeaderRetrievalService createLeaderRetrievalService(String componentId) {// Maybe use a single service for leader retrievalreturn ZooKeeperUtils.createLeaderRetrievalService(curatorFrameworkWrapper.asCuratorFramework(),ZooKeeperUtils.getLeaderPath(componentId),configuration);}

继续方法跟进

public static DefaultLeaderRetrievalService createLeaderRetrievalService(final CuratorFramework client, final String path, final Configuration configuration) {return new DefaultLeaderRetrievalService(createLeaderRetrievalDriverFactory(client, path, configuration));

最终调用DefaultLeaderRetrievalService的构造方法进行初始化,入参是一个LeaderRetrievalDriverFactory接口,zk模式下是ZooKeeperLeaderRetrievalDriverFactory

初始化LeaderRetrievalListener

            final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =new RpcGatewayRetriever<>(rpcService,DispatcherGateway.class,DispatcherId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));

RpcGatewayRetrieverUML类图如下

LeaderGatewayRetriever#notifyNewLeaderAddressLeaderRetrievalListener#notifyLeaderAddress做了实现

    @Overridepublic void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);final CompletableFuture<T> oldGatewayFuture =atomicGatewayFuture.getAndSet(newGatewayFuture);newGatewayFuture.whenComplete((t, throwable) -> {if (throwable != null) {oldGatewayFuture.completeExceptionally(throwable);} else {oldGatewayFuture.complete(t);}});}

创建ResourceManagerService

 resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);

ResourceManagerService负责管理ResourceManager的生命周期,也负责haServer的启动

创建LeaderElection
    private ResourceManagerServiceImpl(ResourceManagerFactory<?> resourceManagerFactory,ResourceManagerProcessContext rmProcessContext)throws Exception {//省略其他方法//LeaderElection进行了初始化this.leaderElection =rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElection();
LeaderElection选举
 resourceManagerService.start();

跳转到start方法

  public void start() throws Exception {synchronized (lock) {if (running) {LOG.debug("Resource manager service has already started.");return;}running = true;}LOG.info("Starting resource manager service.");leaderElection.startLeaderElection(this);}

leaderElection.startLeaderElection(this)zk模式下走的是DefaultLeaderElection的方法,

    @Overridepublic void startLeaderElection(LeaderContender contender) throws Exception {Preconditions.checkNotNull(contender);parentService.register(componentId, contender);}

开始对参选者进行注册

protected void register(String componentId, LeaderContender contender) throws Exception {checkNotNull(componentId, "componentId must not be null.");checkNotNull(contender, "Contender must not be null.");synchronized (lock) {if (leaderElectionDriver == null) {createLeaderElectionDriver();}//省略无关代码}

register会判断选举的driver是否存在,如果不存在,则根据高可用的模式进行选举驱动的创建

 @Overridepublic ZooKeeperLeaderElectionDriver create(LeaderElectionDriver.Listener leaderElectionListener) throws Exception {return new ZooKeeperLeaderElectionDriver(curatorFramework, leaderElectionListener);}
ZooKeeperLeaderElectionDriver初始化
public ZooKeeperLeaderElectionDriver(CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)throws Exception {//参数校验this.curatorFramework = Preconditions.checkNotNull(curatorFramework);this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);//使用ZooKeeperUtils.generateLeaderLatchPath方法基于curatorFramework的命名空间生成一个ZooKeeper节点路径,该路径通常用于领导者选举的锁。this.leaderLatchPath =ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());//使用CuratorFramework和之前生成的路径创建一个LeaderLatch实例。LeaderLatch是Curator提供的一个领导者选举实现。    this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());//使用ZooKeeperUtils.createTreeCache方法创建一个TreeCache实例,用于监听ZooKeeper中特定路径(这里是根路径"/")下的节点变化this.treeCache =ZooKeeperUtils.createTreeCache(curatorFramework,"/",new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());treeCache.getListenable().addListener((client, event) -> {switch (event.getType()) {case NODE_ADDED:case NODE_UPDATED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleChangedLeaderInformation(event.getData());break;case NODE_REMOVED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleRemovedLeaderInformation(event.getData().getPath());break;}});leaderLatch.addListener(this);curatorFramework.getConnectionStateListenable().addListener(listener);leaderLatch.start();treeCache.start();}
ZooKeeperLeaderElectionDriver.handleStateChange

状态变化时候根据不同状态打印日志

private void handleStateChange(ConnectionState newState) {switch (newState) {case CONNECTED:LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");break;case SUSPENDED:LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");break;case RECONNECTED:LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");break;case LOST:// Maybe we have to throw an exception here to terminate the JobManagerLOG.warn("Connection to ZooKeeper lost. None of the contenders participates in the leader election anymore.");break;}}
DefaultLeaderElectionService.notifyLeaderContenderOfLeadership
    private void notifyLeaderContenderOfLeadership(String componentId, UUID sessionID) {if (!leaderContenderRegistry.containsKey(componentId)) {LOG.debug("The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",sessionID,leaderElectionDriver);return;} else if (!sessionID.equals(issuedLeaderSessionID)) {LOG.debug("An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.",sessionID,issuedLeaderSessionID);return;}Preconditions.checkState(!confirmedLeaderInformation.hasLeaderInformation(componentId),"The leadership should have been granted while not having the leadership acquired.");LOG.debug("Granting leadership to the contender registered under component '{}' with session ID {}.",componentId,issuedLeaderSessionID);leaderContenderRegistry.get(componentId).grantLeadership(issuedLeaderSessionID);}

相关文章:

深度了解flink(十) JobManager(4) ResourceManager HA

ResourceManager&#xff08;ZK模式&#xff09;的高可用启动流程 ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create中 public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor i…...

【万兴科技-注册_登录安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞…...

Android启动流程_Zygote阶段

前言 上一篇文档中我们描述了 Android 启动中的 init 启动部分&#xff0c;本片文档将会继续 Android 启动流程的逻辑&#xff0c;继续梳理 Zygote 部分功能。 说明框架 对于 Zygote 进程&#xff0c;要从以下框架说明&#xff1a; 第一点&#xff0c;编译&#xff0c;zygo…...

2022NOIP比赛总结

种花 1.本题是一道前缀和优化加上枚举的问题。先考虑 C 因为 F 是 C 下边随便加一个点&#xff0c;所以只要求出 C 就求出了 F 。 注意到&#xff0c;并没有要求上下行一样&#xff0c;唯一的要求是 C 的两个横要隔一行&#xff0c;这就是问题的突破点&#xff0c;这题很明显…...

Leetcode 排序链表

这段代码的算法思想是 归并排序&#xff0c;一种适合链表的排序方法。它通过递归地将链表拆分成两部分&#xff0c;分别排序&#xff0c;然后合并已排序的部分&#xff0c;从而达到整体排序的目的。以下是代码的中文解释&#xff1a; 算法步骤&#xff1a; 找到链表的中点&…...

哈希函数简介

哈希函数是一种将任意大小的数据输入&#xff08;通常称为“消息”&#xff09;转换为固定大小的输出&#xff08;称为“哈希值”或“摘要”&#xff09;的算法。 主要特点&#xff1a; 1、输出固定长度 无论输入数据的大小如何&#xff0c;哈希函数的输出总是固定长度。例如…...

nginx------正向代理,反向代理生产,以及能否不使用代理详解

在生产环境中&#xff0c;选择使用正向代理还是反向代理取决于具体的应用场景和需求。下面详细解释这两种代理的用处以及为什么在不同情况下会选择它们。 正向代理 (Forward Proxy) 用途 匿名访问&#xff1a; 隐藏客户端的真实 IP 地址&#xff0c;提供隐私保护。常用于绕过…...

iptables限制docker端口禁止某台主机访问(使用DOCKER链和raw表的PREROUTING链)

背景&#xff1a; 在Linux上docker映射了端口&#xff0c;想着对服务端口进行限制指定IP访问&#xff0c;发现在filter表的INPUT链限制无效 环境&#xff1a; 主机192.168.56.132上的docker容器部署了nginx并将容器80端口映射到主机8000端口 [rootlocalhost ~]# docker ps …...

【VM实战】VMware迁移到VirtualBox

VMware 虚拟机开机卸载VMware Tools 调整虚拟磁盘 对于Windows 10及以上的虚拟机&#xff0c;一般VMware默认都会选Nvme固态硬盘。在导出前必须将其改为SATA&#xff0c;否则VirtualBox导入会报Appliance Import错误 (E_INVALIDARG 0x80070057) 先删掉当前盘的挂载&#xff…...

Android WebView加载不到cookie

以下配置根据需求酌情添加&#xff0c;建议逐个试验&#xff0c;cookie操作不是内存操作&#xff0c;建议修改配置后卸载app再重新运行防止缓存影响测试结果。 1.设置应用程序的 WebView 实例是否应发送并接受 Cookie CookieManager cookieManager CookieManager.getInstanc…...

c++qt

1.显示画布 #include "code.h" #include <QtWidgets/QApplication> #include<iostream> #include<vector> #include <QWindow> #include <QGraphicsView> #include <QGraphicsScene>using namespace std;//1.空格 2.墙 3.入口…...

零跑汽车嵌入式面试题汇总及参考答案

C++ 的三大特性是什么? C++ 的三大特性分别是封装、继承和多态。 封装 概念:封装是把数据和操作数据的函数绑定在一起,对数据的访问进行限制。通过将数据成员声明为私有或保护,只允许通过公共的成员函数来访问和修改数据,从而隐藏了类的内部实现细节。这有助于提高代码的安…...

LC:贪心题解

文章目录 376. 摆动序列 376. 摆动序列 题目链接&#xff1a;https://leetcode.cn/problems/wiggle-subsequence/description/ 这个题目自己首先想到的是动态规划解题&#xff0c;贪心解法真的非常妙&#xff0c;参考下面题解&#xff1a;https://leetcode.cn/problems/wiggle…...

ubuntu交叉编译dbus库给arm平台使用

1.下载dbus库源码 https://www.freedesktop.org/wiki/Software/dbus 克隆源码: https://gitlab.freedesktop.org/dbus/dbus/-/tree/dbus-1.12?ref_type=heads 下载1.12.20版本: 指定pkgconfig环境变量: export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:$PWD/../expat-2.3.…...

ansible开局配置-openEuler

ansible干啥用的就不多介绍了&#xff0c;这篇文章主要在说ansible的安装、开局配置、免密登录。 ansible安装 查看系统版本 cat /etc/openEuler-latest输出内容如下&#xff1a; openeulerversionopenEuler-24.03-LTS compiletime2024-05-27-21-31-28 gccversion12.3.1-30.…...

连锁收银系统的优势与挑战

在快速发展的零售环境中&#xff0c;连锁收银系统不仅是收银的工具&#xff0c;更是现代零售管理的重要组成部分。它在提升效率、优化客户体验以及数据管理等方面发挥了关键作用。然而&#xff0c;随着技术的进步和市场环境的变化&#xff0c;连锁收银系统也面临着诸多挑战。本…...

轻型民用无人驾驶航空器安全操控理论培训知识总结-多旋翼部分

航空器知识 螺旋桨 螺旋桨为多旋翼民用无人驾驶航空器提供升力,多旋翼民用无人驾驶航空器通过飞控系统控制电机调节螺旋桨转速,来实现飞行。 天线 多旋翼民用无人驾驶航空器的图像传输以及遥控控制信号,主要是通过无线信道进行的,靠民用无人驾驶航空器与遥控器的天线传…...

springboot092安康旅游网站的设计与实现(论文+源码)_kaic

毕业设计&#xff08;论文&#xff09; 基于JSP的安康旅游网站的设计与实现 姓  名 学  号 院  系 专  业 指导老师 2021 年 月 教务处制 目 录 目 录 摘 要 Abstract 第一章 绪论 1.1 研究现状 1.2 设…...

优化 Git 管理:提升协作效率的最佳实践20241030

优化 Git 管理&#xff1a;提升协作效率的最佳实践 引言 在现代软件开发中&#xff0c;版本控制系统是确保代码质量和团队协作顺畅的基石。Git 作为最流行的分布式版本控制工具&#xff0c;其灵活性和强大功能使得开发者能够高效地管理项目代码。然而&#xff0c;仅依靠工具本…...

Cocos使用精灵组件显示相机内容

Cocos使用精灵组件显示相机内容 1. 为什么使用精灵渲染 在游戏引擎中&#xff0c;游戏场景内除webview和video外所有的节点都是渲染在Canvas上&#xff0c;这导致了webview和video只能存在于所有节点的最上层或最下层&#xff0c;而这种层级关系会出现节点事件无法正常监听或者…...

CHUWI LarkBox X迷你主机评测:AMD Ryzen 7 3700U性能解析

1. CHUWI LarkBox X迷你主机深度解析&#xff1a;AMD Ryzen 7 3700U的紧凑型性能平台去年在亚马逊淘到CHUWI LarkBox X时&#xff0c;我原本只期待它能替代办公室的瘦客户机。但实际使用三个月后&#xff0c;这台巴掌大的设备彻底改变了我对迷你主机的认知——它不仅能流畅运行…...

5个关键步骤掌握PatreonDownloader:高效批量下载Patreon内容的完整指南

5个关键步骤掌握PatreonDownloader&#xff1a;高效批量下载Patreon内容的完整指南 【免费下载链接】PatreonDownloader Powerful tool for downloading content posted by creators on patreon.com. Supports content hosted on patreon itself as well as external sites (ad…...

猫抓浏览器资源嗅探扩展:三步搞定网页视频音频下载的终极指南

猫抓浏览器资源嗅探扩展&#xff1a;三步搞定网页视频音频下载的终极指南 【免费下载链接】cat-catch 猫抓 浏览器资源嗅探扩展 / cat-catch Browser Resource Sniffing Extension 项目地址: https://gitcode.com/GitHub_Trending/ca/cat-catch 你是否曾经在浏览网页时遇…...

Windows远程桌面多用户访问的终极解决方案:RDPWrap完全指南

Windows远程桌面多用户访问的终极解决方案&#xff1a;RDPWrap完全指南 【免费下载链接】rdpwrap RDP Wrapper Library 项目地址: https://gitcode.com/gh_mirrors/rd/rdpwrap 你是否曾经遇到过这样的困境&#xff1a;在家里有多台设备需要访问同一台Windows电脑&#x…...

【国家级智慧农场认证技术栈】:基于Python的土壤墒情、作物长势、微气候三源数据动态加权融合算法

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;【国家级智慧农场认证技术栈】&#xff1a;基于Python的土壤墒情、作物长势、微气候三源数据动态加权融合算法 多源异构数据协同建模原理 该算法面向农业农村部《智慧农业示范场建设指南&#xff08;2…...

【C语言Modbus调试黄金法则】:20年嵌入式老兵亲授5大必踩坑点与实时避坑指南

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Modbus协议核心机制与C语言实现本质 Modbus 是一种串行通信协议&#xff0c;广泛应用于工业自动化领域&#xff0c;其设计简洁、无状态、主从架构明确。协议本质基于功能码&#xff08;Function Code&a…...

【深度解析】从 Claude Jupiter 到 ARC-AGI 3:大模型发布信号、评测体系与多模型工程接入实践

摘要 本文围绕近期 AI 模型动态&#xff0c;解析 Claude Jupiter、Codex 工作流、ARC-AGI 3 基准与多模态智能体趋势&#xff0c;并给出 OpenAI 兼容 API 的 Python 实战接入方案。背景介绍&#xff1a;AI 模型迭代进入“高频发布 工程化竞争”阶段 近期 AI 领域出现了多个值得…...

3分钟彻底告别百度网盘密码搜索:智能提取码工具带来的效率革命

3分钟彻底告别百度网盘密码搜索&#xff1a;智能提取码工具带来的效率革命 【免费下载链接】baidupankey 项目地址: https://gitcode.com/gh_mirrors/ba/baidupankey 想象一下这样的场景&#xff1a;你在学习群中看到一个宝贵的课程资源链接&#xff0c;点击后却遇到熟…...

LGSideMenuController与UINavigationController完美整合指南

LGSideMenuController与UINavigationController完美整合指南 【免费下载链接】LGSideMenuController iOS view controller which manages left and right side views 项目地址: https://gitcode.com/gh_mirrors/lg/LGSideMenuController LGSideMenuController是一款功能…...

Modbus RTU通信总失败?3步定位C语言底层寄存器配置错误(附可复用调试模板)

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Modbus RTU通信失败的典型现象与排查共识 常见故障表征 Modbus RTU通信中断时&#xff0c;上位机常表现为超时错误&#xff08;如“No response from slave”&#xff09;、CRC校验失败&#xff08;0x…...