【Apache Flink】实现有状态函数
文章目录
- 在RuntimeContext 中声明键值分区状态
- 通过ListCheckPonitend 接口实现算子列表状态
- 使用CheckpointedFunction接口
- 接收检查点完成通知
- 参考文档
在RuntimeContext 中声明键值分区状态
Flink为键值分区状态(Keyed State)提供了几种不同的原语(数据类型)。这是因为不同的算法和操作可能需要管理不同类型的状态。其中一些原语包括:
-
ValueState: 这种状态类型用于存储单个的,可能更新的值。常见的用途包括存储计数器或聚合。
-
ListState: 这种状态用于存储一组元素(通常是元素的长列表)。借助此状态,可以简单地追加元素和迭代所有元素。
-
ReducingState 和 AggregatingState<IN, OUT>: 这两种状态都用于合并元素,通常在窗口操作中使用。
-
ReducingState:将添加的元素与现有元素通过reduce函数进行合并,最后只会保留一个元素,即合并的结果。
-
AggregatingState:与ReducingState类似,但是其可以存储转换后的聚合结果,而不是输入元素。
-
-
MapState<UK, UV>: 这种状态类型存储一个key-value映射。
要使用某一类型的 keyed state,需要提供一个 StateDescriptor,用于声明状态的名称和类型。然后可以通过 RuntimeContext 获取状态。
这些状态类型都是接口,并将存储后端(Flink提供了内存和RocksDB两种用于存储状态的后端)的具体实现细节隔离出来,因此用户可以不用关心状态是如何存储和访问的。
Flink 的键控状态使我们能够通过简单的API调用,就能够很自然地处理键控数据流,我们只需要关心特定键的当前事件和状态,Flink 框架会自动地处理状态的分布式存储和故障恢复等
我们需要了解在 Flink 中,RuntimeContext 提供访问在运行期间的任务 (比如 Map、Reduce 或 Filter function) 可以访问的上下文信息,例如任务的并行度,任务名称,任务 ID,输入和输出信号等。此外,RuntimeContext 还为用户代码提供了生成和维护分布式累加器和键值状态的方法。
在 Apache Flink 中,键值状态(Keyed State)是一种类型的状态,它是以 key 为中心的。每一个 key 都可以对应一个状态。我们可以在 Flink 算子的open()方法中通过 RuntimeContext 获取和初始化它。
举个例子,假设我们正在构建一个实时的网络游戏分析系统,我们可能关注每位玩家的实时得分,这个得分基于他们在游戏中执行的动作(例如完成一项任务,击败一个敌人等)。在这个场景中,每个玩家的ID就是一个 "键",同时他们的游戏得分就是与键关联的 "状态"。当玩家在游戏中执行动作时,我们需要调整他们的分数状态。
然后,我们的 Flink 代码可以定义一个 RichMapFunction 来维护每个玩家的分数状态:
public class PlayerScoreFunction extends RichMapFunction<GameEvent, Tuple2<String, Long>> {// 定义键控状态private transient ValueState<Long> scoreState;@Overridepublic void open(Configuration params) throws Exception {ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("playerScore", // 状态的名称TypeInformation.of(new TypeHint<Long>() {}),0L); // 默认值scoreState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple2<String, Long> map(GameEvent gameEvent) throws Exception {// update the statelong currentScore = scoreState.value();currentScore += gameEvent.getScore();scoreState.update(currentScore);// return the updated scorereturn new Tuple2<>(gameEvent.getPlayerId(), currentScore);}
}
在这个例子中,
PlayerScoreFunction接收GameEvent流,这是玩家在游戏中的各种动作生成的事件。我们将玩家的 ID 作为键来处理这个流。通过getRuntimeContext().getState(descriptor)我们获得了状态。然后我们在每次新的GameEvent到来时,根据事件中的分数增量用scoreState.update(currentScore)更新状态,然后将更新后的得分以及玩家的 ID 一起输出给下一个算子,例如,连接到实时的游戏分数仪表盘,将每个玩家的最新得分显示给观众看。
。
通过ListCheckPonitend 接口实现算子列表状态
算子状态(Operator State)在流处理系统(比如 Apache Flink)中,是一种特殊类型的状态,针对的是整个算子,而不是特定的键值。它存储的是某一特定算子的所有记录的全局信息。
算子状态的维护主要包括以下步骤:
-
定义算子状态:首先,我们需要在处理函数中定义一个或多个算子状态。我们可以指定算子状态的名字,并定义它存储的数据类型。
-
读取和写入算子状态:一旦定义了算子状态,我们就可以在流处理函数中对它进行读取和写入。读取算子状态通常在需要根据状态信息做出处理决策时进行。写入算子状态通常在我们需要更新状态信息时进行。
-
保持状态一致:为了保持状态的一致性,我们需要定期将算子状态进行快照(Snapshot)并保存到远程存储系统中。在系统中断后,我们可以从最新的快照恢复算子状态。
-
状态恢复:在系统中断后,我们可以使用保存的快照恢复算子状态,恢复流处理的执行。
维护算子状态的方法可能会根据具体的流处理系统有所不同,但基本原理是相同的。这四步是维护算子状态的基本过程。
在 Flink 中,ListState 是 CheckpointedState 的一种。ListState 可以为每一条数据保存不止一个值,也就是说,所有的数据都会添加到该状态中。在故障恢复时,这些元素按添加的顺序重放。我们从 CheckpointedFunction 或 ListCheckpointed 接口的抽象类型继承,然后实现 snapshotState 和 restoreState 方法,以完成状态恢复。
具体来说,如果我们想使用 ListCheckpointed 接口实现算子列表状态,可以参考以下的代码:
我们每次接收到未序列化的 String 类型的数值,就把它转成 Integer 类型存储在一个列表(List)中。在每个 Checkpoint 操作当中,通过
snapshotState方法进行状态的快照并返回。当故障发生后,Flink 会调用restoreState方法将状态恢复回来。
如果算子是并行的,Flink 会为每一个子任务调用 restoreState 方法,并在算子的每个子任务中创建一个新的列表状态实例。在故障后进行状态恢复时,Flink 将提取快照并将其分发到每个子任务。
public class ListStateFunction extends RichMapFunction<String, Integer> implements ListCheckpointed<Integer> {private List<Integer> bufferElements;public ListStateFunction(){this.bufferElements = new ArrayList<>();}@Overridepublic Integer map(String value) throws Exception {int parsedValue = Integer.parseInt(value);bufferElements.add(parsedValue);return bufferElements.size();}// 每次 checkpoint 时,将缓存的元素进行快照@Overridepublic List<Integer> snapshotState(long checkpointId, long timestamp) {return this.bufferElements;}// 从存储中恢复状态@Overridepublic void restoreState(List<Integer> state) {this.bufferElements.addAll(state);}
}
使用
ListCheckpointed还是CheckpointedFunction取决于特定的需求和上下文,两者在功能上是相似的,但CheckpointedFunction提供了更多的灵活性,可以让你自己决定如何存储和恢复状态以及存储于哪种类型的状态后端。
使用CheckpointedFunction接口
Apache Flink提供了一个特殊的接口CheckpointedFunction,可以在自定义函数中使用它来操作和管理算子状态。这个接口会在检查点(checkpoint)操作时触发,允许访问和编辑操作员状态。
h使用CheckpointedFunction的例子:
public class CountWithCheckpoint implements CheckpointedFunction, MapFunction<Long, Long> {private transient ValueState<Long> counter;@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("counter", TypeInformation.of(new TypeHint<Long>() {}));counter = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(Long value) throws Exception {Long currentCount = counter.value();Long newCount = currentCount == null ? 1L : currentCount + 1;counter.update(newCount);return newCount;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {counter.clear();}
}
此示例创建一个计数但在每个检查点清空的函数。initializeState()方法会在各种生命周期事件(例如,开始和恢复)时调用并初始化状态变量。然后在map()方法中,状态被更新。snapshotState()在checkpoint操作时触发,这里我们仅清空状态,无任何持久化操作。
在操作和维护算子状态时,我们需要考虑状态的一致性和恢复,以处理可能的故障和中断。实际中可能会对
snapshotState()方法更复杂的逻辑,比如将状态存储至远端。
接收检查点完成通知
在Apache Flink中,当所有任务成功从接头位置创建检查点后,作业管理器将坐标控制条以通知所有任务检查点的成功完成。然后,所有任务都会得到一个新的检查点的完成通知。
如果要接收这样的通知并对其做出反应,可以让你的RichFunction实现CheckpointListener接口。以下是一个基本示例:
函数使用ListState进行状态管理,每个接收到的元素都会被添加到状态中。并且,我们实现了notifyCheckpointComplete(long checkpointId)函数,以便在每次成功完成检查点后接收到通知。这个函数里你可以进行一些操作如清除状态、更新外部系统等。
触发的
notifyCheckpointComplete方法是在下一次checkpoint发生在Task周的快照操作之前,具体的实现要根据你的检查点配置和故障恢复能力进行规划。
public class MyFunction extends RichMapFunction<Long, Long> implements CheckpointListener {private transient ListState<Long> checkpointedState;@Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("state", Long.class);checkpointedState = getRuntimeContext().getListState(descriptor);}@Overridepublic Long map(Long value) throws Exception {checkpointedState.add(value);return value;}@Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {// 监听到检查点成功完成的通知,此处可以进行相关逻辑处理}
}
参考文档
- https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/
相关文章:
【Apache Flink】实现有状态函数
文章目录 在RuntimeContext 中声明键值分区状态通过ListCheckPonitend 接口实现算子列表状态使用CheckpointedFunction接口接收检查点完成通知参考文档 在RuntimeContext 中声明键值分区状态 Flink为键值分区状态(Keyed State)提供了几种不同的原语&…...
Android原生项目集成uniMPSDK(Uniapp)遇到的报错总结
uni小程s序SDK 集成到Android原生项目:老项目中用到的库较多,会出现几种冲突问题,总结如下: 报错1: Execution failed for task :app:processDebugManifest. > Manifest merger failed with multiple errors, see logs Andro…...
Linux redis 安装
1、解压 tar -zxvf redis-5.0.10.tar.gz 2、cd /data/redis-5.0.10 文件夹 3、make 等待make命令执行完成即可。 make命令报错:cc 未找到命令,系统中缺少gcc,执行命令安装 gcc: yum -y install gcc automake autocon…...
在Win11上部署ChatGLM3详细步骤
023年10月27日,智谱AI于2023中国计算机大会(CNCC)上,推出了全自研的第三代基座大模型ChatGLM3及相关系列产品,这也是智谱AI继推出千亿基座的对话模型ChatGLM和ChatGLM2之后的又一次重大突破。此次推出的ChatGLM3采用了…...
系列七、动态代理
一、概述 二、Jdk动态代理案例 2.1、Star /*** Author : 一叶浮萍归大海* Date: 2023/10/27 17:16* Description:*/ public interface Star {/*** 唱歌* param name 歌曲名字* return*/String sing(String name);/*** 跳舞*/void dance(); } 2.2、BigStar /*** Author : 一叶…...
Kafka集群搭建与SpringBoot项目集成
本篇文章的目的是帮助Kafka初学者快速搭建一个Kafka集群,以及怎么在SpringBoot项目中使用Kafka。 kafka集群环境包地址:百度网盘 请输入提取码 提取码:x9yn 一、Kafka集群搭建 1、准备环境 (1)准备三台…...
一个简单的注册的页面,如有错误请指正;(3.JavaScript)
这段代码是一个JavaScript函数,实现了用户登录和上传图片的功能,并包含了一些辅助函数。让我一一解释: 1. login():这个函数用于登录操作。首先,通过$(#name).val()来获取ID为name的元素的值,同理获取其他…...
selenium (自动化概念 测试环境配置)
什么是自动化测试 自动化测试介绍 自动化测试指软件测试的自动化,在预设状态下运行应用程序或者系统. 预设条件包括正常和异常,最后评估运行结果。 自动化测试,就是将人为驱动的测试行为转化为机器执行的过程。 【机器 代替 人工】 自动化…...
Mybatis-Plus(企业实际开发应用)
一、Mybatis-Plus简介 MyBatis-Plus是MyBatis框架的一个增强工具,可以简化持久层代码开发MyBatis-Plus(简称 MP)是一个 MyBatis 的增强工具,在 MyBatis 的基础上只做增强不做改变,为简化开发、提高效率而生。 官网&a…...
Spring Web MVC入门
一:了解Spring Web MVC (1)关于Java开发 🌟Java开发大多数场景是业务开发 比如说京东的业务就是电商卖货、今日头条的业务就推送新闻;快手的业务就是短视频推荐 (2)Spring Web MVC的简单理解 💗Spring Web MVC:如何使…...
【C++】mapset的底层结构 -- AVL树(高度平衡二叉搜索树)
前面我们对 map / multimap / set / multiset 进行了简单的介绍,可以发现,这几个容器有个共同点是:其底层都是按照二叉搜索树来实现的。 但是二叉搜索树有其自身的缺陷,假如往树中插入的元素有序或者接近有序,二叉搜索…...
吴恩达《机器学习》1-4:无监督学习
一、无监督学习 无监督学习就像你拿到一堆未分类的东西,没有标签告诉你它们是什么,然后你的任务是自己找出它们之间的关系或者分成不同的组,而不依赖于任何人给你关于这些东西的指导。 以聚类为例,无监督学习算法可以将数据点分成…...
一个简单的注册页面,如有错误请指正(2.css)
这段CSS代码定义了页面的样式,让我逐个解释其功能: 1. * {}:通配符选择器,用于将页面中的所有元素设置统一的样式。这里将margins和paddings设置为0,以去除默认的边距。 2. div img {}:选择页面中所有div…...
【Unity精华一记】特殊文件夹
👨💻个人主页:元宇宙-秩沅 👨💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨💻 本文由 秩沅 原创 👨💻 收录于专栏:uni…...
Node.js中的单线程服务器
为了解决多线程服务器在高并发的I/O密集型应用中的不足,同时避免早期简单单线程服务器的性能障碍,Node.js采用了基于"事件循环"的非阻塞式单线程模型,实现了如下两个目标: (1)保证每个请求都可以…...
如何删除数组中的某个元素?
如何删除数组中的某个元素? 例:给你一个数组 nums 和一个值 val,你需要移除所有数值等于 val 的元素,并返回移除后数组的新长度。 三种方法 1.元素前移(时间复杂度:O(N^2),空间复杂度&#x…...
Apache ActiveMQ RCE漏洞复现(CNVD-2023-69477)
0x01 产品简介 ActiveMQ是一个开源的消息代理和集成模式服务器,它支持Java消息服务(JMS) API。它是Apache Software Foundation下的一个项目,用于实现消息中间件,帮助不同的应用程序或系统之间进行通信。 0x02 漏洞概述 Apache ActiveMQ 中存…...
【BUG】Nginx转发失败解决方案
最近在做项目的时候出现了一个问题,琢磨了好久,来浅浅记录一下。 这个项目后端使用的是gateway网关和nacos实现动态的路由,前端使用nginx来管理前端资源,大体流程:浏览器发起请求,经过nginx代理,…...
综合OA管理系统源码 OA系统源码
综合OA管理系统源码 OA系统源码 功能介绍: 编号:LQ10 一:系统管理 系统配置,功能模块,功能节点,权限角色,操作日志,备份数据,还原数据 二:基础数据 审批…...
9-MySQL提高数据管理效率(分库分表实践)
MySQL提高数据管理效率(分库分表实践) 在当今的互联网时代,随着业务规模的不断扩大,数据量也呈现出爆炸性的增长。如何有效地管理和存储这些数据,以及提高数据库的性能和可扩展性,成为了一个迫切需要解决的…...
测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
AspectJ 在 Android 中的完整使用指南
一、环境配置(Gradle 7.0 适配) 1. 项目级 build.gradle // 注意:沪江插件已停更,推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...
作为测试我们应该关注redis哪些方面
1、功能测试 数据结构操作:验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化:测试aof和aof持久化机制,确保数据在开启后正确恢复。 事务:检查事务的原子性和回滚机制。 发布订阅:确保消息正确传递。 2、性…...
【C++】纯虚函数类外可以写实现吗?
1. 答案 先说答案,可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...
从物理机到云原生:全面解析计算虚拟化技术的演进与应用
前言:我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM(Java Virtual Machine)让"一次编写,到处运行"成为可能。这个软件层面的虚拟化让我着迷,但直到后来接触VMware和Doc…...
