当前位置: 首页 > news >正文

Flink之OperatorState

在Flink中状态主要分为三种:

  • Operator State(算子状态)
  • Keyed State(键控状态)
  • Broadcast State(广播状态)

这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解

  • 数据源
    这里选用Socket作为Source输入,便于测试
    ➜  ~ nc -lk 8888
    a
    b
    c
    k
    k
    k
    
  • 状态算子代码
    /**
    * @Description TODO 自定义状态MapFunc
    **/
    // 状态算子必须要实现对应的算子接口和CheckpointFunction接口
    class StateMapFunc implements MapFunction<String, String>, CheckpointedFunction{private ListState<String> strListState;/*** @Param o* @return String* @Description TODO map方法的正常处理逻辑**/@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中strListState.add(s);Iterable<String> strings = strListState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}/*** @Param functionSnapshotContext* @return void* @Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控**/@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println("快照生成, checkpointId: " + functionSnapshotContext.getCheckpointId());}/*** @Param functionInitializationContext* @return void* @Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化**/@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {// 获取算子状态存储器OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();/*** ListStateDescriptor状态描述* 参数1:一个自定义名称* 参数2:存储的数据类型**/ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("demo", String.class);/*** 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据* getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载**/strListState = operatorStateStore.getListState(stateDescriptor);}
    }
    
    要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.
  • 业务代码
      public class FlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将自定义的StateOperator传入SingleOutputStreamOperator<String> map = socketSource.map(new StateMapFunc());// 打印结果map.print();env.execute("Operator State");}
    }
    

具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.

相关文章:

Flink之OperatorState

在Flink中状态主要分为三种: Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态) 这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解 数据源 这里选用Socket作为Source输入,便于…...

Python集成学习和随机森林算法

大家好&#xff0c;机器学习模型已经成为多个行业决策过程中的重要组成部分&#xff0c;然而在处理嘈杂或多样化的数据集时&#xff0c;它们往往会遇到困难&#xff0c;这就是集成学习&#xff08;Ensemble Learning&#xff09;发挥作用的地方。 本文将揭示集成学习的奥秘&am…...

代码随想录算法训练营第二十四天| 77 组合

