当前位置: 首页 > news >正文

Flink主要有两种基础类型的状态:operator state。

Flink主要有两种基础类型的状态:keyed state 和operator state。
Operator State
对于Operator State(或者non-keyed state),每个operator state绑定到一个并行operator实例上。在Flink中,Kafka Connector是一个使用Operator State的很好的例子。每个并行Kafka消费者实例维护一个主题分区和偏移的map作为它的Operator State。
当并行度被修改时,Operator State接口支持在并行operator实例上重新分配状态。进行这种重新分配可以有不同的方案。
Raw and Managed State
Keyed State 和 Operator State 有两种形式: managed和raw。
Managed State表示数据结构由Flink runtime控制,例如内部哈希表或者RocksDB。例如,“ValueState”,“ListState”等等。Flink的runtime层会编码State并将其写入checkpoint中。
Raw State是操作算子保存在它的数据结构中的state。当进行checkpoint时,它只写入字节序列到checkpoint中。Flink并不知道状态的数据结构,并且只能看到raw字节。
所有的数据流函数都可以使用managed state,但是raw state接口只可以在操作算子的实现类中使用。推荐使用managed state(而不是raw state),因为使用managed state,当并行度变化时,Flink可以自动的重新分布状态,也可以做更好的内存管理。
注意 如果你的managed state需要自定义序列化逻辑,请参见managed state的自定义序列化以确保未来的兼容性。Flink默认的序列化不需要特殊处理。

managed non-keyed state
可以通过实现CheckpointedFunction或者ListCheckpointed接口,来使用managed non-keyed状态。

1.CheckpointedFunction
CheckpointedFunction接口通过不同的重新分配方案提供对non-keyed状态的访问。它需要实现两种方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;
每当必须执行checkpoint时,都会调用snapshotState()。对应的initializeState()在每次初始化用户定义的函数时调用,可以是在函数第一次初始化时调用,也可以是在函数实际从较早的checkpoint恢复时调用。因此,initializeState()不仅是初始化不同类型状态的地方,也是状态恢复逻辑实现地方。

目前,支持List样式的管理操作状态。状态是一个可序列化对象的列表,彼此独立,因此在重新扫描时能够进行重新分区。换句话说,这些对象是可以重新分区no-keyed状态的最佳粒度。根据状态访问方法的不同,定义了以下重分区方案:

Even-split redistribution:每个操作算子返回一个状态元素列表。逻辑上串联起所有的列表就是状态元素完整列表。在恢复/重新分区时,该列表会均分成算子实例个数个子列表。每个操作算子实例获取一个子列表,该子列表可以是空的,也可以包含一个或多个元素。例如,如果并行度为1,则操作算子的检查点状态包含元素element1和element2。当并行度增加到2时,element1可能会出现在算子实例0中,而element2会出现在算子实例1中。
Union redistribution: 每个操作算子返回一个状态元素列表。整个状态在逻辑上是串联起所有列表。在恢复/重新分发时,每个操作算子都获得状态元素的完整列表。
下面是一个有状态的SinkFunction,在讲数据元素写入外部存储之前使用CheckpointedFunction来缓存元素。主要是用来验证event-split充分布list状态。

下面的例子是一个有状态的SinkFunction,该sink会在数据发送到外部存储之前缓存数据元素。该例子是机遇均分重分布来实现的:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {

private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();
}@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}
}@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (Tuple2<String, Integer> element : bufferedElements) {checkpointedState.add(element);}
}@Override
public void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}
}

}
initializeState方法以FunctionInitializationContext作为参数。用于初始化non-keyed状态“containers”。这是ListState类型的容器,其中non-keyed状态对象将在checkpoint上存储。
留意状态是如何初始化的,类似于keyed状态,使用一个StateDescriptor,其中包含状态名和关于状态持有的值的类型的信息:

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
“buffered-elements”,
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

