当前位置: 首页 > news >正文

Flink 有状态流式处理

传统批次处理方法

在这里插入图片描述
【1】持续收取数据(kafka等),以window时间作为划分,划分一个一个的批次档案(按照时间或者大小等);
【2】周期性执行批次运算(Spark/Stom等);

传统批次处理方法存在的问题:
在这里插入图片描述
【1】假设计算每小时出现特定事件的转换次数(例如:1、2…),但某个事件正好处于1到2之间就尴尬了。需要将1点处理一半的结果带到2点这个批次中进行运算。而这个划分跟我们事件发生的时间也是有误差的。
【2】在分布式多线程的情况下,如果接收到事件的顺序颠倒了,又该如何处理?

理想方法

累积状态:表示过去历史接收过的所有事件。可以是计数或者机器模型等等。
在这里插入图片描述我们要处理一个持续维护的状态时,最适合的方式就是状态流处理(累积状态和维护状态+时间,是不是该收的结果都收到了)
在这里插入图片描述
【1】有状态流处理作为一种新的持续过程范式,处理连续的数据;
【2】产生准确的结果;
【3】实时可用的结果仅为模型的自然结果;

流式处理

流处理系统或者流处理引擎都是数据驱动的,而不是定期或者人为的去触发。数据也没有物理边界。
在这里插入图片描述
一般系统都会把操作符放上去,等待数据的到来进行计算。如下是一个逻辑模型(DAG
在这里插入图片描述

分散式流式处理

在这里插入图片描述
【1】从数据中选择一个属性作为key对输入流进行分区;
【2】使用多个实例,每个实例负责部分key的存储,根据Hash值,相同的key一定落在相同的分区进行处理;
【3】根据流式数据处理的DAG模型,有对应如下的分布式流处理的实例模型。例如A算子拥有两个实例,上游的实例节点可能同时与下游的一个或多个节点进行传输。这些实例根据系统或者人为的因素分配在不同的节点之上。节点与节点之间数据传输也会涉及网络之间的占用。本地的传输就不需要走网络
在这里插入图片描述

有状态分散式流式处理

定义一个变量X,输出结果依据这个X,这个X就是一个状态。有状态分散的流失处理引擎,当状态可能会累计非常大。当key比较多的时候就会超出单台节点的负荷量。这个x就应该有状态后台使用memory去维护它。【数据倾斜】
在这里插入图片描述

状态容错(State Fault Tolerance)

状态挂了,如何确保状态拥有精确一次(exactly-onceguarantee)的容错保证?就是通过定期的快照+事件日志位置。我们先假设一个简单的场景,如下,一个队列在不断的传输数据。单一的process在处理数据。这个process没处理一个数据都会累计一个状态。如何为这个process做一个容错。做法就是没处理完一笔,更改完状态之后,就做一次快照(包含它处理的数据在队列中的位置和它处理到的位置以及当时的状态进行对比)
在这里插入图片描述
举个例子:如下我处理到第二笔数据,我就会记录下第二个位置在进入process之前的信息(位置X+状态@X
在这里插入图片描述
当进入process处理的时候出现了fail时,Flink就会根据上一次的位置+状态进行恢复。

如何在分散式场景下替多个拥有本地状态的运算子产生一个全域一致的快照(global consistent snapshot)?
方式一:更改该任务流过的所有运算子的状态。比较笨,有一个副作用,就是我处理完这笔数据,它应该就到了一个process,我本应该做其他数据的处理了,可是为了全局一致性快照就会停止前面和当前的process的运算来保证全局一致性。
在这里插入图片描述

分散式状态容错

通过checkpoint实现分散式状态容错
在这里插入图片描述
每一个运算子它本地都有一个维护一个状态,当要产生一个检查点(checkpoit)的时候,都会将这个检查点存储在一个更小的分布式文件系统DFS中。当出现某个算子fail之后,就会从所有的checkpoint中获取所有算子的上一个状态进行恢复。把消息队列的位置也进行恢复。也就是多线程工作,每一个任务在DFS中就可以看作一个线程,它们数据存储的key就是这个任务,每一个算子的处理状态都会按照处理顺序添加进去。

分布式快照(Distributed Snapshots)

更重要是时如何在不中断运算的前提下生成快照?其实就是给每一个任务标记一个checkpoint n不同的任务这个n是不同的,相同的任务在不同的算子里面它是相同的。具体我们把这个分解后看看。
在这里插入图片描述
【1】如下图,当我们从数据源获取数据的时候,其实我们已经开始有状态了,这个时候我们可以把任务处理的整个过程抽象成如下图中的一张表。
在这里插入图片描述
【2】首先是数据源的状态,就是数据在操作前的一个位置offset进行快照存储,如下图所示:
在这里插入图片描述
【3】当获取到数据源之后,就进入算子中进行处理,此时就会对数据进入之前的状态进行checkpoint。记录一个savepoint
在这里插入图片描述
【4】在最后一次操作前(输出)也会记录checkpoint。在这个过程中,其实前面的算子也在产生不同的 checkpoint n-1 等。如果要进行恢复使用的话,必须是一个complete完整的Checkpoint。只有部分数据的Checkpoint是不能使用的。
在这里插入图片描述

状态维护(State Management)

本地维护的这个状态可能非常非常大。后端的管理系统一般使用内存维护这些状态。
在这里插入图片描述
Flink提供了两种状态后端:JVM Heap状态后端,适合比较小的状态,量不要很大。当运算子action要读取状态的时候,都是一个Java对象的read或者write。当要产生一个检查点的时候,需要将每个运算子的本地状态数据通过序列化存储在DFS中,
在这里插入图片描述
当状态非常大的时候就不能使用JVM Heap的时候,就需要用到RocksDB。当算子需要读取的时候本地state的时候需要进行序列化操作从而节省内存,同时,当需要进行checkpointDFS时,也少了序列化的步骤。它也会给本地存储一份,当fail的时候就可以很快恢复,提高效率。
在这里插入图片描述

Event-time 处理

EventTime是事件产生的时间。
在这里插入图片描述
下面是一张,程序处理时间与事件发生时间的时间差的一张对比图来更好的理解EventTime
在这里插入图片描述

Event-Time 处理

也就是说我们要统计的3-4点之间的数据,程序4点结束这个执行不是根据window时间,而是根据event-Time
在这里插入图片描述

Watermarks

Flinkwatermarks实现Event-Time功能的。在Flink里面也属于一个特殊事件,精髓是当某个运算子收到一个带有时间戳twatermark后就不会再收到任何小于该时间戳的事件了。也就是当window需要统计4点的数据时,例如我们每5分钟发一次watermark,那么当window收到4.05watermark的时候才会去统计4点之前的数据(下一次)。如果4.05收到了4点之前的数据的话,Flink1.5会把这个事件输出到旁路输出(side output),你可以获取出来,进行处理。目前有一个问题就是:如果某个Stream Partition 没有输入了,也就没有Watermarks。那么window就没办法进行处理了。当多个数据流的watermarks不相同的时候,Flink会取最小的watermarks进行运算。可以在接收到资源的时候通过代码设置watermarks

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

在这里插入图片描述

状态保存与迁移(Savenpoints and Job Migration)

可以想成:一个手动产生的检查点(CheckPoint):保存点记录某一个流失应用中的所有运算中的状态。当触发SavePoint之后,Flink提供了两种选择停止消费或者继续运算,根据场景定义。
在这里插入图片描述
执行停止之前,产生一个保存点。就可以解决上面提到的3个问题。
在这里插入图片描述
从保存点恢复新的执行,这个时候,例如我们重启花了30分钟,这段事件kafka还在不断的接收新的数据。恢复之后,Flink就需要从当时记录的kafka位置赶上最新的位置。这个时候利用Event-Time处理新的数据都是事件发生时的数据,这个时候再跟程序执行的时间比较就更能体现Event-time的价值。
在这里插入图片描述

相关文章:

Flink 有状态流式处理

传统批次处理方法 【1】持续收取数据&#xff08;kafka等&#xff09;&#xff0c;以window时间作为划分&#xff0c;划分一个一个的批次档案&#xff08;按照时间或者大小等&#xff09;&#xff1b; 【2】周期性执行批次运算&#xff08;Spark/Stom等&#xff09;&#xff1b…...

LeetCode //C - 1071. Greatest Common Divisor of Strings

1071. Greatest Common Divisor of Strings For two strings s and t, we say “t divides s” if and only if s t … t (i.e., t is concatenated with itself one or more times). Given two strings str1 and str2, return the largest string x such that x divides …...

智能优化算法应用:基于群居蜘蛛算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于群居蜘蛛算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于群居蜘蛛算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.群居蜘蛛算法4.实验参数设定5.算法结果6.…...

AtCoder Beginner Contest 332

E - Lucky bag(简单状态压缩dp&#xff09; 题目链接 题意&#xff1a;给你n个物品&#xff0c;m个福袋&#xff0c;让你将这n个物品用m个福袋打包(福袋可以为空&#xff09;&#xff0c;让分完之后的总方差最小&#xff0c;输出最小方差。 思路&#xff1a;其实由题目的数据…...

华为OD试题二(文件目录大小、相对开音节、找最小数)

1. 文件目录大小 题目描述&#xff1a; 一个文件目录的数据格式为&#xff1a;目录id&#xff0c;本目录中文件大小&#xff0c;(子目录id 列表)。其中目录id全局唯一&#xff0c;取值范围[1,200]&#xff0c;本目录中文件大小范 围[1,1000]&#xff0c;子目录id列表个数[0,10…...

【Spark精讲】Spark作业执行原理

基本流程 用户编写的Spark应用程序最开始都要初始化SparkContext。 用户编写的应用程序中&#xff0c;每执行一个action操作&#xff0c;就会触发一个job的执行&#xff0c;一个应用程序中可能会生成多个job执行。一个job如果存在宽依赖&#xff0c;会将shuffle前后划分成两个…...

Docker容器:Centos7搭建Docker镜像私服harbor

目录 1、安装docker 1.1、前置条件 1.2、查看当前操作系统的内核版本 1.3、卸载旧版本(可选) 1.4、安装需要的软件包 1.5、设置yum安装源 1.6、查看docker可用版本 1.7、安装docker 1.8、开启docker服务 1.9、安装阿里云镜像加速器 1.10、设置docker开机自启 2、安…...

ClickHouse安装和部署

ClickHouse安装过程&#xff1a; ClickHouse支持运行在主流64位CPU架构&#xff08;X86、AArch和PowerPC&#xff09;的Linux操作 系统之上&#xff0c;可以通过源码编译、预编译压缩包、Docker镜像和RPM等多种方法进行安装。由于篇幅有限&#xff0c;本节着重讲解离线RPM的安…...

Spring Cloud Gateway中对admin端点进行认证

前言 我们被扫了一个漏洞&#xff0c;SpringBoot Actuator 未授权访问&#xff0c;漏洞描述是这样的&#xff1a; Actuator 是 springboot 提供的用来对应用系统进行自省和监控的功能模块&#xff0c;借助于 Actuator 开发者可以很方便地对应用系统某些监控指标进行查看、统计…...

2. 如何通过公网IP端口映射访问到设备的vmware虚拟机的ubuntu服务器

文章目录 1. 主机设备是Windows 11系统2. 安装vmware虚拟机3. 创建ubuntu虚拟机&#xff08;据说CentOS 7 明年就不维护了&#xff0c;就不用这个版本的linux了&#xff09;4. 安装nginx服务:默认端口805. 安装ssh服务:默认端口226. 设置主机 -> ubuntu的端口映射7. 设置路由…...

配置android sudio出现的错误

导入demo工程&#xff0c;配置过程参考&#xff1a; AndroidStudio导入项目的正确方式&#xff0c;修改gradle配置 错误&#xff1a;Namespace not specified. Specify a namespace in the module’s build file. 并定位在下图位置&#xff1a; 原因&#xff1a;Android 大括号…...

【初阶C++】前言

C前言 1. 什么是C2. C发展史3. C的重要性4. 如何学习C 1. 什么是C C语言是结构化和模块化的语言&#xff0c;适合处理较小规模的程序。对于复杂的问题&#xff0c;规模较大的程序&#xff0c;需要高度的抽象和建模时&#xff0c;C语言则不合适。为了解决软件危机&#xff0c; …...

MAC IDEA Maven Springboot

在mac中&#xff0c;使用idea进行maven项目构建 环境配置如何运行maven项目1.直接在IDEA中运行2.使用jar打包后执行 如何搭建spring boot1.添加依赖2.创建入口类3.创建控制器4. 运行5.其他 环境配置 官网安装IDEA使用IDEA的创建新项目选择创建MAEVEN项目测试IDEA的MAVEN路径是…...

Angular13无法在浏览器debug

前言 本文将介绍如何解决在Angular 13中无法在浏览器中进行调试的问题&#xff0c;并提供了一种解决方法。 发生场景 根据项目需求&#xff0c;升级至Angular 13后&#xff0c;发现无法在浏览器中进行调试。 问题原因 无法进行调试的原因是&#xff0c;当使用Angular 13的…...

H.264与H.265(HEVC):视频编码的演进

目录 H.264的发展历程 1. 标准发布 2. 广泛应用 3. 专业化应用 H.265的出现...

Python从入门到精通九:Python异常、模块与包

了解异常 什么是异常 当检测到一个错误时&#xff0c;Python解释器就无法继续执行了&#xff0c;反而出现了一些错误的提示&#xff0c;这就是所谓的“异常”, 也就是我们常说的BUG bug单词的诞生 早期计算机采用大量继电器工作&#xff0c;马克二型计算机就是这样的。 19…...

无需公网IP联机Minecraft,我的世界服务器本地搭建教程

目录 前言 1.Mcsmanager安装 2.创建Minecraft服务器 3.本地测试联机 4. 内网穿透 4.1 安装cpolar内网穿透 4.2 创建隧道映射内网端口 5.远程联机测试 6. 配置固定远程联机端口地址 6.1 保留一个固定TCP地址 6.2 配置固定TCP地址 7. 使用固定公网地址远程联机 8.总…...

机器学习-SVM(支持向量机)

推荐课程&#xff1a;【机器学习实战】第5期 支持向量机 |数据分析|机器学习|算法|菊安酱_哔哩哔哩_bilibili 赞美菊神ヾ ( ゜ⅴ゜)&#xff89; 一、什么是支持向量机&#xff1f; 支持向量机&#xff08;Support Vector Machine, SVM&#xff09;是一类按监督学习&#xff0…...

保姆级:Windows Server 2012上安装.NET Framework 3.5

&#x1f4da;&#x1f4da; &#x1f3c5;我是默&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; ​​ &#x1f31f;在这里&#xff0c;我要推荐给大家我的专栏《Windows》。&#x1f3af;&#x1f3af; &#x1f680;无论你是编程小白&#xff0c;还是有…...

昇腾910安装驱动出错,降低Centos7.6的内核版本

零、问题描述&#xff1a; 在安装Atlas800-9000服务器的驱动的时候&#xff0c;可能会出现错误&#xff1a;Dkms install failed, details in : /var/log/ascend_seclog/ascend_install.log 如下所示&#xff1a; [rootlocalhost ~]# ./Ascend-hdk-910-npu-driver_23.0.rc3_l…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接&#xff1a;3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖

在前面的练习中&#xff0c;每个页面需要使用ref&#xff0c;onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入&#xff0c;需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

Unit 1 深度强化学习简介

Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库&#xff0c;例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体&#xff0c;比如 SnowballFight、Huggy the Do…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

Netty从入门到进阶(二)

二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架&#xff0c;用于…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...