当前位置: 首页 > news >正文

深度了解flink(十) JobManager(4) ResourceManager HA

ResourceManager(ZK模式)的高可用启动流程

ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create

public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor ioExecutor,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,BlobServer blobServer,HeartbeatServices heartbeatServices,DelegationTokenManager delegationTokenManager,MetricRegistry metricRegistry,ExecutionGraphInfoStore executionGraphInfoStore,MetricQueryServiceRetriever metricQueryServiceRetriever,Collection<FailureEnricher> failureEnrichers,FatalErrorHandler fatalErrorHandler)throws Exception {//resourcemanager的选举服务LeaderRetrievalService resourceManagerRetrievalService = null;ResourceManagerService resourceManagerService = null;try {//resourceManager leader 获取服务resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();//LeaderGatewayRetriever实现了LeaderRetrievalListener接口final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =new RpcGatewayRetriever<>(rpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);//启动resourceManagerService start方法会调用具体的选举方法resourceManagerService.start();//启动etrievalService服务的start方法,resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);return new DispatcherResourceManagerComponent(dispatcherRunner,resourceManagerService,dispatcherLeaderRetrievalService,resourceManagerRetrievalService,webMonitorEndpoint,fatalErrorHandler,dispatcherOperationCaches);} catch (Exception exception) {//省略无关代码}}

初始化LeaderRetrievalServer

resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();

getResourceManagerLeaderRetriever会调用父类AbstractHaServices的这个方法

 @Overridepublic LeaderRetrievalService getResourceManagerLeaderRetriever() {return createLeaderRetrievalService(getLeaderPathForResourceManager());}

createLeaderRetrievalService会调用具体子类的方法,zk模式下会走到ZooKeeperLeaderElectionHaServices

    @Overrideprotected LeaderRetrievalService createLeaderRetrievalService(String componentId) {// Maybe use a single service for leader retrievalreturn ZooKeeperUtils.createLeaderRetrievalService(curatorFrameworkWrapper.asCuratorFramework(),ZooKeeperUtils.getLeaderPath(componentId),configuration);}

继续方法跟进

public static DefaultLeaderRetrievalService createLeaderRetrievalService(final CuratorFramework client, final String path, final Configuration configuration) {return new DefaultLeaderRetrievalService(createLeaderRetrievalDriverFactory(client, path, configuration));

最终调用DefaultLeaderRetrievalService的构造方法进行初始化,入参是一个LeaderRetrievalDriverFactory接口,zk模式下是ZooKeeperLeaderRetrievalDriverFactory

初始化LeaderRetrievalListener

            final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =new RpcGatewayRetriever<>(rpcService,DispatcherGateway.class,DispatcherId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));

RpcGatewayRetrieverUML类图如下

LeaderGatewayRetriever#notifyNewLeaderAddressLeaderRetrievalListener#notifyLeaderAddress做了实现

    @Overridepublic void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);final CompletableFuture<T> oldGatewayFuture =atomicGatewayFuture.getAndSet(newGatewayFuture);newGatewayFuture.whenComplete((t, throwable) -> {if (throwable != null) {oldGatewayFuture.completeExceptionally(throwable);} else {oldGatewayFuture.complete(t);}});}

创建ResourceManagerService

 resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);

ResourceManagerService负责管理ResourceManager的生命周期,也负责haServer的启动

