当前位置: 首页 > news >正文

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State.
在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.

  • 需求
    将接收到的Socket数据源中的字符串进行拼接
    在命令行开启socket命令:
    nc -lk 8888
    
  • 业务代码
    public class FlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1,便于观察env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将数据进行分组,将分组key给一个常量值SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")// 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等.map(new RichMapFunction<String, String>() {ListState<String> listState;// open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器@Overridepublic void open(Configuration parameters) throws Exception {// 获取上下文RuntimeContext ctx = getRuntimeContext();// 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));}// 在map方法中正常编写业务逻辑@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中listState.add(s);Iterable<String> strings = listState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}});map.print();env.execute("Keyed State");}
    }
    
    API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.

相关文章:

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State. 在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Ch…...

c语言:模拟实现qsort函数

qsort函数的功能&#xff1a; qsort相较于冒泡排序法&#xff0c;不仅效率更快&#xff0c;而且能够比较不同类型的元素&#xff0c;如&#xff1a;浮点数&#xff0c;结构体等等。这里我们来模拟下qsort是如何实现这一功能的&#xff0c;方便我们对指针数组有一个更深层次的理…...

从0开始学习数据结构 C语言实现 1.前篇及二分查找算法

一、前篇 1、什么是数据结构&#xff1f; 数据结构是带有结构特性的数据元素的集合&#xff0c;它研究的是数据的逻辑结构和数据的物理结构以及它们之间的相互关系 2、时间复杂度与空间复杂度 大O符号是用于描述函数渐进行为的数学符号 常用函数的增长表 阶乘O(n!) > 指数…...

VSCode 使用CMakePreset找不到cl.exe编译器的问题

在用vscode开发c项目的时候&#xff0c;使用预先配置的CMakePresets.json可以把一些特定的cmake选项固定下来&#xff0c;在配置时直接使用 "cmake --config --preset presetname"就可以进行配置&#xff0c;免去在命令行输入过多的配置参数。 但是在vscode中&#…...

【Linux系统化学习】进程的状态 | 僵尸进程 | 孤儿进程

个人主页点击直达&#xff1a;小白不是程序媛 Linux专栏&#xff1a;Linux系统化学习 目录 操作系统进程的状态 运行状态 阻塞状态 进程阻塞的现象 挂起阻塞状态 Linux进程状态 Linux内核源代码怎么说 R&#xff08;running状态&#xff09;运行状态 S&#xff08;sl…...

深信服AC流量管理技术

拓扑图 一.保证通道针对修仙部&#xff0c;访问网站&#xff0c;邮件&#xff0c;DNS&#xff0c;IM&#xff0c;办工 OA&#xff0c;微博论坛网上银行等常见应用保证带宽最低 50%&#xff0c;最高 100% 1. 先新建线路带宽 2.新增流量管理通道&#xff08;保证关键应用&#x…...

二元关系及关系代数中的象集、除运算

二元关系及关系代数中的象集、除运算 数学上&#xff0c;二元关系用于讨论两个数学对象的联系。诸如算术中的「大于」及「等于」&#xff0c;几何学中的"相似"&#xff0c;或集合论中的"为...之元素"或"为...之子集"。二元关系有时会简称关系&a…...

[PHP]关联和操作MySQL数据库然后将数据库部署到ECS

在Mac电脑上使用VS Code进行PHP开发并关联操作MySQL数据库&#xff0c;然后将数据库部署到ECS。 1.安装PHP和MySQL 确保你的Mac上已经安装了PHP和MySQL。你可以使用Homebrew来安装它们&#xff1a; $ brew install php $ brew install mysql 安装mysql完成后记住这一句: …...

23.11.19日总结

经过昨天的中期答辩&#xff0c;其实可以看出来项目进度太慢了&#xff0c;现在是第十周&#xff0c;预计第十四周是终级答辩&#xff0c;在这段时间要把项目写完。 前端要加上一个未登录的拦截器&#xff0c;后端加上全局的异常处理。对于饿了么项目的商品建表&#xff0c;之前…...

系列一、JVM概述

一、概述 1.1、Java发展中的重大事件 1.2、虚拟机 vs Java虚拟机 1.2.1、虚拟机 1.2.2、Java虚拟机 1.2.3、Java虚拟机的作用 Java虚拟机是二进制字节码的运行环境&#xff0c;负责装载字节码到其内部&#xff0c;解释/编译为对应平台上的机器指令指令。每一条Java指令&#…...