状态访问方法的命名约定包含其重分区模式及其状态结构。例如,要在还原时使用具有union重分区方案的list state,使用getUnionListState(descriptor)访问状态。如果方法名不包含重分区模式,例如getListState(descriptor),它仅仅意味着将使用均分重分区模式(Even-split redistribution)。
在初始化container之后,我们使用上下文的isrestore()方法检查失败后是否正在恢复。如果是true,即正在恢复,则执行恢复逻辑。
如修改后的BufferingSink代码所示,状态初始化期间恢复的数据保存在一个ListState变量中,以备将来在snapshotState()中使用。在那里,ListState将清除前一个检查点包含的所有对象,然后被我们想要检查的新选项填满。
另外,keyed状态也可以在initializeState()方法中初始化。可以使用FunctionInitializationContext来完成。
2.ListCheckpointed
ListCheckpointed接口是CheckpointedFunction的一个有限制的变体,它只支持列表样式的状态,在恢复时使用均分重分区方案。它还需要实现两种方法:
List snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List state) throws Exception;
在snapshotState()上,操作应该向检查点返回一个对象列表,而restoreState()必须在恢复时处理这个列表。如果状态不可重分区,则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)。

有状态的源函数(Stateful Source Functions)
与其他操作符相比,有状态源需要更多的关注。为了更新状态和输出集合的原子性(用于故障/恢复上的精确一次语义),用户需要从源上下文获取一个锁。

public static class CounterSource
extends RichParallelSourceFunction
implements ListCheckpointed {

/**  current offset for exactly once semantics */
private Long offset;/** flag for job cancellation */
private volatile boolean isRunning = true;@Override
public void run(SourceContext<Long> ctx) {final Object lock = ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset += 1;}}
}@Override
public void cancel() {isRunning = false;
}@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {return Collections.singletonList(offset);
}@Override
public void restoreState(List<Long> state) {for (Long s : state)offset = s;
}

}
当Flink完全确认检查点时,一些操作可能需要这些信息来与外部世界进行通信。在本例中,请参见org.apache.flink.runtime.state.CheckpointListener接口。

相关文章:

Flink主要有两种基础类型的状态:operator state。

Flink主要有两种基础类型的状态&#xff1a;keyed state 和operator state。 Operator State 对于Operator State(或者non-keyed state)&#xff0c;每个operator state绑定到一个并行operator实例上。在Flink中&#xff0c;Kafka Connector是一个使用Operator State的很好的例…...

【vue2】使用vue-admin-template动态添加路由的思路/addRoutes的使用

&#x1f609;博主&#xff1a;初映CY的前说(前端领域) ,&#x1f4d2;本文核心&#xff1a;用原生js实现省市区联动 【前言】在通用的后台管理项目的开发中&#xff0c;不仅仅是会涉及到对表单数据等的增删改查操作还会涉及到一些关于权限管理的问题。我们将基于一个RBAC的思维…...

Python语言中的注释方法应用

Python语言中的注释方法 在Python编程中&#xff0c;与其他编程语言一样&#xff0c;有良好的注释部分&#xff0c;会让你的程序在后续的改进或优化中&#xff0c;变得便利。同时&#xff0c;给自己培养了良好的编程习惯。 在Python语言中&#xff0c;有两种注释方法。 1.单行…...

Google浏览器翻译无法正常使用解决

1.查找可用服务器地址 按WinR键打开运行→输入cmd回车&#xff0c;打开命令提示符→输入ping google.cn 回车。记录一下下图红框里的ip地址&#xff0c;一会要用到 最近自己ping出来的ip可能不能用了&#xff0c;可以尝试用下面的ip 142.251.163.90 142.250.113.90 142.251.…...

ETCD(三)操作指令

1. put put #将给定的key写入到存储 --ignore-lease[false] #使用当前租约更新key --ignore-value[false] #使用当前值更新key --lease"0" # 要附加到key的租约ID&#xff08;十六进制&#xff09; --prev-kv[false] # 返回修改前的上一个键值对2. get get #获取给…...

小白学Pytorch系列--Torch.optim API Base class(1)

小白学Pytorch系列–Torch.optim API Base class(1) torch.optim是一个实现各种优化算法的包。大多数常用的方法都已得到支持&#xff0c;而且接口足够通用&#xff0c;因此将来还可以轻松集成更复杂的方法。 如何使用优化器 使用手torch.optim您必须构造一个优化器对象&…...

flac格式如何转mp3,3招帮你搞定

flac格式如何转mp3&#xff0c;3招帮你搞定的方法来啦。当你的音频是flac格式是不是很头疼&#xff0c;又不知道怎么转mp3 。然后网上搜索出很多方法又不知道从哪个下手&#xff0c;是不是很疑惑&#xff1f;那今天就来看看小编推荐的方法吧&#xff0c;一定让你眼前一亮&#…...

Redis入门到入土(day01)

