当前位置: 首页 > news >正文

【Apache Flink】实现有状态函数

文章目录

  • 在RuntimeContext 中声明键值分区状态
  • 通过ListCheckPonitend 接口实现算子列表状态
  • 使用CheckpointedFunction接口
  • 接收检查点完成通知
  • 参考文档

在RuntimeContext 中声明键值分区状态

Flink为键值分区状态(Keyed State)提供了几种不同的原语(数据类型)。这是因为不同的算法和操作可能需要管理不同类型的状态。其中一些原语包括:

  1. ValueState: 这种状态类型用于存储单个的,可能更新的值。常见的用途包括存储计数器或聚合。

  2. ListState: 这种状态用于存储一组元素(通常是元素的长列表)。借助此状态,可以简单地追加元素和迭代所有元素。

  3. ReducingStateAggregatingState<IN, OUT>: 这两种状态都用于合并元素,通常在窗口操作中使用。

    • ReducingState:将添加的元素与现有元素通过reduce函数进行合并,最后只会保留一个元素,即合并的结果。

    • AggregatingState:与ReducingState类似,但是其可以存储转换后的聚合结果,而不是输入元素。

  4. MapState<UK, UV>: 这种状态类型存储一个key-value映射。

要使用某一类型的 keyed state,需要提供一个 StateDescriptor,用于声明状态的名称和类型。然后可以通过 RuntimeContext 获取状态。

这些状态类型都是接口,并将存储后端(Flink提供了内存和RocksDB两种用于存储状态的后端)的具体实现细节隔离出来,因此用户可以不用关心状态是如何存储和访问的。

Flink 的键控状态使我们能够通过简单的API调用,就能够很自然地处理键控数据流,我们只需要关心特定键的当前事件和状态,Flink 框架会自动地处理状态的分布式存储和故障恢复等

我们需要了解在 Flink 中,RuntimeContext 提供访问在运行期间的任务 (比如 Map、Reduce 或 Filter function) 可以访问的上下文信息,例如任务的并行度,任务名称,任务 ID,输入和输出信号等。此外,RuntimeContext 还为用户代码提供了生成和维护分布式累加器和键值状态的方法。

在 Apache Flink 中,键值状态(Keyed State)是一种类型的状态,它是以 key 为中心的。每一个 key 都可以对应一个状态。我们可以在 Flink 算子的open()方法中通过 RuntimeContext 获取和初始化它。

举个例子,假设我们正在构建一个实时的网络游戏分析系统,我们可能关注每位玩家的实时得分,这个得分基于他们在游戏中执行的动作(例如完成一项任务,击败一个敌人等)。在这个场景中,每个玩家的ID就是一个 "键",同时他们的游戏得分就是与键关联的 "状态"。当玩家在游戏中执行动作时,我们需要调整他们的分数状态

然后,我们的 Flink 代码可以定义一个 RichMapFunction 来维护每个玩家的分数状态:

public class PlayerScoreFunction extends RichMapFunction<GameEvent, Tuple2<String, Long>> {// 定义键控状态private transient ValueState<Long> scoreState;@Overridepublic void open(Configuration params) throws Exception {ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("playerScore", // 状态的名称TypeInformation.of(new TypeHint<Long>() {}),0L); // 默认值scoreState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple2<String, Long> map(GameEvent gameEvent) throws Exception {// update the statelong currentScore = scoreState.value();currentScore += gameEvent.getScore();scoreState.update(currentScore);// return the updated scorereturn new Tuple2<>(gameEvent.getPlayerId(), currentScore);}
}

在这个例子中,PlayerScoreFunction 接收 GameEvent 流,这是玩家在游戏中的各种动作生成的事件。我们将玩家的 ID 作为键来处理这个流。通过 getRuntimeContext().getState(descriptor) 我们获得了状态。然后我们在每次新的 GameEvent 到来时,根据事件中的分数增量用 scoreState.update(currentScore) 更新状态,然后将更新后的得分以及玩家的 ID 一起输出给下一个算子,例如,连接到实时的游戏分数仪表盘,将每个玩家的最新得分显示给观众看。

通过ListCheckPonitend 接口实现算子列表状态

