当前位置: 首页 > news >正文

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State.
在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.

  • 需求
    将接收到的Socket数据源中的字符串进行拼接
    在命令行开启socket命令:
    nc -lk 8888
    
  • 业务代码
    public class FlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1,便于观察env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将数据进行分组,将分组key给一个常量值SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")// 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等.map(new RichMapFunction<String, String>() {ListState<String> listState;// open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器@Overridepublic void open(Configuration parameters) throws Exception {// 获取上下文RuntimeContext ctx = getRuntimeContext();// 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));}// 在map方法中正常编写业务逻辑@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中listState.add(s);Iterable<String> strings = listState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}});map.print();env.execute("Keyed State");}
    }
    
    API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.

相关文章:

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State. 在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Ch…...

c语言:模拟实现qsort函数

qsort函数的功能&#xff1a; qsort相较于冒泡排序法&#xff0c;不仅效率更快&#xff0c;而且能够比较不同类型的元素&#xff0c;如&#xff1a;浮点数&#xff0c;结构体等等。这里我们来模拟下qsort是如何实现这一功能的&#xff0c;方便我们对指针数组有一个更深层次的理…...

从0开始学习数据结构 C语言实现 1.前篇及二分查找算法

一、前篇 1、什么是数据结构&#xff1f; 数据结构是带有结构特性的数据元素的集合&#xff0c;它研究的是数据的逻辑结构和数据的物理结构以及它们之间的相互关系 2、时间复杂度与空间复杂度 大O符号是用于描述函数渐进行为的数学符号 常用函数的增长表 阶乘O(n!) > 指数…...

VSCode 使用CMakePreset找不到cl.exe编译器的问题

在用vscode开发c项目的时候&#xff0c;使用预先配置的CMakePresets.json可以把一些特定的cmake选项固定下来&#xff0c;在配置时直接使用 "cmake --config --preset presetname"就可以进行配置&#xff0c;免去在命令行输入过多的配置参数。 但是在vscode中&#…...

【Linux系统化学习】进程的状态 | 僵尸进程 | 孤儿进程

个人主页点击直达&#xff1a;小白不是程序媛 Linux专栏&#xff1a;Linux系统化学习 目录 操作系统进程的状态 运行状态 阻塞状态 进程阻塞的现象 挂起阻塞状态 Linux进程状态 Linux内核源代码怎么说 R&#xff08;running状态&#xff09;运行状态 S&#xff08;sl…...

深信服AC流量管理技术

拓扑图 一.保证通道针对修仙部&#xff0c;访问网站&#xff0c;邮件&#xff0c;DNS&#xff0c;IM&#xff0c;办工 OA&#xff0c;微博论坛网上银行等常见应用保证带宽最低 50%&#xff0c;最高 100% 1. 先新建线路带宽 2.新增流量管理通道&#xff08;保证关键应用&#x…...

二元关系及关系代数中的象集、除运算

二元关系及关系代数中的象集、除运算 数学上&#xff0c;二元关系用于讨论两个数学对象的联系。诸如算术中的「大于」及「等于」&#xff0c;几何学中的"相似"&#xff0c;或集合论中的"为...之元素"或"为...之子集"。二元关系有时会简称关系&a…...

[PHP]关联和操作MySQL数据库然后将数据库部署到ECS

在Mac电脑上使用VS Code进行PHP开发并关联操作MySQL数据库&#xff0c;然后将数据库部署到ECS。 1.安装PHP和MySQL 确保你的Mac上已经安装了PHP和MySQL。你可以使用Homebrew来安装它们&#xff1a; $ brew install php $ brew install mysql 安装mysql完成后记住这一句: …...

23.11.19日总结

经过昨天的中期答辩&#xff0c;其实可以看出来项目进度太慢了&#xff0c;现在是第十周&#xff0c;预计第十四周是终级答辩&#xff0c;在这段时间要把项目写完。 前端要加上一个未登录的拦截器&#xff0c;后端加上全局的异常处理。对于饿了么项目的商品建表&#xff0c;之前…...

系列一、JVM概述

