当前位置: 首页 > news >正文

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State.
在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.

  • 需求
    将接收到的Socket数据源中的字符串进行拼接
    在命令行开启socket命令:
    nc -lk 8888
    
  • 业务代码
    public class FlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1,便于观察env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将数据进行分组,将分组key给一个常量值SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")// 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等.map(new RichMapFunction<String, String>() {ListState<String> listState;// open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器@Overridepublic void open(Configuration parameters) throws Exception {// 获取上下文RuntimeContext ctx = getRuntimeContext();// 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));}// 在map方法中正常编写业务逻辑@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中listState.add(s);Iterable<String> strings = listState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}});map.print();env.execute("Keyed State");}
    }
    
    API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.

相关文章:

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State. 在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Ch…...

c语言:模拟实现qsort函数

qsort函数的功能&#xff1a; qsort相较于冒泡排序法&#xff0c;不仅效率更快&#xff0c;而且能够比较不同类型的元素&#xff0c;如&#xff1a;浮点数&#xff0c;结构体等等。这里我们来模拟下qsort是如何实现这一功能的&#xff0c;方便我们对指针数组有一个更深层次的理…...

从0开始学习数据结构 C语言实现 1.前篇及二分查找算法

一、前篇 1、什么是数据结构&#xff1f; 数据结构是带有结构特性的数据元素的集合&#xff0c;它研究的是数据的逻辑结构和数据的物理结构以及它们之间的相互关系 2、时间复杂度与空间复杂度 大O符号是用于描述函数渐进行为的数学符号 常用函数的增长表 阶乘O(n!) > 指数…...

VSCode 使用CMakePreset找不到cl.exe编译器的问题

在用vscode开发c项目的时候&#xff0c;使用预先配置的CMakePresets.json可以把一些特定的cmake选项固定下来&#xff0c;在配置时直接使用 "cmake --config --preset presetname"就可以进行配置&#xff0c;免去在命令行输入过多的配置参数。 但是在vscode中&#…...

【Linux系统化学习】进程的状态 | 僵尸进程 | 孤儿进程

个人主页点击直达&#xff1a;小白不是程序媛 Linux专栏&#xff1a;Linux系统化学习 目录 操作系统进程的状态 运行状态 阻塞状态 进程阻塞的现象 挂起阻塞状态 Linux进程状态 Linux内核源代码怎么说 R&#xff08;running状态&#xff09;运行状态 S&#xff08;sl…...

深信服AC流量管理技术

拓扑图 一.保证通道针对修仙部&#xff0c;访问网站&#xff0c;邮件&#xff0c;DNS&#xff0c;IM&#xff0c;办工 OA&#xff0c;微博论坛网上银行等常见应用保证带宽最低 50%&#xff0c;最高 100% 1. 先新建线路带宽 2.新增流量管理通道&#xff08;保证关键应用&#x…...

二元关系及关系代数中的象集、除运算

二元关系及关系代数中的象集、除运算 数学上&#xff0c;二元关系用于讨论两个数学对象的联系。诸如算术中的「大于」及「等于」&#xff0c;几何学中的"相似"&#xff0c;或集合论中的"为...之元素"或"为...之子集"。二元关系有时会简称关系&a…...

[PHP]关联和操作MySQL数据库然后将数据库部署到ECS

在Mac电脑上使用VS Code进行PHP开发并关联操作MySQL数据库&#xff0c;然后将数据库部署到ECS。 1.安装PHP和MySQL 确保你的Mac上已经安装了PHP和MySQL。你可以使用Homebrew来安装它们&#xff1a; $ brew install php $ brew install mysql 安装mysql完成后记住这一句: …...

23.11.19日总结

经过昨天的中期答辩&#xff0c;其实可以看出来项目进度太慢了&#xff0c;现在是第十周&#xff0c;预计第十四周是终级答辩&#xff0c;在这段时间要把项目写完。 前端要加上一个未登录的拦截器&#xff0c;后端加上全局的异常处理。对于饿了么项目的商品建表&#xff0c;之前…...

系列一、JVM概述