NoSQL概述 为什么用NoSQL 1、单机MySQL的美好年代 在90年代&#xff0c;一个网站的访问量一般不大&#xff0c;用单个数据库完全可以轻松应付&#xff01; 在那个时候&#xff0c;更多的都是静态网页&#xff0c;动态交互类型的网站不多。 上述架构下&#xff0c;我们来看看…...

JVM垃圾回收GC 详解(java1.8)

目录 垃圾判断算法&#xff08;你是不是垃圾&#xff1f;&#xff09; 引用计数法 可达性算法 对象的引用 强引用 软引用 弱引用 虚引用 对象的自我救赎 垃圾回收算法--分代 标记清除算法 复制算法 标记整理法 垃圾处理器 垃圾判断算法&#xff08;你是不是垃圾&…...

Mybatis-Plus -03 Mybatis-Plus实现CRUD

Mybatis-Plus实现CRUD 1 Insert增加2 ID生成策略3 Delete删除4 逻辑删除5 Update修改6 Select查询 Mybatis-Plus实现CRUD 通用 CRUD 封装**BaseMapper (opens new window)**接口&#xff0c;为 Mybatis-Plus 启动时自动解析实体表关系映射转换为 Mybatis 内部对象注入容器参数 …...

综合能源系统中基于电转气和碳捕集系统的热电联产建模与优化研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

“智慧赋能 强链塑链”|工程物资供应链管理中的数字化应用

工程项目中的供应链管理至关重要 工程建设行业是国民经济的重要支柱之一&#xff0c;虽然在总产值上持续保持增长态势&#xff0c;但近年来行业的利润总额增速已连续多年呈现下降趋势。究其原因&#xff0c;可以大体从两个方面来看&#xff1a;一是行业盈利能力出现下降&#x…...

通过docker发布项目

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言例如&#xff1a;docker项目的发布方式 [docker发布的参考链接](https://www.cnblogs.com/emperorking/articles/11244253.html) 一、docker是什么&#xff1f;…...

为什么Spring和IDEA不推荐使用@Autowired注解?

在Spring开发中&#xff0c;Autowired注解是一个常用的依赖注入方式。但是&#xff0c;你可能会惊奇地发现&#xff0c;Spring和IDEA都不推荐使用Autowired注解。关于这个问题&#xff0c;其实答案相对统一&#xff0c;实际上用大白话说起来也容易理解。 官方答案 首先&#…...

windows下运行dpdk下的helloworld

打开“本地安全策略”管理单元,在搜索框输入secpol。 打开本地策略->用户权限分配->锁定内存页->添加用户或组->高级->立即查找 输入电脑用户名,选择并添加。点击确定后,重启电脑。 安装内核驱动,下载地址https://download.csdn.net/download/qq_36314864…...

【AI理论学习】深入理解Prompt Learning和Prompt Tuning

深入理解Prompt Learning和Prompt Tuning 背景Prompt Learning简介1. Prompt是什么&#xff1f;2. 为什么要使用Prompt&#xff1f;3. Prompt Learning的形式&#xff08;举例&#xff09;4. 有哪些Pre-training language model&#xff1f;5. 常见的Prompt Learning的方法 Pro…...

从Authy中导出账户和secret

本文转载于我的博客从Authy中导出账户和secret 前言 因为最近买了CanoKey&#xff0c;所以多算试一下CanoKey的TOTP功能&#xff0c;但是之前一直用的Authy并且它默认不支持导出功能 在网上找了一些文档&#xff0c;终于在github上找到了一个有效且简单的方法 目前网上大部分…...

图像锐度评分算法,方差,点锐度法,差分法,梯度法

图像锐度评分算法&#xff0c;方差&#xff0c;点锐度法&#xff0c;差分法&#xff0c;梯度法 图像锐度评分是用来描述图像清晰度的一个指标。常见的图像锐度评分算法包括方差法、点锐度法、差分法和梯度法等。 方差法&#xff1a;该方法是通过计算图像像素值的方差来评估图像…...

查询练习:连接查询

准备用于测试连接查询的数据&#xff1a; CREATE DATABASE testJoin;CREATE TABLE person (id INT,name VARCHAR(20),cardId INT );CREATE TABLE card (id INT,name VARCHAR(20) );INSERT INTO card VALUES (1, 饭卡), (2, 建行卡), (3, 农行卡), (4, 工商卡), (5, 邮政卡); S…...

【mmdeploy】【TODO】使用mmdeploy将mmdetection模型转tensorrt

