当前位置: 首页 > news >正文

【Flink-scala】DataStream编程模型之状态编程

DataStream编程模型之状态编程

参考:
1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出
2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序
3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器
4.【Flink-scala】DataStream编程模型之水位线
5.【Flink-scala】DataStream编程模型之延迟数据处理


文章目录

  • DataStream编程模型之状态编程
  • 前言
  • 一、状态编程相关概念
    • 1.1Flink中状态始终与特定算子相关联
    • 1.2 演示代码
    • 1.3 状态编程程序输入输出


前言

流计算分为无状态和有状态两种,无状态是观察每个独立事件,根据最后一个事件输出结果。比如传感器只关注当前的水位量,超出水位量就发生报警事件。
有状态计算则会基于多个事件输出结果。比如计算过去1小时的水位平均值,那就是状态的计算。

一、状态编程相关概念

流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态计算。

在传统的批处理中,数据的划分为块分片去完成的,每个task处理一个分片,执行完成后,把结果聚合起来就是最终的结果,这个过程中,对状态的需求还是较少的。

但对于流计算而言,它对状态有着非常高的要求,因为在流系统中,输入是一个无限制的流,会运行很长一段时间,甚至运行几天或者几个月都不会停机。在这个过程当中,就需要把状态数据很好地管理起来

1.1Flink中状态始终与特定算子相关联

分为算子状态和键控状态
在这里插入图片描述
算子状态的作用范围限定为算子任务,这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。

算子状态不能由相同或不同算子的另一个任务访问

键控状态是根据输入数据流中定义的键来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据分区到同一个算子任务中,这个任务会维护和处理这个键对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的键。因此,具有相同键的所有数据都会访问相同状态

在这里插入图片描述

