从Flink的Kafka消费者看算子联合列表状态的使用
背景
算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态
算子联合列表状态
首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
算子联合列表状态主要由这两个方法处理:
1初始化方法
public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}
2.开始通知检查点开始的方法:
public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}
相关文章:

从Flink的Kafka消费者看算子联合列表状态的使用
背景 算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态 算子联合列表状态 首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况 算子联合列表状态主…...
CSS3 按钮
创建 CSS3 按钮可以通过组合样式属性和伪类来实现 <!DOCTYPE html> <html> <head><link rel"stylesheet" type"text/css" href"styles.css"> </head> <body><button class"basic-button">…...

STM32 BootLoader设置
编写bootloader程序: 直接复制下面代码到自己程序中。 typedef void (*iapfun)(void); //定义一个函数类型的参数. iapfun jump2app; //设置栈顶地址 //addr:栈顶地址 __asm void MSR_MSP(u32 addr) {MSR MSP, r0 //set Main Stack valueBX r14 }//跳转到…...
django REST framework-使用与不使用的区别?
首先,来回顾一下传统的基于模板引擎的 django 开发工作流: 绑定 URL 和视图函数。当用户访问某个 URL 时,调用绑定的视图函数进行处理。 编写视图函数的逻辑。视图中通常涉及数据库的操作。 在视图中渲染 HTML 模板,返回 HTTP 响应…...
获取URL中的参数
获取URL中的参数 function getUrlParam(name) {var reg new RegExp("(^|&)" name "([^&]*)(&|$)");var r window.location.search.substr(1).match(reg);if (r ! null)return unescape(r[2]);return null; } 这个正则表达式就是一个URL路…...

一起学数据结构(9)——二叉树的链式存储及相关功能实现
目录 1. 二叉树的链式存储: 2. 二叉树的前序遍历: 3. 二叉树的中序遍历: 4. 二叉树的后序遍历: 5. 统计二叉树的结点总数 6.统计二叉树的叶子结点数: 7. 统计二叉树第层的结点数量: 8. 二叉树的销毁…...
vue 后端返回二进制流-前端通过blob对象下载文件-图片
前言 在实际开发中我们经常会遇见下载文件的场景,比如下载合同,下载文件 下载文件有2种方式,一种是后端返回二进制流,前端通过blob对象接受根据不同类型下载 还有一种把地址直接在浏览器新窗口打开浏览器打开pdf可以预览和下载&…...
vue el-dialog封装成子组件(组件化)
前言 实际开发过程中我们经常听见组件化开发,但在实际开发过程中(没有人审查时)怎么方便来 我们有时是因为时间不够,所以把所有代码写在一个页面。当业务逻辑复杂时可能会有1k多行 虽然不能要求自己写出高效复用性高的组件&…...
爬虫教程 一 requests包的使用
request 简介 requests 是一个常用的 HTTP 请求库,可以方便地向网站发送 HTTP 请求,并获取响应结果。 response.text 和response.content的区别 response.text 类型:str解码类型: requests模块自动根据HTTP 头部对响应的编码作…...

Aria2NG连接aria2-pro提示认证失败的处理办法
本文档适用于已经安装了aria2-pro和AriaNg的小伙伴~ 第一次登录管理端会提示”认证失败“ 这是因为aria设置了密码,需要在设置中配置上密码即可 配置完密码重新加载就可以正常使用啦 下载速度明显比以前快了很多 下载参考文档 Docker安装下载神器aria2并使用过程记…...

MYSQL 连接
高频 SQL 50 题(基础版) - 学习计划 - 力扣(LeetCode)全球极客挚爱的技术成长平台 1378. 使用唯一标识码替换员工ID SELECT COALESCE(unique_id, NULL) AS unique_id,name FROM Employees LEFT JOIN EmployeeUNI ON Employees.…...
SeaTunnel 换maven源,解决插件下载慢
SeaTunnel 是使用的mvnw命令,可以先执行一下install-plugin.sh然后终止 理论上应该可以直接执行mvnw,他就会去安装下载maven,目录就是下面的目录 然后去服务器目录修改 setting.xml文件,设置镜像源即可 /root/.m2/wrapper/dists/apache-maven-3.8.4-bin/52ccbt68d252mdldqsfsn…...

