从Flink的Kafka消费者看算子联合列表状态的使用
背景
算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态
算子联合列表状态
首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况

算子联合列表状态主要由这两个方法处理:
1初始化方法
public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}
2.开始通知检查点开始的方法:
public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}
相关文章:
从Flink的Kafka消费者看算子联合列表状态的使用
背景 算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态 算子联合列表状态 首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况 算子联合列表状态主…...
CSS3 按钮
创建 CSS3 按钮可以通过组合样式属性和伪类来实现 <!DOCTYPE html> <html> <head><link rel"stylesheet" type"text/css" href"styles.css"> </head> <body><button class"basic-button">…...
STM32 BootLoader设置
编写bootloader程序: 直接复制下面代码到自己程序中。 typedef void (*iapfun)(void); //定义一个函数类型的参数. iapfun jump2app; //设置栈顶地址 //addr:栈顶地址 __asm void MSR_MSP(u32 addr) {MSR MSP, r0 //set Main Stack valueBX r14 }//跳转到…...
django REST framework-使用与不使用的区别?
首先,来回顾一下传统的基于模板引擎的 django 开发工作流: 绑定 URL 和视图函数。当用户访问某个 URL 时,调用绑定的视图函数进行处理。 编写视图函数的逻辑。视图中通常涉及数据库的操作。 在视图中渲染 HTML 模板,返回 HTTP 响应…...
获取URL中的参数
获取URL中的参数 function getUrlParam(name) {var reg new RegExp("(^|&)" name "([^&]*)(&|$)");var r window.location.search.substr(1).match(reg);if (r ! null)return unescape(r[2]);return null; } 这个正则表达式就是一个URL路…...
一起学数据结构(9)——二叉树的链式存储及相关功能实现
目录 1. 二叉树的链式存储: 2. 二叉树的前序遍历: 3. 二叉树的中序遍历: 4. 二叉树的后序遍历: 5. 统计二叉树的结点总数 6.统计二叉树的叶子结点数: 7. 统计二叉树第层的结点数量: 8. 二叉树的销毁…...
vue 后端返回二进制流-前端通过blob对象下载文件-图片
前言 在实际开发中我们经常会遇见下载文件的场景,比如下载合同,下载文件 下载文件有2种方式,一种是后端返回二进制流,前端通过blob对象接受根据不同类型下载 还有一种把地址直接在浏览器新窗口打开浏览器打开pdf可以预览和下载&…...
vue el-dialog封装成子组件(组件化)
前言 实际开发过程中我们经常听见组件化开发,但在实际开发过程中(没有人审查时)怎么方便来 我们有时是因为时间不够,所以把所有代码写在一个页面。当业务逻辑复杂时可能会有1k多行 虽然不能要求自己写出高效复用性高的组件&…...
爬虫教程 一 requests包的使用
request 简介 requests 是一个常用的 HTTP 请求库,可以方便地向网站发送 HTTP 请求,并获取响应结果。 response.text 和response.content的区别 response.text 类型:str解码类型: requests模块自动根据HTTP 头部对响应的编码作…...
Aria2NG连接aria2-pro提示认证失败的处理办法
本文档适用于已经安装了aria2-pro和AriaNg的小伙伴~ 第一次登录管理端会提示”认证失败“ 这是因为aria设置了密码,需要在设置中配置上密码即可 配置完密码重新加载就可以正常使用啦 下载速度明显比以前快了很多 下载参考文档 Docker安装下载神器aria2并使用过程记…...
MYSQL 连接
高频 SQL 50 题(基础版) - 学习计划 - 力扣(LeetCode)全球极客挚爱的技术成长平台 1378. 使用唯一标识码替换员工ID SELECT COALESCE(unique_id, NULL) AS unique_id,name FROM Employees LEFT JOIN EmployeeUNI ON Employees.…...
SeaTunnel 换maven源,解决插件下载慢
SeaTunnel 是使用的mvnw命令,可以先执行一下install-plugin.sh然后终止 理论上应该可以直接执行mvnw,他就会去安装下载maven,目录就是下面的目录 然后去服务器目录修改 setting.xml文件,设置镜像源即可 /root/.m2/wrapper/dists/apache-maven-3.8.4-bin/52ccbt68d252mdldqsfsn…...
安卓14通过“冻结”缓存应用程序腾出CPU,提高性能和内存效率
本月早些时候,我们听说更新到安卓14似乎提高了谷歌Pixel 7和Pixel 6的效率——提高了电池寿命,并在这个过程中减少了热量的产生。现在看来,安卓14的增效功能细节已经公布。 安卓侦探Mishaal Rahman在X(前身为Twitter)…...
jupyter崩溃OOM,out of memory,jupyter代码写不进去,保存不了。
最近写一个比较长的数据处理代码,有快千行,然后经常代码没有写入,然后直接网页崩溃,给我干蒙了。我已经是jupyter版本的问题,弄了半天,弄完,还是有这个问题。然后就查了一下,发现是j…...
一文带你快速掌握爬虫开发中的一些高级调试技巧
文章目录 1. 写在前面2. Reply XHR(重新发起请求)3. copy as fecth(修改参数请求)4. copy()复制变量5. Web网页全屏截图6. 控制台安装使用npm7. 控制台中引用上次执行结果8. 控制台表展示对象数组 1. 写在前面 做过爬虫开发的人都…...
6.(vue3.x+vite)路由传参query与params区别
前端技术社区总目录(订阅之前请先查看该博客) 效果截图 一:路由传参有两种方式:params与query params与query区别 1:param,路由带“/”,query带“?” 2:query传过来的参数会显示到地址栏中 而params传过来的参数可以显示参数或隐藏参数到地址栏中(vue-router 4.1.4不…...
C++string的使用
CSDN的uu们,大家好。这里是C入门的第十六讲。 座右铭:前路坎坷,披荆斩棘,扶摇直上。 博客主页: 姬如祎 收录专栏:C专题 目录 1.构造函数 1.1 string() 1.2 string(const char* s) 1.3 string(const …...
闲着也是闲着,自己写歌东西玩一玩,碰碰脑子,简单快乐一点,双人出数的小游戏,后续还带补充
主旨就是每个人出一个数,目前限制两人,之后考虑多人,然后对其取差值,获取到一个结果,比对结果的奇偶数,还可以看下两人出同一个数的概率,反正概率上是一个比较稳定的。 当然自己想玩的活也可以做…...
牛客网 -- WY28 跳石板
题目链接: 跳石板_牛客题霸_牛客网 (nowcoder.com) 解题步骤: 参考代码: void get_approximate(vector<int>& v,int n) {//求约数,从2到sqrt(n)即可,原因看图解//这里一定要等于sqrt(n),例如16…...
[正式学习java③]——字符串在内存中的存储方式、为什么字符串不可变、字符串的拼接原理,键盘录入的小细节。
一、字符串 1.字符串在内存中的存储方式 🔥在java中,内存中有两个地方可以存储字符串,一个是字符串池,一个是堆内存,串池中的字符串不会重复,而堆中的字符串每次都会开辟一块新的空间,因为维护…...
Leetcode 3576. Transform Array to All Equal Elements
Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接:3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
Qt Http Server模块功能及架构
Qt Http Server 是 Qt 6.0 中引入的一个新模块,它提供了一个轻量级的 HTTP 服务器实现,主要用于构建基于 HTTP 的应用程序和服务。 功能介绍: 主要功能 HTTP服务器功能: 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
协议转换利器,profinet转ethercat网关的两大派系,各有千秋
随着工业以太网的发展,其高效、便捷、协议开放、易于冗余等诸多优点,被越来越多的工业现场所采用。西门子SIMATIC S7-1200/1500系列PLC集成有Profinet接口,具有实时性、开放性,使用TCP/IP和IT标准,符合基于工业以太网的…...
Linux中《基础IO》详细介绍
目录 理解"文件"狭义理解广义理解文件操作的归类认知系统角度文件类别 回顾C文件接口打开文件写文件读文件稍作修改,实现简单cat命令 输出信息到显示器,你有哪些方法stdin & stdout & stderr打开文件的方式 系统⽂件I/O⼀种传递标志位…...
Java 与 MySQL 性能优化:MySQL 慢 SQL 诊断与分析方法详解
文章目录 一、开启慢查询日志,定位耗时SQL1.1 查看慢查询日志是否开启1.2 临时开启慢查询日志1.3 永久开启慢查询日志1.4 分析慢查询日志 二、使用EXPLAIN分析SQL执行计划2.1 EXPLAIN的基本使用2.2 EXPLAIN分析案例2.3 根据EXPLAIN结果优化SQL 三、使用SHOW PROFILE…...
【iOS】 Block再学习
iOS Block再学习 文章目录 iOS Block再学习前言Block的三种类型__ NSGlobalBlock____ NSMallocBlock____ NSStackBlock__小结 Block底层分析Block的结构捕获自由变量捕获全局(静态)变量捕获静态变量__block修饰符forwarding指针 Block的copy时机block作为函数返回值将block赋给…...
Mysql故障排插与环境优化
前置知识点 最上层是一些客户端和连接服务,包含本 sock 通信和大多数jiyukehuduan/服务端工具实现的TCP/IP通信。主要完成一些简介处理、授权认证、及相关的安全方案等。在该层上引入了线程池的概念,为通过安全认证接入的客户端提供线程。同样在该层上可…...
