当前位置: 首页 > news >正文

Flink之OperatorState

在Flink中状态主要分为三种:

  • Operator State(算子状态)
  • Keyed State(键控状态)
  • Broadcast State(广播状态)

这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解

  • 数据源
    这里选用Socket作为Source输入,便于测试
    ➜  ~ nc -lk 8888
    a
    b
    c
    k
    k
    k
    
  • 状态算子代码
    /**
    * @Description TODO 自定义状态MapFunc
    **/
    // 状态算子必须要实现对应的算子接口和CheckpointFunction接口
    class StateMapFunc implements MapFunction<String, String>, CheckpointedFunction{private ListState<String> strListState;/*** @Param o* @return String* @Description TODO map方法的正常处理逻辑**/@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中strListState.add(s);Iterable<String> strings = strListState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}/*** @Param functionSnapshotContext* @return void* @Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控**/@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println("快照生成, checkpointId: " + functionSnapshotContext.getCheckpointId());}/*** @Param functionInitializationContext* @return void* @Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化**/@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {// 获取算子状态存储器OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();/*** ListStateDescriptor状态描述* 参数1:一个自定义名称* 参数2:存储的数据类型**/ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("demo", String.class);/*** 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据* getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载**/strListState = operatorStateStore.getListState(stateDescriptor);}
    }
    
    要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.
  • 业务代码
      public class FlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将自定义的StateOperator传入SingleOutputStreamOperator<String> map = socketSource.map(new StateMapFunc());// 打印结果map.print();env.execute("Operator State");}
    }
    

具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.

相关文章:

Flink之OperatorState

在Flink中状态主要分为三种: Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态) 这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解 数据源 这里选用Socket作为Source输入,便于…...

Python集成学习和随机森林算法

大家好&#xff0c;机器学习模型已经成为多个行业决策过程中的重要组成部分&#xff0c;然而在处理嘈杂或多样化的数据集时&#xff0c;它们往往会遇到困难&#xff0c;这就是集成学习&#xff08;Ensemble Learning&#xff09;发挥作用的地方。 本文将揭示集成学习的奥秘&am…...

代码随想录算法训练营第二十四天| 77 组合