mmdetection转换 文章目录 mmdetection转换mmdetection 自带转换ONNX——无法测试使用mmdeploy(0.6.0)使用mmdeploy转onnx使用mmdeploy直接转tensorRT调试记录 先上结论&#xff1a;作者最后是转tensorrt的小图才成功的&#xff0c;大图一直不行。文章仅作者自我记录使用&#…...

德赛西威上海车展重磅发布Smart Solution 2.0,有哪些革新点?

4月18日&#xff0c;全球瞩目的第二十届上海车展盛大启幕&#xff0c;作为国际领先的移动出行科技公司&#xff0c;德赛西威携智慧出行黑科技产品矩阵亮相&#xff0c;并以“智出行 共创享”为主题&#xff0c;重磅发布最新迭代的智慧出行解决方案——Smart Solution 2.0。 从…...

戴尔服务器是否需要开启cpupower.service

戴尔并不会默认开启cpupower.service&#xff0c;这取决于具体的操作系统和配置。cpupower.service是一个Linux系统服务&#xff0c;用于管理CPU的功耗和性能调节&#xff0c;可以通过调整CPU的频率和电源管理策略来降低能耗和温度。在某些情况下&#xff0c;开启cpupower.serv…...

day02_第一个Java程序

在开发第一个Java程序之前&#xff0c;我们必须对计算机的一些基础知识进行了解。 常用DOS命令 Java语言的初学者&#xff0c;学习一些DOS命令&#xff0c;会非常有帮助。DOS是一个早期的操作系统&#xff0c;现在已经被Windows系统取代&#xff0c;对于我们开发人员&#xf…...

【华为OD机试真题 】1011 - 第K个排列 (JAVA C++ Python JS) | 机试题+算法思路+考点+代码解析

文章目录 一、题目🔸题目描述🔸输入输出🔸样例1🔸样例2二、代码参考🔸C++代码🔸Java代码🔸Python代码🔸JS代码作者:KJ.JK🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🍂个人博客首页: KJ.JK 💖系列专栏:...

基于php的校园校园兼职网站的设计与实现

摘要 近年来&#xff0c;信息技术在大学校园中得到了广泛的应用&#xff0c;主要体现在两个方面&#xff1a;一是学校管理系统&#xff0c;包括教务管理、行政管理和分校管理&#xff0c;是我国大学管理和信息传递的主要渠道。二是学生生活服务平台。而随着大学生毕业人数的年…...

django部署

1. 配置服务器 安装django&#xff0c;python等服务–尽量和你的自己的配置相同&#xff0c;一摸一样避免出现问题 2.django项目迁移 sudo scp /home/tarena/django/mysitel root88.77.66.55:/home/root/xxx #然后输入密码3&#xff0c;用uWSGI 替代python manage.py runse…...

OpenCV 图像处理学习手册:1~5

原文&#xff1a;Learning Image Processing with OpenCV 协议&#xff1a;CC BY-NC-SA 4.0 译者&#xff1a;飞龙 本文来自【ApacheCN 计算机视觉 译文集】&#xff0c;采用译后编辑&#xff08;MTPE&#xff09;流程来尽可能提升效率。 当别人说你没有底线的时候&#xff0c;…...

深度学习 - 43.SeNET、Bilinear Interaction 实现特征交叉 By Keras

目录 一.引言 二.SENET Layer 1.简介 2.Keras 实现 2.1 Init Function 2.2 Build Function 2.3 Call Function 2.4 Test Main Function 2.5 完整代码 三.BiLinear Intercation Layer 1.简介 2.Keras 实现 2.1 Init Function 2.2 Build Function 2.3 Call Functi…...

Ceph入门到精通-Cephadm安装Ceph(v17.2.5 Quincy)全网最全版本

Deploy Ceph&#xff08;v17.2.5 Quincy&#xff09; cluster to use Cephadm - DevOps - dbaselife Install cephadm Cephadm creates a new Ceph cluster by “bootstrapping” on a single host, expanding the cluster to encompass any additional hosts, and then depl…...

BIOS与POST自检

一、什么是BIOS BIOS是英文"BasicInput-Output System"&#xff0c;中文名称就是"基本输入输出系统"&#xff0c;是集成在主板上的一个ROM芯片&#xff0c;意思是只读存储器基本输入输出系统。顾名思义&#xff0c;它保存着计算机最重要的基本输入输出的程…...