算子状态(Operator State)在流处理系统(比如 Apache Flink)中,是一种特殊类型的状态,针对的是整个算子,而不是特定的键值。它存储的是某一特定算子的所有记录的全局信息。

算子状态的维护主要包括以下步骤:

  1. 定义算子状态:首先,我们需要在处理函数中定义一个或多个算子状态。我们可以指定算子状态的名字,并定义它存储的数据类型。

  2. 读取和写入算子状态:一旦定义了算子状态,我们就可以在流处理函数中对它进行读取和写入。读取算子状态通常在需要根据状态信息做出处理决策时进行。写入算子状态通常在我们需要更新状态信息时进行。

  3. 保持状态一致:为了保持状态的一致性,我们需要定期将算子状态进行快照(Snapshot)并保存到远程存储系统中。在系统中断后,我们可以从最新的快照恢复算子状态。

  4. 状态恢复:在系统中断后,我们可以使用保存的快照恢复算子状态,恢复流处理的执行。

维护算子状态的方法可能会根据具体的流处理系统有所不同,但基本原理是相同的。这四步是维护算子状态的基本过程。

在 Flink 中,ListState 是 CheckpointedState 的一种。ListState 可以为每一条数据保存不止一个值,也就是说,所有的数据都会添加到该状态中。在故障恢复时,这些元素按添加的顺序重放。我们从 CheckpointedFunctionListCheckpointed 接口的抽象类型继承,然后实现 snapshotStaterestoreState 方法,以完成状态恢复。

具体来说,如果我们想使用 ListCheckpointed 接口实现算子列表状态,可以参考以下的代码:

我们每次接收到未序列化的 String 类型的数值,就把它转成 Integer 类型存储在一个列表(List)中。在每个 Checkpoint 操作当中,通过 snapshotState 方法进行状态的快照并返回。当故障发生后,Flink 会调用 restoreState 方法将状态恢复回来。

如果算子是并行的,Flink 会为每一个子任务调用 restoreState 方法,并在算子的每个子任务中创建一个新的列表状态实例。在故障后进行状态恢复时,Flink 将提取快照并将其分发到每个子任务。

public class ListStateFunction extends RichMapFunction<String, Integer> implements ListCheckpointed<Integer> {private List<Integer> bufferElements;public ListStateFunction(){this.bufferElements = new ArrayList<>();}@Overridepublic Integer map(String value) throws Exception {int parsedValue = Integer.parseInt(value);bufferElements.add(parsedValue);return bufferElements.size();}// 每次 checkpoint 时,将缓存的元素进行快照@Overridepublic List<Integer> snapshotState(long checkpointId, long timestamp)  {return this.bufferElements;}// 从存储中恢复状态@Overridepublic void restoreState(List<Integer> state) {this.bufferElements.addAll(state);}
}

使用 ListCheckpointed 还是 CheckpointedFunction 取决于特定的需求和上下文,两者在功能上是相似的,但 CheckpointedFunction 提供了更多的灵活性,可以让你自己决定如何存储和恢复状态以及存储于哪种类型的状态后端。

使用CheckpointedFunction接口

Apache Flink提供了一个特殊的接口CheckpointedFunction,可以在自定义函数中使用它来操作和管理算子状态。这个接口会在检查点(checkpoint)操作时触发,允许访问和编辑操作员状态。

h使用CheckpointedFunction的例子:

public class CountWithCheckpoint implements CheckpointedFunction, MapFunction<Long, Long> {private transient ValueState<Long> counter;@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("counter", TypeInformation.of(new TypeHint<Long>() {}));counter = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(Long value) throws Exception {Long currentCount = counter.value();Long newCount = currentCount == null ? 1L : currentCount + 1;counter.update(newCount);return newCount;}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {counter.clear();}
}

此示例创建一个计数但在每个检查点清空的函数。initializeState()方法会在各种生命周期事件(例如,开始和恢复)时调用并初始化状态变量。然后在map()方法中,状态被更新。snapshotState()在checkpoint操作时触发,这里我们仅清空状态,无任何持久化操作。

在操作和维护算子状态时,我们需要考虑状态的一致性和恢复,以处理可能的故障和中断。实际中可能会对snapshotState()方法更复杂的逻辑,比如将状态存储至远端。

