当前位置: 首页 > news >正文

Flink之OperatorState

在Flink中状态主要分为三种:

  • Operator State(算子状态)
  • Keyed State(键控状态)
  • Broadcast State(广播状态)

这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解

  • 数据源
    这里选用Socket作为Source输入,便于测试
    ➜  ~ nc -lk 8888
    a
    b
    c
    k
    k
    k
    
  • 状态算子代码
    /**
    * @Description TODO 自定义状态MapFunc
    **/
    // 状态算子必须要实现对应的算子接口和CheckpointFunction接口
    class StateMapFunc implements MapFunction<String, String>, CheckpointedFunction{private ListState<String> strListState;/*** @Param o* @return String* @Description TODO map方法的正常处理逻辑**/@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中strListState.add(s);Iterable<String> strings = strListState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}/*** @Param functionSnapshotContext* @return void* @Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控**/@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println("快照生成, checkpointId: " + functionSnapshotContext.getCheckpointId());}/*** @Param functionInitializationContext* @return void* @Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化**/@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {// 获取算子状态存储器OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();/*** ListStateDescriptor状态描述* 参数1:一个自定义名称* 参数2:存储的数据类型**/ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("demo", String.class);/*** 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据* getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载**/strListState = operatorStateStore.getListState(stateDescriptor);}
    }
    
    要注意代码注释中的内容,getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件.
  • 业务代码
      public class FlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将自定义的StateOperator传入SingleOutputStreamOperator<String> map = socketSource.map(new StateMapFunc());// 打印结果map.print();env.execute("Operator State");}
    }
    

具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.

相关文章:

Flink之OperatorState

在Flink中状态主要分为三种: Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态) 这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解 数据源 这里选用Socket作为Source输入,便于…...

Python集成学习和随机森林算法

大家好&#xff0c;机器学习模型已经成为多个行业决策过程中的重要组成部分&#xff0c;然而在处理嘈杂或多样化的数据集时&#xff0c;它们往往会遇到困难&#xff0c;这就是集成学习&#xff08;Ensemble Learning&#xff09;发挥作用的地方。 本文将揭示集成学习的奥秘&am…...

代码随想录算法训练营第二十四天| 77 组合

