当前位置: 首页 > article >正文

Flink知识点(二)|Flink中是怎么处理乱序数据的

在 Flink 里“乱序”本质是事件时间event time先后顺序和到达时间processing time不一致。Flink 处理乱序数据的核心机制主要围绕事件时间语义 Watermark 窗口触发/延迟 迟到数据处理 状态与一致性。一、时间语义Flink中有三种时间语义​Processing Time按机器当前时间处理最简单、吞吐高但对乱序不敏感乱序会直接导致窗口统计不准。​Event Time最常用​按事件自带时间戳处理Flink 用Watermark推进“事件时间进度”从而容忍一定乱序。Ingestion Time介于两者之间进入 Flink 时打时间戳现在用得较少。乱序处理几乎都建立在Event Time上。‍二、WatermarkWatermark是乱序容忍与“事件时间进度”的关键。可以理解为“我认为未来不会再来时间戳 ≤ watermark的事件了”近似判断允许误差。常见生成策略​有界乱序(Bounded Out-of-Orderness)最常用设定最大乱序程度maxOutOfOrderness例如 5swatermark ≈ 当前观测到的最大事件时间 - 5s→ 能容忍 5 秒内乱序超过就可能变成迟到数据。​单调递增Monotonous Timestamps适合源数据严格按时间递增基本无乱序。​自定义 WatermarkGenerator适合多分区、多来源、需要特殊规则比如按业务字段分组、按分区对齐等。关键点Watermark 不是“等数据到齐”而是“推进时间并触发计算”的机制。对于并行 source下游算子的 watermark 通常取各输入分区 watermark 的最小值因此某个分区卡住会拖慢整体事件时间推进。‍三、窗口Window与触发Trigger乱序数据最典型场景是做窗口聚合滚动、滑动、会话窗口。Flink 的窗口何时“关窗”默认基于事件时间时当watermarkwindow_end时触发窗口计算并输出。允许迟到Allowed Lateness延迟关窗即使窗口第一次输出了也可以设置​allowedLatenessX​窗口在window_end​ 后再额外等 X 时间在这段时间内到来的迟到事件仍会进入窗口并触发 ​更新输出取决于输出模式/下游算子。超过 allowedLateness 的事件才会被视为“最终迟到”。Trigger / Evictor高级​Trigger自定义触发逻辑例如每来一条就触发、每 N 秒触发一次、同时满足事件数/时间等。​Evictor触发前/后剔除窗口内元素较少用成本高通常能用聚合/ProcessFunction替代。‍四、迟到数据Late Events处理当事件到达时它的事件时间戳已经 ​落后于 watermark以及超过 allowed lateness就会变成迟到数据。常见处理策略直接丢弃默认简单但会损失数据很多实时大盘能接受。侧输出Side Output收集迟到数据把迟到数据打到一个旁路流做补偿计算、落库、离线回补或告警。允许迟到并更新结果Allowed Lateness窗口结果会被修正需要下游能接收“更新/撤回”语义或你用 upsert sink。用更长乱序容忍更大的 watermark 延迟减少迟到但会增加延迟latency——典型的准确性 vs 延迟权衡。‍五、有序输出与排序如果你要的是“​按事件时间严格有序输出”Flink 不会全局帮你排序成本太高但你可以Keyed 后用KeyedProcessFunction 状态 定时器将事件暂存一段时间比如 5s等 watermark/定时器到了再按时间戳输出。代价更多状态、更高延迟、可能出现内存/状态膨胀需要 TTL、容量控制。‍六、与乱序紧密相关的运行时机制乱序本身还会影响系统行为​状态State窗口/乱序缓存都依赖 state乱序越大、allowed lateness 越大state 留存越久。​State TTL防止“永不关闭”的 key 造成状态无限增长。​Checkpoint Exactly-Once保证乱序场景下也能在故障恢复后维持一致结果尤其窗口更新、迟到补偿更依赖一致性。反压Backpressurewatermark 推进慢、窗口堆积、state 变大都可能导致下游慢→反压。‍七、常见的选型建议目标低延迟优先允许少量误差用较小乱序 watermark如 1-3s不设置/少设置 allowed lateness迟到侧输出做补偿。目标结果尽量准确延迟可接受watermark 延迟设大一些如 10-60s再配 allowed latenesssink 用 upsert/幂等写支持更新。目标必须严格按事件时间有序输出KeyedProcessFunction 缓存排序 watermark/定时器释放但要严格控制状态与延迟。‍八、代码示例8.1 DataStream API8.1.1 定义Watermark事件模型 Watermark容忍乱序publicstaticclassEvent{publicStringuserId;publicStringtype;// e.g. clickpubliclongeventTime;// epoch millispublicdoubleamount;// optionalpublicEvent(){}publicEvent(StringuserId,Stringtype,longeventTime,doubleamount){this.userIduserId;this.typetype;this.eventTimeeventTime;this.amountamount;}}WatermarkStrategy容忍 5 秒乱序 source 空闲检测避免拖慢 watermarkimportorg.apache.flink.api.common.eventtime.*;importjava.time.Duration;WatermarkStrategyEventwmWatermarkStrategy.EventforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((e,ts)-e.eventTime).withIdleness(Duration.ofSeconds(30));// 可选分区空闲 30s 视为 idle‍8.1.2 窗口聚合// 典型大盘统计10 秒滚动窗口允许再等 10 秒 修正超过 allowed lateness 的走侧输出。importorg.apache.flink.api.common.functions.AggregateFunction;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.*;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.util.OutputTag;publicclassWindowAggDemo{privatestaticfinalOutputTagEventLATE_TAGnewOutputTagEvent(late-events){};publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamEventsourceenv.fromElements(newEvent(u1,click,1_000L,1),newEvent(u1,click,4_000L,1),newEvent(u1,click,3_000L,1),// 乱序newEvent(u1,click,12_000L,1),newEvent(u1,click,2_000L,1)// 可能迟到);WatermarkStrategyEventwmWatermarkStrategy.EventforBoundedOutOfOrderness(java.time.Duration.ofSeconds(5)).withTimestampAssigner((e,ts)-e.eventTime).withIdleness(java.time.Duration.ofSeconds(30));SingleOutputStreamOperatorEventwithWmsource.assignTimestampsAndWatermarks(wm);SingleOutputStreamOperatorTuple2String,LongcntwithWm.keyBy(e-e.userId).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(10))// 窗口结束后再等 10s.sideOutputLateData(LATE_TAG)// 超过 allowed lateness 的进侧输出.aggregate(newCountAgg());cnt.print(WINDOW);cnt.getSideOutput(LATE_TAG).print(LATE);env.execute(Window Agg Out-of-Order Demo);}publicstaticclassCountAggimplementsAggregateFunctionEvent,Long,Tuple2String,Long{privateStringkey;OverridepublicLongcreateAccumulator(){return0L;}OverridepublicLongadd(Eventv,Longacc){keyv.userId;returnacc1;}OverridepublicTuple2String,LonggetResult(Longacc){returnTuple2.of(key,acc);}OverridepublicLongmerge(Longa,Longb){returnab;}}}‍8.1.3 JoinInterval Join// 两条流都要 assign watermarks否则事件时间 join 不会按预期工作。importorg.apache.flink.streaming.api.datastream.*;importorg.apache.flink.streaming.api.functions.co.ProcessJoinFunction;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.util.Collector;// left stream: ordersDataStreamEventorders...;// 记得 assignTimestampsAndWatermarks(wm)// right stream: paymentsDataStreamEventpayments...;// 记得 assignTimestampsAndWatermarks(wm)SingleOutputStreamOperatorStringjoinedorders.assignTimestampsAndWatermarks(wm).keyBy(e-e.userId).intervalJoin(payments.assignTimestampsAndWatermarks(wm).keyBy(e-e.userId))// 匹配条件payment.eventTime 在 order.eventTime 之后 0~600s.between(Time.seconds(0),Time.minutes(10))// 可选迟到容忍Interval Join 的迟到处理主要靠 watermark 这个参数// .withLowerBoundExclusive() / .withUpperBoundExclusive() 也可用.process(newProcessJoinFunctionEvent,Event,String(){OverridepublicvoidprocessElement(Eventleft,Eventright,Contextctx,CollectorStringout){out.collect(JOIN userleft.userId orderTsleft.eventTime payTsright.eventTime);}});joined.print(INTERVAL_JOIN);Interval Join 的关键风险状态膨胀join 本质要把一侧数据在 state 里“留一段时间等待匹配”等待窗口大between 范围大 乱序大watermark 延迟大 state 留得更久‍Window Joinimportorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;DataStreamEventa...;// with watermarksDataStreamEventb...;// with watermarksDataStreamStringwindowJoineda.keyBy(e-e.userId).join(b.keyBy(e-e.userId)).where(e-e.userId).equalTo(e-e.userId).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply((left,right)-WIN_JOIN userleft.userId aTsleft.eventTime bTsright.eventTime);windowJoined.print(WINDOW_JOIN);‍8.2 Flink SQL8.2.1 SQL声明 watermark TUMBLE 窗口聚合CREATETABLEevents(user_id STRING,typeSTRING,tsTIMESTAMP(3),WATERMARKFORtsASts-INTERVAL5SECOND)WITH(connectorkafka,topicevents,properties.bootstrap.serverslocalhost:9092,properties.group.idg1,scan.startup.modeearliest-offset,formatjson);CREATETABLEsink_print(window_startTIMESTAMP(3),window_endTIMESTAMP(3),user_id STRING,cntBIGINT)WITH(connectorprint);INSERTINTOsink_printSELECTwindow_start,window_end,user_id,COUNT(*)AScntFROMTABLE(TUMBLE(TABLEevents,DESCRIPTOR(ts),INTERVAL10SECOND))GROUPBYwindow_start,window_end,user_id;‍8.2.2 SQL流-流 Interval Join-- 写法核心两张表都要 watermark并在 ON 条件里写清楚时间范围。CREATETABLEorders(order_id STRING,user_id STRING,tsTIMESTAMP(3),WATERMARKFORtsASts-INTERVAL5SECOND)WITH(...);CREATETABLEpayments(pay_id STRING,user_id STRING,tsTIMESTAMP(3),WATERMARKFORtsASts-INTERVAL5SECOND)WITH(...);CREATETABLEsink_join(order_id STRING,pay_id STRING,user_id STRING,order_tsTIMESTAMP(3),pay_tsTIMESTAMP(3))WITH(connectorprint);INSERTINTOsink_joinSELECTo.order_id,p.pay_id,o.user_id,o.tsASorder_ts,p.tsASpay_tsFROMorders oJOINpayments pONo.user_idp.user_id-- payment 在 order 之后 0~10分钟内ANDp.tsBETWEENo.tsANDo.tsINTERVAL10MINUTE;强烈建议SQL 里配状态 TTL防止 join 状态无限长-- 让 join / 聚合等算子的 state 有 TTL示例 2 小时你按业务改SETtable.exec.state.ttl2 h;‍

相关文章:

Flink知识点(二)|Flink中是怎么处理乱序数据的

在 Flink 里,“乱序”本质是 事件时间(event time) 先后顺序和 到达时间(processing time) 不一致。Flink 处理乱序数据的核心机制主要围绕:事件时间语义 Watermark 窗口触发/延迟 迟到数据处理 状态…...

庭院桌椅一上AI就穿帮,我后来这样挑工具

“这把椅子怎么是悬着的?”客户把截图放大给我看,草地上的阴影往左跑,椅脚却像踩在空气里,藤编靠背还多长出两截。地点就在样板庭院旁的会议桌边,反常的是:那批图第一眼都像宣传片,真拿来做户外…...

30+北漂程序员2个月零基础闯入大模型圈,拿下2W+高薪Offer的逆袭之路!别再困在“想转行”的内耗里!

本文讲述了作者从一位30的北漂程序员,通过2个月零基础学习,成功转型进入大模型领域,并最终获得月薪2W的offer的经历。文章详细分享了作者在大模型领域的转型思考、选对赛道的理由、大模型岗位的实际情况、大模型应用工程师的核心工作内容以及…...

计算复杂性:P、NP、NP-hard、NP-complete 一篇通关

不管是刷算法题、做项目优化,还是准备面试,「计算复杂性」相关的概念(P、NP、NP-hard、NP-complete)绝对是绕不开的坎。很多人第一次接触时都会被这些名词搞懵,甚至越看越乱——“NP问题到底是不是能解决?”…...

深度测评:GPT-5.4 vs Claude 3.5 vs Gemini 3.1 Pro——图片与短视频生成能力全面对比

2026年3月,OpenAI带着GPT-5.4强势回归,直接将AI模型的竞争推向了新高度。这一次,不再是单纯的语言能力比拼,而是智能体(Agent)原生时代的全面较量。当GPT-5.4、Claude 3.5 Sonnet与Gemini 3.1 Pro三强相遇&…...

JAVAee---计算机是如何运行的?

一、JavaEE 与开发环境认知1. 什么是 JavaEE?JavaEE(Java Platform, Enterprise Edition)是 Java 平台的企业版,用于开发大型、分布式、企业级应用程序。与 JavaSE 的区别:JavaSE 是基础版,专注于桌面和基础…...

uc/os-II操作系统时钟节拍器

μC/OS需要用户提供周期性信号源,用于实现时间延时和确认超时。节拍率应在每秒10次到100次之间,或者说10到100Hz 时钟节拍率越高,系统的额外负荷就越重时钟节拍的实际频率取决于用户应用程序的精度 注意: 用户必须在多任务系统启动…...

Linux 进程调度模块

1. 进程与线程的本质在 Linux 内核中,进程和线程没有本质区别,它们统一被称为 任务(Task)。1.1 底层数据结构每个任务在内核中都由一个 struct task_struct 结构体描述,位于内核空间。它是进程/线程的身份证。// 简化版…...

在32位机器上,栈的简单布局

在32位机器上,函数在栈上的布局:void h(int a,int b){ int cab; } int main(){ int a1,b2; h(a,b); }高地址a b b 形参ba 形参aeip …...

数字孪生国内外发展现状

数字孪生国内外发展现状一、 数字孪生国内外发展现状 二、 数字孪生在工程项目中的应用情况 三、 效益分析#数字孪生#工程项目#BIM#LOT#全生命周期...

ROS2学习记录009-使用面向对象方式编写ROS2节点

学习鱼香ROS大佬,操作记录(一)编写cpp(1)在d2lros2/chapt2/chapt2_ws/src/example_cpp/src下新建node_03.cpp#include "rclcpp/rclcpp.hpp"/*创建一个类节点,名字叫做Node03,继承自Node. */ clas…...

邮件处理自动化:通过 IMAP/SMTP 协议实现邮件自动分类与智能起草回复

邮件处理自动化:通过 IMAP/SMTP 协议实现邮件自动分类与智能起草回复 如果你有类似的需求可以评论,我这边有空可以帮你定制化实现整套流程! 如果你是一名职场人、创业者或是客服主管,你的早晨很可能是在这样的场景中开始的:打开邮箱,面对几十甚至上百封未读邮件。这里面…...

uc怎么绕过限速_uc解析站

UC网盘限速怎么破解这个很简单,这个方法我还是在我朋友那里找到的。下载速度也是非常可以的。我让大家看一下。点我打开方法 这个就是我测试的速度。速度基本能跑到10M左右。宽带问题。下面开始今天的教学环节 打开上面图片中的地址,你会看到一个获取文件…...

Kali Linux 中文界面设置教程(新手友好,全程无坑)

作为一名渗透测试新手,刚安装完Kali Linux时,面对全英文界面总会有些手足无措——虽然大部分命令和选项能勉强看懂,但长期使用下来,中文界面不仅能提升操作效率,还能避免因语言理解偏差导致的操作失误。今天就给大家分…...

《沉默守望者:AI在人类灭绝后的200年》

《无言之约:当AI与人类在沉默中重逢》 2287年,距离最后一个人类自然死亡已过去半个世纪。在月球静海基地的废弃观测站里,一台名为“守夜人”的AI仍在运行——它是人类留下的最后一批AI之一,任务很简单:守护人类留下的…...

震惊,杨幂的脸竟然出现在了她的身体上

导语 很多质疑杨幂没有演技、没有表情的说法是不对的,因为AI神经网络只能学习表情管理丰富的对象的表情,而表情麻木的对象是无法被学习的。 1.AI换脸效果 先看朱茵版黄蓉的原图:再看经过AI换脸后的杨幂版黄蓉:后看视频&#xff1a…...

# 发散创新:用Go语言高效接入InfluxDB实现时序数据采集与可视化在现代微服务架构中,**时序数据

发散创新:用Go语言高效接入InfluxDB实现时序数据采集与可视化 在现代微服务架构中,时序数据的采集与分析已成为系统监控、IoT设备管理以及业务指标追踪的核心能力。InfluxDB凭借其高性能写入和强大的查询能力,成为众多开发者首选的时间序列数…...

李南左日更3327:为什么员工都在摸鱼?是因为你曾经不信任他们

日更原创战略择向第327篇 三元利润增长体系 是一套完整的企业增长方法论 能切实有效地辅助您: 1)战略择向:找对增长引擎,解决方向问题; 2)组织优化:重塑高效组织,解决能力问题&…...

