从Flink的Kafka消费者看算子联合列表状态的使用
背景
算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态
算子联合列表状态
首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况

算子联合列表状态主要由这两个方法处理:
1初始化方法
public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}
2.开始通知检查点开始的方法:
public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}
相关文章:
从Flink的Kafka消费者看算子联合列表状态的使用
背景 算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态 算子联合列表状态 首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况 算子联合列表状态主…...
CSS3 按钮
创建 CSS3 按钮可以通过组合样式属性和伪类来实现 <!DOCTYPE html> <html> <head><link rel"stylesheet" type"text/css" href"styles.css"> </head> <body><button class"basic-button">…...
STM32 BootLoader设置
编写bootloader程序: 直接复制下面代码到自己程序中。 typedef void (*iapfun)(void); //定义一个函数类型的参数. iapfun jump2app; //设置栈顶地址 //addr:栈顶地址 __asm void MSR_MSP(u32 addr) {MSR MSP, r0 //set Main Stack valueBX r14 }//跳转到…...
django REST framework-使用与不使用的区别?
首先,来回顾一下传统的基于模板引擎的 django 开发工作流: 绑定 URL 和视图函数。当用户访问某个 URL 时,调用绑定的视图函数进行处理。 编写视图函数的逻辑。视图中通常涉及数据库的操作。 在视图中渲染 HTML 模板,返回 HTTP 响应…...
获取URL中的参数
获取URL中的参数 function getUrlParam(name) {var reg new RegExp("(^|&)" name "([^&]*)(&|$)");var r window.location.search.substr(1).match(reg);if (r ! null)return unescape(r[2]);return null; } 这个正则表达式就是一个URL路…...
一起学数据结构(9)——二叉树的链式存储及相关功能实现
目录 1. 二叉树的链式存储: 2. 二叉树的前序遍历: 3. 二叉树的中序遍历: 4. 二叉树的后序遍历: 5. 统计二叉树的结点总数 6.统计二叉树的叶子结点数: 7. 统计二叉树第层的结点数量: 8. 二叉树的销毁…...
vue 后端返回二进制流-前端通过blob对象下载文件-图片
前言 在实际开发中我们经常会遇见下载文件的场景,比如下载合同,下载文件 下载文件有2种方式,一种是后端返回二进制流,前端通过blob对象接受根据不同类型下载 还有一种把地址直接在浏览器新窗口打开浏览器打开pdf可以预览和下载&…...
vue el-dialog封装成子组件(组件化)
前言 实际开发过程中我们经常听见组件化开发,但在实际开发过程中(没有人审查时)怎么方便来 我们有时是因为时间不够,所以把所有代码写在一个页面。当业务逻辑复杂时可能会有1k多行 虽然不能要求自己写出高效复用性高的组件&…...
爬虫教程 一 requests包的使用
request 简介 requests 是一个常用的 HTTP 请求库,可以方便地向网站发送 HTTP 请求,并获取响应结果。 response.text 和response.content的区别 response.text 类型:str解码类型: requests模块自动根据HTTP 头部对响应的编码作…...
Aria2NG连接aria2-pro提示认证失败的处理办法
本文档适用于已经安装了aria2-pro和AriaNg的小伙伴~ 第一次登录管理端会提示”认证失败“ 这是因为aria设置了密码,需要在设置中配置上密码即可 配置完密码重新加载就可以正常使用啦 下载速度明显比以前快了很多 下载参考文档 Docker安装下载神器aria2并使用过程记…...
MYSQL 连接
高频 SQL 50 题(基础版) - 学习计划 - 力扣(LeetCode)全球极客挚爱的技术成长平台 1378. 使用唯一标识码替换员工ID SELECT COALESCE(unique_id, NULL) AS unique_id,name FROM Employees LEFT JOIN EmployeeUNI ON Employees.…...
SeaTunnel 换maven源,解决插件下载慢
SeaTunnel 是使用的mvnw命令,可以先执行一下install-plugin.sh然后终止 理论上应该可以直接执行mvnw,他就会去安装下载maven,目录就是下面的目录 然后去服务器目录修改 setting.xml文件,设置镜像源即可 /root/.m2/wrapper/dists/apache-maven-3.8.4-bin/52ccbt68d252mdldqsfsn…...
安卓14通过“冻结”缓存应用程序腾出CPU,提高性能和内存效率
本月早些时候,我们听说更新到安卓14似乎提高了谷歌Pixel 7和Pixel 6的效率——提高了电池寿命,并在这个过程中减少了热量的产生。现在看来,安卓14的增效功能细节已经公布。 安卓侦探Mishaal Rahman在X(前身为Twitter)…...
jupyter崩溃OOM,out of memory,jupyter代码写不进去,保存不了。
最近写一个比较长的数据处理代码,有快千行,然后经常代码没有写入,然后直接网页崩溃,给我干蒙了。我已经是jupyter版本的问题,弄了半天,弄完,还是有这个问题。然后就查了一下,发现是j…...
一文带你快速掌握爬虫开发中的一些高级调试技巧
文章目录 1. 写在前面2. Reply XHR(重新发起请求)3. copy as fecth(修改参数请求)4. copy()复制变量5. Web网页全屏截图6. 控制台安装使用npm7. 控制台中引用上次执行结果8. 控制台表展示对象数组 1. 写在前面 做过爬虫开发的人都…...
6.(vue3.x+vite)路由传参query与params区别
前端技术社区总目录(订阅之前请先查看该博客) 效果截图 一:路由传参有两种方式:params与query params与query区别 1:param,路由带“/”,query带“?” 2:query传过来的参数会显示到地址栏中 而params传过来的参数可以显示参数或隐藏参数到地址栏中(vue-router 4.1.4不…...
C++string的使用
CSDN的uu们,大家好。这里是C入门的第十六讲。 座右铭:前路坎坷,披荆斩棘,扶摇直上。 博客主页: 姬如祎 收录专栏:C专题 目录 1.构造函数 1.1 string() 1.2 string(const char* s) 1.3 string(const …...
闲着也是闲着,自己写歌东西玩一玩,碰碰脑子,简单快乐一点,双人出数的小游戏,后续还带补充
主旨就是每个人出一个数,目前限制两人,之后考虑多人,然后对其取差值,获取到一个结果,比对结果的奇偶数,还可以看下两人出同一个数的概率,反正概率上是一个比较稳定的。 当然自己想玩的活也可以做…...
牛客网 -- WY28 跳石板
题目链接: 跳石板_牛客题霸_牛客网 (nowcoder.com) 解题步骤: 参考代码: void get_approximate(vector<int>& v,int n) {//求约数,从2到sqrt(n)即可,原因看图解//这里一定要等于sqrt(n),例如16…...
[正式学习java③]——字符串在内存中的存储方式、为什么字符串不可变、字符串的拼接原理,键盘录入的小细节。
一、字符串 1.字符串在内存中的存储方式 🔥在java中,内存中有两个地方可以存储字符串,一个是字符串池,一个是堆内存,串池中的字符串不会重复,而堆中的字符串每次都会开辟一块新的空间,因为维护…...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...
DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径
目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
JAVA后端开发——多租户
数据隔离是多租户系统中的核心概念,确保一个租户(在这个系统中可能是一个公司或一个独立的客户)的数据对其他租户是不可见的。在 RuoYi 框架(您当前项目所使用的基础框架)中,这通常是通过在数据表中增加一个…...
LabVIEW双光子成像系统技术
双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制,展现出显著的技术优势: 深层组织穿透能力:适用于活体组织深度成像 高分辨率观测性能:满足微观结构的精细研究需求 低光毒性特点:减少对样本的损伤…...
tauri项目,如何在rust端读取电脑环境变量
如果想在前端通过调用来获取环境变量的值,可以通过标准的依赖: std::env::var(name).ok() 想在前端通过调用来获取,可以写一个command函数: #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...
快速排序算法改进:随机快排-荷兰国旗划分详解
随机快速排序-荷兰国旗划分算法详解 一、基础知识回顾1.1 快速排序简介1.2 荷兰国旗问题 二、随机快排 - 荷兰国旗划分原理2.1 随机化枢轴选择2.2 荷兰国旗划分过程2.3 结合随机快排与荷兰国旗划分 三、代码实现3.1 Python实现3.2 Java实现3.3 C实现 四、性能分析4.1 时间复杂度…...
Appium下载安装配置保姆教程(图文详解)
目录 一、Appium软件介绍 1.特点 2.工作原理 3.应用场景 二、环境准备 安装 Node.js 安装 Appium 安装 JDK 安装 Android SDK 安装Python及依赖包 三、安装教程 1.Node.js安装 1.1.下载Node 1.2.安装程序 1.3.配置npm仓储和缓存 1.4. 配置环境 1.5.测试Node.j…...