目录 77 组合 暴力 减枝优化 77 组合 暴力 class Solution {List<List<Integer>>res new ArrayList<>();LinkedList<Integer>newList new LinkedList<>();public List<List<Integer>> combine(int n, int k) {dfs(n,k,1);r…...

el-dialog element-ui弹窗

bulkImport.vue 自定义组件 <template> <el-dialog :visible"modalVisible" title"批量导入" centered close"$emit(close)" :fullscreen"true"> <span>弹窗内容</span> <span slot"foot…...

计算机网络的发展

目录 一、计算机网络发展的四个阶段 1、第一阶段&#xff1a;面向终端的计算机网络&#xff08;20世纪50年代&#xff09; 2、第二阶段&#xff1a;计算机—计算机网络&#xff08;20世纪60年代&#xff09; 3、第三阶段&#xff1a;开放式标准化网络&#xff08;20世纪70年…...

官宣!Wayland正式支持基于IntelliJ的IDE

对于基于IntelliJ IDE的Linux用户来说&#xff0c;一项令人期待的进步即将到来 – 对 Wayland 显示服务器协议的支持。 这项更新将带来许多好处&#xff0c;包括解决古老的分数缩放问题以及在与适用于 Linux 的 Windows 子系统 (WSLg)&#xff08;在底层运行 Wayland 服务器&am…...

大模型在数据分析场景下的能力评测|进阶篇

做数据分析&#xff0c;什么大模型比较合适&#xff1f; 如何调优大模型&#xff0c;来更好地做数据计算和洞察分析&#xff1f; 如何降低整体成本&#xff0c;同时保障分析体验&#xff1f;10月25日&#xff0c;我们发布了数据分析场景下的大模型能力评测框架&#xff08;点击…...

服务注册发现 springcloud netflix eureka

文章目录 前言角色&#xff08;三个&#xff09; 工程说明基础运行环境工程目录说明启动顺序&#xff08;建议&#xff09;&#xff1a;运行效果注册与发现中心服务消费者&#xff1a; 代码说明服务注册中心&#xff08;Register Service&#xff09;服务提供者&#xff08;Pro…...

Spring cloud负载均衡@LoadBalanced LoadBalancerClient

LoadBalance vs Ribbon 由于Spring cloud2020之后移除了Ribbon&#xff0c;直接使用Spring Cloud LoadBalancer作为客户端负载均衡组件&#xff0c;我们讨论Spring负载均衡以Spring Cloud2020之后版本为主&#xff0c;学习Spring Cloud LoadBalance&#xff0c;暂不讨论Ribbon…...

6.运行mysql容器-理解容器数据卷

运行mysql容器-理解容器数据卷 1.什么是容器数据卷2.如何使用容器数据卷2.1 数据卷挂载命令2.2 容器数据卷的继承2.3 数据卷的读写权限2.4 容器数据卷的小实验&#xff08;加深理解&#xff09;2.4.1 启动挂载数据卷的centos容器2.4.2 启动后&#xff0c;在宿主机的data目录下会…...

golang学习笔记——查找质数

查找质数 编写一个程序来查找小于 20 的所有质数。 质数是大于 1 的任意数字&#xff0c;只能被它自己和 1 整除。 “整除”表示经过除法运算后没有余数。 与大多数编程语言一样&#xff0c;Go 还提供了一种方法来检查除法运算是否产生余数。 我们可以使用模数 %&#xff08;百…...

C++ 基础二

文章目录 四、流程控制语句4.1 选择结构4.1.1 if语句 4.1.2 三目运算符4.1.3 switch语句注意事项 4.1.4 if和switch的区别【CHAT】4.2 循环结构4.2.1 while循环语句4.2.2 do...while循环语句 4.2.3 for循环语句九九乘法表 4.3 跳转语句4.3.1 break语句4.3.2 continue语句4.3.3 …...

鼎盛合 | 宠物智能投食机方案设计开发

养宠物是一件治愈并解压的事情&#xff0c;与动物的相处中能够释放压力&#xff0c;并在与宠物的互动中小可爱们往往能带给你一种治愈的力量&#xff0c;所以养宠物成为了人们尤为热衷的事情。我们生活中随处可见主人与宠物相处的温馨画面&#xff0c;但养宠物也有些问题在困扰…...

ERR_PNPM_INVALID_WORKSPACE_CONFIGURATION packages field missing or empty

vue执行 pnpm install命令时&#xff0c;报 ERR_PNPM_INVALID_WORKSPACE_CONFIGURATION  packages field missing or empty错&#xff0c;在网上查询了很久&#xff0c;也没有传出来结果&#xff0c;最后发现是pnpm的版本不对引起的。 我先执行的是npm install -g pnpm&…...

ubuntu 23.04从源码编译安装rocm运行tensorflow-rocm

因为ubuntu22.04的RDP不支持声音转发&#xff0c;所以下载了ubuntu23.04.但官方的rocm二进制包最高只支持ubuntu22.04&#xff0c;不支持ubuntu 23.04&#xff0c;只能自己从源码编译虽然有网友告诉我可以用docker运行rocm。但是我已经研究了好几天&#xff0c;沉没成本太多&am…...

echarts 图表文字大小自适应 字体大小自适应

将文字大小自适应方法挂载到全局 //main.js Vue.prototype.fontSize function(res) {// 获取视口宽度const clientWidth window.innerWidth ||document.documentElement.clientWidth ||document.body.clientWidth;if (!clientWidth) return; // 如果获取不到视口宽度&#xf…...

【项目】云备份系统基础功能实现

目录 一.项目介绍1.云备份认识2.服务端程序负责功能与功能模块划分3.客户端程序负责功能与功能模块划分4.开发环境 二.环境搭建1.gcc升级7.3版本2.安装jsoncpp库3.下载bundle数据压缩库4.下载httplib库 三.第三方库认识1.json(1)json认识(2)jsoncpp认识(3)json实现序列化(4)jso…...

【Shell脚本13】Shell 文件包含

Shell 文件包含 和其他语言一样&#xff0c;Shell 也可以包含外部脚本。这样可以很方便的封装一些公用的代码作为一个独立的文件。 Shell 文件包含的语法格式如下&#xff1a; . filename # 注意点号(.)和文件名中间有一空格或source filename实例 创建两个 shell 脚本文件…...

2023.11.15 关于 Spring Boot 配置文件

目录 引言 Spring Boot 配置文件 properties 配置文件说明 基本语法 读取配置文件 优点 缺点 yml 配置文件说明 基本语法 读取配置文件 yml 配置不同数据数据类型及 null 字符串 加单双引号的区别 yml 配置 列表&#xff08;List&#xff09; 和 映射&#xff08;…...

2023年第九届数维杯国际大学生数学建模挑战赛A题

2023年第九届数维杯国际大学生数学建模挑战赛正在火热进行&#xff0c;小云学长又在第一时间给大家带来最全最完整的思路代码解析&#xff01;&#xff01;&#xff01; A题思路解析如下&#xff1a; 完整版解题过程及代码&#xff0c;稍后继续给大家分享~ 更多题目完整解析点…...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具

作者&#xff1a;来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗&#xff1f;了解下一期 Elasticsearch Engineer 培训的时间吧&#xff01; Elasticsearch 拥有众多新功能&#xff0c;助你为自己…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module

1、为什么要修改 CONNECT 报文&#xff1f; 多租户隔离&#xff1a;自动为接入设备追加租户前缀&#xff0c;后端按 ClientID 拆分队列。零代码鉴权&#xff1a;将入站用户名替换为 OAuth Access-Token&#xff0c;后端 Broker 统一校验。灰度发布&#xff1a;根据 IP/地理位写…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

Spring Boot面试题精选汇总

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…...

【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)

本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...

python爬虫——气象数据爬取

一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用&#xff1a; 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests&#xff1a;发送 …...

stm32wle5 lpuart DMA数据不接收

配置波特率9600时&#xff0c;需要使用外部低速晶振...