当前位置: 首页 > news >正文

Flink主要有两种基础类型的状态:operator state。

Flink主要有两种基础类型的状态:keyed state 和operator state。
Operator State
对于Operator State(或者non-keyed state),每个operator state绑定到一个并行operator实例上。在Flink中,Kafka Connector是一个使用Operator State的很好的例子。每个并行Kafka消费者实例维护一个主题分区和偏移的map作为它的Operator State。
当并行度被修改时,Operator State接口支持在并行operator实例上重新分配状态。进行这种重新分配可以有不同的方案。
Raw and Managed State
Keyed State 和 Operator State 有两种形式: managed和raw。
Managed State表示数据结构由Flink runtime控制,例如内部哈希表或者RocksDB。例如,“ValueState”,“ListState”等等。Flink的runtime层会编码State并将其写入checkpoint中。
Raw State是操作算子保存在它的数据结构中的state。当进行checkpoint时,它只写入字节序列到checkpoint中。Flink并不知道状态的数据结构,并且只能看到raw字节。
所有的数据流函数都可以使用managed state,但是raw state接口只可以在操作算子的实现类中使用。推荐使用managed state(而不是raw state),因为使用managed state,当并行度变化时,Flink可以自动的重新分布状态,也可以做更好的内存管理。
注意 如果你的managed state需要自定义序列化逻辑,请参见managed state的自定义序列化以确保未来的兼容性。Flink默认的序列化不需要特殊处理。

managed non-keyed state
可以通过实现CheckpointedFunction或者ListCheckpointed接口,来使用managed non-keyed状态。

1.CheckpointedFunction
CheckpointedFunction接口通过不同的重新分配方案提供对non-keyed状态的访问。它需要实现两种方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;
每当必须执行checkpoint时,都会调用snapshotState()。对应的initializeState()在每次初始化用户定义的函数时调用,可以是在函数第一次初始化时调用,也可以是在函数实际从较早的checkpoint恢复时调用。因此,initializeState()不仅是初始化不同类型状态的地方,也是状态恢复逻辑实现地方。

目前,支持List样式的管理操作状态。状态是一个可序列化对象的列表,彼此独立,因此在重新扫描时能够进行重新分区。换句话说,这些对象是可以重新分区no-keyed状态的最佳粒度。根据状态访问方法的不同,定义了以下重分区方案:

Even-split redistribution:每个操作算子返回一个状态元素列表。逻辑上串联起所有的列表就是状态元素完整列表。在恢复/重新分区时,该列表会均分成算子实例个数个子列表。每个操作算子实例获取一个子列表,该子列表可以是空的,也可以包含一个或多个元素。例如,如果并行度为1,则操作算子的检查点状态包含元素element1和element2。当并行度增加到2时,element1可能会出现在算子实例0中,而element2会出现在算子实例1中。
Union redistribution: 每个操作算子返回一个状态元素列表。整个状态在逻辑上是串联起所有列表。在恢复/重新分发时,每个操作算子都获得状态元素的完整列表。
下面是一个有状态的SinkFunction,在讲数据元素写入外部存储之前使用CheckpointedFunction来缓存元素。主要是用来验证event-split充分布list状态。

下面的例子是一个有状态的SinkFunction,该sink会在数据发送到外部存储之前缓存数据元素。该例子是机遇均分重分布来实现的:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {

private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();
}@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}
}@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (Tuple2<String, Integer> element : bufferedElements) {checkpointedState.add(element);}
}@Override
public void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}
}

}
initializeState方法以FunctionInitializationContext作为参数。用于初始化non-keyed状态“containers”。这是ListState类型的容器,其中non-keyed状态对象将在checkpoint上存储。
留意状态是如何初始化的,类似于keyed状态,使用一个StateDescriptor,其中包含状态名和关于状态持有的值的类型的信息:

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
“buffered-elements”,
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

状态访问方法的命名约定包含其重分区模式及其状态结构。例如,要在还原时使用具有union重分区方案的list state,使用getUnionListState(descriptor)访问状态。如果方法名不包含重分区模式,例如getListState(descriptor),它仅仅意味着将使用均分重分区模式(Even-split redistribution)。
在初始化container之后,我们使用上下文的isrestore()方法检查失败后是否正在恢复。如果是true,即正在恢复,则执行恢复逻辑。
如修改后的BufferingSink代码所示,状态初始化期间恢复的数据保存在一个ListState变量中,以备将来在snapshotState()中使用。在那里,ListState将清除前一个检查点包含的所有对象,然后被我们想要检查的新选项填满。
另外,keyed状态也可以在initializeState()方法中初始化。可以使用FunctionInitializationContext来完成。
2.ListCheckpointed
ListCheckpointed接口是CheckpointedFunction的一个有限制的变体,它只支持列表样式的状态,在恢复时使用均分重分区方案。它还需要实现两种方法:
List snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List state) throws Exception;
在snapshotState()上,操作应该向检查点返回一个对象列表,而restoreState()必须在恢复时处理这个列表。如果状态不可重分区,则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)。