创建LeaderElection
    private ResourceManagerServiceImpl(ResourceManagerFactory<?> resourceManagerFactory,ResourceManagerProcessContext rmProcessContext)throws Exception {//省略其他方法//LeaderElection进行了初始化this.leaderElection =rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElection();
LeaderElection选举
 resourceManagerService.start();

跳转到start方法

  public void start() throws Exception {synchronized (lock) {if (running) {LOG.debug("Resource manager service has already started.");return;}running = true;}LOG.info("Starting resource manager service.");leaderElection.startLeaderElection(this);}

leaderElection.startLeaderElection(this)zk模式下走的是DefaultLeaderElection的方法,

    @Overridepublic void startLeaderElection(LeaderContender contender) throws Exception {Preconditions.checkNotNull(contender);parentService.register(componentId, contender);}

开始对参选者进行注册

protected void register(String componentId, LeaderContender contender) throws Exception {checkNotNull(componentId, "componentId must not be null.");checkNotNull(contender, "Contender must not be null.");synchronized (lock) {if (leaderElectionDriver == null) {createLeaderElectionDriver();}//省略无关代码}

register会判断选举的driver是否存在,如果不存在,则根据高可用的模式进行选举驱动的创建

 @Overridepublic ZooKeeperLeaderElectionDriver create(LeaderElectionDriver.Listener leaderElectionListener) throws Exception {return new ZooKeeperLeaderElectionDriver(curatorFramework, leaderElectionListener);}
ZooKeeperLeaderElectionDriver初始化
public ZooKeeperLeaderElectionDriver(CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)throws Exception {//参数校验this.curatorFramework = Preconditions.checkNotNull(curatorFramework);this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);//使用ZooKeeperUtils.generateLeaderLatchPath方法基于curatorFramework的命名空间生成一个ZooKeeper节点路径,该路径通常用于领导者选举的锁。this.leaderLatchPath =ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());//使用CuratorFramework和之前生成的路径创建一个LeaderLatch实例。LeaderLatch是Curator提供的一个领导者选举实现。    this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());//使用ZooKeeperUtils.createTreeCache方法创建一个TreeCache实例,用于监听ZooKeeper中特定路径(这里是根路径"/")下的节点变化this.treeCache =ZooKeeperUtils.createTreeCache(curatorFramework,"/",new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());treeCache.getListenable().addListener((client, event) -> {switch (event.getType()) {case NODE_ADDED:case NODE_UPDATED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleChangedLeaderInformation(event.getData());break;case NODE_REMOVED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleRemovedLeaderInformation(event.getData().getPath());break;}});leaderLatch.addListener(this);curatorFramework.getConnectionStateListenable().addListener(listener);leaderLatch.start();treeCache.start();}
ZooKeeperLeaderElectionDriver.handleStateChange

状态变化时候根据不同状态打印日志

private void handleStateChange(ConnectionState newState) {switch (newState) {case CONNECTED:LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");break;case SUSPENDED:LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");break;case RECONNECTED:LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");break;case LOST:// Maybe we have to throw an exception here to terminate the JobManagerLOG.warn("Connection to ZooKeeper lost. None of the contenders participates in the leader election anymore.");break;}}
DefaultLeaderElectionService.notifyLeaderContenderOfLeadership
    private void notifyLeaderContenderOfLeadership(String componentId, UUID sessionID) {if (!leaderContenderRegistry.containsKey(componentId)) {LOG.debug("The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",sessionID,leaderElectionDriver);return;} else if (!sessionID.equals(issuedLeaderSessionID)) {LOG.debug("An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.",sessionID,issuedLeaderSessionID);return;}Preconditions.checkState(!confirmedLeaderInformation.hasLeaderInformation(componentId),"The leadership should have been granted while not having the leadership acquired.");LOG.debug("Granting leadership to the contender registered under component '{}' with session ID {}.",componentId,issuedLeaderSessionID);leaderContenderRegistry.get(componentId).grantLeadership(issuedLeaderSessionID);}

相关文章:

深度了解flink(十) JobManager(4) ResourceManager HA

ResourceManager&#xff08;ZK模式&#xff09;的高可用启动流程 ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create中 public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor i…...

【万兴科技-注册_登录安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞…...

Android启动流程_Zygote阶段

前言 上一篇文档中我们描述了 Android 启动中的 init 启动部分&#xff0c;本片文档将会继续 Android 启动流程的逻辑&#xff0c;继续梳理 Zygote 部分功能。 说明框架 对于 Zygote 进程&#xff0c;要从以下框架说明&#xff1a; 第一点&#xff0c;编译&#xff0c;zygo…...

2022NOIP比赛总结

种花 1.本题是一道前缀和优化加上枚举的问题。先考虑 C 因为 F 是 C 下边随便加一个点&#xff0c;所以只要求出 C 就求出了 F 。 注意到&#xff0c;并没有要求上下行一样&#xff0c;唯一的要求是 C 的两个横要隔一行&#xff0c;这就是问题的突破点&#xff0c;这题很明显…...

Leetcode 排序链表

这段代码的算法思想是 归并排序&#xff0c;一种适合链表的排序方法。它通过递归地将链表拆分成两部分&#xff0c;分别排序&#xff0c;然后合并已排序的部分&#xff0c;从而达到整体排序的目的。以下是代码的中文解释&#xff1a; 算法步骤&#xff1a; 找到链表的中点&…...

哈希函数简介

哈希函数是一种将任意大小的数据输入&#xff08;通常称为“消息”&#xff09;转换为固定大小的输出&#xff08;称为“哈希值”或“摘要”&#xff09;的算法。 主要特点&#xff1a; 1、输出固定长度 无论输入数据的大小如何&#xff0c;哈希函数的输出总是固定长度。例如…...

nginx------正向代理,反向代理生产,以及能否不使用代理详解

在生产环境中&#xff0c;选择使用正向代理还是反向代理取决于具体的应用场景和需求。下面详细解释这两种代理的用处以及为什么在不同情况下会选择它们。 正向代理 (Forward Proxy) 用途 匿名访问&#xff1a; 隐藏客户端的真实 IP 地址&#xff0c;提供隐私保护。常用于绕过…...

iptables限制docker端口禁止某台主机访问(使用DOCKER链和raw表的PREROUTING链)

背景&#xff1a; 在Linux上docker映射了端口&#xff0c;想着对服务端口进行限制指定IP访问&#xff0c;发现在filter表的INPUT链限制无效 环境&#xff1a; 主机192.168.56.132上的docker容器部署了nginx并将容器80端口映射到主机8000端口 [rootlocalhost ~]# docker ps …...

【VM实战】VMware迁移到VirtualBox

VMware 虚拟机开机卸载VMware Tools 调整虚拟磁盘 对于Windows 10及以上的虚拟机&#xff0c;一般VMware默认都会选Nvme固态硬盘。在导出前必须将其改为SATA&#xff0c;否则VirtualBox导入会报Appliance Import错误 (E_INVALIDARG 0x80070057) 先删掉当前盘的挂载&#xff…...

Android WebView加载不到cookie

以下配置根据需求酌情添加&#xff0c;建议逐个试验&#xff0c;cookie操作不是内存操作&#xff0c;建议修改配置后卸载app再重新运行防止缓存影响测试结果。 1.设置应用程序的 WebView 实例是否应发送并接受 Cookie CookieManager cookieManager CookieManager.getInstanc…...

c++qt

1.显示画布 #include "code.h" #include <QtWidgets/QApplication> #include<iostream> #include<vector> #include <QWindow> #include <QGraphicsView> #include <QGraphicsScene>using namespace std;//1.空格 2.墙 3.入口…...

零跑汽车嵌入式面试题汇总及参考答案

C++ 的三大特性是什么? C++ 的三大特性分别是封装、继承和多态。 封装 概念:封装是把数据和操作数据的函数绑定在一起,对数据的访问进行限制。通过将数据成员声明为私有或保护,只允许通过公共的成员函数来访问和修改数据,从而隐藏了类的内部实现细节。这有助于提高代码的安…...

LC:贪心题解

文章目录 376. 摆动序列 376. 摆动序列 题目链接&#xff1a;https://leetcode.cn/problems/wiggle-subsequence/description/ 这个题目自己首先想到的是动态规划解题&#xff0c;贪心解法真的非常妙&#xff0c;参考下面题解&#xff1a;https://leetcode.cn/problems/wiggle…...

ubuntu交叉编译dbus库给arm平台使用

1.下载dbus库源码 https://www.freedesktop.org/wiki/Software/dbus 克隆源码: https://gitlab.freedesktop.org/dbus/dbus/-/tree/dbus-1.12?ref_type=heads 下载1.12.20版本: 指定pkgconfig环境变量: export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:$PWD/../expat-2.3.…...

ansible开局配置-openEuler

ansible干啥用的就不多介绍了&#xff0c;这篇文章主要在说ansible的安装、开局配置、免密登录。 ansible安装 查看系统版本 cat /etc/openEuler-latest输出内容如下&#xff1a; openeulerversionopenEuler-24.03-LTS compiletime2024-05-27-21-31-28 gccversion12.3.1-30.…...

连锁收银系统的优势与挑战

在快速发展的零售环境中&#xff0c;连锁收银系统不仅是收银的工具&#xff0c;更是现代零售管理的重要组成部分。它在提升效率、优化客户体验以及数据管理等方面发挥了关键作用。然而&#xff0c;随着技术的进步和市场环境的变化&#xff0c;连锁收银系统也面临着诸多挑战。本…...

轻型民用无人驾驶航空器安全操控理论培训知识总结-多旋翼部分

航空器知识 螺旋桨 螺旋桨为多旋翼民用无人驾驶航空器提供升力,多旋翼民用无人驾驶航空器通过飞控系统控制电机调节螺旋桨转速,来实现飞行。 天线 多旋翼民用无人驾驶航空器的图像传输以及遥控控制信号,主要是通过无线信道进行的,靠民用无人驾驶航空器与遥控器的天线传…...

springboot092安康旅游网站的设计与实现(论文+源码)_kaic

毕业设计&#xff08;论文&#xff09; 基于JSP的安康旅游网站的设计与实现 姓  名 学  号 院  系 专  业 指导老师 2021 年 月 教务处制 目 录 目 录 摘 要 Abstract 第一章 绪论 1.1 研究现状 1.2 设…...

优化 Git 管理:提升协作效率的最佳实践20241030

优化 Git 管理&#xff1a;提升协作效率的最佳实践 引言 在现代软件开发中&#xff0c;版本控制系统是确保代码质量和团队协作顺畅的基石。Git 作为最流行的分布式版本控制工具&#xff0c;其灵活性和强大功能使得开发者能够高效地管理项目代码。然而&#xff0c;仅依靠工具本…...

Cocos使用精灵组件显示相机内容

Cocos使用精灵组件显示相机内容 1. 为什么使用精灵渲染 在游戏引擎中&#xff0c;游戏场景内除webview和video外所有的节点都是渲染在Canvas上&#xff0c;这导致了webview和video只能存在于所有节点的最上层或最下层&#xff0c;而这种层级关系会出现节点事件无法正常监听或者…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

Spring Boot面试题精选汇总

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

实现弹窗随键盘上移居中

实现弹窗随键盘上移的核心思路 在Android中&#xff0c;可以通过监听键盘的显示和隐藏事件&#xff0c;动态调整弹窗的位置。关键点在于获取键盘高度&#xff0c;并计算剩余屏幕空间以重新定位弹窗。 // 在Activity或Fragment中设置键盘监听 val rootView findViewById<V…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

安卓基础(aar)

重新设置java21的环境&#xff0c;临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的&#xff1a; MyApp/ ├── app/ …...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...