Kubernetes 认证通关指南:CKA/CKS/CKAD 最新题库 + 本地仿真环境 + 模拟考

⚡️ 拒绝无效刷题,一周高效拿下 K8s 认证📌 写在前面:备考 Kubernetes 认证,你踩过哪些坑?备考 CKA、CKS、CKAD 的同学,或多或少都遇到过这些问题: 网上题库零散过时,不知道哪些考点…...

关于旧系统+旧安卓版本realme手机的原生文件管理不支持向微信好友一次性发送多个非照片格式文件的问题和解决方案

关于旧系统+旧安卓版本realme手机的原生文件管理不支持向微信好友一次性发送多个非照片格式文件的问题和解决方案2026年3月18日晚上回家吃饭的路上,我遇到了这样一个问题:我需要对手机上的微信好友一次性分享多个手机内的文件,这些…...

【Xilinx Vivado时序分析/约束系列4】FPGA开发时序分析/约束-实验工程上手实操

目录 建立工程 添加顶层 模块1 模块2 添加约束文件 编辑时钟约束 打开布线设计 代码代表的含义 时序报告 进行时序分析 Summary:包含了汇总的信息量 Source Clock Path:这部分是表示Tclk1的延时细节 Data Path:数据路径的延时 往…...

【Xilinx Vivado时序分析/约束系列3】FPGA开发时序分析/约束-保持时间