有状态的源函数(Stateful Source Functions)
与其他操作符相比,有状态源需要更多的关注。为了更新状态和输出集合的原子性(用于故障/恢复上的精确一次语义),用户需要从源上下文获取一个锁。

public static class CounterSource
extends RichParallelSourceFunction
implements ListCheckpointed {

/**  current offset for exactly once semantics */
private Long offset;/** flag for job cancellation */
private volatile boolean isRunning = true;@Override
public void run(SourceContext<Long> ctx) {final Object lock = ctx.getCheckpointLock();while (isRunning) {// output and state update are atomicsynchronized (lock) {ctx.collect(offset);offset += 1;}}
}@Override
public void cancel() {isRunning = false;
}@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {return Collections.singletonList(offset);
}@Override
public void restoreState(List<Long> state) {for (Long s : state)offset = s;
}

}
当Flink完全确认检查点时,一些操作可能需要这些信息来与外部世界进行通信。在本例中,请参见org.apache.flink.runtime.state.CheckpointListener接口。

相关文章:

Flink主要有两种基础类型的状态:operator state。

Flink主要有两种基础类型的状态&#xff1a;keyed state 和operator state。 Operator State 对于Operator State(或者non-keyed state)&#xff0c;每个operator state绑定到一个并行operator实例上。在Flink中&#xff0c;Kafka Connector是一个使用Operator State的很好的例…...

【vue2】使用vue-admin-template动态添加路由的思路/addRoutes的使用

&#x1f609;博主&#xff1a;初映CY的前说(前端领域) ,&#x1f4d2;本文核心&#xff1a;用原生js实现省市区联动 【前言】在通用的后台管理项目的开发中&#xff0c;不仅仅是会涉及到对表单数据等的增删改查操作还会涉及到一些关于权限管理的问题。我们将基于一个RBAC的思维…...

Python语言中的注释方法应用

Python语言中的注释方法 在Python编程中&#xff0c;与其他编程语言一样&#xff0c;有良好的注释部分&#xff0c;会让你的程序在后续的改进或优化中&#xff0c;变得便利。同时&#xff0c;给自己培养了良好的编程习惯。 在Python语言中&#xff0c;有两种注释方法。 1.单行…...

Google浏览器翻译无法正常使用解决

1.查找可用服务器地址 按WinR键打开运行→输入cmd回车&#xff0c;打开命令提示符→输入ping google.cn 回车。记录一下下图红框里的ip地址&#xff0c;一会要用到 最近自己ping出来的ip可能不能用了&#xff0c;可以尝试用下面的ip 142.251.163.90 142.250.113.90 142.251.…...

ETCD(三)操作指令

1. put put #将给定的key写入到存储 --ignore-lease[false] #使用当前租约更新key --ignore-value[false] #使用当前值更新key --lease"0" # 要附加到key的租约ID&#xff08;十六进制&#xff09; --prev-kv[false] # 返回修改前的上一个键值对2. get get #获取给…...

小白学Pytorch系列--Torch.optim API Base class(1)

小白学Pytorch系列–Torch.optim API Base class(1) torch.optim是一个实现各种优化算法的包。大多数常用的方法都已得到支持&#xff0c;而且接口足够通用&#xff0c;因此将来还可以轻松集成更复杂的方法。 如何使用优化器 使用手torch.optim您必须构造一个优化器对象&…...

flac格式如何转mp3,3招帮你搞定

flac格式如何转mp3&#xff0c;3招帮你搞定的方法来啦。当你的音频是flac格式是不是很头疼&#xff0c;又不知道怎么转mp3 。然后网上搜索出很多方法又不知道从哪个下手&#xff0c;是不是很疑惑&#xff1f;那今天就来看看小编推荐的方法吧&#xff0c;一定让你眼前一亮&#…...

Redis入门到入土(day01)

NoSQL概述 为什么用NoSQL 1、单机MySQL的美好年代 在90年代&#xff0c;一个网站的访问量一般不大&#xff0c;用单个数据库完全可以轻松应付&#xff01; 在那个时候&#xff0c;更多的都是静态网页&#xff0c;动态交互类型的网站不多。 上述架构下&#xff0c;我们来看看…...

JVM垃圾回收GC 详解(java1.8)

目录 垃圾判断算法&#xff08;你是不是垃圾&#xff1f;&#xff09; 引用计数法 可达性算法 对象的引用 强引用 软引用 弱引用 虚引用 对象的自我救赎 垃圾回收算法--分代 标记清除算法 复制算法 标记整理法 垃圾处理器 垃圾判断算法&#xff08;你是不是垃圾&…...

Mybatis-Plus -03 Mybatis-Plus实现CRUD

Mybatis-Plus实现CRUD 1 Insert增加2 ID生成策略3 Delete删除4 逻辑删除5 Update修改6 Select查询 Mybatis-Plus实现CRUD 通用 CRUD 封装**BaseMapper (opens new window)**接口&#xff0c;为 Mybatis-Plus 启动时自动解析实体表关系映射转换为 Mybatis 内部对象注入容器参数 …...

综合能源系统中基于电转气和碳捕集系统的热电联产建模与优化研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

“智慧赋能 强链塑链”|工程物资供应链管理中的数字化应用

工程项目中的供应链管理至关重要 工程建设行业是国民经济的重要支柱之一&#xff0c;虽然在总产值上持续保持增长态势&#xff0c;但近年来行业的利润总额增速已连续多年呈现下降趋势。究其原因&#xff0c;可以大体从两个方面来看&#xff1a;一是行业盈利能力出现下降&#x…...

通过docker发布项目

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言例如&#xff1a;docker项目的发布方式 [docker发布的参考链接](https://www.cnblogs.com/emperorking/articles/11244253.html) 一、docker是什么&#xff1f;…...

为什么Spring和IDEA不推荐使用@Autowired注解?

在Spring开发中&#xff0c;Autowired注解是一个常用的依赖注入方式。但是&#xff0c;你可能会惊奇地发现&#xff0c;Spring和IDEA都不推荐使用Autowired注解。关于这个问题&#xff0c;其实答案相对统一&#xff0c;实际上用大白话说起来也容易理解。 官方答案 首先&#…...

windows下运行dpdk下的helloworld

打开“本地安全策略”管理单元,在搜索框输入secpol。 打开本地策略->用户权限分配->锁定内存页->添加用户或组->高级->立即查找 输入电脑用户名,选择并添加。点击确定后,重启电脑。 安装内核驱动,下载地址https://download.csdn.net/download/qq_36314864…...

【AI理论学习】深入理解Prompt Learning和Prompt Tuning

深入理解Prompt Learning和Prompt Tuning 背景Prompt Learning简介1. Prompt是什么&#xff1f;2. 为什么要使用Prompt&#xff1f;3. Prompt Learning的形式&#xff08;举例&#xff09;4. 有哪些Pre-training language model&#xff1f;5. 常见的Prompt Learning的方法 Pro…...

从Authy中导出账户和secret

本文转载于我的博客从Authy中导出账户和secret 前言 因为最近买了CanoKey&#xff0c;所以多算试一下CanoKey的TOTP功能&#xff0c;但是之前一直用的Authy并且它默认不支持导出功能 在网上找了一些文档&#xff0c;终于在github上找到了一个有效且简单的方法 目前网上大部分…...

图像锐度评分算法,方差,点锐度法,差分法,梯度法

图像锐度评分算法&#xff0c;方差&#xff0c;点锐度法&#xff0c;差分法&#xff0c;梯度法 图像锐度评分是用来描述图像清晰度的一个指标。常见的图像锐度评分算法包括方差法、点锐度法、差分法和梯度法等。 方差法&#xff1a;该方法是通过计算图像像素值的方差来评估图像…...

查询练习:连接查询

准备用于测试连接查询的数据&#xff1a; CREATE DATABASE testJoin;CREATE TABLE person (id INT,name VARCHAR(20),cardId INT );CREATE TABLE card (id INT,name VARCHAR(20) );INSERT INTO card VALUES (1, 饭卡), (2, 建行卡), (3, 农行卡), (4, 工商卡), (5, 邮政卡); S…...

【mmdeploy】【TODO】使用mmdeploy将mmdetection模型转tensorrt

mmdetection转换 文章目录 mmdetection转换mmdetection 自带转换ONNX——无法测试使用mmdeploy(0.6.0)使用mmdeploy转onnx使用mmdeploy直接转tensorRT调试记录 先上结论&#xff1a;作者最后是转tensorrt的小图才成功的&#xff0c;大图一直不行。文章仅作者自我记录使用&#…...

当plc编程遇见ai助手:用快马智能分析需求并生成优化控制方案

作为一名工业自动化领域的工程师&#xff0c;我最近尝试用AI辅助完成PLC编程工作&#xff0c;发现InsCode(快马)平台的智能对话功能特别适合处理复杂控制逻辑的开发。这种"人类描述需求AI分析生成"的协作模式&#xff0c;让传统PLC开发效率提升了至少三倍。 需求分析…...

零基础友好:借助快马生成的指导项目轻松完成anaconda安装与初体验

最近在学Python数据分析&#xff0c;被各种环境配置搞得头大。朋友推荐用Anaconda管理环境&#xff0c;但光是安装就卡了半天。后来在InsCode(快马)平台发现了个神器项目&#xff0c;像有个老师手把手教操作&#xff0c;分享下我的学习过程&#xff1a; 为什么选择Anaconda 刚开…...

OpenClaw多任务队列:gemma-3-12b-it并行处理技巧与实践

OpenClaw多任务队列&#xff1a;gemma-3-12b-it并行处理技巧与实践 1. 为什么需要多任务队列 去年冬天&#xff0c;我正尝试用OpenClaw自动化处理一批市场调研报告。当同时提交5个分析任务时&#xff0c;发现系统要么卡死&#xff0c;要么任务相互覆盖。这种经历让我意识到—…...

为什么高端芯片都爱用Flip Chip?对比Wire Bonding的5大优势详解

为什么高端芯片都爱用Flip Chip&#xff1f;对比Wire Bonding的5大优势详解 在芯片封装领域&#xff0c;Flip Chip&#xff08;倒装芯片&#xff09;技术正逐渐成为高端应用的标配。想象一下&#xff0c;当你手持最新款智能手机&#xff0c;流畅运行着复杂的AI应用时&#xff0…...

QWEN-AUDIO与其他AI工具共存:如何合理分配GPU资源?

QWEN-AUDIO与其他AI工具共存&#xff1a;如何合理分配GPU资源&#xff1f; 1. 多AI工具共存的挑战与解决方案 在当前的AI应用场景中&#xff0c;单一GPU服务器往往需要同时运行多个AI模型。QWEN-AUDIO作为一款高性能语音合成系统&#xff0c;如何与其他视觉、语言模型和谐共存…...

Qwen3-ASR-1.7B车载场景应用:驾驶语音助手开发

Qwen3-ASR-1.7B车载场景应用&#xff1a;驾驶语音助手开发 1. 引言 开车时操作导航、切歌、调音量&#xff0c;这些看似简单的操作却暗藏风险。低头一秒&#xff0c;车辆就能开出几十米&#xff0c;事故往往就发生在这瞬间。传统的触屏操作不仅分心&#xff0c;还让驾驶变得不…...

协议解析CPU飙升85%?从Wireshark抓包到JFR火焰图的全链路诊断闭环,立即生效!

第一章&#xff1a;协议解析CPU飙升85%&#xff1f;从Wireshark抓包到JFR火焰图的全链路诊断闭环&#xff0c;立即生效&#xff01;当线上服务突发CPU使用率飙升至85%以上&#xff0c;且无明显GC压力或线程阻塞时&#xff0c;协议层异常解析往往是隐藏元凶。我们曾在线上Java服…...

OpenClaw学习助手:用gemma-3-12b-it自动整理课程笔记与习题

OpenClaw学习助手&#xff1a;用gemma-3-12b-it自动整理课程笔记与习题 1. 为什么需要AI学习助手&#xff1f; 作为一名经常需要消化大量课程资料的技术从业者&#xff0c;我长期被三个问题困扰&#xff1a;PDF讲义信息碎片化难以形成体系、课堂重点难以快速提炼、错题整理耗…...

Pixel Couplet Gen快速部署:微信小程序端调用像素春联API的跨域与性能优化

Pixel Couplet Gen快速部署&#xff1a;微信小程序端调用像素春联API的跨域与性能优化 1. 项目背景与核心价值 Pixel Couplet Gen是一款基于ModelScope大模型驱动的创新春联生成器&#xff0c;将传统春节文化与现代像素艺术完美融合。不同于传统春联生成工具&#xff0c;该项…...

假芯片识别与防范:工程师实战指南

1. 假芯片泛滥&#xff1a;半导体行业的隐秘危机最近在调试一块电路板时&#xff0c;我发现一个奇怪的现象&#xff1a;明明使用的是同型号的MCU&#xff0c;但部分板子的功耗异常偏高。经过一周的排查&#xff0c;最终发现问题出在芯片上——我们采购到了一批"套牌"…...