当前位置: 首页 > news >正文

【Flink-scala】DataStream编程模型之状态编程

DataStream编程模型之状态编程

参考:
1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出
2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器
4.【Flink-scala】DataStream编程模型之水位线
5.【Flink-scala】DataStream编程模型之延迟数据处理


文章目录

  • DataStream编程模型之状态编程
  • 前言
  • 一、状态编程相关概念
    • 1.1Flink中状态始终与特定算子相关联
    • 1.2 演示代码
    • 1.3 状态编程程序输入输出


前言

流计算分为无状态和有状态两种,无状态是观察每个独立事件,根据最后一个事件输出结果。比如传感器只关注当前的水位量,超出水位量就发生报警事件。
有状态计算则会基于多个事件输出结果。比如计算过去1小时的水位平均值,那就是状态的计算。

一、状态编程相关概念

流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态计算。

在传统的批处理中,数据的划分为块分片去完成的,每个task处理一个分片,执行完成后,把结果聚合起来就是最终的结果,这个过程中,对状态的需求还是较少的。

但对于流计算而言,它对状态有着非常高的要求,因为在流系统中,输入是一个无限制的流,会运行很长一段时间,甚至运行几天或者几个月都不会停机。在这个过程当中,就需要把状态数据很好地管理起来

1.1Flink中状态始终与特定算子相关联

分为算子状态和键控状态
在这里插入图片描述
算子状态的作用范围限定为算子任务,这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。

算子状态不能由相同或不同算子的另一个任务访问

键控状态是根据输入数据流中定义的键来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据分区到同一个算子任务中,这个任务会维护和处理这个键对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的键。因此,具有相同键的所有数据都会访问相同状态

在这里插入图片描述

