当前位置: 首页 > news >正文

从Flink的Kafka消费者看算子联合列表状态的使用

背景

算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态

算子联合列表状态

首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
在这里插入图片描述
算子联合列表状态主要由这两个方法处理:
1初始化方法

public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}

2.开始通知检查点开始的方法:

public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}

相关文章:

从Flink的Kafka消费者看算子联合列表状态的使用

背景 算子的联合列表状态是平时使用的比较少的一种状态&#xff0c;本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态 算子联合列表状态 首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况 算子联合列表状态主…...

CSS3 按钮

创建 CSS3 按钮可以通过组合样式属性和伪类来实现 <!DOCTYPE html> <html> <head><link rel"stylesheet" type"text/css" href"styles.css"> </head> <body><button class"basic-button">…...

STM32 BootLoader设置

编写bootloader程序&#xff1a; 直接复制下面代码到自己程序中。 typedef void (*iapfun)(void); //定义一个函数类型的参数. iapfun jump2app; //设置栈顶地址 //addr:栈顶地址 __asm void MSR_MSP(u32 addr) {MSR MSP, r0 //set Main Stack valueBX r14 }//跳转到…...

django REST framework-使用与不使用的区别?

首先&#xff0c;来回顾一下传统的基于模板引擎的 django 开发工作流&#xff1a; 绑定 URL 和视图函数。当用户访问某个 URL 时&#xff0c;调用绑定的视图函数进行处理。 编写视图函数的逻辑。视图中通常涉及数据库的操作。 在视图中渲染 HTML 模板&#xff0c;返回 HTTP 响应…...

获取URL中的参数

获取URL中的参数 function getUrlParam(name) {var reg new RegExp("(^|&)" name "([^&]*)(&|$)");var r window.location.search.substr(1).match(reg);if (r ! null)return unescape(r[2]);return null; } 这个正则表达式就是一个URL路…...

一起学数据结构(9)——二叉树的链式存储及相关功能实现

目录 1. 二叉树的链式存储&#xff1a; 2. 二叉树的前序遍历&#xff1a; 3. 二叉树的中序遍历&#xff1a; 4. 二叉树的后序遍历&#xff1a; 5. 统计二叉树的结点总数 6.统计二叉树的叶子结点数&#xff1a; 7. 统计二叉树第层的结点数量&#xff1a; 8. 二叉树的销毁…...

vue 后端返回二进制流-前端通过blob对象下载文件-图片

前言 在实际开发中我们经常会遇见下载文件的场景&#xff0c;比如下载合同&#xff0c;下载文件 下载文件有2种方式&#xff0c;一种是后端返回二进制流&#xff0c;前端通过blob对象接受根据不同类型下载 还有一种把地址直接在浏览器新窗口打开浏览器打开pdf可以预览和下载&…...

vue el-dialog封装成子组件(组件化)

前言 实际开发过程中我们经常听见组件化开发&#xff0c;但在实际开发过程中&#xff08;没有人审查时&#xff09;怎么方便来 我们有时是因为时间不够&#xff0c;所以把所有代码写在一个页面。当业务逻辑复杂时可能会有1k多行 虽然不能要求自己写出高效复用性高的组件&…...

爬虫教程 一 requests包的使用

request 简介 requests 是一个常用的 HTTP 请求库&#xff0c;可以方便地向网站发送 HTTP 请求&#xff0c;并获取响应结果。 response.text 和response.content的区别 response.text 类型&#xff1a;str解码类型&#xff1a; requests模块自动根据HTTP 头部对响应的编码作…...

Aria2NG连接aria2-pro提示认证失败的处理办法

本文档适用于已经安装了aria2-pro和AriaNg的小伙伴~ 第一次登录管理端会提示”认证失败“ 这是因为aria设置了密码&#xff0c;需要在设置中配置上密码即可 配置完密码重新加载就可以正常使用啦 下载速度明显比以前快了很多 下载参考文档 Docker安装下载神器aria2并使用过程记…...

MYSQL 连接

高频 SQL 50 题&#xff08;基础版&#xff09; - 学习计划 - 力扣&#xff08;LeetCode&#xff09;全球极客挚爱的技术成长平台 1378. 使用唯一标识码替换员工ID SELECT COALESCE(unique_id, NULL) AS unique_id,name FROM Employees LEFT JOIN EmployeeUNI ON Employees.…...

SeaTunnel 换maven源,解决插件下载慢

SeaTunnel 是使用的mvnw命令,可以先执行一下install-plugin.sh然后终止 理论上应该可以直接执行mvnw,他就会去安装下载maven,目录就是下面的目录 然后去服务器目录修改 setting.xml文件,设置镜像源即可 /root/.m2/wrapper/dists/apache-maven-3.8.4-bin/52ccbt68d252mdldqsfsn…...