接收检查点完成通知

在Apache Flink中,当所有任务成功从接头位置创建检查点后,作业管理器将坐标控制条以通知所有任务检查点的成功完成。然后,所有任务都会得到一个新的检查点的完成通知。

如果要接收这样的通知并对其做出反应,可以让你的RichFunction实现CheckpointListener接口。以下是一个基本示例:

函数使用ListState进行状态管理,每个接收到的元素都会被添加到状态中。并且,我们实现了notifyCheckpointComplete(long checkpointId)函数,以便在每次成功完成检查点后接收到通知。这个函数里你可以进行一些操作如清除状态、更新外部系统等。

触发的notifyCheckpointComplete方法是在下一次checkpoint发生在Task周的快照操作之前,具体的实现要根据你的检查点配置和故障恢复能力进行规划。

public class MyFunction extends RichMapFunction<Long, Long> implements CheckpointListener {private transient ListState<Long> checkpointedState;@Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("state", Long.class);checkpointedState = getRuntimeContext().getListState(descriptor);}@Overridepublic Long map(Long value) throws Exception {checkpointedState.add(value);return value;}@Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {// 监听到检查点成功完成的通知,此处可以进行相关逻辑处理}
}

参考文档

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/

相关文章:

【Apache Flink】实现有状态函数

文章目录 在RuntimeContext 中声明键值分区状态通过ListCheckPonitend 接口实现算子列表状态使用CheckpointedFunction接口接收检查点完成通知参考文档 在RuntimeContext 中声明键值分区状态 Flink为键值分区状态&#xff08;Keyed State&#xff09;提供了几种不同的原语&…...

Android原生项目集成uniMPSDK(Uniapp)遇到的报错总结

uni小程s序SDK 集成到Android原生项目:老项目中用到的库较多&#xff0c;会出现几种冲突问题&#xff0c;总结如下&#xff1a; 报错1&#xff1a; Execution failed for task :app:processDebugManifest. > Manifest merger failed with multiple errors, see logs Andro…...

Linux redis 安装

1、解压 tar -zxvf redis-5.0.10.tar.gz 2、cd /data/redis-5.0.10 文件夹 3、make 等待make命令执行完成即可。 make命令报错&#xff1a;cc 未找到命令&#xff0c;系统中缺少gcc&#xff0c;执行命令安装 gcc&#xff1a; yum -y install gcc automake autocon…...

在Win11上部署ChatGLM3详细步骤

023年10月27日&#xff0c;智谱AI于2023中国计算机大会&#xff08;CNCC&#xff09;上&#xff0c;推出了全自研的第三代基座大模型ChatGLM3及相关系列产品&#xff0c;这也是智谱AI继推出千亿基座的对话模型ChatGLM和ChatGLM2之后的又一次重大突破。此次推出的ChatGLM3采用了…...

系列七、动态代理