1.2 演示代码

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double)object StateTest {def main(args: Array[String]): Unit = {//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment   
//设定程序并行度env.setParallelism(1) //创建数据源val source = env.socketTextStream("localhost", 9999) //指定针对数据流的转换操作逻辑val stockDataStream = source.map(s => s.split(",")).map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))val alertStream = stockDataStream.keyBy(_.stockId).flatMap(new PriceChangeAlert(10))//新建了一个PriceChangeAlert类  这里重新了flatmap方法// 打印输出alertStream.print() //触发程序执行env.execute("state test")}class PriceChangeAlert(threshold: Double) extends RichFlatMapFunction[StockPrice,(String, Double, Double)]{//定义状态保存上一次的价格lazy val lastPriceState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("last-price",classOf[Double]))override def flatMap(value: StockPrice, out: Collector[(String, Double, Double)]): Unit = {// 获取上次的价格
val lastPrice = lastPriceState.value()
//跟最新的价格求差值做比较val diff = (value.price-lastPrice).absif( diff > threshold)out.collect((value.stockId,lastPrice,value.price))//更新状态lastPriceState.update(value.price)}}
}

代码分析:
1.传入参数,阈值
2.继承里接受一个stockPrice类型的输入,一个(String,Double,Double)三元组的输出。

String,Double,Double
case class StockPrice(stockId:String,timeStamp:Long,price:Double)

有什么不同呢,两个double代表了两个价格:分别代表股票ID、上次价格、当前价格。

3.ValueState是Flink中用于保存单个值的状态。这里它被用来保存上一次处理的股票价格。lazy关键字意味着这个状态变量只有在第一次被使用时才会被初始化
4…getState(new ValueStateDescriptor[Double](“last-price”, classOf[Double])): 这个方法尝试从运行时上下文中检索一个名为 “last-price” 的 ValueState,如果状态不存在,它将根据提供的 ValueStateDescriptor 创建一个新的状态。

ValueStateDescriptor 包含了状态的名称(代码中是 “last-price”)和状态的值的类型(这个代码中是 Double)。
5. classOf[Double] 提供了状态的值的类型信息。
6. 重写的flatmap应该能看懂,主要是当当前价格超出阈值(代码中是10),就打印。

1.3 状态编程程序输入输出

输入:

stock_4,1602031562148,43.4
stock_1,1602036130952,39.7
stock_4,1602036131741,59.9
stock_2,1602036132184,30.1
stock_3,1602036133154,79.8
stock_0,1602036133919,9.9
stock_1,1602036134385,21.7

输出:

(stock_4,0.0,43.4)
(stock_1,0.0,39.7)
(stock_4,43.4,59.9)
(stock_2,0.0,30.1)
(stock_3,0.0,79.8)
(stock_1,39.7,21.7)

其中根据stock_id分类。

初始状态:所有stockId的最近价格都是未定义的(即null或None,在代码中表现为Double的默认值0.0,因为ValueState在初始化时未设置值)。

处理第一条记录:stock_4,1602031562148,43.4。由于没有先前的价格,不会触发输出。最近价格更新为43.4。
处理第二条记录:stock_1,1602036130952,39.7。同样,没有先前的价格,不会触发输出。最近价格更新为39.7。
处理第三条记录:stock_4,1602036131741,59.9。价格从43.4变为59.9,差异为16.5,超过阈值10,因此输出(stock_4, 43.4, 59.9)。最近价格更新为59.9。
后续记录:对于stock_2、stock_3、stock_0,由于没有先前的价格,30.1 和79.8直接列出,
但是9.9这个价格要注意
stock_0,默认值为0,这里变为9.9,没有超出阈值10,那么输出就没有。

相关文章:

【Flink-scala】DataStream编程模型之状态编程

DataStream编程模型之状态编程 参考: 1.【Flink-Scala】DataStream编程模型之数据源、数据转换、数据输出 2.【Flink-scala】DataStream编程模型之 窗口的划分-时间概念-窗口计算程序 3.【Flink-scala】DataStream编程模型之窗口计算-触发器-驱逐器 4.【Flink-scal…...

RabbitMQ的核心组件有哪些?

大家好,我是锋哥。今天分享关于【RabbitMQ的核心组件有哪些?】面试题。希望对大家有帮助; RabbitMQ的核心组件有哪些? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 RabbitMQ是一个开源的消息代理(Messag…...

【Linux基础】基本开发工具的使用

目录 一、编译器——gcc/g的使用 gcc/g的安装 gcc的安装: g的安装: gcc/g的基本使用 gcc的使用 g的使用 动态链接与静态链接 程序的翻译过程 1. 一个C/C程序的构建过程,程序从源代码到可执行文件必须经历四个阶段 2. 理解选项的含…...

常见的数据结构和应用场景

数据结构是计算机科学中的基础概念,用于组织和存储数据,以便能够高效地访问和修改。下面是几种常见数据结构及其代表性应用场景: 1. 数组(Array) 问题解决:数组是一种线性数据结构,用于存储相…...

爬虫基础学习

爬虫概念与工作原理 爬虫是什么:爬虫(Web Scraping)是自动化地访问网站并提取数据的技术。它模拟用户浏览器的行为,通过HTTP请求访问网页,解析HTML文档并提取有用信息。 爬虫的基本工作流程: 发送HTTP请求…...

C++对象数组对象指针对象指针数组

一、对象数组 对象数组中的每一个元素都是同类的对象&#xff1b; 例1 对象数组成员的初始化 #include<iostream> using namespace std;class Student { public:Student( ){ };Student(int n,string nam,char s):num(n),name(nam),sex(s){};void display(){cout<&l…...

D96【python 接口自动化学习】- pytest进阶之fixture用法

day96 pytest的fixture详解&#xff08;三&#xff09; 学习日期&#xff1a;20241211 学习目标&#xff1a;pytest基础用法 -- pytest的fixture详解&#xff08;三&#xff09; 学习笔记&#xff1a; fixture(scop"class") (scop"class") 每一个类调…...

【算法】动态规划中01背包问题解析

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…...

选择WordPress和Shopify:搭建对谷歌SEO友好的网站

在建设网站时&#xff0c;不仅要考虑它的美观和功能性&#xff0c;还要关注它是否对谷歌SEO友好。如果你希望网站能够获得更好的搜索排名&#xff0c;WordPress和Shopify是两个值得推荐的建站平台。 WordPress作为最流行的内容管理系统&#xff0c;其强大的灵活性和丰富的插件…...

代理IP与生成式AI:携手共创未来

目录 代理IP&#xff1a;网络世界的“隐形斗篷” 1. 隐藏真实IP&#xff0c;保护隐私 2. 突破网络限制&#xff0c;访问更多资源 生成式AI&#xff1a;创意与效率的“超级大脑” 1. 提高创作效率 2. 个性化定制 代理IP与生成式AI的协同作用 1. 网络安全 2. 内容创作与…...

iOS 应用的生命周期

Managing your app’s life cycle | Apple Developer Documentation Performance and metrics | Apple Developer Documentation iOS 应用的生命周期状态是理解应用如何在不同状态下运行和管理资源的基础。在 iOS 开发中&#xff0c;应用生命周期管理的是应用从启动到终止的整…...

Elasticsearch 集群快照的定期备份设置指南

Elasticsearch 集群快照的定期备份设置指南 概述 快照&#xff1a; 在给定时刻对整个集群或者单个索引进行备份&#xff0c;以便在之后出现故障时可以基于之前备份的快照进行快速恢复。 前提条件&#xff1a; 准备一个备份存储盘&#xff0c;本指南采用的是AWS EFS文件系统做…...

Docker--Docker Image(镜像)

什么是Docker Image&#xff1f; Docker镜像&#xff08;Docker Image&#xff09;是Docker容器技术的核心组件之一&#xff0c;它包含了运行应用程序所需的所有依赖、库、代码、运行时环境以及配置文件等。 简单来说&#xff0c;Docker镜像是一个轻量级、可执行的软件包&…...

C++ 中的序列化和反序列化

一、C 中的序列化和反序列化 &#xff08;一&#xff09;基本概念 在 C 中&#xff0c;序列化是将对象转换为字节流的过程&#xff0c;反序列化则是从字节流重新构建对象的过程。这对于存储对象状态到文件、网络传输等场景非常有用。 &#xff08;二&#xff09;简单的序列化…...

我的Github学生认证申请过程

先说结论&#xff1a;很简单。 学生认证链接&#xff1a;GitHub Education GitHub 1. 首先你得绑定edu邮箱。这个应该没什么问题&#xff0c;Github也会提示。 2. 我是在学校里面、使用流量而非WiFi申请的&#xff0c;听说地理位置很重要&#xff0c;该给的权限&#xff08…...

信奥题解:勾股数计算中的浮点数精度问题

来源:GESP C++ 二级模拟题 本文给出官方参考答案的详细解析,包括每一部分的功能和关键点,以及与浮点数精度相关的问题的分析。 题目描述 勾股数是很有趣的数学概念。如果三个正整数a 、b 、c ,满足 a 2 + b 2 = c 2 a^2 + b^2 = c^2 a2+b2=c2 ,而且1 ≤ a ≤ b ≤ c ,…...

重生之我在学Vue--第2天 Vue 3 Composition API 与响应式系统

重生之我在学Vue–第2天 Vue 3 Composition API 与响应式系统 文章目录 重生之我在学Vue--第2天 Vue 3 Composition API 与响应式系统前言一、Composition API 核心概念1.1 什么是 Composition API&#xff1f;1.2 Composition API 的核心工具1.3 基础用法示例 二、响应式系统2…...

【AI知识】逻辑回归介绍+ 做二分类任务的实例(代码可视化)

1. 分类的基本概念 在机器学习的有监督学习中&#xff0c;分类一种常见任务&#xff0c;它的目标是将输入数据分类到预定的类别中。具体来说&#xff1a; 分类任务的常见应用&#xff1a; 垃圾邮件分类&#xff1a;判断一封电子邮件是否是垃圾邮件 。 医学诊断&#xff1a;…...

Mysql 笔记2 emp dept HRs

-- 注意事项 -- 1.给数据库和表起名字时尽量选择全小写 -- 2.作为筛选条件的字符串是否区分大小写看设置的校对规则utf8_bin 区分 drop database if exists hrs; create database hrs default charset utf8 collate utf8_general_ci;use hrs; drop table if exists tb_emp; dro…...

MySQL和Oracle的区别

MySQL和Oracle的区别 MySQL是轻量型数据库&#xff0c;并且免费&#xff0c;没有服务恢复数据。 Oracle是重量型数据库&#xff0c;收费&#xff0c;Oracle公司对Oracle数据库有任何服务。 1.对事务的提交 MySQL默认是自动提交&#xff0c;而Oracle默认不自动提交&#xff0…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现

目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

基于服务器使用 apt 安装、配置 Nginx

&#x1f9fe; 一、查看可安装的 Nginx 版本 首先&#xff0c;你可以运行以下命令查看可用版本&#xff1a; apt-cache madison nginx-core输出示例&#xff1a; nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

在WSL2的Ubuntu镜像中安装Docker

Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包&#xff1a; for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...

Vue ③-生命周期 || 脚手架

生命周期 思考&#xff1a;什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09; 什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a; 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...

02.运算符

目录 什么是运算符 算术运算符 1.基本四则运算符 2.增量运算符 3.自增/自减运算符 关系运算符 逻辑运算符 &&&#xff1a;逻辑与 ||&#xff1a;逻辑或 &#xff01;&#xff1a;逻辑非 短路求值 位运算符 按位与&&#xff1a; 按位或 | 按位取反~ …...