一、概述 1.1、Java发展中的重大事件 1.2、虚拟机 vs Java虚拟机 1.2.1、虚拟机 1.2.2、Java虚拟机 1.2.3、Java虚拟机的作用 Java虚拟机是二进制字节码的运行环境&#xff0c;负责装载字节码到其内部&#xff0c;解释/编译为对应平台上的机器指令指令。每一条Java指令&#…...

milvus数据管理-压缩数据

Milvus 默认支持自动数据压缩。您可以 配置 Milvus 以启用或禁用 压缩 和自动压缩。 如果自动压缩被禁用&#xff0c;您仍然可以手动压缩数据。 1.手动压缩数据 压缩请求是异步处理的&#xff0c;因为它们通常需要花费很长时间。 from pymilvus import Collection collection…...

SpringBoot项目连接linux服务器数据库两种解决方法(linux直接开放端口访问本机通过SSH协议访问,以mysql为例)

最近找个springboot脚手架重新熟悉一下springboot相关框架的东西&#xff0c;结果发现好像项目还不能直接像数据库GUI工具一样填几个SSH参数就可以了&#xff0c;于是就给他再整一下看看如何解决 linux开放3306&#xff08;可修改&#xff09;端口直接访问 此方法较为方便&am…...

【Rust】快速教程——闭包与生命周期

前言 你怎么向天生的瞎子说清颜色&#xff1f;怎么用手势向天生的聋子描述声音&#xff1f; 鲜花就在眼前&#xff0c;雷鸣就在头顶&#xff0c;对他们来说却都毫无意义 眼睛看不到&#xff0c;鼻子可以嗅闻花香&#xff0c;耳朵听不见&#xff0c;手指可以触碰窗纸的震动。 犯…...

redis高级案列case

案列一 双写一致性 案例二 双锁策略 package com.redis.redis01.service;import com.redis.redis01.bean.RedisBs; import com.redis.redis01.mapper.RedisBsMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; imp…...

Vue3+Vite实现工程化,attribute属性渲染v-bind指令

想要渲染一个元素的attribute&#xff0c;应该使用v-bind指令 由于插值表达式不能直接放在标签的属性中&#xff0c;所有要渲染元素的属性就应该使用v-bindv-bind可以用于渲染任何元素的属性&#xff0c;语法为 v-bind:属性名数据名&#xff0c;可以简写为 :属性名数据名 <…...

下一代搜索引擎会什么?

现在是北京时间2023年11月18日。聊一聊搜索。 说到搜索&#xff0c;大家首先想到的肯定是谷歌&#xff0c;百度。我把这些定义成上一个时代的搜索引擎。ChatGPT已经火热了有一年的时间了&#xff0c;大家都认为Ai搜索是下一代的搜索。但是AI搜索&#xff0c;需要的是很大算力&a…...

WPF中如何在MVVM模式下关闭窗口

完全来源于十月的寒流&#xff0c;感谢大佬讲解 使用Behaviors <Window x:Class"Test_03.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:b"http://schemas.microsoft.com/xaml/behaviors"xmlns:x&quo…...

【数据结构&C++】二叉平衡搜索树-AVL树(25)

前言 大家好吖&#xff0c;欢迎来到 YY 滴C系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; 目录 一.AVL树的概念二.AVL树节点的定义(代码…...

Python算法——树的最大深度和最小深度

Python中的树的最大深度和最小深度算法详解 树的最大深度和最小深度是树结构中的两个关键指标&#xff0c;它们分别表示树的从根节点到最深叶子节点的最大路径长度和最小路径长度。在本文中&#xff0c;我们将深入讨论如何计算树的最大深度和最小深度&#xff0c;并提供Python…...

46.全排列-py

46.全排列 class Solution(object):def permute(self, nums):""":type nums: List[int]:rtype: List[List[int]]"""# 结果数组0ans[]nlen(nums)# 判断是否使用state_[False]*n# 临时状态数组dp_[]def dfs (index):# 终止条件if indexn:ans.appe…...

Android性能优化深度解析:从理论到实践