一、概述 二、Jdk动态代理案例 2.1、Star /*** Author : 一叶浮萍归大海* Date: 2023/10/27 17:16* Description:*/ public interface Star {/*** 唱歌* param name 歌曲名字* return*/String sing(String name);/*** 跳舞*/void dance(); } 2.2、BigStar /*** Author : 一叶…...

Kafka集群搭建与SpringBoot项目集成

本篇文章的目的是帮助Kafka初学者快速搭建一个Kafka集群&#xff0c;以及怎么在SpringBoot项目中使用Kafka。 kafka集群环境包地址&#xff1a;百度网盘 请输入提取码 提取码&#xff1a;x9yn 一、Kafka集群搭建 1、准备环境 &#xff08;1&#xff09;准备三台…...

一个简单的注册的页面,如有错误请指正;(3.JavaScript)

这段代码是一个JavaScript函数&#xff0c;实现了用户登录和上传图片的功能&#xff0c;并包含了一些辅助函数。让我一一解释&#xff1a; 1. login()&#xff1a;这个函数用于登录操作。首先&#xff0c;通过$(#name).val()来获取ID为name的元素的值&#xff0c;同理获取其他…...

selenium (自动化概念 测试环境配置)

什么是自动化测试 自动化测试介绍 自动化测试指软件测试的自动化&#xff0c;在预设状态下运行应用程序或者系统. 预设条件包括正常和异常&#xff0c;最后评估运行结果。   自动化测试&#xff0c;就是将人为驱动的测试行为转化为机器执行的过程。 【机器 代替 人工】 自动化…...

Mybatis-Plus(企业实际开发应用)

一、Mybatis-Plus简介 MyBatis-Plus是MyBatis框架的一个增强工具&#xff0c;可以简化持久层代码开发MyBatis-Plus&#xff08;简称 MP&#xff09;是一个 MyBatis 的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变&#xff0c;为简化开发、提高效率而生。 官网&a…...

Spring Web MVC入门

一&#xff1a;了解Spring Web MVC (1)关于Java开发 &#x1f31f;Java开发大多数场景是业务开发 比如说京东的业务就是电商卖货、今日头条的业务就推送新闻&#xff1b;快手的业务就是短视频推荐 (2)Spring Web MVC的简单理解 &#x1f497;Spring Web MVC&#xff1a;如何使…...

【C++】mapset的底层结构 -- AVL树(高度平衡二叉搜索树)

前面我们对 map / multimap / set / multiset 进行了简单的介绍&#xff0c;可以发现&#xff0c;这几个容器有个共同点是&#xff1a;其底层都是按照二叉搜索树来实现的。 但是二叉搜索树有其自身的缺陷&#xff0c;假如往树中插入的元素有序或者接近有序&#xff0c;二叉搜索…...

吴恩达《机器学习》1-4:无监督学习

一、无监督学习 无监督学习就像你拿到一堆未分类的东西&#xff0c;没有标签告诉你它们是什么&#xff0c;然后你的任务是自己找出它们之间的关系或者分成不同的组&#xff0c;而不依赖于任何人给你关于这些东西的指导。 以聚类为例&#xff0c;无监督学习算法可以将数据点分成…...

一个简单的注册页面,如有错误请指正(2.css)

这段CSS代码定义了页面的样式&#xff0c;让我逐个解释其功能&#xff1a; 1. * {}&#xff1a;通配符选择器&#xff0c;用于将页面中的所有元素设置统一的样式。这里将margins和paddings设置为0&#xff0c;以去除默认的边距。 2. div img {}&#xff1a;选择页面中所有div…...

【Unity精华一记】特殊文件夹

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…...

Node.js中的单线程服务器

为了解决多线程服务器在高并发的I/O密集型应用中的不足&#xff0c;同时避免早期简单单线程服务器的性能障碍&#xff0c;Node.js采用了基于"事件循环"的非阻塞式单线程模型&#xff0c;实现了如下两个目标&#xff1a; &#xff08;1&#xff09;保证每个请求都可以…...

如何删除数组中的某个元素?

如何删除数组中的某个元素&#xff1f; 例&#xff1a;给你一个数组 nums 和一个值 val&#xff0c;你需要移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 三种方法 1.元素前移&#xff08;时间复杂度&#xff1a;O(N^2)&#xff0c;空间复杂度&#x…...

Apache ActiveMQ RCE漏洞复现(CNVD-2023-69477)

0x01 产品简介 ActiveMQ是一个开源的消息代理和集成模式服务器&#xff0c;它支持Java消息服务(JMS) API。它是Apache Software Foundation下的一个项目&#xff0c;用于实现消息中间件&#xff0c;帮助不同的应用程序或系统之间进行通信。 0x02 漏洞概述 Apache ActiveMQ 中存…...

【BUG】Nginx转发失败解决方案

最近在做项目的时候出现了一个问题&#xff0c;琢磨了好久&#xff0c;来浅浅记录一下。 这个项目后端使用的是gateway网关和nacos实现动态的路由&#xff0c;前端使用nginx来管理前端资源&#xff0c;大体流程&#xff1a;浏览器发起请求&#xff0c;经过nginx代理&#xff0c…...

综合OA管理系统源码 OA系统源码

综合OA管理系统源码 OA系统源码 功能介绍&#xff1a; 编号&#xff1a;LQ10 一&#xff1a;系统管理 系统配置&#xff0c;功能模块&#xff0c;功能节点&#xff0c;权限角色&#xff0c;操作日志&#xff0c;备份数据&#xff0c;还原数据 二&#xff1a;基础数据 审批…...

9-MySQL提高数据管理效率(分库分表实践)

MySQL提高数据管理效率&#xff08;分库分表实践&#xff09; 在当今的互联网时代&#xff0c;随着业务规模的不断扩大&#xff0c;数据量也呈现出爆炸性的增长。如何有效地管理和存储这些数据&#xff0c;以及提高数据库的性能和可扩展性&#xff0c;成为了一个迫切需要解决的…...

经典卷积神经网络 - NIN

网络中的网络&#xff0c;NIN。 AlexNet和VGG都是先由卷积层构成的模块充分抽取空间特征&#xff0c;再由全连接层构成的模块来输出分类结果。但是其中的全连接层的参数量过于巨大&#xff0c;因此NiN提出用1*1卷积代替全连接层&#xff0c;串联多个由卷积层和“全连接”层构成…...

leetcode_2558 从数量最多的堆取走礼物

1. 题意 给定一个数组&#xff0c;每次从中取走最大的数&#xff0c;返回开根号向下取整送入堆中&#xff0c;最后计算总和。 从数量最多的堆取走礼物 2. 题解 直接用堆模拟即可 2.1 我的代码 用了额外的空间O( n ) priority_queue会自动调用make_heap() 、pop_heap() c…...

01. 嵌入式与人工智能是如何结合的?

CPU是Arm A57的 GPU是128cuda核 一.小车跟踪的需求和设计方法 比如有一个小车跟踪的项目。 需求是&#xff1a;小车识别出罪犯&#xff0c;然后去跟踪他。方法&#xff1a;摄像头采集到人之后传入到开发板&#xff0c;内部做一下识别&#xff0c;然后控制小车去跟随。在人工智…...

vue3.0运行npm run dev 报错Cannot find module node:url

vue3.0运行npm run dev 报错Cannot find module 问题背景 近期用vue3.0写项目&#xff0c;npm init vuelatest —> npm install 都正常,npm run dev的时候报错如下&#xff1a; failed to load config from F:\code\testVue\vue-demo\vite.config.js error when starting…...

26. 删除排序数组中的重复项、Leetcode的Python实现

博客主页&#xff1a;&#x1f3c6;看看是李XX还是李歘歘 &#x1f3c6; &#x1f33a;每天分享一些包括但不限于计算机基础、算法等相关的知识点&#x1f33a; &#x1f497;点关注不迷路&#xff0c;总有一些&#x1f4d6;知识点&#x1f4d6;是你想要的&#x1f497; ⛽️今…...

荣耀推送服务消息分类标准

前言 为了提升终端用户的推送体验、营造良好可持续的通知生态&#xff0c;荣耀推送服务将对推送消息进行分类管理。 消息分类 定义 荣耀推送服务将根据应用类型、消息内容和消息发送场景&#xff0c;将推送消息分成服务通讯和资讯营销两大类别。 服务通讯类&#xff0c;包…...

[数据结构]-二叉搜索树

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正。 目录 一、二叉搜…...

力扣每日一题79:单词搜索

题目描述&#xff1a; 给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 单词必须按照字母顺序&#xff0c;通过相邻的单元格内的字母构成&#xff0c;其中“相邻”单元格…...

ChatGPT如何应对用户提出的道德伦理困境?

ChatGPT在应对用户提出的道德伦理困境时&#xff0c;需要考虑众多复杂的因素。道德伦理问题涉及到价值观、原则、社会和文化背景&#xff0c;以及众多伦理理论。ChatGPT的设计和应用需要权衡各种考虑因素&#xff0c;以确保它不仅提供有用的信息&#xff0c;而且遵循伦理标准。…...

SpringBoot运行流程源码分析------阶段三(Spring Boot外化配置源码解析)

Spring Boot外化配置源码解析 外化配置简介 Spring Boot设计了非常特殊的加载指定属性文件&#xff08;PropertySouce&#xff09;的顺序&#xff0c;允许属性值合理的覆盖&#xff0c;属性值会以下面的优先级进行配置。home目录下的Devtool全局设置属性&#xff08;~/.sprin…...