深度了解flink(十) JobManager(4) ResourceManager HA
ResourceManager(ZK模式)的高可用启动流程
ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create中
public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor ioExecutor,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,BlobServer blobServer,HeartbeatServices heartbeatServices,DelegationTokenManager delegationTokenManager,MetricRegistry metricRegistry,ExecutionGraphInfoStore executionGraphInfoStore,MetricQueryServiceRetriever metricQueryServiceRetriever,Collection<FailureEnricher> failureEnrichers,FatalErrorHandler fatalErrorHandler)throws Exception {//resourcemanager的选举服务LeaderRetrievalService resourceManagerRetrievalService = null;ResourceManagerService resourceManagerService = null;try {//resourceManager leader 获取服务resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();//LeaderGatewayRetriever实现了LeaderRetrievalListener接口final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =new RpcGatewayRetriever<>(rpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);//启动resourceManagerService start方法会调用具体的选举方法resourceManagerService.start();//启动etrievalService服务的start方法,resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);return new DispatcherResourceManagerComponent(dispatcherRunner,resourceManagerService,dispatcherLeaderRetrievalService,resourceManagerRetrievalService,webMonitorEndpoint,fatalErrorHandler,dispatcherOperationCaches);} catch (Exception exception) {//省略无关代码}}
初始化LeaderRetrievalServer
resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();
getResourceManagerLeaderRetriever会调用父类AbstractHaServices的这个方法
@Overridepublic LeaderRetrievalService getResourceManagerLeaderRetriever() {return createLeaderRetrievalService(getLeaderPathForResourceManager());}
createLeaderRetrievalService会调用具体子类的方法,zk模式下会走到ZooKeeperLeaderElectionHaServices下
@Overrideprotected LeaderRetrievalService createLeaderRetrievalService(String componentId) {// Maybe use a single service for leader retrievalreturn ZooKeeperUtils.createLeaderRetrievalService(curatorFrameworkWrapper.asCuratorFramework(),ZooKeeperUtils.getLeaderPath(componentId),configuration);}
继续方法跟进
public static DefaultLeaderRetrievalService createLeaderRetrievalService(final CuratorFramework client, final String path, final Configuration configuration) {return new DefaultLeaderRetrievalService(createLeaderRetrievalDriverFactory(client, path, configuration));
最终调用DefaultLeaderRetrievalService的构造方法进行初始化,入参是一个LeaderRetrievalDriverFactory接口,zk模式下是ZooKeeperLeaderRetrievalDriverFactory
初始化LeaderRetrievalListener
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =new RpcGatewayRetriever<>(rpcService,DispatcherGateway.class,DispatcherId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));
RpcGatewayRetrieverUML类图如下

LeaderGatewayRetriever#notifyNewLeaderAddress对LeaderRetrievalListener#notifyLeaderAddress做了实现
@Overridepublic void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);final CompletableFuture<T> oldGatewayFuture =atomicGatewayFuture.getAndSet(newGatewayFuture);newGatewayFuture.whenComplete((t, throwable) -> {if (throwable != null) {oldGatewayFuture.completeExceptionally(throwable);} else {oldGatewayFuture.complete(t);}});}
创建ResourceManagerService
resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);
ResourceManagerService负责管理ResourceManager的生命周期,也负责haServer的启动
创建LeaderElection
private ResourceManagerServiceImpl(ResourceManagerFactory<?> resourceManagerFactory,ResourceManagerProcessContext rmProcessContext)throws Exception {//省略其他方法//LeaderElection进行了初始化this.leaderElection =rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElection();
LeaderElection选举
resourceManagerService.start();
跳转到start方法
public void start() throws Exception {synchronized (lock) {if (running) {LOG.debug("Resource manager service has already started.");return;}running = true;}LOG.info("Starting resource manager service.");leaderElection.startLeaderElection(this);}
leaderElection.startLeaderElection(this)zk模式下走的是DefaultLeaderElection的方法,
@Overridepublic void startLeaderElection(LeaderContender contender) throws Exception {Preconditions.checkNotNull(contender);parentService.register(componentId, contender);}
开始对参选者进行注册
protected void register(String componentId, LeaderContender contender) throws Exception {checkNotNull(componentId, "componentId must not be null.");checkNotNull(contender, "Contender must not be null.");synchronized (lock) {if (leaderElectionDriver == null) {createLeaderElectionDriver();}//省略无关代码}
register会判断选举的driver是否存在,如果不存在,则根据高可用的模式进行选举驱动的创建
@Overridepublic ZooKeeperLeaderElectionDriver create(LeaderElectionDriver.Listener leaderElectionListener) throws Exception {return new ZooKeeperLeaderElectionDriver(curatorFramework, leaderElectionListener);}
ZooKeeperLeaderElectionDriver初始化
public ZooKeeperLeaderElectionDriver(CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)throws Exception {//参数校验this.curatorFramework = Preconditions.checkNotNull(curatorFramework);this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);//使用ZooKeeperUtils.generateLeaderLatchPath方法基于curatorFramework的命名空间生成一个ZooKeeper节点路径,该路径通常用于领导者选举的锁。this.leaderLatchPath =ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());//使用CuratorFramework和之前生成的路径创建一个LeaderLatch实例。LeaderLatch是Curator提供的一个领导者选举实现。 this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());//使用ZooKeeperUtils.createTreeCache方法创建一个TreeCache实例,用于监听ZooKeeper中特定路径(这里是根路径"/")下的节点变化this.treeCache =ZooKeeperUtils.createTreeCache(curatorFramework,"/",new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());treeCache.getListenable().addListener((client, event) -> {switch (event.getType()) {case NODE_ADDED:case NODE_UPDATED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleChangedLeaderInformation(event.getData());break;case NODE_REMOVED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleRemovedLeaderInformation(event.getData().getPath());break;}});leaderLatch.addListener(this);curatorFramework.getConnectionStateListenable().addListener(listener);leaderLatch.start();treeCache.start();}
ZooKeeperLeaderElectionDriver.handleStateChange
状态变化时候根据不同状态打印日志
private void handleStateChange(ConnectionState newState) {switch (newState) {case CONNECTED:LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");break;case SUSPENDED:LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");break;case RECONNECTED:LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");break;case LOST:// Maybe we have to throw an exception here to terminate the JobManagerLOG.warn("Connection to ZooKeeper lost. None of the contenders participates in the leader election anymore.");break;}}
DefaultLeaderElectionService.notifyLeaderContenderOfLeadership
private void notifyLeaderContenderOfLeadership(String componentId, UUID sessionID) {if (!leaderContenderRegistry.containsKey(componentId)) {LOG.debug("The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",sessionID,leaderElectionDriver);return;} else if (!sessionID.equals(issuedLeaderSessionID)) {LOG.debug("An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.",sessionID,issuedLeaderSessionID);return;}Preconditions.checkState(!confirmedLeaderInformation.hasLeaderInformation(componentId),"The leadership should have been granted while not having the leadership acquired.");LOG.debug("Granting leadership to the contender registered under component '{}' with session ID {}.",componentId,issuedLeaderSessionID);leaderContenderRegistry.get(componentId).grantLeadership(issuedLeaderSessionID);}相关文章:
深度了解flink(十) JobManager(4) ResourceManager HA
ResourceManager(ZK模式)的高可用启动流程 ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create中 public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor i…...
【万兴科技-注册_登录安全分析报告】
前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…...
Android启动流程_Zygote阶段
前言 上一篇文档中我们描述了 Android 启动中的 init 启动部分,本片文档将会继续 Android 启动流程的逻辑,继续梳理 Zygote 部分功能。 说明框架 对于 Zygote 进程,要从以下框架说明: 第一点,编译,zygo…...
2022NOIP比赛总结
种花 1.本题是一道前缀和优化加上枚举的问题。先考虑 C 因为 F 是 C 下边随便加一个点,所以只要求出 C 就求出了 F 。 注意到,并没有要求上下行一样,唯一的要求是 C 的两个横要隔一行,这就是问题的突破点,这题很明显…...
Leetcode 排序链表
这段代码的算法思想是 归并排序,一种适合链表的排序方法。它通过递归地将链表拆分成两部分,分别排序,然后合并已排序的部分,从而达到整体排序的目的。以下是代码的中文解释: 算法步骤: 找到链表的中点&…...
哈希函数简介
哈希函数是一种将任意大小的数据输入(通常称为“消息”)转换为固定大小的输出(称为“哈希值”或“摘要”)的算法。 主要特点: 1、输出固定长度 无论输入数据的大小如何,哈希函数的输出总是固定长度。例如…...
nginx------正向代理,反向代理生产,以及能否不使用代理详解
在生产环境中,选择使用正向代理还是反向代理取决于具体的应用场景和需求。下面详细解释这两种代理的用处以及为什么在不同情况下会选择它们。 正向代理 (Forward Proxy) 用途 匿名访问: 隐藏客户端的真实 IP 地址,提供隐私保护。常用于绕过…...
iptables限制docker端口禁止某台主机访问(使用DOCKER链和raw表的PREROUTING链)
背景: 在Linux上docker映射了端口,想着对服务端口进行限制指定IP访问,发现在filter表的INPUT链限制无效 环境: 主机192.168.56.132上的docker容器部署了nginx并将容器80端口映射到主机8000端口 [rootlocalhost ~]# docker ps …...
【VM实战】VMware迁移到VirtualBox
VMware 虚拟机开机卸载VMware Tools 调整虚拟磁盘 对于Windows 10及以上的虚拟机,一般VMware默认都会选Nvme固态硬盘。在导出前必须将其改为SATA,否则VirtualBox导入会报Appliance Import错误 (E_INVALIDARG 0x80070057) 先删掉当前盘的挂载ÿ…...
Android WebView加载不到cookie
以下配置根据需求酌情添加,建议逐个试验,cookie操作不是内存操作,建议修改配置后卸载app再重新运行防止缓存影响测试结果。 1.设置应用程序的 WebView 实例是否应发送并接受 Cookie CookieManager cookieManager CookieManager.getInstanc…...
c++qt
1.显示画布 #include "code.h" #include <QtWidgets/QApplication> #include<iostream> #include<vector> #include <QWindow> #include <QGraphicsView> #include <QGraphicsScene>using namespace std;//1.空格 2.墙 3.入口…...
零跑汽车嵌入式面试题汇总及参考答案
C++ 的三大特性是什么? C++ 的三大特性分别是封装、继承和多态。 封装 概念:封装是把数据和操作数据的函数绑定在一起,对数据的访问进行限制。通过将数据成员声明为私有或保护,只允许通过公共的成员函数来访问和修改数据,从而隐藏了类的内部实现细节。这有助于提高代码的安…...
LC:贪心题解
文章目录 376. 摆动序列 376. 摆动序列 题目链接:https://leetcode.cn/problems/wiggle-subsequence/description/ 这个题目自己首先想到的是动态规划解题,贪心解法真的非常妙,参考下面题解:https://leetcode.cn/problems/wiggle…...
ubuntu交叉编译dbus库给arm平台使用
1.下载dbus库源码 https://www.freedesktop.org/wiki/Software/dbus 克隆源码: https://gitlab.freedesktop.org/dbus/dbus/-/tree/dbus-1.12?ref_type=heads 下载1.12.20版本: 指定pkgconfig环境变量: export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:$PWD/../expat-2.3.…...
ansible开局配置-openEuler
ansible干啥用的就不多介绍了,这篇文章主要在说ansible的安装、开局配置、免密登录。 ansible安装 查看系统版本 cat /etc/openEuler-latest输出内容如下: openeulerversionopenEuler-24.03-LTS compiletime2024-05-27-21-31-28 gccversion12.3.1-30.…...
连锁收银系统的优势与挑战
在快速发展的零售环境中,连锁收银系统不仅是收银的工具,更是现代零售管理的重要组成部分。它在提升效率、优化客户体验以及数据管理等方面发挥了关键作用。然而,随着技术的进步和市场环境的变化,连锁收银系统也面临着诸多挑战。本…...
轻型民用无人驾驶航空器安全操控理论培训知识总结-多旋翼部分
航空器知识 螺旋桨 螺旋桨为多旋翼民用无人驾驶航空器提供升力,多旋翼民用无人驾驶航空器通过飞控系统控制电机调节螺旋桨转速,来实现飞行。 天线 多旋翼民用无人驾驶航空器的图像传输以及遥控控制信号,主要是通过无线信道进行的,靠民用无人驾驶航空器与遥控器的天线传…...
springboot092安康旅游网站的设计与实现(论文+源码)_kaic
毕业设计(论文) 基于JSP的安康旅游网站的设计与实现 姓 名 学 号 院 系 专 业 指导老师 2021 年 月 教务处制 目 录 目 录 摘 要 Abstract 第一章 绪论 1.1 研究现状 1.2 设…...
优化 Git 管理:提升协作效率的最佳实践20241030
优化 Git 管理:提升协作效率的最佳实践 引言 在现代软件开发中,版本控制系统是确保代码质量和团队协作顺畅的基石。Git 作为最流行的分布式版本控制工具,其灵活性和强大功能使得开发者能够高效地管理项目代码。然而,仅依靠工具本…...
Cocos使用精灵组件显示相机内容
Cocos使用精灵组件显示相机内容 1. 为什么使用精灵渲染 在游戏引擎中,游戏场景内除webview和video外所有的节点都是渲染在Canvas上,这导致了webview和video只能存在于所有节点的最上层或最下层,而这种层级关系会出现节点事件无法正常监听或者…...
19c补丁后oracle属主变化,导致不能识别磁盘组
补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...
C++实现分布式网络通信框架RPC(3)--rpc调用端
目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中,我们已经大致实现了rpc服务端的各项功能代…...
令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...
项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能
1. 开发环境准备 安装DevEco Studio 3.1: 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK 项目配置: // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...