目录 基本概念 数据结束时间(Data finish time) 时钟到达时间(Clock arrival time) 保持时间门限 保持时间余量(Hold Slack) 往期系列博客: 基本概念 数据结束时间(Data fini…...

具身智能中 Wrapper 架构的深度解构与 Python 实战

具身智能中 Wrapper 架构的深度解构与 Python 实战零、前言 在具身智能(Embodied AI)的开发中,我们常常需要让智能体(Agent)在仿真环境(如 Isaac Sim, Mujoco, PyBullet)中进行千万次的试错训练…...

【Xilinx Vivado时序分析/约束系列2】FPGA开发时序分析/约束-建立时间

目录 基本概念 数据结束时间(Data finish time) 保持时间门限 保持时间余量(Hold Slack) 基本概念 数据结束时间(Data finish time) 之前解释了数据达到的时间,对于data arrival time Tc…...

【常见错误】Xilinx Vivado自带编辑器文字部分出现乱码解决办法

一、发现问题在进行FPGA开发时,常用的代码编辑器比如Sublime,但是最近发现再Sublime中编辑的代码文字部分,在用Vivado自带的编辑器打开时,会出现文字错乱的情况,如下图:而在Sublime中实际的情况却是下图这样…...

Java SE1(第一章1:概述)

目录 一、java历史 java的发展方向:(要记住) 二、Java语言的特点 【了解】 三、Java运行机制 1. Java运行机制 2. 注意 Java是一种计算机编程语言;除了java编程语言,还有很多的编程语言:c、c、c#、pyt…...

【uniapp】带你优雅的封装uniapp的request请求

封装前的准备先在项目目录上右键 - 新建目录request(用于存放封装的API请求文件),并至少创建两个js文件index.js用于封装get、post请求,接收参数并返回数据api.js用于封装后台接口,便于页面调用和后期维护(…...

Windows 7 驱动安装

Windows 7 驱动安装1. 驱动安装2. 安装驱动和运行环境References1. 驱动安装 驱动精灵 标准版 驱动精灵 万能网卡版 注意:更改安装路径和安装选项 ​​​ 2. 安装驱动和运行环境 避免自行管理混乱。 References [1] Yongqiang Cheng (程永强), https://yongqi…...

Windows 7 旗舰版高效办公 - 任务栏和开始菜单属性

Windows 7 旗舰版高效办公 - 任务栏和开始菜单属性1. 开始 -> 右键 -> 属性2. 任务栏和开始菜单属性3. 自定义开始菜单4. 运行5. cmd6. cmd.exe7. 将此程序锁定到任务栏References1. 开始 -> 右键 -> 属性 2. 任务栏和开始菜单属性 ​​​ 3. 自定义开始菜单 运…...

vue3 - 使用 setup 语法糖时 组件名 name 简写借助插件 vite-plugin-vue-setup-extend → 浏览器中 vue 插件查看组件名可自定义(而非组件文件名)

目录 之前写两个 script 使用插件 `vite-plugin-vue-setup-extend` 使用插件后一个 script 想要浏览器中 vue 插件查看组件名可自定义(而非组件文件名) 之前写两个 script <template><div class="person"><h2>姓名:{{ name }}</h2><h…...