安卓14通过“冻结”缓存应用程序腾出CPU,提高性能和内存效率

本月早些时候&#xff0c;我们听说更新到安卓14似乎提高了谷歌Pixel 7和Pixel 6的效率——提高了电池寿命&#xff0c;并在这个过程中减少了热量的产生。现在看来&#xff0c;安卓14的增效功能细节已经公布。 安卓侦探Mishaal Rahman在X&#xff08;前身为Twitter&#xff09;…...

jupyter崩溃OOM,out of memory,jupyter代码写不进去,保存不了。

最近写一个比较长的数据处理代码&#xff0c;有快千行&#xff0c;然后经常代码没有写入&#xff0c;然后直接网页崩溃&#xff0c;给我干蒙了。我已经是jupyter版本的问题&#xff0c;弄了半天&#xff0c;弄完&#xff0c;还是有这个问题。然后就查了一下&#xff0c;发现是j…...

一文带你快速掌握爬虫开发中的一些高级调试技巧

文章目录 1. 写在前面2. Reply XHR&#xff08;重新发起请求&#xff09;3. copy as fecth&#xff08;修改参数请求&#xff09;4. copy()复制变量5. Web网页全屏截图6. 控制台安装使用npm7. 控制台中引用上次执行结果8. 控制台表展示对象数组 1. 写在前面 做过爬虫开发的人都…...

6.(vue3.x+vite)路由传参query与params区别

前端技术社区总目录(订阅之前请先查看该博客) 效果截图 一:路由传参有两种方式:params与query params与query区别 1:param,路由带“/”,query带“?” 2:query传过来的参数会显示到地址栏中 而params传过来的参数可以显示参数或隐藏参数到地址栏中(vue-router 4.1.4不…...

C++string的使用

CSDN的uu们&#xff0c;大家好。这里是C入门的第十六讲。 座右铭&#xff1a;前路坎坷&#xff0c;披荆斩棘&#xff0c;扶摇直上。 博客主页&#xff1a; 姬如祎 收录专栏&#xff1a;C专题 目录 1.构造函数 1.1 string() 1.2 string(const char* s) 1.3 string(const …...

闲着也是闲着,自己写歌东西玩一玩,碰碰脑子,简单快乐一点,双人出数的小游戏,后续还带补充

主旨就是每个人出一个数&#xff0c;目前限制两人&#xff0c;之后考虑多人&#xff0c;然后对其取差值&#xff0c;获取到一个结果&#xff0c;比对结果的奇偶数&#xff0c;还可以看下两人出同一个数的概率&#xff0c;反正概率上是一个比较稳定的。 当然自己想玩的活也可以做…...

牛客网 -- WY28 跳石板

题目链接&#xff1a; 跳石板_牛客题霸_牛客网 (nowcoder.com) 解题步骤&#xff1a; 参考代码&#xff1a; void get_approximate(vector<int>& v,int n) {//求约数&#xff0c;从2到sqrt(n)即可&#xff0c;原因看图解//这里一定要等于sqrt(n)&#xff0c;例如16…...

[正式学习java③]——字符串在内存中的存储方式、为什么字符串不可变、字符串的拼接原理,键盘录入的小细节。

一、字符串 1.字符串在内存中的存储方式 &#x1f525;在java中&#xff0c;内存中有两个地方可以存储字符串&#xff0c;一个是字符串池&#xff0c;一个是堆内存&#xff0c;串池中的字符串不会重复&#xff0c;而堆中的字符串每次都会开辟一块新的空间&#xff0c;因为维护…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

高等数学(下)题型笔记(八)空间解析几何与向量代数

目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

【python异步多线程】异步多线程爬虫代码示例

claude生成的python多线程、异步代码示例&#xff0c;模拟20个网页的爬取&#xff0c;每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程&#xff1a;允许程序同时执行多个任务&#xff0c;提高IO密集型任务&#xff08;如网络请求&#xff09;的效率…...

如何更改默认 Crontab 编辑器 ?

在 Linux 领域中&#xff0c;crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用&#xff0c;用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益&#xff0c;允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...

C++实现分布式网络通信框架RPC(2)——rpc发布端

有了上篇文章的项目的基本知识的了解&#xff0c;现在我们就开始构建项目。 目录 一、构建工程目录 二、本地服务发布成RPC服务 2.1理解RPC发布 2.2实现 三、Mprpc框架的基础类设计 3.1框架的初始化类 MprpcApplication 代码实现 3.2读取配置文件类 MprpcConfig 代码实现…...