milvus数据管理-压缩数据

Milvus 默认支持自动数据压缩。您可以 配置 Milvus 以启用或禁用 压缩 和自动压缩。 如果自动压缩被禁用&#xff0c;您仍然可以手动压缩数据。 1.手动压缩数据 压缩请求是异步处理的&#xff0c;因为它们通常需要花费很长时间。 from pymilvus import Collection collection…...

SpringBoot项目连接linux服务器数据库两种解决方法(linux直接开放端口访问本机通过SSH协议访问,以mysql为例)

最近找个springboot脚手架重新熟悉一下springboot相关框架的东西&#xff0c;结果发现好像项目还不能直接像数据库GUI工具一样填几个SSH参数就可以了&#xff0c;于是就给他再整一下看看如何解决 linux开放3306&#xff08;可修改&#xff09;端口直接访问 此方法较为方便&am…...

【Rust】快速教程——闭包与生命周期

前言 你怎么向天生的瞎子说清颜色&#xff1f;怎么用手势向天生的聋子描述声音&#xff1f; 鲜花就在眼前&#xff0c;雷鸣就在头顶&#xff0c;对他们来说却都毫无意义 眼睛看不到&#xff0c;鼻子可以嗅闻花香&#xff0c;耳朵听不见&#xff0c;手指可以触碰窗纸的震动。 犯…...

redis高级案列case

案列一 双写一致性 案例二 双锁策略 package com.redis.redis01.service;import com.redis.redis01.bean.RedisBs; import com.redis.redis01.mapper.RedisBsMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; imp…...

Vue3+Vite实现工程化,attribute属性渲染v-bind指令

想要渲染一个元素的attribute&#xff0c;应该使用v-bind指令 由于插值表达式不能直接放在标签的属性中&#xff0c;所有要渲染元素的属性就应该使用v-bindv-bind可以用于渲染任何元素的属性&#xff0c;语法为 v-bind:属性名数据名&#xff0c;可以简写为 :属性名数据名 <…...

下一代搜索引擎会什么?

现在是北京时间2023年11月18日。聊一聊搜索。 说到搜索&#xff0c;大家首先想到的肯定是谷歌&#xff0c;百度。我把这些定义成上一个时代的搜索引擎。ChatGPT已经火热了有一年的时间了&#xff0c;大家都认为Ai搜索是下一代的搜索。但是AI搜索&#xff0c;需要的是很大算力&a…...

WPF中如何在MVVM模式下关闭窗口

完全来源于十月的寒流&#xff0c;感谢大佬讲解 使用Behaviors <Window x:Class"Test_03.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:b"http://schemas.microsoft.com/xaml/behaviors"xmlns:x&quo…...

【数据结构&C++】二叉平衡搜索树-AVL树(25)

前言 大家好吖&#xff0c;欢迎来到 YY 滴C系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; 目录 一.AVL树的概念二.AVL树节点的定义(代码…...

Python算法——树的最大深度和最小深度

Python中的树的最大深度和最小深度算法详解 树的最大深度和最小深度是树结构中的两个关键指标&#xff0c;它们分别表示树的从根节点到最深叶子节点的最大路径长度和最小路径长度。在本文中&#xff0c;我们将深入讨论如何计算树的最大深度和最小深度&#xff0c;并提供Python…...

46.全排列-py

46.全排列 class Solution(object):def permute(self, nums):""":type nums: List[int]:rtype: List[List[int]]"""# 结果数组0ans[]nlen(nums)# 判断是否使用state_[False]*n# 临时状态数组dp_[]def dfs (index):# 终止条件if indexn:ans.appe…...

网络编程(Modbus进阶)

思维导图 Modbus RTU&#xff08;先学一点理论&#xff09; 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议&#xff0c;由 Modicon 公司&#xff08;现施耐德电气&#xff09;于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

7.4.分块查找

一.分块查找的算法思想&#xff1a; 1.实例&#xff1a; 以上述图片的顺序表为例&#xff0c; 该顺序表的数据元素从整体来看是乱序的&#xff0c;但如果把这些数据元素分成一块一块的小区间&#xff0c; 第一个区间[0,1]索引上的数据元素都是小于等于10的&#xff0c; 第二…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...