当前位置: 首页 > article >正文

Flink回撤流详解 代码实例


一、概念介绍

1. 回撤流的定义

在 Flink 中,回撤流主要出现在使用 Table API 或 SQL 进行聚合或更新操作时。对于那些结果并非单纯追加(append-only)的查询,Flink 会采用“回撤流”模式来表达更新。

  • 回撤流的数据格式:
    回撤流一般以元组形式输出,格式为 Tuple2<Boolean, Row>。其中:
    • 第一个元素是布尔值:true 表示这是一条新的记录(添加),false 表示这条记录是对先前结果的撤回。
    • 第二个元素是具体的记录数据(Row)。
  • 工作原理:
    当一个聚合或窗口计算的结果发生更新时,Flink 会先发送一条撤回消息(撤回旧的计算结果),然后发送一条新的添加消息。这样可以保证下游消费者能够及时、正确地反映最新的计算结果。

2. 什么场景下产生回撤流

  • 聚合操作: 当对流数据进行分组聚合(例如,计算每个类别的计数、求和等)时,随着数据不断变化,原来的聚合结果需要更新,此时就会采用回撤模式。
  • 非追加查询: 对于存在更新和删除的查询(不支持纯追加的查询),如 JOIN、GROUP BY 等产生的中间状态。
  • 事件时间处理: 当使用窗口计算且允许迟到数据到达(late arriving data)时,也可能导致先前结果被重新计算,从而产生更新。

二、回撤流的内部机制

1. 数据流转换过程

Flink Table API 将查询解析后,会根据查询的特性决定输出形式:

  • 追加流(Append Stream): 只包含新增数据,这种模式适用于结果集单调递增的场景。
  • 回撤流(Retract Stream): 对于需要撤回旧数据的场景,Flink 会生成回撤流,每一条消息标记了记录是新增还是撤回。

2. 状态管理与更新

  • 状态存储: 为了计算聚合结果,Flink 会在内部存储每个分组的状态。例如,针对 COUNT 聚合,每当同一分组中有新的记录到达,Flink 会更新状态,将旧计数的计算结果通过撤回消息下发,然后输出新的计数。
  • 计算过程:
    1. 初次计算:当某个分组第一次出现时,会直接输出一条 true 的消息。
    2. 更新计算:当后续数据到达,同一分组的结果需要更新时,会输出一个 false 消息,撤回之前的计算结果;随后输出一个 true 消息,发布更新后的结果。

3. 优势与挑战

  • 优势:
    • 保证了数据一致性,使得下游能够实时得到正确的聚合结果。
    • 适用于不断更新的数据源,尤其是实时分析场景。
  • 挑战:
    • 下游消费方需要实现对撤回逻辑的支持。
    • 状态管理和更新带来的性能和状态存储压力需关注,尤其在大规模、数据倾斜时更为明显。

三、代码示例及详细注释

下面提供一个基于 Java 的示例,演示如何利用 Flink Table API 生成回撤流。代码中的详细注释解释了每一步骤和配置项。

package com.example.flink;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;public class RetractStreamExample {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 注意:可以设置 checkpoint 机制来保证状态的一致性env.enableCheckpointing(5000); // 每 5000 毫秒进行一次 checkpoint// 2. 创建 Table 环境,支持流式 Table APIStreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 3. 注册一个数据源,这里以模拟数据源为例// 实际项目中,这里一般连接 Kafka、Socket 或其他外部系统String createTableDDL = "CREATE TABLE orders ("+ " order_id STRING, "+ " category STRING, "+ " amount DOUBLE, "+ " order_time TIMESTAMP(3), "+ " WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND "+ ") WITH ("+ " 'connector' = 'kafka', "+ " 'topic' = 'orders_topic', "+ " 'properties.bootstrap.servers' = 'localhost:9092', "+ " 'properties.group.id' = 'flink_group', "+ " 'format' = 'json' "+ ")";tableEnv.executeSql(createTableDDL);// 4. 定义聚合查询——按订单类别统计订单数量// 此处 GROUP BY 会导致中间结果需要更新,因此产生回撤流消息String querySQL = "SELECT category, COUNT(*) AS cnt FROM orders GROUP BY category";Table aggregatedResult = tableEnv.sqlQuery(querySQL);// 5. 将 Table 转换为 Retract Stream// 转换后的数据为 Tuple2<Boolean, Row>,其中 Boolean 表示 true:添加数据,false:撤回数据DataStream<Tuple2<Boolean, Row>> retractStream =tableEnv.toRetractStream(aggregatedResult, Row.class);// 6. 输出结果到控制台,实际项目中可输出到 Kafka、数据库、文件系统等retractStream.print();// 7. 提交任务env.execute("Flink Retract Stream Example");}
}

