当前位置: 首页 > news >正文

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State.
在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.

  • 需求
    将接收到的Socket数据源中的字符串进行拼接
    在命令行开启socket命令:
    nc -lk 8888
    
  • 业务代码
    public class FlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1,便于观察env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将数据进行分组,将分组key给一个常量值SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")// 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等.map(new RichMapFunction<String, String>() {ListState<String> listState;// open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器@Overridepublic void open(Configuration parameters) throws Exception {// 获取上下文RuntimeContext ctx = getRuntimeContext();// 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));}// 在map方法中正常编写业务逻辑@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中listState.add(s);Iterable<String> strings = listState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}});map.print();env.execute("Keyed State");}
    }
    
    API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.

相关文章:

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State. 在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Ch…...

c语言:模拟实现qsort函数

qsort函数的功能&#xff1a; qsort相较于冒泡排序法&#xff0c;不仅效率更快&#xff0c;而且能够比较不同类型的元素&#xff0c;如&#xff1a;浮点数&#xff0c;结构体等等。这里我们来模拟下qsort是如何实现这一功能的&#xff0c;方便我们对指针数组有一个更深层次的理…...

从0开始学习数据结构 C语言实现 1.前篇及二分查找算法

一、前篇 1、什么是数据结构&#xff1f; 数据结构是带有结构特性的数据元素的集合&#xff0c;它研究的是数据的逻辑结构和数据的物理结构以及它们之间的相互关系 2、时间复杂度与空间复杂度 大O符号是用于描述函数渐进行为的数学符号 常用函数的增长表 阶乘O(n!) > 指数…...

VSCode 使用CMakePreset找不到cl.exe编译器的问题

在用vscode开发c项目的时候&#xff0c;使用预先配置的CMakePresets.json可以把一些特定的cmake选项固定下来&#xff0c;在配置时直接使用 "cmake --config --preset presetname"就可以进行配置&#xff0c;免去在命令行输入过多的配置参数。 但是在vscode中&#…...

【Linux系统化学习】进程的状态 | 僵尸进程 | 孤儿进程

个人主页点击直达&#xff1a;小白不是程序媛 Linux专栏&#xff1a;Linux系统化学习 目录 操作系统进程的状态 运行状态 阻塞状态 进程阻塞的现象 挂起阻塞状态 Linux进程状态 Linux内核源代码怎么说 R&#xff08;running状态&#xff09;运行状态 S&#xff08;sl…...

深信服AC流量管理技术

拓扑图 一.保证通道针对修仙部&#xff0c;访问网站&#xff0c;邮件&#xff0c;DNS&#xff0c;IM&#xff0c;办工 OA&#xff0c;微博论坛网上银行等常见应用保证带宽最低 50%&#xff0c;最高 100% 1. 先新建线路带宽 2.新增流量管理通道&#xff08;保证关键应用&#x…...

二元关系及关系代数中的象集、除运算

二元关系及关系代数中的象集、除运算 数学上&#xff0c;二元关系用于讨论两个数学对象的联系。诸如算术中的「大于」及「等于」&#xff0c;几何学中的"相似"&#xff0c;或集合论中的"为...之元素"或"为...之子集"。二元关系有时会简称关系&a…...

[PHP]关联和操作MySQL数据库然后将数据库部署到ECS

在Mac电脑上使用VS Code进行PHP开发并关联操作MySQL数据库&#xff0c;然后将数据库部署到ECS。 1.安装PHP和MySQL 确保你的Mac上已经安装了PHP和MySQL。你可以使用Homebrew来安装它们&#xff1a; $ brew install php $ brew install mysql 安装mysql完成后记住这一句: …...

23.11.19日总结

经过昨天的中期答辩&#xff0c;其实可以看出来项目进度太慢了&#xff0c;现在是第十周&#xff0c;预计第十四周是终级答辩&#xff0c;在这段时间要把项目写完。 前端要加上一个未登录的拦截器&#xff0c;后端加上全局的异常处理。对于饿了么项目的商品建表&#xff0c;之前…...

系列一、JVM概述

一、概述 1.1、Java发展中的重大事件 1.2、虚拟机 vs Java虚拟机 1.2.1、虚拟机 1.2.2、Java虚拟机 1.2.3、Java虚拟机的作用 Java虚拟机是二进制字节码的运行环境&#xff0c;负责装载字节码到其内部&#xff0c;解释/编译为对应平台上的机器指令指令。每一条Java指令&#…...

milvus数据管理-压缩数据

Milvus 默认支持自动数据压缩。您可以 配置 Milvus 以启用或禁用 压缩 和自动压缩。 如果自动压缩被禁用&#xff0c;您仍然可以手动压缩数据。 1.手动压缩数据 压缩请求是异步处理的&#xff0c;因为它们通常需要花费很长时间。 from pymilvus import Collection collection…...

SpringBoot项目连接linux服务器数据库两种解决方法(linux直接开放端口访问本机通过SSH协议访问,以mysql为例)

