【Flink-scala】DataStream编程模型之状态编程
DataStream编程模型之状态编程
参考:
1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出
2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器
4.【Flink-scala】DataStream编程模型之水位线
5.【Flink-scala】DataStream编程模型之延迟数据处理
文章目录
- DataStream编程模型之状态编程
- 前言
- 一、状态编程相关概念
- 1.1Flink中状态始终与特定算子相关联
- 1.2 演示代码
- 1.3 状态编程程序输入输出
前言
流计算分为无状态和有状态两种,无状态是观察每个独立事件,根据最后一个事件输出结果。比如传感器只关注当前的水位量,超出水位量就发生报警事件。
有状态计算则会基于多个事件输出结果。比如计算过去1小时的水位平均值,那就是状态的计算。
一、状态编程相关概念
流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态计算。
在传统的批处理中,数据的划分为块分片去完成的,每个task处理一个分片,执行完成后,把结果聚合起来就是最终的结果,这个过程中,对状态的需求还是较少的。
但对于流计算而言,它对状态有着非常高的要求,因为在流系统中,输入是一个无限制的流,会运行很长一段时间,甚至运行几天或者几个月都不会停机。在这个过程当中,就需要把状态数据很好地管理起来
1.1Flink中状态始终与特定算子相关联
分为算子状态和键控状态
算子状态的作用范围限定为算子任务,这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。
算子状态不能由相同或不同算子的另一个任务访问
键控状态是根据输入数据流中定义的键来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据分区到同一个算子任务中,这个任务会维护和处理这个键对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的键。因此,具有相同键的所有数据都会访问相同状态
1.2 演示代码
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double)object StateTest {def main(args: Array[String]): Unit = {//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定程序并行度env.setParallelism(1) //创建数据源val source = env.socketTextStream("localhost", 9999) //指定针对数据流的转换操作逻辑val stockDataStream = source.map(s => s.split(",")).map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))val alertStream = stockDataStream.keyBy(_.stockId).flatMap(new PriceChangeAlert(10))//新建了一个PriceChangeAlert类 这里重新了flatmap方法// 打印输出alertStream.print() //触发程序执行env.execute("state test")}class PriceChangeAlert(threshold: Double) extends RichFlatMapFunction[StockPrice,(String, Double, Double)]{//定义状态保存上一次的价格lazy val lastPriceState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-price",classOf[Double]))override def flatMap(value: StockPrice, out: Collector[(String, Double, Double)]): Unit = {// 获取上次的价格
val lastPrice = lastPriceState.value()
//跟最新的价格求差值做比较val diff = (value.price-lastPrice).absif( diff > threshold)out.collect((value.stockId,lastPrice,value.price))//更新状态lastPriceState.update(value.price)}}
}
代码分析:
1.传入参数,阈值
2.继承里接受一个stockPrice类型的输入,一个(String,Double,Double)三元组的输出。
(String,Double,Double)
case class StockPrice(stockId:String,timeStamp:Long,price:Double)
有什么不同呢,两个double代表了两个价格:分别代表股票ID、上次价格、当前价格。
3.ValueState是Flink中用于保存单个值的状态。这里它被用来保存上一次处理的股票价格。lazy关键字意味着这个状态变量只有在第一次被使用时才会被初始化
4…getState(new ValueStateDescriptor[Double](“last-price”, classOf[Double])): 这个方法尝试从运行时上下文中检索一个名为 “last-price” 的 ValueState,如果状态不存在,它将根据提供的 ValueStateDescriptor 创建一个新的状态。
ValueStateDescriptor 包含了状态的名称(代码中是 “last-price”)和状态的值的类型(这个代码中是 Double)。
5. classOf[Double] 提供了状态的值的类型信息。
6. 重写的flatmap应该能看懂,主要是当当前价格超出阈值(代码中是10),就打印。
1.3 状态编程程序输入输出
输入:
stock_4,1602031562148,43.4
stock_1,1602036130952,39.7
stock_4,1602036131741,59.9
stock_2,1602036132184,30.1
stock_3,1602036133154,79.8
stock_0,1602036133919,9.9
stock_1,1602036134385,21.7
输出:
(stock_4,0.0,43.4)
(stock_1,0.0,39.7)
(stock_4,43.4,59.9)
(stock_2,0.0,30.1)
(stock_3,0.0,79.8)
(stock_1,39.7,21.7)
其中根据stock_id分类。
初始状态:所有stockId的最近价格都是未定义的(即null或None,在代码中表现为Double的默认值0.0,因为ValueState在初始化时未设置值)。
处理第一条记录:stock_4,1602031562148,43.4。由于没有先前的价格,不会触发输出。最近价格更新为43.4。
处理第二条记录:stock_1,1602036130952,39.7。同样,没有先前的价格,不会触发输出。最近价格更新为39.7。
处理第三条记录:stock_4,1602036131741,59.9。价格从43.4变为59.9,差异为16.5,超过阈值10,因此输出(stock_4, 43.4, 59.9)。最近价格更新为59.9。
后续记录:对于stock_2、stock_3、stock_0,由于没有先前的价格,30.1 和79.8直接列出,
但是9.9这个价格要注意
stock_0,默认值为0,这里变为9.9,没有超出阈值10,那么输出就没有。
相关文章:

【Flink-scala】DataStream编程模型之状态编程
DataStream编程模型之状态编程 参考: 1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器 4.【Flink-scal…...

RabbitMQ的核心组件有哪些?
大家好,我是锋哥。今天分享关于【RabbitMQ的核心组件有哪些?】面试题。希望对大家有帮助; RabbitMQ的核心组件有哪些? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 RabbitMQ是一个开源的消息代理(Messag…...

【Linux基础】基本开发工具的使用
目录 一、编译器——gcc/g的使用 gcc/g的安装 gcc的安装: g的安装: gcc/g的基本使用 gcc的使用 g的使用 动态链接与静态链接 程序的翻译过程 1. 一个C/C程序的构建过程,程序从源代码到可执行文件必须经历四个阶段 2. 理解选项的含…...

常见的数据结构和应用场景
数据结构是计算机科学中的基础概念,用于组织和存储数据,以便能够高效地访问和修改。下面是几种常见数据结构及其代表性应用场景: 1. 数组(Array) 问题解决:数组是一种线性数据结构,用于存储相…...

爬虫基础学习
爬虫概念与工作原理 爬虫是什么:爬虫(Web Scraping)是自动化地访问网站并提取数据的技术。它模拟用户浏览器的行为,通过HTTP请求访问网页,解析HTML文档并提取有用信息。 爬虫的基本工作流程: 发送HTTP请求…...

C++对象数组对象指针对象指针数组
一、对象数组 对象数组中的每一个元素都是同类的对象; 例1 对象数组成员的初始化 #include<iostream> using namespace std;class Student { public:Student( ){ };Student(int n,string nam,char s):num(n),name(nam),sex(s){};void display(){cout<&l…...

D96【python 接口自动化学习】- pytest进阶之fixture用法
day96 pytest的fixture详解(三) 学习日期:20241211 学习目标:pytest基础用法 -- pytest的fixture详解(三) 学习笔记: fixture(scop"class") (scop"class") 每一个类调…...

【算法】动态规划中01背包问题解析
📢博客主页:https://blog.csdn.net/2301_779549673 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! 📢本文由 JohnKi 原创,首发于 CSDN🙉 📢未来很长&#…...

选择WordPress和Shopify:搭建对谷歌SEO友好的网站
在建设网站时,不仅要考虑它的美观和功能性,还要关注它是否对谷歌SEO友好。如果你希望网站能够获得更好的搜索排名,WordPress和Shopify是两个值得推荐的建站平台。 WordPress作为最流行的内容管理系统,其强大的灵活性和丰富的插件…...

代理IP与生成式AI:携手共创未来
目录 代理IP:网络世界的“隐形斗篷” 1. 隐藏真实IP,保护隐私 2. 突破网络限制,访问更多资源 生成式AI:创意与效率的“超级大脑” 1. 提高创作效率 2. 个性化定制 代理IP与生成式AI的协同作用 1. 网络安全 2. 内容创作与…...

iOS 应用的生命周期
Managing your app’s life cycle | Apple Developer Documentation Performance and metrics | Apple Developer Documentation iOS 应用的生命周期状态是理解应用如何在不同状态下运行和管理资源的基础。在 iOS 开发中,应用生命周期管理的是应用从启动到终止的整…...

Elasticsearch 集群快照的定期备份设置指南
Elasticsearch 集群快照的定期备份设置指南 概述 快照: 在给定时刻对整个集群或者单个索引进行备份,以便在之后出现故障时可以基于之前备份的快照进行快速恢复。 前提条件: 准备一个备份存储盘,本指南采用的是AWS EFS文件系统做…...

Docker--Docker Image(镜像)
什么是Docker Image? Docker镜像(Docker Image)是Docker容器技术的核心组件之一,它包含了运行应用程序所需的所有依赖、库、代码、运行时环境以及配置文件等。 简单来说,Docker镜像是一个轻量级、可执行的软件包&…...

C++ 中的序列化和反序列化
一、C 中的序列化和反序列化 (一)基本概念 在 C 中,序列化是将对象转换为字节流的过程,反序列化则是从字节流重新构建对象的过程。这对于存储对象状态到文件、网络传输等场景非常有用。 (二)简单的序列化…...

我的Github学生认证申请过程
先说结论:很简单。 学生认证链接:GitHub Education GitHub 1. 首先你得绑定edu邮箱。这个应该没什么问题,Github也会提示。 2. 我是在学校里面、使用流量而非WiFi申请的,听说地理位置很重要,该给的权限(…...

信奥题解:勾股数计算中的浮点数精度问题
来源:GESP C++ 二级模拟题 本文给出官方参考答案的详细解析,包括每一部分的功能和关键点,以及与浮点数精度相关的问题的分析。 题目描述 勾股数是很有趣的数学概念。如果三个正整数a 、b 、c ,满足 a 2 + b 2 = c 2 a^2 + b^2 = c^2 a2+b2=c2 ,而且1 ≤ a ≤ b ≤ c ,…...

重生之我在学Vue--第2天 Vue 3 Composition API 与响应式系统
重生之我在学Vue–第2天 Vue 3 Composition API 与响应式系统 文章目录 重生之我在学Vue--第2天 Vue 3 Composition API 与响应式系统前言一、Composition API 核心概念1.1 什么是 Composition API?1.2 Composition API 的核心工具1.3 基础用法示例 二、响应式系统2…...

【AI知识】逻辑回归介绍+ 做二分类任务的实例(代码可视化)
1. 分类的基本概念 在机器学习的有监督学习中,分类一种常见任务,它的目标是将输入数据分类到预定的类别中。具体来说: 分类任务的常见应用: 垃圾邮件分类:判断一封电子邮件是否是垃圾邮件 。 医学诊断:…...

Mysql 笔记2 emp dept HRs
-- 注意事项 -- 1.给数据库和表起名字时尽量选择全小写 -- 2.作为筛选条件的字符串是否区分大小写看设置的校对规则utf8_bin 区分 drop database if exists hrs; create database hrs default charset utf8 collate utf8_general_ci;use hrs; drop table if exists tb_emp; dro…...

MySQL和Oracle的区别
MySQL和Oracle的区别 MySQL是轻量型数据库,并且免费,没有服务恢复数据。 Oracle是重量型数据库,收费,Oracle公司对Oracle数据库有任何服务。 1.对事务的提交 MySQL默认是自动提交,而Oracle默认不自动提交࿰…...

实验12 C语言连接和操作MySQL数据库
一、安装MySQL 1、使用包管理器安装MySQL sudo apt update sudo apt install mysql-server2、启动MySQL服务: sudo systemctl start mysql3、检查MySQL服务状态: sudo systemctl status mysql二、安装MySQL开发库 sudo apt-get install libmysqlcli…...

09篇--图片的水印添加(掩膜的运用)
如何添加水印? 添加水印其实可以理解为将一张图片中的某个物体或者图案提取出来,然后叠加到另一张图片上。具体的操作思想是通过将原始图片转换成灰度图,并进行二值化处理,去除背景部分,得到一个类似掩膜的图像。然后…...

sql-labs(21-25)
第21关 第一步 可以发现cookie是经过64位加密的 我们试试在这里注入 选择给他编码 发现可以成功注入 爆出表名 爆出字段 爆出数据 第22关 跟二十一关一模一样 闭合换成" 第 23 关 第二十三关重新回到get请求,会发现输入单引号报错,但是注释符…...

CTF知识集-命令执行
CTF知识集-命令执行 写在开头可能会用到的提醒 ;可以用%0a来替换 是shell_exec的缩写 ls | tee 1 把ls的输出内容存入1这个文件 shell查看文件的几种方式,tac | more | less | tail | sort | tac | cat | head | od | expand 针对flag 可以用grep { flag.php来…...

基于米尔全志T527开发板的OpenCV进行手势识别方案
本文将介绍基于米尔电子MYD-LT527开发板(米尔基于全志T527开发板)的OpenCV手势识别方案测试。 摘自优秀创作者-小火苗 米尔基于全志T527开发板 一、软件环境安装 1.安装OpenCV sudo apt-get install libopencv-dev python3-opencv 2.安装pip sudo apt…...

Htpp中web通讯发送post(上传文件)、get请求
一、正常发送post请求 1、引入pom文件 <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5</version></dependency>2、这个是发送至正常的post、get请求 import org…...

【论文阅读笔记】HunyuanVideo: A Systematic Framework For Large Video Generative Models
HunyuanVideo: A Systematic Framework For Large Video Generative Models 前言引言Overview数据预处理数据过滤数据注释 模型架构设计3D Variational Auto-encoder Designtraininginference 统一的图像和视频生成架构Text encoderModel ScalingImage model scaling lawvideo …...

SpringBoot的事务钩子函数
如果需要在A方法执行完成之后做一个不影响主方法运行的动作B,我们需要判断这个A方法是否存在事务,并且使用异步执行动作B; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transa…...

源码安装PHP-7.2.19
源码安装PHP-7.2.19 1.解压 tar -xjvf php-7.2.19.tar.bz2.编译 -prefix安装路径 cd php-7.2.19 ./configure --prefix/home/work/study 成功输出 3.make(构建) makemake testmake installlinux对php操作的一些命令 # 进入到php [rootvdb1 study]# cd php/ [rootvdb1 st…...

UE5制作伤害浮动数字
效果演示: 首先创建一个控件UI 添加画布和文本 文本设置样式 添加伤害浮动动画,根据自己喜好调整,我设置了缩放和不透明度 添加绑定 转到事件图表,事件构造设置动画 创建actor蓝图类 添加widget 获取位置 设置位移 创建一个被击中…...