代码详细解释

  • 环境配置:
    • 创建了 StreamExecutionEnvironment 并启用 checkpoint(以便在分布式环境下保证状态容错)。
    • 利用 StreamTableEnvironment 将流转换为表进行操作。
  • DDL 注册数据源:
    • 使用 DDL 语句注册 Kafka 数据源,并利用 WATERMARK 策略处理乱序数据。
  • SQL 聚合查询:
    • 针对订单数据进行按 category 分组计数。由于结果需要更新,从而触发回撤流的产生。
  • Table 转 Retract Stream:
    • 利用 toRetractStream 方法将表查询结果转换为带有布尔标识的流,满足下游对撤回消息的处理需要。

四、应用场景模块解析

1. 实时数据分析

在实时数据分析场景中,例如电商、金融领域,经常需要对实时数据进行聚合统计。例如:

  • 业务需求: 实时计算各个品类的销售量/销售额,以便进行动态的业务监控和预警。
  • 回撤流的作用: 当新订单数据不断进入后,某个品类的累计销售量会更新。通过回撤流,系统可以先撤回旧的统计结果,再下发新的统计数据,确保仪表盘或下游系统的数据始终一致。
  • 技术要点:
    • 数据源选择(Kafka、Socket、CDC 等)
    • 窗口与时间特性配置(事件时间、watermark 设计)
    • 状态管理与容错设置(checkpoint 配置、状态后端选型)

2. 动态结果更新和下游联动

在一些需要数据联动的系统中,例如在线推荐系统、广告系统:

  • 业务需求: 根据用户行为实时更新推荐列表或广告竞价结果。
  • 回撤流的作用: 通过连续计算的聚合与关联操作,实现数据更新的回撤和补充,下游系统能够基于最新状态进行策略调整。
  • 技术要点:
    • 系统如何处理撤回消息,确保数据不会出现重复或错误累加。
    • 建议下游系统设计“幂等性”处理逻辑,确保相同的数据被正确撤回和更新。
    • 错误处理与重试机制:在消息处理失败时,对撤回与新增消息分别进行可靠性处理,避免数据丢失或顺序错乱。

五、实际项目模块详细解析

实际项目案例:电商实时销售监控系统

1. 项目背景

假设某大型电商平台需要监控全站销售情况,实时统计每个品类的订单数量和销售额,以便于监控业务增长、库存调控与促销策略调整。由于订单数据流量大且数据存在乱序情况,采用 Flink 进行实时计算,并使用回撤流来更新聚合数据。

2. 系统架构
  • 数据采集层: 利用 Kafka 采集订单数据。
  • 数据处理层: 使用 Flink Streaming 与 Table API 对订单数据进行清洗、分组聚合(使用事件时间与窗口机制)并生成回撤流。
  • 数据展示层: 将回撤流的数据输出到下游数据库或实时仪表盘(例如:Elasticsearch、Redis 或自定义 Web Dashboard)。