安卓14通过“冻结”缓存应用程序腾出CPU,提高性能和内存效率
本月早些时候,我们听说更新到安卓14似乎提高了谷歌Pixel 7和Pixel 6的效率——提高了电池寿命,并在这个过程中减少了热量的产生。现在看来,安卓14的增效功能细节已经公布。 安卓侦探Mishaal Rahman在X(前身为Twitter)…...

jupyter崩溃OOM,out of memory,jupyter代码写不进去,保存不了。
最近写一个比较长的数据处理代码,有快千行,然后经常代码没有写入,然后直接网页崩溃,给我干蒙了。我已经是jupyter版本的问题,弄了半天,弄完,还是有这个问题。然后就查了一下,发现是j…...

一文带你快速掌握爬虫开发中的一些高级调试技巧
文章目录 1. 写在前面2. Reply XHR(重新发起请求)3. copy as fecth(修改参数请求)4. copy()复制变量5. Web网页全屏截图6. 控制台安装使用npm7. 控制台中引用上次执行结果8. 控制台表展示对象数组 1. 写在前面 做过爬虫开发的人都…...

6.(vue3.x+vite)路由传参query与params区别
前端技术社区总目录(订阅之前请先查看该博客) 效果截图 一:路由传参有两种方式:params与query params与query区别 1:param,路由带“/”,query带“?” 2:query传过来的参数会显示到地址栏中 而params传过来的参数可以显示参数或隐藏参数到地址栏中(vue-router 4.1.4不…...

C++string的使用
CSDN的uu们,大家好。这里是C入门的第十六讲。 座右铭:前路坎坷,披荆斩棘,扶摇直上。 博客主页: 姬如祎 收录专栏:C专题 目录 1.构造函数 1.1 string() 1.2 string(const char* s) 1.3 string(const …...
闲着也是闲着,自己写歌东西玩一玩,碰碰脑子,简单快乐一点,双人出数的小游戏,后续还带补充
主旨就是每个人出一个数,目前限制两人,之后考虑多人,然后对其取差值,获取到一个结果,比对结果的奇偶数,还可以看下两人出同一个数的概率,反正概率上是一个比较稳定的。 当然自己想玩的活也可以做…...

牛客网 -- WY28 跳石板
题目链接: 跳石板_牛客题霸_牛客网 (nowcoder.com) 解题步骤: 参考代码: void get_approximate(vector<int>& v,int n) {//求约数,从2到sqrt(n)即可,原因看图解//这里一定要等于sqrt(n),例如16…...

[正式学习java③]——字符串在内存中的存储方式、为什么字符串不可变、字符串的拼接原理,键盘录入的小细节。
一、字符串 1.字符串在内存中的存储方式 🔥在java中,内存中有两个地方可以存储字符串,一个是字符串池,一个是堆内存,串池中的字符串不会重复,而堆中的字符串每次都会开辟一块新的空间,因为维护…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...

视觉slam十四讲实践部分记录——ch2、ch3
ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别
【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势…...

C++ 设计模式 《小明的奶茶加料风波》
👨🎓 模式名称:装饰器模式(Decorator Pattern) 👦 小明最近上线了校园奶茶配送功能,业务火爆,大家都在加料: 有的同学要加波霸 🟤,有的要加椰果…...
4. TypeScript 类型推断与类型组合
一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式,自动确定它们的类型。 这一特性减少了显式类型注解的需要,在保持类型安全的同时简化了代码。通过分析上下文和初始值,TypeSc…...

elementUI点击浏览table所选行数据查看文档
项目场景: table按照要求特定的数据变成按钮可以点击 解决方案: <el-table-columnprop"mlname"label"名称"align"center"width"180"><template slot-scope"scope"><el-buttonv-if&qu…...

【深度学习新浪潮】什么是credit assignment problem?
Credit Assignment Problem(信用分配问题) 是机器学习,尤其是强化学习(RL)中的核心挑战之一,指的是如何将最终的奖励或惩罚准确地分配给导致该结果的各个中间动作或决策。在序列决策任务中,智能体执行一系列动作后获得一个最终奖励,但每个动作对最终结果的贡献程度往往…...