最近找个springboot脚手架重新熟悉一下springboot相关框架的东西&#xff0c;结果发现好像项目还不能直接像数据库GUI工具一样填几个SSH参数就可以了&#xff0c;于是就给他再整一下看看如何解决 linux开放3306&#xff08;可修改&#xff09;端口直接访问 此方法较为方便&am…...

【Rust】快速教程——闭包与生命周期

前言 你怎么向天生的瞎子说清颜色&#xff1f;怎么用手势向天生的聋子描述声音&#xff1f; 鲜花就在眼前&#xff0c;雷鸣就在头顶&#xff0c;对他们来说却都毫无意义 眼睛看不到&#xff0c;鼻子可以嗅闻花香&#xff0c;耳朵听不见&#xff0c;手指可以触碰窗纸的震动。 犯…...

redis高级案列case

案列一 双写一致性 案例二 双锁策略 package com.redis.redis01.service;import com.redis.redis01.bean.RedisBs; import com.redis.redis01.mapper.RedisBsMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; imp…...

Vue3+Vite实现工程化,attribute属性渲染v-bind指令

想要渲染一个元素的attribute&#xff0c;应该使用v-bind指令 由于插值表达式不能直接放在标签的属性中&#xff0c;所有要渲染元素的属性就应该使用v-bindv-bind可以用于渲染任何元素的属性&#xff0c;语法为 v-bind:属性名数据名&#xff0c;可以简写为 :属性名数据名 <…...

下一代搜索引擎会什么?

现在是北京时间2023年11月18日。聊一聊搜索。 说到搜索&#xff0c;大家首先想到的肯定是谷歌&#xff0c;百度。我把这些定义成上一个时代的搜索引擎。ChatGPT已经火热了有一年的时间了&#xff0c;大家都认为Ai搜索是下一代的搜索。但是AI搜索&#xff0c;需要的是很大算力&a…...

WPF中如何在MVVM模式下关闭窗口

完全来源于十月的寒流&#xff0c;感谢大佬讲解 使用Behaviors <Window x:Class"Test_03.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:b"http://schemas.microsoft.com/xaml/behaviors"xmlns:x&quo…...

【数据结构&C++】二叉平衡搜索树-AVL树(25)

前言 大家好吖&#xff0c;欢迎来到 YY 滴C系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; 目录 一.AVL树的概念二.AVL树节点的定义(代码…...

Python算法——树的最大深度和最小深度

Python中的树的最大深度和最小深度算法详解 树的最大深度和最小深度是树结构中的两个关键指标&#xff0c;它们分别表示树的从根节点到最深叶子节点的最大路径长度和最小路径长度。在本文中&#xff0c;我们将深入讨论如何计算树的最大深度和最小深度&#xff0c;并提供Python…...

46.全排列-py

46.全排列 class Solution(object):def permute(self, nums):""":type nums: List[int]:rtype: List[List[int]]"""# 结果数组0ans[]nlen(nums)# 判断是否使用state_[False]*n# 临时状态数组dp_[]def dfs (index):# 终止条件if indexn:ans.appe…...

7.4.分块查找

一.分块查找的算法思想&#xff1a; 1.实例&#xff1a; 以上述图片的顺序表为例&#xff0c; 该顺序表的数据元素从整体来看是乱序的&#xff0c;但如果把这些数据元素分成一块一块的小区间&#xff0c; 第一个区间[0,1]索引上的数据元素都是小于等于10的&#xff0c; 第二…...

DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径

目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作&#xff1a; 1&#xff09;、切换集群 2&#xff09;、切换节点 3&#xff09;、切换到 apparmor 的目录 4&#xff09;、执行 apparmor 策略模块 5&#xff09;、修改 pod 文件 6&#xff09;、…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言&#xff1a; 最近在做行为检测相关的模型&#xff0c;用的是时空图卷积网络&#xff08;STGCN&#xff09;&#xff0c;但原有kinetic-400数据集数据质量较低&#xff0c;需要进行细粒度的标注&#xff0c;同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...

08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险

C#入门系列【类的基本概念】&#xff1a;开启编程世界的奇妙冒险 嘿&#xff0c;各位编程小白探险家&#xff01;欢迎来到 C# 的奇幻大陆&#xff01;今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类&#xff01;别害怕&#xff0c;跟着我&#xff0c;保准让你轻松搞…...

在树莓派上添加音频输入设备的几种方法

在树莓派上添加音频输入设备可以通过以下步骤完成&#xff0c;具体方法取决于设备类型&#xff08;如USB麦克风、3.5mm接口麦克风或HDMI音频输入&#xff09;。以下是详细指南&#xff1a; 1. 连接音频输入设备 USB麦克风/声卡&#xff1a;直接插入树莓派的USB接口。3.5mm麦克…...

macOS 终端智能代理检测

&#x1f9e0; 终端智能代理检测&#xff1a;自动判断是否需要设置代理访问 GitHub 在开发中&#xff0c;使用 GitHub 是非常常见的需求。但有时候我们会发现某些命令失败、插件无法更新&#xff0c;例如&#xff1a; fatal: unable to access https://github.com/ohmyzsh/oh…...