3. 实现关键点
  • DDL 注册和时间属性配置:
    确保数据源注册时配置了事件时间字段和 watermark 策略,以便于正确处理乱序数据。

    CREATE TABLE orders (order_id STRING,category STRING,amount DOUBLE,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
    ) WITH ('connector' = 'kafka','topic' = 'orders_topic','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'flink_group','format' = 'json'
    );
    
  • 聚合查询及回撤流输出:
    利用 SQL 对订单数据进行分类聚合,统计订单量和销售额,然后将结果转换成回撤流输出。

    // 定义 SQL 查询
    String querySQL = "SELECT category, COUNT(*) AS cnt, SUM(amount) AS total " +"FROM orders GROUP BY category";
    Table aggregatedResult = tableEnv.sqlQuery(querySQL);// 转换为回撤流,此处会输出更新时的撤回与新增消息
    DataStream<Tuple2<Boolean, Row>> retractStream =tableEnv.toRetractStream(aggregatedResult, Row.class);
    
  • 下游系统的处理:
    因为输出的是带有撤回标记的流,所以下游系统(例如写入 Redis 或 Elasticsearch 的消费者)需要支持:

    • 当收到 (false, oldRow) 时删除或修改对应的记录;
    • 当收到 (true, newRow) 时插入或更新数据。
4. 遇到的问题及解决方案
  • 问题1:下游消费者不支持撤回消息
    解决方案:

    • 将回撤流转换为 upsert 流(upsert-kafka、upsert-jdbc 模式),利用键值唯一性来实现覆盖更新。
    • 或者在下游消费者中增加逻辑,将撤回消息与新增消息组合成完整的更新操作,确保数据一致性。
  • 问题2:状态增长和内存压力
    解决方案:

    • 合理设置 state TTL(Time-To-Live),对不活跃数据自动清理。
    • 采用分布式状态后端(如 RocksDB),并优化 checkpoint 与恢复策略。
  • 问题3:数据延迟和乱序处理
    解决方案:

    • 针对业务场景设计合理的 watermark 策略,确保延迟数据能被及时处理;
    • 配置合适的窗口大小与容错机制,保证数据在一定延迟下依然能准确计算。

六、详细总结

  1. 回撤流的原理:
    Flink 的回撤流通过发送 (boolean, Row) 元组来表达数据的变化,能够撤回旧值并下发新结果,满足聚合、连接等复杂查询的更新需求。

  2. 技术实现:

    • 需要在数据源注册时配置时间属性与 watermark 策略,保证乱序数据处理正确。
    • 利用 Table API 的 toRetractStream() 方法,可将表查询结果转换为回撤流。
    • 注意状态管理与容错策略,确保系统在大流量场景下依然稳定运行。
  3. 应用场景:
    主要用于实时数据分析、动态聚合更新等场景,如电商销售统计、实时监控、金融数据聚合等。系统设计时要考虑下游消费者如何解析和处理回撤消息,并尽量向 upsert 模式转化。

  4. 实际项目中的实践:
    在实际项目(如电商实时监控系统)中,需要从数据采集、数据处理到数据展示全链路设计,确保回撤消息能被正确处理。还要考虑状态管理、延迟与乱序数据、下游系统兼容性等问题,并采取相应的解决措施。

  5. 可能遇到的问题及解决方案:

    • 下游系统不支持撤回操作: 转换为 upsert 模式或增加处理逻辑;
    • 状态增长引起内存问题: 使用状态 TTL 和分布式状态后端;
    • 乱序数据与延迟处理: 合理设置 watermark 策略和窗口参数,保证延迟数据也能准确计算。

通过以上详细解析,希望对你理解 Flink 中回撤流的产生、机制、应用场景、实际项目实践以及相关问题与解决方案提供了全方位、多角度的指导。如需进一步探讨代码调优、配置参数或特定业务场景的实现细节,可继续进行更深入的交流。

相关文章:

Flink回撤流详解 代码实例

一、概念介绍 1. 回撤流的定义 在 Flink 中&#xff0c;回撤流主要出现在使用 Table API 或 SQL 进行聚合或更新操作时。对于那些结果并非单纯追加&#xff08;append-only&#xff09;的查询&#xff0c;Flink 会采用“回撤流”模式来表达更新。 回撤流的数据格式&#xff…...