目录 77 组合 暴力 减枝优化 77 组合 暴力 class Solution {List<List<Integer>>res new ArrayList<>();LinkedList<Integer>newList new LinkedList<>();public List<List<Integer>> combine(int n, int k) {dfs(n,k,1);r…...

el-dialog element-ui弹窗

bulkImport.vue 自定义组件 <template> <el-dialog :visible"modalVisible" title"批量导入" centered close"$emit(close)" :fullscreen"true"> <span>弹窗内容</span> <span slot"foot…...

计算机网络的发展

目录 一、计算机网络发展的四个阶段 1、第一阶段&#xff1a;面向终端的计算机网络&#xff08;20世纪50年代&#xff09; 2、第二阶段&#xff1a;计算机—计算机网络&#xff08;20世纪60年代&#xff09; 3、第三阶段&#xff1a;开放式标准化网络&#xff08;20世纪70年…...

官宣!Wayland正式支持基于IntelliJ的IDE

对于基于IntelliJ IDE的Linux用户来说&#xff0c;一项令人期待的进步即将到来 – 对 Wayland 显示服务器协议的支持。 这项更新将带来许多好处&#xff0c;包括解决古老的分数缩放问题以及在与适用于 Linux 的 Windows 子系统 (WSLg)&#xff08;在底层运行 Wayland 服务器&am…...

大模型在数据分析场景下的能力评测|进阶篇

做数据分析&#xff0c;什么大模型比较合适&#xff1f; 如何调优大模型&#xff0c;来更好地做数据计算和洞察分析&#xff1f; 如何降低整体成本&#xff0c;同时保障分析体验&#xff1f;10月25日&#xff0c;我们发布了数据分析场景下的大模型能力评测框架&#xff08;点击…...

服务注册发现 springcloud netflix eureka

文章目录 前言角色&#xff08;三个&#xff09; 工程说明基础运行环境工程目录说明启动顺序&#xff08;建议&#xff09;&#xff1a;运行效果注册与发现中心服务消费者&#xff1a; 代码说明服务注册中心&#xff08;Register Service&#xff09;服务提供者&#xff08;Pro…...

Spring cloud负载均衡@LoadBalanced LoadBalancerClient

LoadBalance vs Ribbon 由于Spring cloud2020之后移除了Ribbon&#xff0c;直接使用Spring Cloud LoadBalancer作为客户端负载均衡组件&#xff0c;我们讨论Spring负载均衡以Spring Cloud2020之后版本为主&#xff0c;学习Spring Cloud LoadBalance&#xff0c;暂不讨论Ribbon…...

6.运行mysql容器-理解容器数据卷

运行mysql容器-理解容器数据卷 1.什么是容器数据卷2.如何使用容器数据卷2.1 数据卷挂载命令2.2 容器数据卷的继承2.3 数据卷的读写权限2.4 容器数据卷的小实验&#xff08;加深理解&#xff09;2.4.1 启动挂载数据卷的centos容器2.4.2 启动后&#xff0c;在宿主机的data目录下会…...

golang学习笔记——查找质数

查找质数 编写一个程序来查找小于 20 的所有质数。 质数是大于 1 的任意数字&#xff0c;只能被它自己和 1 整除。 “整除”表示经过除法运算后没有余数。 与大多数编程语言一样&#xff0c;Go 还提供了一种方法来检查除法运算是否产生余数。 我们可以使用模数 %&#xff08;百…...

C++ 基础二

文章目录 四、流程控制语句4.1 选择结构4.1.1 if语句 4.1.2 三目运算符4.1.3 switch语句注意事项 4.1.4 if和switch的区别【CHAT】4.2 循环结构4.2.1 while循环语句4.2.2 do...while循环语句 4.2.3 for循环语句九九乘法表 4.3 跳转语句4.3.1 break语句4.3.2 continue语句4.3.3 …...

鼎盛合 | 宠物智能投食机方案设计开发

养宠物是一件治愈并解压的事情&#xff0c;与动物的相处中能够释放压力&#xff0c;并在与宠物的互动中小可爱们往往能带给你一种治愈的力量&#xff0c;所以养宠物成为了人们尤为热衷的事情。我们生活中随处可见主人与宠物相处的温馨画面&#xff0c;但养宠物也有些问题在困扰…...

ERR_PNPM_INVALID_WORKSPACE_CONFIGURATION packages field missing or empty

vue执行 pnpm install命令时&#xff0c;报 ERR_PNPM_INVALID_WORKSPACE_CONFIGURATION  packages field missing or empty错&#xff0c;在网上查询了很久&#xff0c;也没有传出来结果&#xff0c;最后发现是pnpm的版本不对引起的。 我先执行的是npm install -g pnpm&…...

ubuntu 23.04从源码编译安装rocm运行tensorflow-rocm

因为ubuntu22.04的RDP不支持声音转发&#xff0c;所以下载了ubuntu23.04.但官方的rocm二进制包最高只支持ubuntu22.04&#xff0c;不支持ubuntu 23.04&#xff0c;只能自己从源码编译虽然有网友告诉我可以用docker运行rocm。但是我已经研究了好几天&#xff0c;沉没成本太多&am…...

echarts 图表文字大小自适应 字体大小自适应

将文字大小自适应方法挂载到全局 //main.js Vue.prototype.fontSize function(res) {// 获取视口宽度const clientWidth window.innerWidth ||document.documentElement.clientWidth ||document.body.clientWidth;if (!clientWidth) return; // 如果获取不到视口宽度&#xf…...

【项目】云备份系统基础功能实现

目录 一.项目介绍1.云备份认识2.服务端程序负责功能与功能模块划分3.客户端程序负责功能与功能模块划分4.开发环境 二.环境搭建1.gcc升级7.3版本2.安装jsoncpp库3.下载bundle数据压缩库4.下载httplib库 三.第三方库认识1.json(1)json认识(2)jsoncpp认识(3)json实现序列化(4)jso…...

【Shell脚本13】Shell 文件包含

Shell 文件包含 和其他语言一样&#xff0c;Shell 也可以包含外部脚本。这样可以很方便的封装一些公用的代码作为一个独立的文件。 Shell 文件包含的语法格式如下&#xff1a; . filename # 注意点号(.)和文件名中间有一空格或source filename实例 创建两个 shell 脚本文件…...

2023.11.15 关于 Spring Boot 配置文件

目录 引言 Spring Boot 配置文件 properties 配置文件说明 基本语法 读取配置文件 优点 缺点 yml 配置文件说明 基本语法 读取配置文件 yml 配置不同数据数据类型及 null 字符串 加单双引号的区别 yml 配置 列表&#xff08;List&#xff09; 和 映射&#xff08;…...

2023年第九届数维杯国际大学生数学建模挑战赛A题

2023年第九届数维杯国际大学生数学建模挑战赛正在火热进行&#xff0c;小云学长又在第一时间给大家带来最全最完整的思路代码解析&#xff01;&#xff01;&#xff01; A题思路解析如下&#xff1a; 完整版解题过程及代码&#xff0c;稍后继续给大家分享~ 更多题目完整解析点…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

国防科技大学计算机基础课程笔记02信息编码

1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制&#xff0c;因此这个了16进制的数据既可以翻译成为这个机器码&#xff0c;也可以翻译成为这个国标码&#xff0c;所以这个时候很容易会出现这个歧义的情况&#xff1b; 因此&#xff0c;我们的这个国…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

Copilot for Xcode (iOS的 AI辅助编程)

Copilot for Xcode 简介Copilot下载与安装 体验环境要求下载最新的安装包安装登录系统权限设置 AI辅助编程生成注释代码补全简单需求代码生成辅助编程行间代码生成注释联想 代码生成 总结 简介 尝试使用了Copilot&#xff0c;它能根据上下文补全代码&#xff0c;快速生成常用…...

项目进度管理软件是什么?项目进度管理软件有哪些核心功能?

无论是建筑施工、软件开发&#xff0c;还是市场营销活动&#xff0c;项目往往涉及多个团队、大量资源和严格的时间表。如果没有一个系统化的工具来跟踪和管理这些元素&#xff0c;项目很容易陷入混乱&#xff0c;导致进度延误、成本超支&#xff0c;甚至失败。 项目进度管理软…...

JUC并发编程(二)Monitor/自旋/轻量级/锁膨胀/wait/notify/锁消除

目录 一 基础 1 概念 2 卖票问题 3 转账问题 二 锁机制与优化策略 0 Monitor 1 轻量级锁 2 锁膨胀 3 自旋 4 偏向锁 5 锁消除 6 wait /notify 7 sleep与wait的对比 8 join原理 一 基础 1 概念 临界区 一段代码块内如果存在对共享资源的多线程读写操作&#xf…...

【靶场】XXE-Lab xxe漏洞

前言 学习xxe漏洞,搭了个XXE-Lab的靶场 一、搭建靶场 现在需要登录,不知道用户名密码,先随便试试抓包 二、判断是否存在xxe漏洞 1.首先登录抓包 看到xml数据解析,由此判断和xxe漏洞有关,但还不确定xxe漏洞是否存在。 2.尝试xxe 漏洞 判断是否存在xxe漏洞 A.send to …...