Flink之OperatorState
在Flink中状态主要分为三种:
- Operator State(算子状态)
- Keyed State(键控状态)
- Broadcast State(广播状态)
这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解
- 数据源
这里选用Socket作为Source输入,便于测试➜ ~ nc -lk 8888 a b c k k k - 状态算子代码
要注意代码注释中的内容,/** * @Description TODO 自定义状态MapFunc **/ // 状态算子必须要实现对应的算子接口和CheckpointFunction接口 class StateMapFunc implements MapFunction<String, String>, CheckpointedFunction{private ListState<String> strListState;/*** @Param o* @return String* @Description TODO map方法的正常处理逻辑**/@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中strListState.add(s);Iterable<String> strings = strListState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}/*** @Param functionSnapshotContext* @return void* @Description TODO 系统对状态数据做快照(持久化)会调用此方法, 用户使用此方法在持久化前对状态数据可以做一些操控**/@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {System.out.println("快照生成, checkpointId: " + functionSnapshotContext.getCheckpointId());}/*** @Param functionInitializationContext* @return void* @Description TODO 算子任务在启动前会调用此方法,未用户状态数据进行初始化**/@Overridepublic void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {// 获取算子状态存储器OperatorStateStore operatorStateStore = functionInitializationContext.getOperatorStateStore();/*** ListStateDescriptor状态描述* 参数1:一个自定义名称* 参数2:存储的数据类型**/ListStateDescriptor<String> stateDescriptor = new ListStateDescriptor<>("demo", String.class);/*** 算子状态存储器, 只提供ListSate的形式(和Java中的List不是一回事)来存储状态数据* getListSate方法,会在Task失败后,task自动重启时,会帮助用户加载最近一次的快照数据,如果是job重启则不会加载**/strListState = operatorStateStore.getListState(stateDescriptor);} }getListState只作用于Task的自动重启,如果是整个Job重启时不生效的,如果是想Job重启后从重启前的State获取数据需要在Job提交时就指定checkpoint镜像文件. - 业务代码
public class FlinkOperatorState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度1env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将自定义的StateOperator传入SingleOutputStreamOperator<String> map = socketSource.map(new StateMapFunc());// 打印结果map.print();env.execute("Operator State");} }
具体的代码模板和API的介绍大概就这些内容,具体实践要根据业务逻辑而定.
相关文章:
Flink之OperatorState
在Flink中状态主要分为三种: Operator State(算子状态)Keyed State(键控状态)Broadcast State(广播状态) 这里简单介绍一下Operator State的使用,说到使用State就必然要使用到Flink的容错机制也就是Checkpoint.具体内容见代码注解 数据源 这里选用Socket作为Source输入,便于…...
Python集成学习和随机森林算法
大家好,机器学习模型已经成为多个行业决策过程中的重要组成部分,然而在处理嘈杂或多样化的数据集时,它们往往会遇到困难,这就是集成学习(Ensemble Learning)发挥作用的地方。 本文将揭示集成学习的奥秘&am…...
代码随想录算法训练营第二十四天| 77 组合
目录 77 组合 暴力 减枝优化 77 组合 暴力 class Solution {List<List<Integer>>res new ArrayList<>();LinkedList<Integer>newList new LinkedList<>();public List<List<Integer>> combine(int n, int k) {dfs(n,k,1);r…...
el-dialog element-ui弹窗
bulkImport.vue 自定义组件 <template> <el-dialog :visible"modalVisible" title"批量导入" centered close"$emit(close)" :fullscreen"true"> <span>弹窗内容</span> <span slot"foot…...
计算机网络的发展
目录 一、计算机网络发展的四个阶段 1、第一阶段:面向终端的计算机网络(20世纪50年代) 2、第二阶段:计算机—计算机网络(20世纪60年代) 3、第三阶段:开放式标准化网络(20世纪70年…...
官宣!Wayland正式支持基于IntelliJ的IDE
对于基于IntelliJ IDE的Linux用户来说,一项令人期待的进步即将到来 – 对 Wayland 显示服务器协议的支持。 这项更新将带来许多好处,包括解决古老的分数缩放问题以及在与适用于 Linux 的 Windows 子系统 (WSLg)(在底层运行 Wayland 服务器&am…...
大模型在数据分析场景下的能力评测|进阶篇
做数据分析,什么大模型比较合适? 如何调优大模型,来更好地做数据计算和洞察分析? 如何降低整体成本,同时保障分析体验?10月25日,我们发布了数据分析场景下的大模型能力评测框架(点击…...
服务注册发现 springcloud netflix eureka
文章目录 前言角色(三个) 工程说明基础运行环境工程目录说明启动顺序(建议):运行效果注册与发现中心服务消费者: 代码说明服务注册中心(Register Service)服务提供者(Pro…...
Spring cloud负载均衡@LoadBalanced LoadBalancerClient
LoadBalance vs Ribbon 由于Spring cloud2020之后移除了Ribbon,直接使用Spring Cloud LoadBalancer作为客户端负载均衡组件,我们讨论Spring负载均衡以Spring Cloud2020之后版本为主,学习Spring Cloud LoadBalance,暂不讨论Ribbon…...
6.运行mysql容器-理解容器数据卷
运行mysql容器-理解容器数据卷 1.什么是容器数据卷2.如何使用容器数据卷2.1 数据卷挂载命令2.2 容器数据卷的继承2.3 数据卷的读写权限2.4 容器数据卷的小实验(加深理解)2.4.1 启动挂载数据卷的centos容器2.4.2 启动后,在宿主机的data目录下会…...
golang学习笔记——查找质数
查找质数 编写一个程序来查找小于 20 的所有质数。 质数是大于 1 的任意数字,只能被它自己和 1 整除。 “整除”表示经过除法运算后没有余数。 与大多数编程语言一样,Go 还提供了一种方法来检查除法运算是否产生余数。 我们可以使用模数 %(百…...
C++ 基础二
文章目录 四、流程控制语句4.1 选择结构4.1.1 if语句 4.1.2 三目运算符4.1.3 switch语句注意事项 4.1.4 if和switch的区别【CHAT】4.2 循环结构4.2.1 while循环语句4.2.2 do...while循环语句 4.2.3 for循环语句九九乘法表 4.3 跳转语句4.3.1 break语句4.3.2 continue语句4.3.3 …...
鼎盛合 | 宠物智能投食机方案设计开发
养宠物是一件治愈并解压的事情,与动物的相处中能够释放压力,并在与宠物的互动中小可爱们往往能带给你一种治愈的力量,所以养宠物成为了人们尤为热衷的事情。我们生活中随处可见主人与宠物相处的温馨画面,但养宠物也有些问题在困扰…...
ERR_PNPM_INVALID_WORKSPACE_CONFIGURATION packages field missing or empty
vue执行 pnpm install命令时,报 ERR_PNPM_INVALID_WORKSPACE_CONFIGURATION packages field missing or empty错,在网上查询了很久,也没有传出来结果,最后发现是pnpm的版本不对引起的。 我先执行的是npm install -g pnpm&…...
ubuntu 23.04从源码编译安装rocm运行tensorflow-rocm
因为ubuntu22.04的RDP不支持声音转发,所以下载了ubuntu23.04.但官方的rocm二进制包最高只支持ubuntu22.04,不支持ubuntu 23.04,只能自己从源码编译虽然有网友告诉我可以用docker运行rocm。但是我已经研究了好几天,沉没成本太多&am…...
echarts 图表文字大小自适应 字体大小自适应
将文字大小自适应方法挂载到全局 //main.js Vue.prototype.fontSize function(res) {// 获取视口宽度const clientWidth window.innerWidth ||document.documentElement.clientWidth ||document.body.clientWidth;if (!clientWidth) return; // 如果获取不到视口宽度…...
【项目】云备份系统基础功能实现
目录 一.项目介绍1.云备份认识2.服务端程序负责功能与功能模块划分3.客户端程序负责功能与功能模块划分4.开发环境 二.环境搭建1.gcc升级7.3版本2.安装jsoncpp库3.下载bundle数据压缩库4.下载httplib库 三.第三方库认识1.json(1)json认识(2)jsoncpp认识(3)json实现序列化(4)jso…...
【Shell脚本13】Shell 文件包含
Shell 文件包含 和其他语言一样,Shell 也可以包含外部脚本。这样可以很方便的封装一些公用的代码作为一个独立的文件。 Shell 文件包含的语法格式如下: . filename # 注意点号(.)和文件名中间有一空格或source filename实例 创建两个 shell 脚本文件…...
2023.11.15 关于 Spring Boot 配置文件
目录 引言 Spring Boot 配置文件 properties 配置文件说明 基本语法 读取配置文件 优点 缺点 yml 配置文件说明 基本语法 读取配置文件 yml 配置不同数据数据类型及 null 字符串 加单双引号的区别 yml 配置 列表(List) 和 映射(…...
2023年第九届数维杯国际大学生数学建模挑战赛A题
2023年第九届数维杯国际大学生数学建模挑战赛正在火热进行,小云学长又在第一时间给大家带来最全最完整的思路代码解析!!! A题思路解析如下: 完整版解题过程及代码,稍后继续给大家分享~ 更多题目完整解析点…...
保姆级教程:在Ubuntu 22.04上从源码编译COLMAP 3.9(含6个常见Bug解决方案)
在Ubuntu 22.04上从源码编译COLMAP 3.9的终极避坑指南三维重建技术正在重塑数字世界的构建方式,而COLMAP作为开源领域的标杆工具,其强大的多视图几何算法让学术研究和工业应用都受益匪浅。但当你第一次尝试在Ubuntu系统上编译这个工具时,可能…...
在Ubuntu 18.04上,用RoadRunner 2022b画的地图如何导入UE4.24给CARLA 0.9.10用?保姆级避坑指南
在Ubuntu 18.04上将RoadRunner 2022b地图导入UE4.24并适配CARLA 0.9.10的完整指南对于自动驾驶仿真开发者而言,构建一个稳定可靠的地图工作流至关重要。本文将详细介绍如何在Ubuntu 18.04系统中,将RoadRunner 2022b创建的地图无缝导入Unreal Engine 4.24…...
别再让auditd拖慢你的麒麟系统!手把手教你排查并关闭这个审计服务
麒麟系统性能优化实战:auditd服务深度排查与替代方案 在麒麟系统的日常运维中,auditd这个默默运行的后台服务常常成为系统性能的"隐形杀手"。许多开发者突然发现系统响应变慢、内存占用飙升时,往往不会第一时间联想到这个看似无害的…...
HRN三维人脸UV对齐:Blender与Unity跨平台精准映射指南
1. 这不是“贴图导入”,而是三维人脸数据流的精准对齐很多人第一次看到“3D Face HRN”这个词,下意识会以为是某种新出的美颜插件,或者Unity Asset Store里点几下就能拖进场景的预制体。我去年在给一家医疗仿真团队做面部肌肉运动模拟时也这么…...
CANN NPU 功耗优化:推理服务的能效比提升实战
功耗直接影响部署成本和设备寿命。同样的推理任务,功耗优化后能省 30% 电费,设备温度降低 10C。本文讲解 NPU 功耗的来源、动态调频策略、算子级功耗控制,以及在 CANN 上实现绿色推理的实战方法。一、NPU 功耗从哪来 1.1 功耗的三个来源 计算…...
原来训大模型,就像开一家小餐馆!
你是不是一直觉得,训练大语言模型是 OpenAI、百度这种大厂才能干的事?要几万张显卡,要花几个亿,普通人想都不敢想? 错了!我用自己开发机上的 8 张 H20 显卡,花了点时间,从零开始训了…...
关于自指系统与算术障碍的跨领域猜想:一项探索性研究(世毫九实验室学术完善报告)
关于自指系统与算术障碍的跨领域猜想:一项探索性研究(世毫九实验室学术完善报告) 作者:方见华 单位:世毫九实验室 核心摘要 本报告针对世毫九实验室原创的探索性跨领域论文《关于自指系统与算术障碍的跨领域猜想&#…...
认知殖民与范式陷阱:当代人工智能发展路径的文明危机研究
认知殖民与范式陷阱:当代人工智能发展路径的文明危机研究摘要本文从文明安全与认知主权视角出发,系统批判了当前以Transformer架构、Scaling Law和大语言模型为核心的人工智能技术范式。研究指出,该范式不仅是技术路径的选择,更是…...
Unity离线语音识别插件:解决无网/隐私/延迟三大痛点
1. 这不是“又一个语音识别SDK”——它解决的是Unity开发者真正卡脖子的三个痛点我在2022年做一款医疗陪护类AR应用时,被语音识别拖垮过整整三个月。当时用的是某云厂商的在线SDK,结果在医院内网环境下,每次识别都要等2.3秒以上,患…...
AI技术落地情报简报:面向执行层的模型选型与Prompt工程实战
1. 这不是一份普通 newsletter:它是一张AI领域的动态认知地图“This AI newsletter is all you need #61”——光看标题,你可能以为这又是一份泛泛而谈的AI资讯合集。但作为连续追踪该系列超过18个月、亲手拆解过其中52期原始内容、并用其指导过7个真实产…...