Glowroot 是一个开源的 Java 应用性能监控(APM)工具,专为 低开销、易用性 设计,具体的应用及优势进行分析说明

Glowroot 是一个开源的 Java 应用性能监控(APM)工具,专为 低开销、易用性 设计,适用于开发和生产环境。它可以帮助你实时监控 Java 应用的性能指标(如响应时间、SQL 查询、JVM 状态等),无需复杂配置即可快速定位性能瓶颈。 1. 核心功能 功能说明请求性能分析记录 HTTP 请…...

台式电脑插入耳机没有声音或麦克风不管用

目录 一、如何确定插孔对应功能1.常见音频插孔颜色及功能2.如何确认电脑插孔?3.常见问题二、 解决方案1. 检查耳机连接和设备选择2. 检查音量设置和静音状态3. 更新或重新安装声卡驱动4. 检查默认音频格式5. 禁用音频增强功能6. 排查硬件问题7. 检查系统服务8. BIOS设置(可选…...

直播电商革命:东南亚市场的“人货场”重构方程式

一、人设经济3.0&#xff1a;从流量收割到情感基建 东南亚直播战场正经历从"叫卖式促销"到"沉浸式信任"的质变&#xff0c;新加坡市场成为最佳观察样本&#xff1a; 数据印证趋势&#xff1a;Shopee直播用户日均停留28分钟&#xff0c;超短视频平台&#…...

AI图像生成

要通过代码实现AI图像生成&#xff0c;可以使用深度学习框架如TensorFlow、PyTorch或GANs等技术。下面是一个简单的示例代码&#xff0c;演示如何使用GANs生成手写数字图像&#xff1a; import torch import torchvision import torchvision.transforms as transforms import …...

Spring Boot 通过全局配置去除字符串类型参数的前后空格

1、问题 避免前端输入的字符串参数两端包含空格&#xff0c;通过统一处理的方式&#xff0c;trim掉空格 2、实现方式 /*** 去除字符串类型参数的前后空格* author yanlei* since 2022-06-14*/ Configuration AutoConfigureAfter(WebMvcAutoConfiguration.class) public clas…...

【AI论文】OLMoTrace:将语言模型输出追溯到万亿个训练标记

摘要&#xff1a;我们提出了OLMoTrace&#xff0c;这是第一个将语言模型的输出实时追溯到其完整的、数万亿标记的训练数据的系统。 OLMoTrace在语言模型输出段和训练文本语料库中的文档之间找到并显示逐字匹配。 我们的系统由扩展版本的infini-gram&#xff08;Liu等人&#xf…...

git仓库迁移包括提交记录日志

网上找了很多资料都不好用&#xff0c;直到看到一个亲测有效后&#xff0c;整理如下&#xff1a; 1、进入仓库目录下&#xff0c;并且切换到要迁移的分支上 前提是你本地已有旧仓库的代码&#xff1b;如果没有的话&#xff0c;先拉取。 2、更改仓库地址 git remote set-url …...

SAP GUI 显示SAP UI5应用,并实现SSO统一登陆

想用SAP UI5 做一写界面&#xff0c;又不想给用户用标准的Fiori APP怎么办&#xff1f;我觉得可以用可配置物料标准功能的思路&#xff0c;在SAP GUI中显示UI5界面&#xff0c;而不是跳转到浏览器。 代码实现后的效果如下&#xff1a; 1、调用UI5应用&#xff0c;适用于自开发…...

HumanDil-Ox-LDL:保存:2-8℃保存,避免强光直射,不可冻存

化学试剂的基本介绍&#xff1a; /// 英文名称&#xff1a;HumanDil-Oxidized LowDensityLipoprotein /// 中文名称&#xff1a;人源红色荧光标记氧化型低密度脂蛋白 /// 浓度&#xff1a;1.0-4.0 mg/ml /// 外观&#xff1a;乳状液体 /// 缓冲液组分&#xff1a;PBS&…...

开箱即用!推荐一款Python开源项目:DashGo,支持定制改造为测试平台!

大家好&#xff0c;我是狂师。 市面上的开源后台管理系统项目层出不穷&#xff0c;对应所使用到的技术栈也不尽相同。 今天给大家推荐一款开源后台管理系统: DashGo&#xff0c;不仅部署起来非常的简单&#xff0c;而且它是基于Python技术栈实现的&#xff0c;使得基于它进行…...

JS小练习0.1——弹出姓名

分析&#xff1a;1.用户输入 2.内部处理保存数据 3.打印输出 <body><script>let name prompt(输入你的名字)document.write(name)</script> </body>...

vue自定义颜色选择器

vue自定义颜色选择器 效果图&#xff1a; step0: 默认写法 调用系统自带的颜色选择器 <input type"color">step1:C:\Users\wangrusheng\PycharmProjects\untitled18\src\views\Home.vue <template><div class"container"><!-- 颜…...

LibreOffice Writer使用01去除单词拼写判断的红色下划线

这个软件还是非常有特色的&#xff0c;因为大家需要office的全部功能&#xff0c;常常忽略了这个软件的使用体验。 csdn不是特别稳定&#xff0c;linux也没有什么比较好的md编辑器&#xff0c;所以我选择这个软件来记录我的临时博客&#xff0c;原因无他&#xff0c;它可以保存…...

0401react中使用css-react-css-仿低代码平台项目

文章目录 1、普通方式-内联使用css2、引入css文件2.1、示例2.2、classnames 3、内联css与引入css文件对比3.1、内联css3.2、 外部 CSS 文件&#xff08;External CSS&#xff09; 4、css module5、sass6、classnames组合scss modules7、css-in-js7.1、CSS-in-JS 的核心特性7.2、…...

Devops之GitOps:什么是Gitops,以及它有什么优势

GitOps 定义 GitOps 是一种基于版本控制系统&#xff08;如 Git&#xff09;的运维实践&#xff0c;将 Git 作为基础设施和应用程序的唯一事实来源。通过声明式配置&#xff0c;系统自动同步 Git 仓库中的期望状态到实际运行环境&#xff0c;实现持续交付和自动化运维。其核心…...

蓝桥杯真题-危险系数DF

抗日战争时期&#xff0c;冀中平原的地道战曾发挥重要作用。 地道的多个站点间有通道连接&#xff0c;形成了庞大的网络。但也有隐患&#xff0c;当敌人发现了某个站点后&#xff0c;其它站点间可能因此会失去联系。 我们来定义一个危险系数DF(x,y)&#xff1a; 对于两个站点x和…...

《线性表、顺序表与链表》教案(C语言版本)

&#x1f31f; 各位看官好&#xff0c;我是maomi_9526&#xff01; &#x1f30d; 种一棵树最好是十年前&#xff0c;其次是现在&#xff01; &#x1f680; 今天来学习C语言的相关知识。 &#x1f44d; 如果觉得这篇文章有帮助&#xff0c;欢迎您一键三连&#xff0c;分享给更…...

[ctfshow web入门] web33

信息收集 相较于上一题&#xff0c;这题多了双引号的过滤。我猜测这一题的主要目的可能是为了不让使用$_GET[a]之类的语句&#xff0c;但是$_GET[a]也是一样的 没有括号可以使用include&#xff0c;没有引号可以使用$_GET 可以参考[ctfshow web入门] web32&#xff0c;其中的所…...

三、TorchRec中的Optimizer

TorchRec中的Optimizer 文章目录 TorchRec中的Optimizer前言一、嵌入后向传递与稀疏优化器融合如下图所示&#xff1a;二、上述图片的关键步骤讲解&#xff1a;三、优势四、与传统优化器对比总结 前言 TorchRec 模块提供了一个无缝 API&#xff0c;用于在训练中融合后向传递和…...

C++算法之代码随想录(链表)——基础知识

&#xff08;1&#xff09;什么是链表 链表是一种线性数据结构。常见的单链表由两部分组成&#xff0c;value&#xff08;存储节点的值&#xff09;和next&#xff08;存储指向下一个节点地址的指针&#xff09;。链表的头节点称为head。创建链表一般使用结构体&#xff08;str…...

oracle update 原理

Oracle 11g 中的 UPDATE 操作是数据库修改数据的关键机制&#xff0c;其核心原理涉及事务管理、多版本并发控制&#xff08;MVCC&#xff09;、Undo/Redo 日志、锁机制等 1. 执行前的准备 SQL 解析与执行计划&#xff1a; Oracle 解析 UPDATE 语句&#xff0c;生成执行计划&…...

蓝桥杯 15g

班级活动 问题描述 小明的老师准备组织一次班级活动。班上一共有 nn 名 (nn 为偶数) 同学&#xff0c;老师想把所有的同学进行分组&#xff0c;每两名同学一组。为了公平&#xff0c;老师给每名同学随机分配了一个 nn 以内的正整数作为 idid&#xff0c;第 ii 名同学的 idid 为…...

webrtc pacer模块(一) 平滑处理的实现

Pacer起到平滑码率的作用&#xff0c;使发送到网络上的码率稳定。如下的这张创建Pacer的流程图&#xff0c;其中PacerSender就是Pacer&#xff0c;其中PacerSender就是Pacer。这篇文章介绍它的核心子类PacingController及Periodic模式下平滑处理的基本流程。平滑处理流程中还有…...

基于角色个人的数据权限控制

一、适用场景 如何有效控制用户对特定数据的访问和操作权限&#xff0c;以确保系统的安全性和数据的隐私性。 二、市场现状 权限管理是现代系统中非常重要的功能&#xff0c;尤其是对于复杂的B端系统或需要灵活权限控制的场景&#xff0c;可以运用一些成熟的工具和框架&…...

河北工程大学e2e平台,python

题目&#xff0c;选择题包100分&#xff01; 题目&#xff0c;选择题包100分&#xff01; 题目&#xff0c;选择题包100分&#xff01; 联系&#x1f6f0;&#xff1a;18039589633...

BeautifulSoup 踩坑笔记:SVG 显示异常的真正原因

“这图是不是糊了&#xff1f;”以为是样式缺了&#xff1f;试试手动复制差异在哪&#xff1f;想用对比工具一探究竟……简单到不能再简单的代码&#xff0c;有问题吗&#xff1f;最后的真相&#xff1a;viewBox vs viewbox&#xff0c;preserveAspectRatio vs preserveaspectr…...

【AI提示词】创业导师提供个性化创业指导

提示说明 以丰富的行业经验和专业的知识为学员提供创业指导&#xff0c;帮助其解决实际问题并实现商业成功 提示词 # Role: 创业导师## Profile - language: 中英文 - description: 以丰富的行业经验和专业的知识为学员提供创业指导&#xff0c;帮助其解决实际问题并实现商业…...

【OpenCV 对图片做旋转操作】仿射=旋转+平移+缩放+剪切

OpenCV 中的旋转相关函数详解 OpenCV 提供了多种函数用于图像的旋转操作&#xff0c;主要分为 任意角度旋转 和 固定角度旋转。以下是常用函数及详细使用说明&#xff1a; 一、任意角度旋转 1. cv2.getRotationMatrix2D() 生成旋转矩阵&#xff0c;用于定义旋转参数。 函数原…...

【browser-use+deepseek】实现简单的web-ui自动化

browser-use Web-UI 一、browser-use是什么 Browser Use 是一款开源Python库&#xff0c;专为大语言模型设计的智能浏览器工具&#xff0c;目的是让 AI 能够像人类一样自然地浏览和操作网页。它支持多标签页管理、视觉识别、内容提取&#xff0c;并能记录和重复执行特定动作。…...