当前位置: 首页 > news >正文

Flink状态State | 大数据技术

简单说两句

✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:小叮当撩代码CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:小叮当撩代码

🔎GZH哆啦A梦撩代码

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

Flink状态

image-20240602200907231

Flink中的State

image-20240602192616430

State概念

在 Flink 中,状态是流处理程序中非常重要的一部分,它允许你保存和访问数据,以实现复杂的计算逻辑。

可以简单理解为: 历史计算结果

Flink中的算子任务的State分类通常分为两类

1️⃣ 有状态

有状态需要考虑历史的数据,相同的输入可能会得到不同的输出

比如:sum/reduce/maxBy, 对单词按照key分组聚合,进来一个(hello,1),得到(hello,1), 再进来一个(hello,1), 得到的结果为(hello,2)

2️⃣ 无状态

无状态简单说就是不需要考虑历史的数据,相同的输入得到相同的结果

比如map、filter、flatmap算子都属于无状态,不需要依赖其他数据

Flink默认已经支持了无状态和有状态计算!

状态分类

Flink中有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)

Managed State是由Flink管理的,Flink帮忙存储、恢复和优化

Raw State是开发者自己管理的,需要自己序列化

❇️通常情况下,我们采用托管状态来实现我们的需求!!!

托管状态

​ Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以Flink 能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效

​ 很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。

🎨所以:我们又可以将托管状态分为两类:算子状态按键分区状态

键控状态Keyed State

详细内容可以瞅瞅官网:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/fault-tolerance/state/

Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。

需要注意的是键控状态只能在 KeyedStream 上进行使用,可以通过 stream.keyBy(…) 来得到 KeyedStream 。

img

Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。

ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。

ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。

AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。

FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。

MapState:维护 Map 类型的状态。


Code实操

例子1

使用KeyState中的ValueState来模拟实现maxBy

代码清单


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author tiancx*/
public class StateMaxByDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//加载数据DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("北京", 1),Tuple2.of("上海", 2),Tuple2.of("广州", 3),Tuple2.of("北京", 4),Tuple2.of("上海", 5),Tuple2.of("广州", 6),Tuple2.of("北京", 3)).keyBy(t -> t.f0);source.map(new RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {//定义状态,用于存储最大值ValueState<Integer> maxValueState = null;//进行初始化@Overridepublic void open(Configuration parameters) throws Exception {//创建状态描述器ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("maxValueState", Integer.class);maxValueState = getRuntimeContext().getState(descriptor);}@Overridepublic Tuple3<String, Integer, Integer> map(Tuple2<String, Integer> value) throws Exception {//获取当前值Integer currentVal = value.f1;Integer currentMax = maxValueState.value();if (currentMax == null || currentVal > currentMax) {maxValueState.update(currentVal);}return Tuple3.of(value.f0, value.f1, maxValueState.value());}}).print();env.execute();}
}

运行看结果

5c1eb573f51d5a9cec2032e503b0dee3

例子2

如果一个人的体温超过阈值38度,超过3次及以上,则输出: 姓名 [温度1,温度2,温度3]

代码清单


import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.List;/*** @author tiancx*/
public class StateDemo01 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Integer>> source = stream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(" ");return Tuple2.of(split[0], Integer.parseInt(split[1]));}}).keyBy(t -> t.f0);source.flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, List<Integer>>>() {ListState<Integer> listState = null;//存放超过38度的次数ValueState<Integer> valueState = null;@Overridepublic void open(Configuration parameters) throws Exception {ListStateDescriptor<Integer> listStateDescriptor = new ListStateDescriptor<Integer>("listState", Integer.class);ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("valueState", Integer.class);listState = getRuntimeContext().getListState(listStateDescriptor);valueState = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, List<Integer>>> out) throws Exception {System.out.println("进入flatMap");Integer val = value.f1;if (valueState.value() == null) {valueState.update(0);}if (val > 38) {listState.add(val);valueState.update(valueState.value() + 1);}if (valueState.value() >= 3) {List<Integer> list = (List<Integer>) listState.get();out.collect(Tuple2.of(value.f0, list));listState.clear();valueState.clear();}}}).print();env.execute();}
}

