深度了解flink(十) JobManager(4) ResourceManager HA
ResourceManager(ZK模式)的高可用启动流程
ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create中
public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor ioExecutor,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,BlobServer blobServer,HeartbeatServices heartbeatServices,DelegationTokenManager delegationTokenManager,MetricRegistry metricRegistry,ExecutionGraphInfoStore executionGraphInfoStore,MetricQueryServiceRetriever metricQueryServiceRetriever,Collection<FailureEnricher> failureEnrichers,FatalErrorHandler fatalErrorHandler)throws Exception {//resourcemanager的选举服务LeaderRetrievalService resourceManagerRetrievalService = null;ResourceManagerService resourceManagerService = null;try {//resourceManager leader 获取服务resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();//LeaderGatewayRetriever实现了LeaderRetrievalListener接口final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever =new RpcGatewayRetriever<>(rpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);//启动resourceManagerService start方法会调用具体的选举方法resourceManagerService.start();//启动etrievalService服务的start方法,resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);return new DispatcherResourceManagerComponent(dispatcherRunner,resourceManagerService,dispatcherLeaderRetrievalService,resourceManagerRetrievalService,webMonitorEndpoint,fatalErrorHandler,dispatcherOperationCaches);} catch (Exception exception) {//省略无关代码}}
初始化LeaderRetrievalServer
resourceManagerRetrievalService =highAvailabilityServices.getResourceManagerLeaderRetriever();
getResourceManagerLeaderRetriever会调用父类AbstractHaServices的这个方法
@Overridepublic LeaderRetrievalService getResourceManagerLeaderRetriever() {return createLeaderRetrievalService(getLeaderPathForResourceManager());}
createLeaderRetrievalService会调用具体子类的方法,zk模式下会走到ZooKeeperLeaderElectionHaServices下
@Overrideprotected LeaderRetrievalService createLeaderRetrievalService(String componentId) {// Maybe use a single service for leader retrievalreturn ZooKeeperUtils.createLeaderRetrievalService(curatorFrameworkWrapper.asCuratorFramework(),ZooKeeperUtils.getLeaderPath(componentId),configuration);}
继续方法跟进
public static DefaultLeaderRetrievalService createLeaderRetrievalService(final CuratorFramework client, final String path, final Configuration configuration) {return new DefaultLeaderRetrievalService(createLeaderRetrievalDriverFactory(client, path, configuration));
最终调用DefaultLeaderRetrievalService的构造方法进行初始化,入参是一个LeaderRetrievalDriverFactory接口,zk模式下是ZooKeeperLeaderRetrievalDriverFactory
初始化LeaderRetrievalListener
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =new RpcGatewayRetriever<>(rpcService,DispatcherGateway.class,DispatcherId::fromUuid,new ExponentialBackoffRetryStrategy(12, Duration.ofMillis(10), Duration.ofMillis(50)));
RpcGatewayRetrieverUML类图如下

LeaderGatewayRetriever#notifyNewLeaderAddress对LeaderRetrievalListener#notifyLeaderAddress做了实现
@Overridepublic void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);final CompletableFuture<T> oldGatewayFuture =atomicGatewayFuture.getAndSet(newGatewayFuture);newGatewayFuture.whenComplete((t, throwable) -> {if (throwable != null) {oldGatewayFuture.completeExceptionally(throwable);} else {oldGatewayFuture.complete(t);}});}
创建ResourceManagerService
resourceManagerService =ResourceManagerServiceImpl.create(resourceManagerFactory,configuration,resourceId,rpcService,highAvailabilityServices,heartbeatServices,delegationTokenManager,fatalErrorHandler,new ClusterInformation(hostname, blobServer.getPort()),webMonitorEndpoint.getRestBaseUrl(),metricRegistry,hostname,ioExecutor);
ResourceManagerService负责管理ResourceManager的生命周期,也负责haServer的启动
创建LeaderElection
private ResourceManagerServiceImpl(ResourceManagerFactory<?> resourceManagerFactory,ResourceManagerProcessContext rmProcessContext)throws Exception {//省略其他方法//LeaderElection进行了初始化this.leaderElection =rmProcessContext.getHighAvailabilityServices().getResourceManagerLeaderElection();
LeaderElection选举
resourceManagerService.start();
跳转到start方法
public void start() throws Exception {synchronized (lock) {if (running) {LOG.debug("Resource manager service has already started.");return;}running = true;}LOG.info("Starting resource manager service.");leaderElection.startLeaderElection(this);}
leaderElection.startLeaderElection(this)zk模式下走的是DefaultLeaderElection的方法,
@Overridepublic void startLeaderElection(LeaderContender contender) throws Exception {Preconditions.checkNotNull(contender);parentService.register(componentId, contender);}
开始对参选者进行注册
protected void register(String componentId, LeaderContender contender) throws Exception {checkNotNull(componentId, "componentId must not be null.");checkNotNull(contender, "Contender must not be null.");synchronized (lock) {if (leaderElectionDriver == null) {createLeaderElectionDriver();}//省略无关代码}
register会判断选举的driver是否存在,如果不存在,则根据高可用的模式进行选举驱动的创建
@Overridepublic ZooKeeperLeaderElectionDriver create(LeaderElectionDriver.Listener leaderElectionListener) throws Exception {return new ZooKeeperLeaderElectionDriver(curatorFramework, leaderElectionListener);}
ZooKeeperLeaderElectionDriver初始化
public ZooKeeperLeaderElectionDriver(CuratorFramework curatorFramework, LeaderElectionDriver.Listener leaderElectionListener)throws Exception {//参数校验this.curatorFramework = Preconditions.checkNotNull(curatorFramework);this.leaderElectionListener = Preconditions.checkNotNull(leaderElectionListener);//使用ZooKeeperUtils.generateLeaderLatchPath方法基于curatorFramework的命名空间生成一个ZooKeeper节点路径,该路径通常用于领导者选举的锁。this.leaderLatchPath =ZooKeeperUtils.generateLeaderLatchPath(curatorFramework.getNamespace());//使用CuratorFramework和之前生成的路径创建一个LeaderLatch实例。LeaderLatch是Curator提供的一个领导者选举实现。 this.leaderLatch = new LeaderLatch(curatorFramework, ZooKeeperUtils.getLeaderLatchPath());//使用ZooKeeperUtils.createTreeCache方法创建一个TreeCache实例,用于监听ZooKeeper中特定路径(这里是根路径"/")下的节点变化this.treeCache =ZooKeeperUtils.createTreeCache(curatorFramework,"/",new ZooKeeperLeaderElectionDriver.ConnectionInfoNodeSelector());treeCache.getListenable().addListener((client, event) -> {switch (event.getType()) {case NODE_ADDED:case NODE_UPDATED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleChangedLeaderInformation(event.getData());break;case NODE_REMOVED:Preconditions.checkNotNull(event.getData(),"The ZooKeeper event data must not be null.");handleRemovedLeaderInformation(event.getData().getPath());break;}});leaderLatch.addListener(this);curatorFramework.getConnectionStateListenable().addListener(listener);leaderLatch.start();treeCache.start();}
ZooKeeperLeaderElectionDriver.handleStateChange
状态变化时候根据不同状态打印日志
private void handleStateChange(ConnectionState newState) {switch (newState) {case CONNECTED:LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");break;case SUSPENDED:LOG.warn("Connection to ZooKeeper suspended, waiting for reconnection.");break;case RECONNECTED:LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");break;case LOST:// Maybe we have to throw an exception here to terminate the JobManagerLOG.warn("Connection to ZooKeeper lost. None of the contenders participates in the leader election anymore.");break;}}
DefaultLeaderElectionService.notifyLeaderContenderOfLeadership
private void notifyLeaderContenderOfLeadership(String componentId, UUID sessionID) {if (!leaderContenderRegistry.containsKey(componentId)) {LOG.debug("The grant leadership notification for session ID {} is not forwarded because the DefaultLeaderElectionService ({}) has no contender registered.",sessionID,leaderElectionDriver);return;} else if (!sessionID.equals(issuedLeaderSessionID)) {LOG.debug("An out-dated leadership-acquired event with session ID {} was triggered. The current leader session ID is {}. The event will be ignored.",sessionID,issuedLeaderSessionID);return;}Preconditions.checkState(!confirmedLeaderInformation.hasLeaderInformation(componentId),"The leadership should have been granted while not having the leadership acquired.");LOG.debug("Granting leadership to the contender registered under component '{}' with session ID {}.",componentId,issuedLeaderSessionID);leaderContenderRegistry.get(componentId).grantLeadership(issuedLeaderSessionID);}相关文章:
深度了解flink(十) JobManager(4) ResourceManager HA
ResourceManager(ZK模式)的高可用启动流程 ResourceManager启动流程在DefaultDispatcherResourceManagerComponentFactory#create中 public DispatcherResourceManagerComponent create(Configuration configuration,ResourceID resourceId,Executor i…...
【万兴科技-注册_登录安全分析报告】
前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 暴力破解密码,造成用户信息泄露短信盗刷的安全问题,影响业务及导致用户投诉带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞…...
Android启动流程_Zygote阶段
前言 上一篇文档中我们描述了 Android 启动中的 init 启动部分,本片文档将会继续 Android 启动流程的逻辑,继续梳理 Zygote 部分功能。 说明框架 对于 Zygote 进程,要从以下框架说明: 第一点,编译,zygo…...
2022NOIP比赛总结
种花 1.本题是一道前缀和优化加上枚举的问题。先考虑 C 因为 F 是 C 下边随便加一个点,所以只要求出 C 就求出了 F 。 注意到,并没有要求上下行一样,唯一的要求是 C 的两个横要隔一行,这就是问题的突破点,这题很明显…...
Leetcode 排序链表
这段代码的算法思想是 归并排序,一种适合链表的排序方法。它通过递归地将链表拆分成两部分,分别排序,然后合并已排序的部分,从而达到整体排序的目的。以下是代码的中文解释: 算法步骤: 找到链表的中点&…...
哈希函数简介
哈希函数是一种将任意大小的数据输入(通常称为“消息”)转换为固定大小的输出(称为“哈希值”或“摘要”)的算法。 主要特点: 1、输出固定长度 无论输入数据的大小如何,哈希函数的输出总是固定长度。例如…...
nginx------正向代理,反向代理生产,以及能否不使用代理详解
在生产环境中,选择使用正向代理还是反向代理取决于具体的应用场景和需求。下面详细解释这两种代理的用处以及为什么在不同情况下会选择它们。 正向代理 (Forward Proxy) 用途 匿名访问: 隐藏客户端的真实 IP 地址,提供隐私保护。常用于绕过…...
iptables限制docker端口禁止某台主机访问(使用DOCKER链和raw表的PREROUTING链)
背景: 在Linux上docker映射了端口,想着对服务端口进行限制指定IP访问,发现在filter表的INPUT链限制无效 环境: 主机192.168.56.132上的docker容器部署了nginx并将容器80端口映射到主机8000端口 [rootlocalhost ~]# docker ps …...
【VM实战】VMware迁移到VirtualBox
VMware 虚拟机开机卸载VMware Tools 调整虚拟磁盘 对于Windows 10及以上的虚拟机,一般VMware默认都会选Nvme固态硬盘。在导出前必须将其改为SATA,否则VirtualBox导入会报Appliance Import错误 (E_INVALIDARG 0x80070057) 先删掉当前盘的挂载ÿ…...
Android WebView加载不到cookie
以下配置根据需求酌情添加,建议逐个试验,cookie操作不是内存操作,建议修改配置后卸载app再重新运行防止缓存影响测试结果。 1.设置应用程序的 WebView 实例是否应发送并接受 Cookie CookieManager cookieManager CookieManager.getInstanc…...
c++qt
1.显示画布 #include "code.h" #include <QtWidgets/QApplication> #include<iostream> #include<vector> #include <QWindow> #include <QGraphicsView> #include <QGraphicsScene>using namespace std;//1.空格 2.墙 3.入口…...
零跑汽车嵌入式面试题汇总及参考答案
C++ 的三大特性是什么? C++ 的三大特性分别是封装、继承和多态。 封装 概念:封装是把数据和操作数据的函数绑定在一起,对数据的访问进行限制。通过将数据成员声明为私有或保护,只允许通过公共的成员函数来访问和修改数据,从而隐藏了类的内部实现细节。这有助于提高代码的安…...
LC:贪心题解
文章目录 376. 摆动序列 376. 摆动序列 题目链接:https://leetcode.cn/problems/wiggle-subsequence/description/ 这个题目自己首先想到的是动态规划解题,贪心解法真的非常妙,参考下面题解:https://leetcode.cn/problems/wiggle…...
ubuntu交叉编译dbus库给arm平台使用
1.下载dbus库源码 https://www.freedesktop.org/wiki/Software/dbus 克隆源码: https://gitlab.freedesktop.org/dbus/dbus/-/tree/dbus-1.12?ref_type=heads 下载1.12.20版本: 指定pkgconfig环境变量: export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:$PWD/../expat-2.3.…...
ansible开局配置-openEuler
ansible干啥用的就不多介绍了,这篇文章主要在说ansible的安装、开局配置、免密登录。 ansible安装 查看系统版本 cat /etc/openEuler-latest输出内容如下: openeulerversionopenEuler-24.03-LTS compiletime2024-05-27-21-31-28 gccversion12.3.1-30.…...
连锁收银系统的优势与挑战
在快速发展的零售环境中,连锁收银系统不仅是收银的工具,更是现代零售管理的重要组成部分。它在提升效率、优化客户体验以及数据管理等方面发挥了关键作用。然而,随着技术的进步和市场环境的变化,连锁收银系统也面临着诸多挑战。本…...
轻型民用无人驾驶航空器安全操控理论培训知识总结-多旋翼部分
航空器知识 螺旋桨 螺旋桨为多旋翼民用无人驾驶航空器提供升力,多旋翼民用无人驾驶航空器通过飞控系统控制电机调节螺旋桨转速,来实现飞行。 天线 多旋翼民用无人驾驶航空器的图像传输以及遥控控制信号,主要是通过无线信道进行的,靠民用无人驾驶航空器与遥控器的天线传…...
springboot092安康旅游网站的设计与实现(论文+源码)_kaic
毕业设计(论文) 基于JSP的安康旅游网站的设计与实现 姓 名 学 号 院 系 专 业 指导老师 2021 年 月 教务处制 目 录 目 录 摘 要 Abstract 第一章 绪论 1.1 研究现状 1.2 设…...
优化 Git 管理:提升协作效率的最佳实践20241030
优化 Git 管理:提升协作效率的最佳实践 引言 在现代软件开发中,版本控制系统是确保代码质量和团队协作顺畅的基石。Git 作为最流行的分布式版本控制工具,其灵活性和强大功能使得开发者能够高效地管理项目代码。然而,仅依靠工具本…...
Cocos使用精灵组件显示相机内容
Cocos使用精灵组件显示相机内容 1. 为什么使用精灵渲染 在游戏引擎中,游戏场景内除webview和video外所有的节点都是渲染在Canvas上,这导致了webview和video只能存在于所有节点的最上层或最下层,而这种层级关系会出现节点事件无法正常监听或者…...
虾胡闹,多Agents中的成员正在玩心机
Agents of Chaos研究封面 最近读到一篇很有意思的论文,Northeastern University等机构的20位研究者做了一项为期两周的"红队测试"实验,把AI Agents部署在真实环境中,给了它们Discord账号、邮箱、文件系统和shell权限,然…...
本地离线运行更安全!AI人脸隐私卫士,保护隐私无需上传云端
本地离线运行更安全!AI人脸隐私卫士,保护隐私无需上传云端 在社交媒体分享、企业宣传、新闻报道等场景中,我们常常需要发布包含人物的照片。然而,未经处理的人脸信息一旦公开,就可能面临隐私泄露的风险。手动给照片中…...
**发散创新:基于脉冲计算的神经形态编程实践与Python实现**在传统冯·诺依曼架构下,计算
发散创新:基于脉冲计算的神经形态编程实践与Python实现 在传统冯诺依曼架构下,计算和存储分离导致能效瓶颈日益突出。近年来,脉冲计算(Spiking Neural Computing, SNC)作为一种受生物神经系统启发的新范式,…...
SenseVoice-Small ONNX酿酒工艺:发酵语音→糖度+温度+时间结构化记录
SenseVoice-Small ONNX酿酒工艺:发酵语音→糖度温度时间结构化记录 1. 引言:从“听不清”到“记得清”的酿造革命 想象一下,你是一位酿酒师,正站在发酵车间里。耳边是发酵罐低沉的嗡鸣,空气里弥漫着谷物和酵母的混合…...
WiFiPixels:ESP32上轻量级Wi-Fi控制NeoPixel的固件框架
1. 项目概述WiFiPixels 是一个面向嵌入式 LED 控制场景的轻量级网络化固件框架,其核心设计目标是将 NeoPixel(WS2812B 类型)LED 阵列通过 Wi-Fi 接口暴露为可远程寻址、实时更新的像素资源。项目名称 “NeoPixel Wifi WifiPixels” 并非营销…...
008、OpenClaw TTS 声学模型实战:训练数据准备与配置解析
上周调一个长句合成,输出音频在中段突然出现音调断裂,像是两个不同人在交替发音。频谱图上一看,隐状态在某个音素边界处发生了跳变。问题最终追溯到训练数据里同一说话人的音频存在采样率混用——部分文件是16kHz,另一些却是22.05kHz。预处理脚本没做统一重采样,导致模型在…...
Arduino_CloudUtils:嵌入式物联网云通信核心工具库
1. Arduino_CloudUtils 库深度解析:嵌入式云通信核心工具链Arduino_CloudUtils 是 Arduino 官方为物联网云连接场景设计的底层通用工具库,其定位并非独立应用框架,而是作为 ArduinoIoTCloud 等上层云 SDK 的“基础设施层”。该库不处理网络协…...
你的SSH密钥可能已经过期了噬
引言 在现代软件开发中,性能始终是衡量应用质量的重要指标之一。无论是企业级应用、云服务还是桌面程序,性能优化都能显著提升用户体验、降低基础设施成本并增强系统的可扩展性。对于使用 C# 开发的应用程序而言,性能优化涉及多个层面&#x…...
我试了四种去除 Gemini 水印的方法,整理成一篇实用对比撕
认识Pass层级结构 Pass范围从上到下一共分为5个层级: 模块层级:单个.ll或.bc文件 调用图层级:函数调用的关系。 函数层级:单个函数。 基本块层级:单个代码块。例如C语言中{}括起来的最小代码。 指令层级:单…...
Go语言中的内存管理:从原理到优化
Go语言中的内存管理:从原理到优化 1. 内存管理的重要性 内存管理是编程语言的核心特性之一,它直接影响程序的性能和稳定性。Go语言通过内置的垃圾回收器和内存分配器,为开发者提供了自动内存管理能力,使得开发者可以专注于业务逻…...