一、概述 1.1、Java发展中的重大事件 1.2、虚拟机 vs Java虚拟机 1.2.1、虚拟机 1.2.2、Java虚拟机 1.2.3、Java虚拟机的作用 Java虚拟机是二进制字节码的运行环境&#xff0c;负责装载字节码到其内部&#xff0c;解释/编译为对应平台上的机器指令指令。每一条Java指令&#…...

milvus数据管理-压缩数据

Milvus 默认支持自动数据压缩。您可以 配置 Milvus 以启用或禁用 压缩 和自动压缩。 如果自动压缩被禁用&#xff0c;您仍然可以手动压缩数据。 1.手动压缩数据 压缩请求是异步处理的&#xff0c;因为它们通常需要花费很长时间。 from pymilvus import Collection collection…...

SpringBoot项目连接linux服务器数据库两种解决方法(linux直接开放端口访问本机通过SSH协议访问,以mysql为例)

最近找个springboot脚手架重新熟悉一下springboot相关框架的东西&#xff0c;结果发现好像项目还不能直接像数据库GUI工具一样填几个SSH参数就可以了&#xff0c;于是就给他再整一下看看如何解决 linux开放3306&#xff08;可修改&#xff09;端口直接访问 此方法较为方便&am…...

【Rust】快速教程——闭包与生命周期

前言 你怎么向天生的瞎子说清颜色&#xff1f;怎么用手势向天生的聋子描述声音&#xff1f; 鲜花就在眼前&#xff0c;雷鸣就在头顶&#xff0c;对他们来说却都毫无意义 眼睛看不到&#xff0c;鼻子可以嗅闻花香&#xff0c;耳朵听不见&#xff0c;手指可以触碰窗纸的震动。 犯…...

redis高级案列case

案列一 双写一致性 案例二 双锁策略 package com.redis.redis01.service;import com.redis.redis01.bean.RedisBs; import com.redis.redis01.mapper.RedisBsMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; imp…...

Vue3+Vite实现工程化,attribute属性渲染v-bind指令

想要渲染一个元素的attribute&#xff0c;应该使用v-bind指令 由于插值表达式不能直接放在标签的属性中&#xff0c;所有要渲染元素的属性就应该使用v-bindv-bind可以用于渲染任何元素的属性&#xff0c;语法为 v-bind:属性名数据名&#xff0c;可以简写为 :属性名数据名 <…...

下一代搜索引擎会什么?

现在是北京时间2023年11月18日。聊一聊搜索。 说到搜索&#xff0c;大家首先想到的肯定是谷歌&#xff0c;百度。我把这些定义成上一个时代的搜索引擎。ChatGPT已经火热了有一年的时间了&#xff0c;大家都认为Ai搜索是下一代的搜索。但是AI搜索&#xff0c;需要的是很大算力&a…...

WPF中如何在MVVM模式下关闭窗口

完全来源于十月的寒流&#xff0c;感谢大佬讲解 使用Behaviors <Window x:Class"Test_03.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:b"http://schemas.microsoft.com/xaml/behaviors"xmlns:x&quo…...

【数据结构&C++】二叉平衡搜索树-AVL树(25)

前言 大家好吖&#xff0c;欢迎来到 YY 滴C系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; 目录 一.AVL树的概念二.AVL树节点的定义(代码…...

Python算法——树的最大深度和最小深度

Python中的树的最大深度和最小深度算法详解 树的最大深度和最小深度是树结构中的两个关键指标&#xff0c;它们分别表示树的从根节点到最深叶子节点的最大路径长度和最小路径长度。在本文中&#xff0c;我们将深入讨论如何计算树的最大深度和最小深度&#xff0c;并提供Python…...

46.全排列-py

46.全排列 class Solution(object):def permute(self, nums):""":type nums: List[int]:rtype: List[List[int]]"""# 结果数组0ans[]nlen(nums)# 判断是否使用state_[False]*n# 临时状态数组dp_[]def dfs (index):# 终止条件if indexn:ans.appe…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

业务系统对接大模型的基础方案:架构设计与关键步骤

业务系统对接大模型&#xff1a;架构设计与关键步骤 在当今数字化转型的浪潮中&#xff0c;大语言模型&#xff08;LLM&#xff09;已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中&#xff0c;不仅可以优化用户体验&#xff0c;还能为业务决策提供…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

前端倒计时误差!

提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...