输入

image-20240602100424957

运行结果

image-20240602100441746

算子状态OperatorState

​ 算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。

​ 算 子 状 态 也 支 持 不 同 的 结 构 类 型 , 主 要 有 三 种 : ListState 、 UnionListState 和BroadcastState。


code实操

例子1:

在 map 算子中计算数据的个数

代码清单


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author tiancx*/
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);stream.map(new MyCountMapFunction()).print();env.execute();}public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {private Long count = 0L;private ListState<Long> listState;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** 本地变量持久化:将 本地变量拷贝到算子状态中,开启checkpoint 时才会调用 snapshotState 方法** @param context the context for drawing a snapshot of the operator* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("MyCountMapFunction.snapshotState");listState.clear();listState.add(count);}/*** 初始化本地变量:程序启动和恢复时,从状态中把数据添加到本地变量,每个子任务调用一次** @param context the context for initializing the operator* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("MyCountMapFunction.initializeState");//从上下文初始化状态listState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("listState", Types.LONG()));//从算子状态中把数据拷贝到本地变量if (context.isRestored()) {for (Long aLong : listState.get()) {count += aLong;}}}}
}

输入

image-20240602110341359

运行结果

image-20240602110403448

【都看到这了,点点赞点点关注呗,爱你们】😚😚

蓝白色微信公众号大学生校园清新简单纸飞机动态引导关注简洁新媒体分享中文动态引导关注

💬

✨ 正在努力的小叮当~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:小叮当撩代码CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:小叮当撩代码

🔎GZH哆啦A梦撩代码

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

相关文章:

Flink状态State | 大数据技术

⭐简单说两句⭐ ✨ 正在努力的小叮当~ &#x1f496; 超级爱分享&#xff0c;分享各种有趣干货&#xff01; &#x1f469;‍&#x1f4bb; 提供&#xff1a;模拟面试 | 简历诊断 | 独家简历模板 &#x1f308; 感谢关注&#xff0c;关注了你就是我的超级粉丝啦&#xff01; &a…...

go语言方法之方法值和方法表达式

我们经常选择一个方法&#xff0c;并且在同一个表达式里执行&#xff0c;比如常见的p.Distance()形式&#xff0c;实际上 将其分成两步来执行也是可能的。p.Distance叫作“选择器”&#xff0c;选择器会返回一个方法"值"->一 个将方法(Point.Distance)绑定到特定接…...

TDMQ CKafka 版弹性存储能力重磅上线!

导语 自 2024年5月起&#xff0c;TDMQ CKafka 专业版支持弹性存储能力&#xff0c;这种产品形态下&#xff0c;存储可按需使用、按量付费&#xff0c;一方面降低消费即删除、存储使用波动大场景下的存储成本&#xff0c;另一方面存储空间理论上无穷大。 TDMQ CKafka 版产品能…...

24、Linux网络端口

Linux网络端口 1、查看网络接口信息ifconfig ens33 eth0 文件 ifconfig 当前设备正在工作的网卡&#xff0c;启动的设备。 ifconfig -a 查看所有的网络设备。 ifconfig ens33 查看指定网卡设备。 ifconfig ens33 up/down 对指定网卡设备进行开关 基于物理网卡设备虚拟的…...

Mysql全文搜索和LIKE搜索有什么区别

全文搜索和LIKE的区别 性能&#xff1a;在大数据集上&#xff0c;全文搜索通常比LIKE查询更快&#xff0c;因为它使用了专门的索引结构。 功能&#xff1a;全文搜索提供了更丰富的查询功能&#xff0c;如多个关键词的搜索、自然语言搜索、布尔搜索等。而LIKE通常只支持简单的…...

elementplu父级页面怎么使用封装子组件原组件的方法

一、使用原因&#xff1a; 封装了el-table&#xff0c;表格中有多选&#xff0c;父级要根据指定状态&#xff0c;让其选择不上&#xff0c;需要用到elementplus中table原方法toggleRowSelection 附加小知识点&#xff1a;&#xff08;el-tree刷新树后之前选中的保持高亮setCurr…...

el-date-picker选择开始日期的近半年

<el-date-pickerv-model"form[val.key]":type"val.datePickerType || daterange":clearable"val.clearable && true"range-separator"~"start-placeholder"开始日期"end-placeholder"结束日期"style&q…...

C++

封装一个矩形类(Rect)&#xff0c;拥有私有属性:宽度(width)、高度(height)&#xff0c; 定义公有成员函数: 初始化函数:void init(int w, int h) 更改宽度的函数:set_w(int w) 更改高度的函数:set_h(int h) 输出该矩形的周长和面积函数:void show()...

nginx源码阅读理解 [持续更新,建议关注]

文章目录 前述一、nginx 进程模型基本流程二、源码里的小点1.对字符串操作都进行了原生实现2.配置文件解析也是原生实现待续 前述 通过对 nginx 的了解和代码简单阅读&#xff0c;发现这个C代码的中间件确实存在过人之处&#xff0c;使用场景特别多&#xff0c;插件模块很丰富…...

笔试训练2

牛客.单词搜索 刚开始我就想是搜索&#xff0c;但是不清楚bfs还是dfs更好&#xff0c;我尝试了bfs但是队列存东西&#xff0c;没有我想象的那么好写&#xff0c;所以我决定试试dfs import java.util.*;public class Solution {static int m 0;static int n 0;static int […...

构建坚不可摧的Web安全防线:深入剖析二阶注入与全面防御策略

引言 在数字化时代&#xff0c;数据安全是企业和个人最为关注的问题之一。网络攻击手段层出不穷&#xff0c;其中SQL注入攻击尤为狡猾&#xff0c;它允许攻击者通过Web应用的漏洞对数据库进行非法操作。更隐蔽的是二阶注入攻击&#xff0c;它不仅威胁当前操作&#xff0c;还能…...

(4) qml动态元素

文章目录 概述注意 动画元素变化的策略Animation on 变化behavior on⽤standalone animation注意 缓冲曲线&#xff08;Easing Curves&#xff09;动画分组 概述 这⼀章介绍如何控制属性值的变化&#xff0c;通过动画的⽅式在⼀段时间内来改变属性值。这项技术是建⽴⼀个现代化…...

深度神经网络——什么是梯度下降?

如果对神经网络的训练有所了解&#xff0c;那么很可能已经听说过“梯度下降”这一术语。梯度下降是提升神经网络性能、降低其误差率的主要技术手段。然而&#xff0c;对于机器学习新手来说&#xff0c;梯度下降的概念可能稍显晦涩。本文旨在帮助您直观理解梯度下降的工作原理。…...

基本元器件 - 二极管

目录 二极管的主要参数 二极管的分类 整流二极管 快恢复二极管&#xff08;FRD&#xff09; 稳压&#xff08;齐纳&#xff09;二级管 瞬态电压抑制器&#xff08;TVS&#xff09; 开关二极管 肖特基二极管&#xff08;SBD&#xff09; 正偏与反偏 常用封装 伏安特性…...

【设计模式】单例模式(创建型)⭐⭐⭐

1.概念 1.1 什么是单例模式 单例模式属于创建型模式&#xff0c;一个单例类在任何情况下都只存在一个实例&#xff0c; 构造方法必须是私有的、由自己创建一个静态变量存储实例&#xff0c;对外提供一 个静态公有方法获取实例。 1.2 优点与缺点 优点:是内存中只有一个实例&…...

《深入浅出C语言:从基础到指针的全面指南》

1. 简介 C语言是一种通用的编程语言&#xff0c;广泛应用于系统编程、嵌入式系统和高性能应用程序。它由Dennis Ritchie在1972年开发&#xff0c;并且至今仍然非常流行。C语言以其高效、灵活和强大的功能著称&#xff0c;是许多现代编程语言的基础。 2. 基本语法 2.1 Hello, …...

Typescript高级: 深入实践Record类型

概述 Record 类型是TS中其众多强大特性之一它为我们提供了创建键值对映射的强大能力极大地增强了代码的灵活性与类型安全性 应用示例 1 &#xff09;用于配置场景 在复杂的项目中&#xff0c;配置文件往往包含多个模块的不同设置使用 Record 可以确保配置的键名正确且值类型…...

重构与优化-对象间特性搬移重构(2)

在软件开发过程中,重构是改进代码结构和设计、不改变其外在行为的过程。对象之间的特性搬移(Moving Features Between Objects)是重构的一种重要类型,它涉及到将属性、方法或其他特性从一个对象转移到另一个对象,以优化代码结构、提高可维护性和遵循设计原则。以下是几种典…...

网络流量监控与DNS流量分析

目录 一、网络流量监控的基础知识 什么是网络流量监控&#xff1f; 网络流量监控的重要性 实用案例&#xff1a;如何通过网络流量监控优化带宽利用 二、DNS流量分析的核心要点 什么是DNS流量分析&#xff1f; DNS流量分析的优势 实用技巧&#xff1a;如何通过DNS流量分…...

【数据分析】打造完美数据分析环境:Python开发环境搭建全攻略

打造完美数据分析环境&#xff1a;Python开发环境搭建全攻略 在数据分析的世界中&#xff0c;搭建一个稳定且高效的Python开发环境是至关重要的。本文将介绍三种主要的环境搭建方式&#xff1a;使用pip、Anaconda和Miniconda。 1. 使用pip从清华镜像安装Python包 pip是Pytho…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

CTF show Web 红包题第六弹

提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框&#xff0c;很难让人不联想到SQL注入&#xff0c;但提示都说了不是SQL注入&#xff0c;所以就不往这方面想了 ​ 先查看一下网页源码&#xff0c;发现一段JavaScript代码&#xff0c;有一个关键类ctfs…...

如何在看板中有效管理突发紧急任务

在看板中有效管理突发紧急任务需要&#xff1a;设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP&#xff08;Work-in-Progress&#xff09;弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中&#xff0c;设立专门的紧急任务通道尤为重要&#xff0c;这能…...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...

LeetCode - 199. 二叉树的右视图

题目 199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 思路 右视图是指从树的右侧看&#xff0c;对于每一层&#xff0c;只能看到该层最右边的节点。实现思路是&#xff1a; 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

《C++ 模板》

目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板&#xff0c;就像一个模具&#xff0c;里面可以将不同类型的材料做成一个形状&#xff0c;其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式&#xff1a;templa…...

破解路内监管盲区:免布线低位视频桩重塑停车管理新标准

城市路内停车管理常因行道树遮挡、高位设备盲区等问题&#xff0c;导致车牌识别率低、逃费率高&#xff0c;传统模式在复杂路段束手无策。免布线低位视频桩凭借超低视角部署与智能算法&#xff0c;正成为破局关键。该设备安装于车位侧方0.5-0.7米高度&#xff0c;直接规避树枝遮…...

MyBatis中关于缓存的理解

MyBatis缓存 MyBatis系统当中默认定义两级缓存&#xff1a;一级缓存、二级缓存 默认情况下&#xff0c;只有一级缓存开启&#xff08;sqlSession级别的缓存&#xff09;二级缓存需要手动开启配置&#xff0c;需要局域namespace级别的缓存 一级缓存&#xff08;本地缓存&#…...

小木的算法日记-多叉树的递归/层序遍历

&#x1f332; 从二叉树到森林&#xff1a;一文彻底搞懂多叉树遍历的艺术 &#x1f680; 引言 你好&#xff0c;未来的算法大神&#xff01; 在数据结构的世界里&#xff0c;“树”无疑是最核心、最迷人的概念之一。我们中的大多数人都是从 二叉树 开始入门的&#xff0c;它…...