在Android开发领域,性能优化是确保应用流畅运行和用户体验的关键。作为一名安卓开发工程师,掌握性能优化技术不仅能提升应用质量,还能在面试和实际工作中脱颖而出。本文将以性能优化为核心领域,深入探讨其理论、工具和实践方法,并提供代码示例和常见面试问题及答案。文章内…...

ARM ETE跟踪单元与单次比较器控制技术解析

1. ARM ETE跟踪单元的核心机制解析在嵌入式系统调试领域&#xff0c;ARM的嵌入式跟踪扩展(Embedded Trace Extension, ETE)提供了一套完整的指令执行流监控方案。其核心组件跟踪单元(Trace Unit)通过地址比较器(Address Comparator)实现细粒度的执行监控&#xff0c;能够捕获特…...

紧急更新!OpenAI API v4.5对脑筋急转弯类输出新增隐式过滤机制——立即启用这7个绕过策略,保住你的创意产能

更多请点击&#xff1a; https://codechina.net 第一章&#xff1a;OpenAI API v4.5脑筋急转弯过滤机制的底层原理与影响评估 OpenAI API v4.5 引入的脑筋急转弯过滤机制并非独立模块&#xff0c;而是深度集成于请求预处理与响应后置校验双阶段的语义安全策略。其核心依赖于轻…...

ChatGPT移动端隐私红线报告(2024Q2):麦克风/剪贴板/位置数据采集路径全曝光,3步彻底锁死敏感权限

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;ChatGPT移动端隐私红线报告&#xff08;2024Q2&#xff09;核心发现与风险定级 高危数据外泄通道实证 本季度对iOS与Android平台主流ChatGPT客户端&#xff08;含官方App v6.12.1及第三方封装SDK集成应…...

DeepSeek流式响应提速73%的底层逻辑:从Token缓冲区到GPU显存调度的全链路拆解

更多请点击&#xff1a; https://kaifayun.com 第一章&#xff1a;DeepSeek流式响应提速73%的工程现象与性能基线 在真实生产环境中对 DeepSeek-R1 模型实施流式响应优化后&#xff0c;端到端首 token 延迟&#xff08;Time to First Token, TTFT&#xff09;从平均 482ms 降至…...

摒弃地毯式盲搜,智能定位指引科学救援方向 ——视频孪生无感定位驱动煤矿智能化抢险救援技术方案

摒弃地毯式盲搜&#xff0c;智能定位指引科学救援方向——视频孪生无感定位驱动煤矿智能化抢险救援技术方案一、方案引言煤矿井下灾害具备突发性强、环境复杂多变、次生风险叠加的特征&#xff0c;瓦斯冲击、顶板坍塌、透水淹井事故发生后&#xff0c;巷道结构损毁、通信供电中…...

Arknights-Mower:解放双手的明日方舟智能基建管理工具

Arknights-Mower&#xff1a;解放双手的明日方舟智能基建管理工具 【免费下载链接】arknights-mower 《明日方舟》长草助手 项目地址: https://gitcode.com/gh_mirrors/ar/arknights-mower 在《明日方舟》的日常游戏过程中&#xff0c;基建管理、资源刷取和日常任务占据…...

Zotero PDF Translate:打破语言壁垒的学术翻译神器

Zotero PDF Translate&#xff1a;打破语言壁垒的学术翻译神器 【免费下载链接】zotero-pdf-translate Translate PDF, EPub, webpage, metadata, annotations, notes to the target language. Support 20 translate services. 项目地址: https://gitcode.com/gh_mirrors/zo/…...

重新定义Android设备管理:告别命令行,拥抱可视化操作新时代

重新定义Android设备管理&#xff1a;告别命令行&#xff0c;拥抱可视化操作新时代 【免费下载链接】FastbootEnhance A user-friendly Fastboot ToolBox & Payload Dumper for Windows 项目地址: https://gitcode.com/gh_mirrors/fa/FastbootEnhance 你是否曾经面对…...

中小团队如何统一管理多个项目的AI模型调用与API密钥

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 中小团队如何统一管理多个项目的AI模型调用与API密钥 在中小型技术团队的日常开发中&#xff0c;多个项目并行是常态。这些项目可能…...