flink写入hdfs数据如何保证幂等的?
在 Flink 中使用 HDFS Connector 将数据写入 HDFS 时,保证幂等性是一个重要的需求,尤其是在数据可靠性要求较高的场景下。以下是详细介绍如何通过 Flink 和 HDFS 的特性以及一些设计上的优化来实现幂等性。
一、Flink 的 Checkpoint 机制
Flink 的 Checkpoint 机制是实现幂等性的重要保障之一。Checkpoint 用于捕获流处理程序的状态快照,确保在任务失败或中断时能够从最近的 Checkpoint 恢复,从而避免重复处理数据。
1. Checkpoint 的工作原理
- 状态快照:Flink 定期对任务的状态进行快照,这些快照存储在可靠的存储系统(如 HDFS 或 S3)中。
- 容错恢复:当任务失败时,Flink 会从最近的 Checkpoint 恢复,重新处理未完成的数据。
- Exactly-Once 语义:通过结合两阶段提交协议(2PC),Flink 可以保证每个事件仅被处理一次。
2. 配置 Checkpoint
# 在 Flink 配置文件中启用 Checkpoint
execution.checkpointing.interval: 10s # 设置 Checkpoint 间隔
execution.checkpointing.mode: EXACTLY_ONCE # 启用 Exactly-Once 语义
execution.checkpointing.storage.directory: hdfs://namenode:8020/flink/checkpoints # 存储路径
二、HDFS 的原子写入特性
HDFS 的原子写入特性是实现幂等性的基础之一。HDFS 支持原子提交操作,这意味着文件写入要么成功完成,要么完全失败,不会有中间状态。
1. 原子写入的工作原理
- 原子提交:HDFS 在写入文件时会先将数据写入临时文件,只有在所有数据写入完成后才会将临时文件重命名为正式文件名。
- 避免覆盖:通过合理的文件命名策略(如包含时间戳或唯一标识),可以避免文件被覆盖或重复写入。
2. 示例:HDFS 文件命名策略
// 使用时间戳和分区键生成唯一的文件名
String fileName = "data_" + System.currentTimeMillis() + "_" + partitionKey;
三、Flink HDFS Sink 的设计优化
Flink 提供了多种 HDFS Sink 的实现方式,通过合理的设计可以进一步增强幂等性。
1. 滚动文件(Rolling Files)
- 按时间滚动:每隔固定时间(如 1 分钟)创建一个新的文件。
- 按大小滚动:当文件大小达到一定阈值(如 1GB)时创建新文件。
- 优点:避免单个文件过大,提高数据写入效率。
2. 文件命名策略
- 唯一标识:在文件名中包含唯一标识(如时间戳、分区键、随机 UUID 等)。
- 示例:
String filePath = "/user/flink/output/" + LocalDateTime.now().toString() + "/" + UUID.randomUUID() + ".parquet";
3. 输出路径管理
- 动态路径:每次作业运行时生成新的输出路径。
- 历史数据清理:定期清理旧的历史数据以释放存储空间。
四、数据唯一性检查
在某些场景下,可以通过额外的元数据存储(如数据库或缓存)来记录已写入的数据,从而实现幂等性。
1. 元数据存储
- 记录已处理的数据:在写入 HDFS 之前,检查数据是否已经存在于元数据存储中。
- 去重逻辑:如果数据已经存在,则跳过写入操作。
2. 示例:基于数据库的去重
public class IdempotentWriter {private final Connection connection;public IdempotentWriter(Connection connection) {this.connection = connection;}public void write(String data) throws SQLException {// 检查数据是否已经存在 if (!isDataExists(data)) {// 写入 HDFS writeToFile(data);// 记录到数据库 markAsProcessed(data);}}private boolean isDataExists(String data) {// 查询数据库 return false;}private void markAsProcessed(String data) {// 更新数据库 }private void writeToFile(String data) {// 写入 HDFS }
}
五、业务逻辑中的幂等处理
除了技术层面的优化,业务逻辑的设计也对幂等性至关重要。
1. 事件时间处理
- 事件时间排序:使用事件时间而不是处理时间来排序和处理数据。
- 水印机制:通过设置水印(Watermark)来检测迟到的数据,并决定如何处理这些数据。
2. 幂等写入接口
- 幂等操作:确保写入操作对相同的输入产生相同的结果。
- 示例:
public interface IdempotentWriteInterface {void write(DataRecord record) throws IOException; }
六、结构图:Flink HDFS 写入幂等性设计
以下是一个逻辑结构图,展示了如何通过 Flink 和 HDFS 的特性实现幂等性:
+-------------------+ +-------------------+ +-------------------+
| Flink Task | | HDFS Sink | | HDFS |
| | | | | |
| - Checkpoint | <--> | - 滚动文件 | <--> | - 原子写入 |
| - Exactly-Once | | - 唯一文件名 | | - 文件锁机制 |
+-------------------+ +-------------------+ +-------------------+| | || 数据流 | 数据写入 || | || v |
+-------------------+ +-------------------+ +-------------------+
| 元数据存储 | | 数据唯一性检查 | | 业务逻辑处理 |
| | | | | |
| - 数据去重 | <--> | - 唯一标识 | <--> | - 事件时间处理 |
| - 历史记录 | | - 时间戳 | | - 水印机制 |
+-------------------+ +-------------------+ +-------------------+
总结
通过 Flink 的 Checkpoint 机制、HDFS 的原子写入特性、合理的文件命名策略、动态输出路径管理以及业务逻辑中的幂等处理,可以有效保证 Flink 写入 HDFS 的幂等性。这些方法相互配合,确保了数据在高并发和容错场景下的准确性和一致性。
相关文章:
flink写入hdfs数据如何保证幂等的?
在 Flink 中使用 HDFS Connector 将数据写入 HDFS 时,保证幂等性是一个重要的需求,尤其是在数据可靠性要求较高的场景下。以下是详细介绍如何通过 Flink 和 HDFS 的特性以及一些设计上的优化来实现幂等性。 一、Flink 的 Checkpoint 机制 Flink 的 Chec…...
newgrp docker需要每次刷新问题
每次都需要运行 newgrp docker 的原因: 当用户被添加到 docker 组后,当前会话并不会立即更新组信息,因此需要通过 newgrp docker 切换到新的用户组以使权限生效 如果不想每次都手动运行 newgrp docker,可以在终端中配置一个自动刷新的脚本。…...
LM_Funny-2-01 递推算法:从数学基础到跨学科应用
目录 第一章 递推算法的数学本质 1.1 形式化定义与公理化体系 定理1.1 (完备性条件) 1.2 高阶递推的特征分析 案例:Gauss同余递推4 第二章 工程实现优化技术 2.1 内存压缩的革新方法 滚动窗口策略 分块存储技术 2.2 异构计算加速方案 GPU并行递推 量子计…...
WDM_OTN_基础知识_波分站点与组网类型
为了便于理解,我们用高铁来打个比方,这是郑州与武汉的高铁,中间经过了许昌孝感等很多个站点,郑州武汉作为始发站和终点站,所有人员都是上车或下车,而许昌等中间站点,既有人员上下车,…...
机器视觉--索贝尔滤波
引言 在图像处理领域,边缘检测是一项至关重要的任务,它能够帮助我们识别图像中不同区域的边界,为后续的目标识别、图像分割等操作奠定基础。索贝尔滤波(Sobel Filter)作为一种经典的边缘检测算法,因其简单…...
网络分析仪E5071C的回波损耗测量
回波损耗(Return Loss)是评估射频/微波元件(如滤波器、天线、电缆等)信号反射特性的关键参数,反映端口阻抗匹配性能。E5071C矢量网络分析仪(VNA)通过以下步骤实现高精度回波损耗测量:…...
力扣-二叉树-98 验证二叉搜索树
思路 第一个特性,二叉搜索树的中序遍历是有序的,第二个特性,利用两个指针判断大小关系 代码 class Solution { public:TreeNode* pre NULL;bool isValidBST(TreeNode* root) {if(root NULL) return true;bool left isValidBST(root->…...
【动态规划】详解 0-1背包问题
文章目录 1. 问题引入2. 从 dfs 到动态规划3. 动态规划过程分析4. 二维 dp 的遍历顺序5. 从二维数组到一维数组6. 一维数组的遍历次序7. 背包的遍历顺序8. 代码总结9. 总结 1. 问题引入 0-1 背包是比较经典的动态规划问题,这里以代码随想录里面的例子来介绍下。总的…...
【Java线程池与线程状态】线程池分类与最佳实践
解析Java线程池与线程状态变化,结合运行机制与业务场景对照,帮助形成系统性知识。 一、线程池核心要素(五维模型) 采用「参数配置→处理流程→工作模式」三层递进结构 核心参数(线程池DNA) corePoolSiz…...
【小白学AI系列】NLP 核心知识点(八)多头自注意力机制
文章目录 **多头自注意力机制(Multi-Head Self-Attention)****核心概念** **1. 自注意力机制(Self-Attention)****2. 多头机制(Multi-Head Attention)****3. 为什么要用多头注意力机制?****4. 公…...
学习笔记——word中图目录、表目录 标题引用
目标1: 建立——图1-1 引用——图1-1 1在word文档中的引用——>插入题注 新建标签,然后命名为“图1-“。 点击确认,即可插入如图所示 图1- 1 春天 需要把图1-和后面那个1中间的空格删除,即 图1-1 春天 2怎么去引用这个“…...
3.3 Hugging Face Transformers核心功能模块深度解析
Hugging Face Transformers核心功能模块深度解析 一、模块化架构总览 #mermaid-svg-wxTV5vrEo7Y57IlW {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-wxTV5vrEo7Y57IlW .error-icon{fill:#552222;}#mermaid-svg-wxT…...
linux中设置脚本定时执行ntp命令同步时间
目录 一、背景二、过程1.到系统目录2.安装ntp3.创建文件夹4.创建脚本文件5.提升脚本文件权限6.设置执行时间:7.检查是否设置了执行器(执行后输出的内容为执行器中的定时执行内容)8.执行脚本文件9.查看日志文件,是否执行成功 三、总…...
map的使用(c++)
在了解map之前,我们先看看两个场景,通过这两个场景的对比,让我们知道为什么要存在存储双关键字的容器 场景一:判断一堆字符串中,某一个字符串是否出现过 在没学set容器之前,我们只能想到把这一堆字符串存到…...
毕业设计—基于Spring Boot的社区居民健康管理平台的设计与实现
🎓 毕业设计大揭秘!想要源码和文章?快来私信我吧! Hey小伙伴们~ 👋 毕业季又来啦!是不是都在为毕业设计忙得团团转呢?🤔 别担心,我这里有个小小的福利要分享给你们哦&…...
Python:蟒蛇绘制(一笔画)
一、题目要求 使用turtle库,绘制一个蟒蛇形状的图形。 二、代码展示 # 请在下方开始编写你的代码 import turtle turtle.setup(650,350,200,200) turtle.penup() turtle.fd(-250) turtle.pendown() turtle.pensize(25) turtle.pencolor("purple") turt…...
mysql查询判断函数,类似decode
mysql中没有decode函数,如果使用的话,会报如下错误:Error Code: 1305. FUNCTION stockdb.decode does not exist 如果要实现像 Oracle 数据库那样原生的 DECODE 函数,可以通过以下几种方式来实现类似 DECODE 函数的功能。 -- 创建…...
异常处理、事务管理
异常处理 程序开发过程中不可避免的会遇到异常现象 如何处理 方案一:在Controller的方法中进行try...catch处理(代码臃肿,不推荐) 方案二:全局异常处理器 全局异常处理器 RestControllerAdvice :定义全…...
UART(一)——UART基础
一、定义 UART(Universal Asynchronous Receiver/Transmitter)是一种广泛使用的串行通信协议,用于在设备间通过异步方式传输数据。它无需共享时钟信号,而是依赖双方预先约定的参数(如波特率)完成通信。 功能和特点 基本的 UART 系统只需三个信号即可提供稳健的中速全双工…...
MySQL 中各种日志简介
MySQL 日志 慢查询日志(Slow query log) 慢查询⽇志由执⾏时间超过系统变量 long_query_time 指定的秒数的SQL语句组成,并且检 查的⾏数⼤于系统变量 min_examined_row_limit 指定值。被记录的慢查询需要进⾏优化, 可以使⽤mysqldumpslow客⼾端程序对慢…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
爬虫基础学习day2
# 爬虫设计领域 工商:企查查、天眼查短视频:抖音、快手、西瓜 ---> 飞瓜电商:京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空:抓取所有航空公司价格 ---> 去哪儿自媒体:采集自媒体数据进…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...
高防服务器价格高原因分析
高防服务器的价格较高,主要是由于其特殊的防御机制、硬件配置、运营维护等多方面的综合成本。以下从技术、资源和服务三个维度详细解析高防服务器昂贵的原因: 一、硬件与技术投入 大带宽需求 DDoS攻击通过占用大量带宽资源瘫痪目标服务器,因此…...
python打卡day49@浙大疏锦行
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 一、通道注意力模块复习 & CBAM实现 import torch import torch.nn as nnclass CBAM(nn.Module):def __init__…...
作为点的对象CenterNet论文阅读
摘要 检测器将图像中的物体表示为轴对齐的边界框。大多数成功的目标检测方法都会枚举几乎完整的潜在目标位置列表,并对每一个位置进行分类。这种做法既浪费又低效,并且需要额外的后处理。在本文中,我们采取了不同的方法。我们将物体建模为单…...
【Zephyr 系列 16】构建 BLE + LoRa 协同通信系统:网关转发与混合调度实战
🧠关键词:Zephyr、BLE、LoRa、混合通信、事件驱动、网关中继、低功耗调度 📌面向读者:希望将 BLE 和 LoRa 结合应用于资产追踪、环境监测、远程数据采集等场景的开发者 📊篇幅预计:5300+ 字 🧭 背景与需求 在许多 IoT 项目中,单一通信方式往往难以兼顾近场数据采集…...
短视频时长预估算法调研
weighted LR o d d s T p 1 − p ( 1 − p ) o d d s T p ( T p o d d s ∗ p ) o d d s p o d d s T o d d s odds \frac{Tp}{1-p} \newline (1-p)odds Tp \newline (Tp odds * p) odds \newline p \frac{odds}{T odds} \newline odds1−pTp(1−p)oddsTp(Tpodds…...
compose 组件 ---无ui组件
在 Jetpack Compose 中,确实存在不直接参与 UI 渲染的组件,它们主要用于逻辑处理、状态管理或副作用控制。这些组件虽然没有视觉界面,但在架构中扮演重要角色。以下是常见的非 UI 组件及其用途: 1. 无 UI 的 Compose 组件分类 (…...
