Flink之KeyedState
前面的文章中介绍过Operator State,这里介绍一下Keyed State.
在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.
- 需求
将接收到的Socket数据源中的字符串进行拼接
在命令行开启socket命令:nc -lk 8888 - 业务代码
public class FlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1,便于观察env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将数据进行分组,将分组key给一个常量值SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")// 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等.map(new RichMapFunction<String, String>() {ListState<String> listState;// open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器@Overridepublic void open(Configuration parameters) throws Exception {// 获取上下文RuntimeContext ctx = getRuntimeContext();// 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));}// 在map方法中正常编写业务逻辑@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中listState.add(s);Iterable<String> strings = listState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}});map.print();env.execute("Keyed State");} }API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度、上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.
相关文章:
Flink之KeyedState
前面的文章中介绍过Operator State,这里介绍一下Keyed State. 在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Ch…...
c语言:模拟实现qsort函数
qsort函数的功能: qsort相较于冒泡排序法,不仅效率更快,而且能够比较不同类型的元素,如:浮点数,结构体等等。这里我们来模拟下qsort是如何实现这一功能的,方便我们对指针数组有一个更深层次的理…...
从0开始学习数据结构 C语言实现 1.前篇及二分查找算法
一、前篇 1、什么是数据结构? 数据结构是带有结构特性的数据元素的集合,它研究的是数据的逻辑结构和数据的物理结构以及它们之间的相互关系 2、时间复杂度与空间复杂度 大O符号是用于描述函数渐进行为的数学符号 常用函数的增长表 阶乘O(n!) > 指数…...
VSCode 使用CMakePreset找不到cl.exe编译器的问题
在用vscode开发c项目的时候,使用预先配置的CMakePresets.json可以把一些特定的cmake选项固定下来,在配置时直接使用 "cmake --config --preset presetname"就可以进行配置,免去在命令行输入过多的配置参数。 但是在vscode中&#…...
【Linux系统化学习】进程的状态 | 僵尸进程 | 孤儿进程
个人主页点击直达:小白不是程序媛 Linux专栏:Linux系统化学习 目录 操作系统进程的状态 运行状态 阻塞状态 进程阻塞的现象 挂起阻塞状态 Linux进程状态 Linux内核源代码怎么说 R(running状态)运行状态 S(sl…...
深信服AC流量管理技术
拓扑图 一.保证通道针对修仙部,访问网站,邮件,DNS,IM,办工 OA,微博论坛网上银行等常见应用保证带宽最低 50%,最高 100% 1. 先新建线路带宽 2.新增流量管理通道(保证关键应用&#x…...
二元关系及关系代数中的象集、除运算
二元关系及关系代数中的象集、除运算 数学上,二元关系用于讨论两个数学对象的联系。诸如算术中的「大于」及「等于」,几何学中的"相似",或集合论中的"为...之元素"或"为...之子集"。二元关系有时会简称关系&a…...
[PHP]关联和操作MySQL数据库然后将数据库部署到ECS
在Mac电脑上使用VS Code进行PHP开发并关联操作MySQL数据库,然后将数据库部署到ECS。 1.安装PHP和MySQL 确保你的Mac上已经安装了PHP和MySQL。你可以使用Homebrew来安装它们: $ brew install php $ brew install mysql 安装mysql完成后记住这一句: …...
23.11.19日总结
经过昨天的中期答辩,其实可以看出来项目进度太慢了,现在是第十周,预计第十四周是终级答辩,在这段时间要把项目写完。 前端要加上一个未登录的拦截器,后端加上全局的异常处理。对于饿了么项目的商品建表,之前…...
系列一、JVM概述
一、概述 1.1、Java发展中的重大事件 1.2、虚拟机 vs Java虚拟机 1.2.1、虚拟机 1.2.2、Java虚拟机 1.2.3、Java虚拟机的作用 Java虚拟机是二进制字节码的运行环境,负责装载字节码到其内部,解释/编译为对应平台上的机器指令指令。每一条Java指令&#…...
milvus数据管理-压缩数据
Milvus 默认支持自动数据压缩。您可以 配置 Milvus 以启用或禁用 压缩 和自动压缩。 如果自动压缩被禁用,您仍然可以手动压缩数据。 1.手动压缩数据 压缩请求是异步处理的,因为它们通常需要花费很长时间。 from pymilvus import Collection collection…...
SpringBoot项目连接linux服务器数据库两种解决方法(linux直接开放端口访问本机通过SSH协议访问,以mysql为例)
最近找个springboot脚手架重新熟悉一下springboot相关框架的东西,结果发现好像项目还不能直接像数据库GUI工具一样填几个SSH参数就可以了,于是就给他再整一下看看如何解决 linux开放3306(可修改)端口直接访问 此方法较为方便&am…...
【Rust】快速教程——闭包与生命周期
前言 你怎么向天生的瞎子说清颜色?怎么用手势向天生的聋子描述声音? 鲜花就在眼前,雷鸣就在头顶,对他们来说却都毫无意义 眼睛看不到,鼻子可以嗅闻花香,耳朵听不见,手指可以触碰窗纸的震动。 犯…...
redis高级案列case
案列一 双写一致性 案例二 双锁策略 package com.redis.redis01.service;import com.redis.redis01.bean.RedisBs; import com.redis.redis01.mapper.RedisBsMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; imp…...
Vue3+Vite实现工程化,attribute属性渲染v-bind指令
想要渲染一个元素的attribute,应该使用v-bind指令 由于插值表达式不能直接放在标签的属性中,所有要渲染元素的属性就应该使用v-bindv-bind可以用于渲染任何元素的属性,语法为 v-bind:属性名数据名,可以简写为 :属性名数据名 <…...
下一代搜索引擎会什么?
现在是北京时间2023年11月18日。聊一聊搜索。 说到搜索,大家首先想到的肯定是谷歌,百度。我把这些定义成上一个时代的搜索引擎。ChatGPT已经火热了有一年的时间了,大家都认为Ai搜索是下一代的搜索。但是AI搜索,需要的是很大算力&a…...
WPF中如何在MVVM模式下关闭窗口
完全来源于十月的寒流,感谢大佬讲解 使用Behaviors <Window x:Class"Test_03.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:b"http://schemas.microsoft.com/xaml/behaviors"xmlns:x&quo…...
【数据结构&C++】二叉平衡搜索树-AVL树(25)
前言 大家好吖,欢迎来到 YY 滴C系列 ,热烈欢迎! 本章主要内容面向接触过C的老铁 主要内容含: 欢迎订阅 YY滴C专栏!更多干货持续更新!以下是传送门! 目录 一.AVL树的概念二.AVL树节点的定义(代码…...
Python算法——树的最大深度和最小深度
Python中的树的最大深度和最小深度算法详解 树的最大深度和最小深度是树结构中的两个关键指标,它们分别表示树的从根节点到最深叶子节点的最大路径长度和最小路径长度。在本文中,我们将深入讨论如何计算树的最大深度和最小深度,并提供Python…...
46.全排列-py
46.全排列 class Solution(object):def permute(self, nums):""":type nums: List[int]:rtype: List[List[int]]"""# 结果数组0ans[]nlen(nums)# 判断是否使用state_[False]*n# 临时状态数组dp_[]def dfs (index):# 终止条件if indexn:ans.appe…...
OpenClaw部署指南:2026年百度云部署OpenClaw、配置百炼API、集成Skill、接入微信/QQ/飞书/钉钉步骤
OpenClaw部署指南:2026年百度云部署OpenClaw、配置百炼API、集成Skill、接入微信/QQ/飞书/钉钉步骤。OpenClaw(原Clawdbot)作为2026年主流的AI自动化助理平台,可通过阿里云轻量服务器实现724小时稳定运行,并快速接入钉…...
Socket.IO-Client-Swift终极安全指南:TLS/SSL配置和证书认证详解
Socket.IO-Client-Swift终极安全指南:TLS/SSL配置和证书认证详解 【免费下载链接】socket.io-client-swift 项目地址: https://gitcode.com/gh_mirrors/so/socket.io-client-swift Socket.IO-Client-Swift是一款功能强大的Swift客户端库,用于与S…...
GLM-OCR GPU算力优化实践:vLLM推理加速+令牌下采样,吞吐提升2.3倍
GLM-OCR GPU算力优化实践:vLLM推理加速令牌下采样,吞吐提升2.3倍 1. 项目背景与优化需求 GLM-OCR是一个基于GLM-V编码器-解码器架构构建的多模态OCR模型,专门为复杂文档理解而设计。这个模型集成了在大规模图文数据上预训练的CogViT视觉编码…...
SpringBoot + Ollama + Qdrant + DeepSeek:从零构建企业级本地知识库问答系统
1. 为什么选择这套技术栈? 在企业内部搭建知识库问答系统时,技术选型需要平衡性能、成本和易用性。这套组合拳的巧妙之处在于:SpringBoot提供企业级开发框架,Ollama让大模型本地化运行成为可能,Qdrant解决向量检索的效…...
Windows Defender一键移除工具:终极完整指南,三步彻底关闭系统安全防护
Windows Defender一键移除工具:终极完整指南,三步彻底关闭系统安全防护 【免费下载链接】windows-defender-remover A tool which is uses to remove Windows Defender in Windows 8.x, Windows 10 (every version) and Windows 11. 项目地址: https:/…...
Epigenase m6A 甲基化酶活性/抑制比色法检测试剂盒:快速、灵敏、高通量适配
一、产品概述Epigenase m6A 甲基化酶活性/抑制比色法检测试剂盒,由Cytoskeleton推出,艾美捷代理,它是一套完整的优化缓冲液与试剂组合,专用于定量检测总 m6A 甲基化酶(甲基转移酶)的活性或抑制效果。该试剂…...
【GitLab npm Registry 非标准端口安装问题解决方案】
GitLab npm Registry 非标准端口安装问题解决方案 问题类型: npm/pnpm 客户端与 GitLab npm Registry 集成 影响范围: 使用非标准端口的 GitLab npm Registry 解决时间: 2026-04-03 文档版本: v1.0 一、问题背景 1.1 业务场景 团队需要将内部组件库发布到私有 npm registry,选…...
[AI/向量数据库/GUI] Attu : Milvus 的图形化与一体化管理工具
起因是我想在搞一些操作windows进程的事情时,老是需要右键以管理员身份运行,感觉很麻烦。就研究了一下怎么提权,顺手瞄了一眼Windows下用户态权限分配,然后也是感谢《深入解析Windows操作系统》这本书给我偷令牌的灵感吧ÿ…...
【数据结构与算法】第24篇:哈夫曼树与哈夫曼编码
一、基本概念1.1 带权路径长度在二叉树中:路径长度:从一个节点到另一个节点经过的边数带权路径长度(WPL):所有叶子节点的权重 路径长度 之和示例:text叶子节点:A(7), B(5), C(2), D(4)普通树:15/ \7 8/…...
[Android] 故宫陶瓷馆 v2.2.251126
[Android] 故宫陶瓷馆 v2.2.251126 链接:https://pan.xunlei.com/s/VOpHzrBozQgvaUJbdCkB20SMA1?pwdu338# 故宫陶瓷馆是故宫博物院官方出品的APP,以“时间轴”为核心骨架、全新技术手段打造的陶瓷馆,为你将展品带至手中、带至眼前。...