目录 77 组合 暴力 减枝优化 77 组合 暴力 class Solution {List<List<Integer>>res new ArrayList<>();LinkedList<Integer>newList new LinkedList<>();public List<List<Integer>> combine(int n, int k) {dfs(n,k,1);r…...

el-dialog element-ui弹窗

bulkImport.vue 自定义组件 <template> <el-dialog :visible"modalVisible" title"批量导入" centered close"$emit(close)" :fullscreen"true"> <span>弹窗内容</span> <span slot"foot…...

计算机网络的发展

目录 一、计算机网络发展的四个阶段 1、第一阶段&#xff1a;面向终端的计算机网络&#xff08;20世纪50年代&#xff09; 2、第二阶段&#xff1a;计算机—计算机网络&#xff08;20世纪60年代&#xff09; 3、第三阶段&#xff1a;开放式标准化网络&#xff08;20世纪70年…...

官宣!Wayland正式支持基于IntelliJ的IDE

对于基于IntelliJ IDE的Linux用户来说&#xff0c;一项令人期待的进步即将到来 – 对 Wayland 显示服务器协议的支持。 这项更新将带来许多好处&#xff0c;包括解决古老的分数缩放问题以及在与适用于 Linux 的 Windows 子系统 (WSLg)&#xff08;在底层运行 Wayland 服务器&am…...

大模型在数据分析场景下的能力评测|进阶篇

做数据分析&#xff0c;什么大模型比较合适&#xff1f; 如何调优大模型&#xff0c;来更好地做数据计算和洞察分析&#xff1f; 如何降低整体成本&#xff0c;同时保障分析体验&#xff1f;10月25日&#xff0c;我们发布了数据分析场景下的大模型能力评测框架&#xff08;点击…...

服务注册发现 springcloud netflix eureka

文章目录 前言角色&#xff08;三个&#xff09; 工程说明基础运行环境工程目录说明启动顺序&#xff08;建议&#xff09;&#xff1a;运行效果注册与发现中心服务消费者&#xff1a; 代码说明服务注册中心&#xff08;Register Service&#xff09;服务提供者&#xff08;Pro…...

Spring cloud负载均衡@LoadBalanced LoadBalancerClient

LoadBalance vs Ribbon 由于Spring cloud2020之后移除了Ribbon&#xff0c;直接使用Spring Cloud LoadBalancer作为客户端负载均衡组件&#xff0c;我们讨论Spring负载均衡以Spring Cloud2020之后版本为主&#xff0c;学习Spring Cloud LoadBalance&#xff0c;暂不讨论Ribbon…...

6.运行mysql容器-理解容器数据卷

运行mysql容器-理解容器数据卷 1.什么是容器数据卷2.如何使用容器数据卷2.1 数据卷挂载命令2.2 容器数据卷的继承2.3 数据卷的读写权限2.4 容器数据卷的小实验&#xff08;加深理解&#xff09;2.4.1 启动挂载数据卷的centos容器2.4.2 启动后&#xff0c;在宿主机的data目录下会…...

golang学习笔记——查找质数

查找质数 编写一个程序来查找小于 20 的所有质数。 质数是大于 1 的任意数字&#xff0c;只能被它自己和 1 整除。 “整除”表示经过除法运算后没有余数。 与大多数编程语言一样&#xff0c;Go 还提供了一种方法来检查除法运算是否产生余数。 我们可以使用模数 %&#xff08;百…...

C++ 基础二

文章目录 四、流程控制语句4.1 选择结构4.1.1 if语句 4.1.2 三目运算符4.1.3 switch语句注意事项 4.1.4 if和switch的区别【CHAT】4.2 循环结构4.2.1 while循环语句4.2.2 do...while循环语句 4.2.3 for循环语句九九乘法表 4.3 跳转语句4.3.1 break语句4.3.2 continue语句4.3.3 …...

鼎盛合 | 宠物智能投食机方案设计开发

养宠物是一件治愈并解压的事情&#xff0c;与动物的相处中能够释放压力&#xff0c;并在与宠物的互动中小可爱们往往能带给你一种治愈的力量&#xff0c;所以养宠物成为了人们尤为热衷的事情。我们生活中随处可见主人与宠物相处的温馨画面&#xff0c;但养宠物也有些问题在困扰…...

ERR_PNPM_INVALID_WORKSPACE_CONFIGURATION packages field missing or empty

vue执行 pnpm install命令时&#xff0c;报 ERR_PNPM_INVALID_WORKSPACE_CONFIGURATION  packages field missing or empty错&#xff0c;在网上查询了很久&#xff0c;也没有传出来结果&#xff0c;最后发现是pnpm的版本不对引起的。 我先执行的是npm install -g pnpm&…...

ubuntu 23.04从源码编译安装rocm运行tensorflow-rocm

因为ubuntu22.04的RDP不支持声音转发&#xff0c;所以下载了ubuntu23.04.但官方的rocm二进制包最高只支持ubuntu22.04&#xff0c;不支持ubuntu 23.04&#xff0c;只能自己从源码编译虽然有网友告诉我可以用docker运行rocm。但是我已经研究了好几天&#xff0c;沉没成本太多&am…...

echarts 图表文字大小自适应 字体大小自适应

将文字大小自适应方法挂载到全局 //main.js Vue.prototype.fontSize function(res) {// 获取视口宽度const clientWidth window.innerWidth ||document.documentElement.clientWidth ||document.body.clientWidth;if (!clientWidth) return; // 如果获取不到视口宽度&#xf…...

【项目】云备份系统基础功能实现

目录 一.项目介绍1.云备份认识2.服务端程序负责功能与功能模块划分3.客户端程序负责功能与功能模块划分4.开发环境 二.环境搭建1.gcc升级7.3版本2.安装jsoncpp库3.下载bundle数据压缩库4.下载httplib库 三.第三方库认识1.json(1)json认识(2)jsoncpp认识(3)json实现序列化(4)jso…...

【Shell脚本13】Shell 文件包含

Shell 文件包含 和其他语言一样&#xff0c;Shell 也可以包含外部脚本。这样可以很方便的封装一些公用的代码作为一个独立的文件。 Shell 文件包含的语法格式如下&#xff1a; . filename # 注意点号(.)和文件名中间有一空格或source filename实例 创建两个 shell 脚本文件…...

2023.11.15 关于 Spring Boot 配置文件

目录 引言 Spring Boot 配置文件 properties 配置文件说明 基本语法 读取配置文件 优点 缺点 yml 配置文件说明 基本语法 读取配置文件 yml 配置不同数据数据类型及 null 字符串 加单双引号的区别 yml 配置 列表&#xff08;List&#xff09; 和 映射&#xff08;…...

2023年第九届数维杯国际大学生数学建模挑战赛A题

2023年第九届数维杯国际大学生数学建模挑战赛正在火热进行&#xff0c;小云学长又在第一时间给大家带来最全最完整的思路代码解析&#xff01;&#xff01;&#xff01; A题思路解析如下&#xff1a; 完整版解题过程及代码&#xff0c;稍后继续给大家分享~ 更多题目完整解析点…...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?

Golang 面试经典题&#xff1a;map 的 key 可以是什么类型&#xff1f;哪些不可以&#xff1f; 在 Golang 的面试中&#xff0c;map 类型的使用是一个常见的考点&#xff0c;其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

HDFS分布式存储 zookeeper

hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架&#xff0c;允许使用简单的变成模型跨计算机对大型集群进行分布式处理&#xff08;1.海量的数据存储 2.海量数据的计算&#xff09;Hadoop核心组件 hdfs&#xff08;分布式文件存储系统&#xff09;&a…...

【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)

LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 题目描述解题思路Java代码 题目描述 题目链接&#xff1a;LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...

《Offer来了:Java面试核心知识点精讲》大纲

文章目录 一、《Offer来了:Java面试核心知识点精讲》的典型大纲框架Java基础并发编程JVM原理数据库与缓存分布式架构系统设计二、《Offer来了:Java面试核心知识点精讲(原理篇)》技术文章大纲核心主题:Java基础原理与面试高频考点Java虚拟机(JVM)原理Java并发编程原理Jav…...

WEB3全栈开发——面试专业技能点P4数据库

一、mysql2 原生驱动及其连接机制 概念介绍 mysql2 是 Node.js 环境中广泛使用的 MySQL 客户端库&#xff0c;基于 mysql 库改进而来&#xff0c;具有更好的性能、Promise 支持、流式查询、二进制数据处理能力等。 主要特点&#xff1a; 支持 Promise / async-await&#xf…...

Windows 下端口占用排查与释放全攻略

Windows 下端口占用排查与释放全攻略​ 在开发和运维过程中&#xff0c;经常会遇到端口被占用的问题&#xff08;如 8080、3306 等常用端口&#xff09;。本文将详细介绍如何通过命令行和图形化界面快速定位并释放被占用的端口&#xff0c;帮助你高效解决此类问题。​ 一、准…...