1.2 演示代码

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double)object StateTest {def main(args: Array[String]): Unit = {//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment   
//设定程序并行度env.setParallelism(1) //创建数据源val source = env.socketTextStream("localhost", 9999) //指定针对数据流的转换操作逻辑val stockDataStream = source.map(s => s.split(",")).map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))val alertStream = stockDataStream.keyBy(_.stockId).flatMap(new PriceChangeAlert(10))//新建了一个PriceChangeAlert类  这里重新了flatmap方法// 打印输出alertStream.print() //触发程序执行env.execute("state test")}class PriceChangeAlert(threshold: Double) extends RichFlatMapFunction[StockPrice,(String, Double, Double)]{//定义状态保存上一次的价格lazy val lastPriceState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-price",classOf[Double]))override def flatMap(value: StockPrice, out: Collector[(String, Double, Double)]): Unit = {// 获取上次的价格
val lastPrice = lastPriceState.value()
//跟最新的价格求差值做比较val diff = (value.price-lastPrice).absif( diff > threshold)out.collect((value.stockId,lastPrice,value.price))//更新状态lastPriceState.update(value.price)}}
}

代码分析:
1.传入参数,阈值
2.继承里接受一个stockPrice类型的输入,一个(String,Double,Double)三元组的输出。

String,Double,Double
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

有什么不同呢,两个double代表了两个价格:分别代表股票ID、上次价格、当前价格。

3.ValueState是Flink中用于保存单个值的状态。这里它被用来保存上一次处理的股票价格。lazy关键字意味着这个状态变量只有在第一次被使用时才会被初始化
4…getState(new ValueStateDescriptor[Double](“last-price”, classOf[Double])): 这个方法尝试从运行时上下文中检索一个名为 “last-price” 的 ValueState,如果状态不存在,它将根据提供的 ValueStateDescriptor 创建一个新的状态。

ValueStateDescriptor 包含了状态的名称(代码中是 “last-price”)和状态的值的类型(这个代码中是 Double)。
5. classOf[Double] 提供了状态的值的类型信息。
6. 重写的flatmap应该能看懂,主要是当当前价格超出阈值(代码中是10),就打印。

1.3 状态编程程序输入输出

输入:

stock_4,1602031562148,43.4
stock_1,1602036130952,39.7
stock_4,1602036131741,59.9
stock_2,1602036132184,30.1
stock_3,1602036133154,79.8
stock_0,1602036133919,9.9
stock_1,1602036134385,21.7

输出:

(stock_4,0.0,43.4)
(stock_1,0.0,39.7)
(stock_4,43.4,59.9)
(stock_2,0.0,30.1)
(stock_3,0.0,79.8)
(stock_1,39.7,21.7)

其中根据stock_id分类。

初始状态:所有stockId的最近价格都是未定义的(即null或None,在代码中表现为Double的默认值0.0,因为ValueState在初始化时未设置值)。

处理第一条记录:stock_4,1602031562148,43.4。由于没有先前的价格,不会触发输出。最近价格更新为43.4。
处理第二条记录:stock_1,1602036130952,39.7。同样,没有先前的价格,不会触发输出。最近价格更新为39.7。
处理第三条记录:stock_4,1602036131741,59.9。价格从43.4变为59.9,差异为16.5,超过阈值10,因此输出(stock_4, 43.4, 59.9)。最近价格更新为59.9。
后续记录:对于stock_2、stock_3、stock_0,由于没有先前的价格,30.1 和79.8直接列出,
但是9.9这个价格要注意
stock_0,默认值为0,这里变为9.9,没有超出阈值10,那么输出就没有。

相关文章:

【Flink-scala】DataStream编程模型之状态编程

DataStream编程模型之状态编程 参考: 1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器 4.【Flink-scal…...

RabbitMQ的核心组件有哪些?

大家好,我是锋哥。今天分享关于【RabbitMQ的核心组件有哪些?】面试题。希望对大家有帮助; RabbitMQ的核心组件有哪些? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 RabbitMQ是一个开源的消息代理(Messag…...

【Linux基础】基本开发工具的使用

目录 一、编译器——gcc/g的使用 gcc/g的安装 gcc的安装: g的安装: gcc/g的基本使用 gcc的使用 g的使用 动态链接与静态链接 程序的翻译过程 1. 一个C/C程序的构建过程,程序从源代码到可执行文件必须经历四个阶段 2. 理解选项的含…...

常见的数据结构和应用场景

数据结构是计算机科学中的基础概念,用于组织和存储数据,以便能够高效地访问和修改。下面是几种常见数据结构及其代表性应用场景: 1. 数组(Array) 问题解决:数组是一种线性数据结构,用于存储相…...

爬虫基础学习

爬虫概念与工作原理 爬虫是什么:爬虫(Web Scraping)是自动化地访问网站并提取数据的技术。它模拟用户浏览器的行为,通过HTTP请求访问网页,解析HTML文档并提取有用信息。 爬虫的基本工作流程: 发送HTTP请求…...

C++对象数组对象指针对象指针数组

一、对象数组 对象数组中的每一个元素都是同类的对象&#xff1b; 例1 对象数组成员的初始化 #include<iostream> using namespace std;class Student { public:Student( ){ };Student(int n,string nam,char s):num(n),name(nam),sex(s){};void display(){cout<&l…...

D96【python 接口自动化学习】- pytest进阶之fixture用法

day96 pytest的fixture详解&#xff08;三&#xff09; 学习日期&#xff1a;20241211 学习目标&#xff1a;pytest基础用法 -- pytest的fixture详解&#xff08;三&#xff09; 学习笔记&#xff1a; fixture(scop"class") (scop"class") 每一个类调…...

【算法】动态规划中01背包问题解析

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…...

选择WordPress和Shopify:搭建对谷歌SEO友好的网站

在建设网站时&#xff0c;不仅要考虑它的美观和功能性&#xff0c;还要关注它是否对谷歌SEO友好。如果你希望网站能够获得更好的搜索排名&#xff0c;WordPress和Shopify是两个值得推荐的建站平台。 WordPress作为最流行的内容管理系统&#xff0c;其强大的灵活性和丰富的插件…...

代理IP与生成式AI:携手共创未来

目录 代理IP&#xff1a;网络世界的“隐形斗篷” 1. 隐藏真实IP&#xff0c;保护隐私 2. 突破网络限制&#xff0c;访问更多资源 生成式AI&#xff1a;创意与效率的“超级大脑” 1. 提高创作效率 2. 个性化定制 代理IP与生成式AI的协同作用 1. 网络安全 2. 内容创作与…...

iOS 应用的生命周期

Managing your app’s life cycle | Apple Developer Documentation Performance and metrics | Apple Developer Documentation iOS 应用的生命周期状态是理解应用如何在不同状态下运行和管理资源的基础。在 iOS 开发中&#xff0c;应用生命周期管理的是应用从启动到终止的整…...

Elasticsearch 集群快照的定期备份设置指南

Elasticsearch 集群快照的定期备份设置指南 概述 快照&#xff1a; 在给定时刻对整个集群或者单个索引进行备份&#xff0c;以便在之后出现故障时可以基于之前备份的快照进行快速恢复。 前提条件&#xff1a; 准备一个备份存储盘&#xff0c;本指南采用的是AWS EFS文件系统做…...

Docker--Docker Image(镜像)

什么是Docker Image&#xff1f; Docker镜像&#xff08;Docker Image&#xff09;是Docker容器技术的核心组件之一&#xff0c;它包含了运行应用程序所需的所有依赖、库、代码、运行时环境以及配置文件等。 简单来说&#xff0c;Docker镜像是一个轻量级、可执行的软件包&…...

C++ 中的序列化和反序列化

一、C 中的序列化和反序列化 &#xff08;一&#xff09;基本概念 在 C 中&#xff0c;序列化是将对象转换为字节流的过程&#xff0c;反序列化则是从字节流重新构建对象的过程。这对于存储对象状态到文件、网络传输等场景非常有用。 &#xff08;二&#xff09;简单的序列化…...

我的Github学生认证申请过程

先说结论&#xff1a;很简单。 学生认证链接&#xff1a;GitHub Education GitHub 1. 首先你得绑定edu邮箱。这个应该没什么问题&#xff0c;Github也会提示。 2. 我是在学校里面、使用流量而非WiFi申请的&#xff0c;听说地理位置很重要&#xff0c;该给的权限&#xff08…...

信奥题解:勾股数计算中的浮点数精度问题

来源:GESP C++ 二级模拟题 本文给出官方参考答案的详细解析,包括每一部分的功能和关键点,以及与浮点数精度相关的问题的分析。 题目描述 勾股数是很有趣的数学概念。如果三个正整数a 、b 、c ,满足 a 2 + b 2 = c 2 a^2 + b^2 = c^2 a2+b2=c2 ,而且1 ≤ a ≤ b ≤ c ,…...

重生之我在学Vue--第2天 Vue 3 Composition API 与响应式系统

重生之我在学Vue–第2天 Vue 3 Composition API 与响应式系统 文章目录 重生之我在学Vue--第2天 Vue 3 Composition API 与响应式系统前言一、Composition API 核心概念1.1 什么是 Composition API&#xff1f;1.2 Composition API 的核心工具1.3 基础用法示例 二、响应式系统2…...

【AI知识】逻辑回归介绍+ 做二分类任务的实例(代码可视化)

1. 分类的基本概念 在机器学习的有监督学习中&#xff0c;分类一种常见任务&#xff0c;它的目标是将输入数据分类到预定的类别中。具体来说&#xff1a; 分类任务的常见应用&#xff1a; 垃圾邮件分类&#xff1a;判断一封电子邮件是否是垃圾邮件 。 医学诊断&#xff1a;…...

Mysql 笔记2 emp dept HRs

-- 注意事项 -- 1.给数据库和表起名字时尽量选择全小写 -- 2.作为筛选条件的字符串是否区分大小写看设置的校对规则utf8_bin 区分 drop database if exists hrs; create database hrs default charset utf8 collate utf8_general_ci;use hrs; drop table if exists tb_emp; dro…...

MySQL和Oracle的区别

MySQL和Oracle的区别 MySQL是轻量型数据库&#xff0c;并且免费&#xff0c;没有服务恢复数据。 Oracle是重量型数据库&#xff0c;收费&#xff0c;Oracle公司对Oracle数据库有任何服务。 1.对事务的提交 MySQL默认是自动提交&#xff0c;而Oracle默认不自动提交&#xff0…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装&#xff08;Encapsulation&#xff09; 定义&#xff1a;将数据&#xff08;属性&#xff09;和操作数据的方法绑定在一起&#xff0c;通过访问控制符&#xff08;private、protected、public&#xff09;隐藏内部实现细节。示例&#xff1a; public …...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

抖音增长新引擎:品融电商,一站式全案代运营领跑者

抖音增长新引擎&#xff1a;品融电商&#xff0c;一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中&#xff0c;品牌如何破浪前行&#xff1f;自建团队成本高、效果难控&#xff1b;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

Java - Mysql数据类型对应

Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

在四层代理中还原真实客户端ngx_stream_realip_module

一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡&#xff08;如 HAProxy、AWS NLB、阿里 SLB&#xff09;发起上游连接时&#xff0c;将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后&#xff0c;ngx_stream_realip_module 从中提取原始信息…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中&#xff0c;UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)

上一章用到了V2 的概念&#xff0c;其实 Fiori当中还有 V4&#xff0c;咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务)&#xff0c;代理中间件&#xff08;ui5-middleware-simpleproxy&#xff09;-CSDN博客…...

技术栈RabbitMq的介绍和使用

目录 1. 什么是消息队列&#xff1f;2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...