flink写入hdfs数据如何保证幂等的?
在 Flink 中使用 HDFS Connector 将数据写入 HDFS 时,保证幂等性是一个重要的需求,尤其是在数据可靠性要求较高的场景下。以下是详细介绍如何通过 Flink 和 HDFS 的特性以及一些设计上的优化来实现幂等性。
一、Flink 的 Checkpoint 机制
Flink 的 Checkpoint 机制是实现幂等性的重要保障之一。Checkpoint 用于捕获流处理程序的状态快照,确保在任务失败或中断时能够从最近的 Checkpoint 恢复,从而避免重复处理数据。
1. Checkpoint 的工作原理
- 状态快照:Flink 定期对任务的状态进行快照,这些快照存储在可靠的存储系统(如 HDFS 或 S3)中。
- 容错恢复:当任务失败时,Flink 会从最近的 Checkpoint 恢复,重新处理未完成的数据。
- Exactly-Once 语义:通过结合两阶段提交协议(2PC),Flink 可以保证每个事件仅被处理一次。
2. 配置 Checkpoint
# 在 Flink 配置文件中启用 Checkpoint
execution.checkpointing.interval: 10s # 设置 Checkpoint 间隔
execution.checkpointing.mode: EXACTLY_ONCE # 启用 Exactly-Once 语义
execution.checkpointing.storage.directory: hdfs://namenode:8020/flink/checkpoints # 存储路径
二、HDFS 的原子写入特性
HDFS 的原子写入特性是实现幂等性的基础之一。HDFS 支持原子提交操作,这意味着文件写入要么成功完成,要么完全失败,不会有中间状态。
1. 原子写入的工作原理
- 原子提交:HDFS 在写入文件时会先将数据写入临时文件,只有在所有数据写入完成后才会将临时文件重命名为正式文件名。
- 避免覆盖:通过合理的文件命名策略(如包含时间戳或唯一标识),可以避免文件被覆盖或重复写入。
2. 示例:HDFS 文件命名策略
// 使用时间戳和分区键生成唯一的文件名
String fileName = "data_" + System.currentTimeMillis() + "_" + partitionKey;
三、Flink HDFS Sink 的设计优化
Flink 提供了多种 HDFS Sink 的实现方式,通过合理的设计可以进一步增强幂等性。
1. 滚动文件(Rolling Files)
- 按时间滚动:每隔固定时间(如 1 分钟)创建一个新的文件。
- 按大小滚动:当文件大小达到一定阈值(如 1GB)时创建新文件。
- 优点:避免单个文件过大,提高数据写入效率。
2. 文件命名策略
- 唯一标识:在文件名中包含唯一标识(如时间戳、分区键、随机 UUID 等)。
- 示例:
String filePath = "/user/flink/output/" + LocalDateTime.now().toString() + "/" + UUID.randomUUID() + ".parquet";
3. 输出路径管理
- 动态路径:每次作业运行时生成新的输出路径。
- 历史数据清理:定期清理旧的历史数据以释放存储空间。
四、数据唯一性检查
在某些场景下,可以通过额外的元数据存储(如数据库或缓存)来记录已写入的数据,从而实现幂等性。
1. 元数据存储
- 记录已处理的数据:在写入 HDFS 之前,检查数据是否已经存在于元数据存储中。
- 去重逻辑:如果数据已经存在,则跳过写入操作。
2. 示例:基于数据库的去重
public class IdempotentWriter {private final Connection connection;public IdempotentWriter(Connection connection) {this.connection = connection;}public void write(String data) throws SQLException {// 检查数据是否已经存在 if (!isDataExists(data)) {// 写入 HDFS writeToFile(data);// 记录到数据库 markAsProcessed(data);}}private boolean isDataExists(String data) {// 查询数据库 return false;}private void markAsProcessed(String data) {// 更新数据库 }private void writeToFile(String data) {// 写入 HDFS }
}
五、业务逻辑中的幂等处理
除了技术层面的优化,业务逻辑的设计也对幂等性至关重要。
1. 事件时间处理
- 事件时间排序:使用事件时间而不是处理时间来排序和处理数据。
- 水印机制:通过设置水印(Watermark)来检测迟到的数据,并决定如何处理这些数据。
2. 幂等写入接口
- 幂等操作:确保写入操作对相同的输入产生相同的结果。
- 示例:
public interface IdempotentWriteInterface {void write(DataRecord record) throws IOException; }
六、结构图:Flink HDFS 写入幂等性设计
以下是一个逻辑结构图,展示了如何通过 Flink 和 HDFS 的特性实现幂等性:
+-------------------+ +-------------------+ +-------------------+
| Flink Task | | HDFS Sink | | HDFS |
| | | | | |
| - Checkpoint | <--> | - 滚动文件 | <--> | - 原子写入 |
| - Exactly-Once | | - 唯一文件名 | | - 文件锁机制 |
+-------------------+ +-------------------+ +-------------------+| | || 数据流 | 数据写入 || | || v |
+-------------------+ +-------------------+ +-------------------+
| 元数据存储 | | 数据唯一性检查 | | 业务逻辑处理 |
| | | | | |
| - 数据去重 | <--> | - 唯一标识 | <--> | - 事件时间处理 |
| - 历史记录 | | - 时间戳 | | - 水印机制 |
+-------------------+ +-------------------+ +-------------------+
总结
通过 Flink 的 Checkpoint 机制、HDFS 的原子写入特性、合理的文件命名策略、动态输出路径管理以及业务逻辑中的幂等处理,可以有效保证 Flink 写入 HDFS 的幂等性。这些方法相互配合,确保了数据在高并发和容错场景下的准确性和一致性。
相关文章:
flink写入hdfs数据如何保证幂等的?
在 Flink 中使用 HDFS Connector 将数据写入 HDFS 时,保证幂等性是一个重要的需求,尤其是在数据可靠性要求较高的场景下。以下是详细介绍如何通过 Flink 和 HDFS 的特性以及一些设计上的优化来实现幂等性。 一、Flink 的 Checkpoint 机制 Flink 的 Chec…...
newgrp docker需要每次刷新问题
每次都需要运行 newgrp docker 的原因: 当用户被添加到 docker 组后,当前会话并不会立即更新组信息,因此需要通过 newgrp docker 切换到新的用户组以使权限生效 如果不想每次都手动运行 newgrp docker,可以在终端中配置一个自动刷新的脚本。…...
LM_Funny-2-01 递推算法:从数学基础到跨学科应用
目录 第一章 递推算法的数学本质 1.1 形式化定义与公理化体系 定理1.1 (完备性条件) 1.2 高阶递推的特征分析 案例:Gauss同余递推4 第二章 工程实现优化技术 2.1 内存压缩的革新方法 滚动窗口策略 分块存储技术 2.2 异构计算加速方案 GPU并行递推 量子计…...

WDM_OTN_基础知识_波分站点与组网类型
为了便于理解,我们用高铁来打个比方,这是郑州与武汉的高铁,中间经过了许昌孝感等很多个站点,郑州武汉作为始发站和终点站,所有人员都是上车或下车,而许昌等中间站点,既有人员上下车,…...

机器视觉--索贝尔滤波
引言 在图像处理领域,边缘检测是一项至关重要的任务,它能够帮助我们识别图像中不同区域的边界,为后续的目标识别、图像分割等操作奠定基础。索贝尔滤波(Sobel Filter)作为一种经典的边缘检测算法,因其简单…...

网络分析仪E5071C的回波损耗测量
回波损耗(Return Loss)是评估射频/微波元件(如滤波器、天线、电缆等)信号反射特性的关键参数,反映端口阻抗匹配性能。E5071C矢量网络分析仪(VNA)通过以下步骤实现高精度回波损耗测量:…...
力扣-二叉树-98 验证二叉搜索树
思路 第一个特性,二叉搜索树的中序遍历是有序的,第二个特性,利用两个指针判断大小关系 代码 class Solution { public:TreeNode* pre NULL;bool isValidBST(TreeNode* root) {if(root NULL) return true;bool left isValidBST(root->…...

【动态规划】详解 0-1背包问题
文章目录 1. 问题引入2. 从 dfs 到动态规划3. 动态规划过程分析4. 二维 dp 的遍历顺序5. 从二维数组到一维数组6. 一维数组的遍历次序7. 背包的遍历顺序8. 代码总结9. 总结 1. 问题引入 0-1 背包是比较经典的动态规划问题,这里以代码随想录里面的例子来介绍下。总的…...
【Java线程池与线程状态】线程池分类与最佳实践
解析Java线程池与线程状态变化,结合运行机制与业务场景对照,帮助形成系统性知识。 一、线程池核心要素(五维模型) 采用「参数配置→处理流程→工作模式」三层递进结构 核心参数(线程池DNA) corePoolSiz…...
【小白学AI系列】NLP 核心知识点(八)多头自注意力机制
文章目录 **多头自注意力机制(Multi-Head Self-Attention)****核心概念** **1. 自注意力机制(Self-Attention)****2. 多头机制(Multi-Head Attention)****3. 为什么要用多头注意力机制?****4. 公…...

学习笔记——word中图目录、表目录 标题引用
目标1: 建立——图1-1 引用——图1-1 1在word文档中的引用——>插入题注 新建标签,然后命名为“图1-“。 点击确认,即可插入如图所示 图1- 1 春天 需要把图1-和后面那个1中间的空格删除,即 图1-1 春天 2怎么去引用这个“…...
3.3 Hugging Face Transformers核心功能模块深度解析
Hugging Face Transformers核心功能模块深度解析 一、模块化架构总览 #mermaid-svg-wxTV5vrEo7Y57IlW {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-wxTV5vrEo7Y57IlW .error-icon{fill:#552222;}#mermaid-svg-wxT…...

linux中设置脚本定时执行ntp命令同步时间
目录 一、背景二、过程1.到系统目录2.安装ntp3.创建文件夹4.创建脚本文件5.提升脚本文件权限6.设置执行时间:7.检查是否设置了执行器(执行后输出的内容为执行器中的定时执行内容)8.执行脚本文件9.查看日志文件,是否执行成功 三、总…...
map的使用(c++)
在了解map之前,我们先看看两个场景,通过这两个场景的对比,让我们知道为什么要存在存储双关键字的容器 场景一:判断一堆字符串中,某一个字符串是否出现过 在没学set容器之前,我们只能想到把这一堆字符串存到…...

毕业设计—基于Spring Boot的社区居民健康管理平台的设计与实现
🎓 毕业设计大揭秘!想要源码和文章?快来私信我吧! Hey小伙伴们~ 👋 毕业季又来啦!是不是都在为毕业设计忙得团团转呢?🤔 别担心,我这里有个小小的福利要分享给你们哦&…...

Python:蟒蛇绘制(一笔画)
一、题目要求 使用turtle库,绘制一个蟒蛇形状的图形。 二、代码展示 # 请在下方开始编写你的代码 import turtle turtle.setup(650,350,200,200) turtle.penup() turtle.fd(-250) turtle.pendown() turtle.pensize(25) turtle.pencolor("purple") turt…...

mysql查询判断函数,类似decode
mysql中没有decode函数,如果使用的话,会报如下错误:Error Code: 1305. FUNCTION stockdb.decode does not exist 如果要实现像 Oracle 数据库那样原生的 DECODE 函数,可以通过以下几种方式来实现类似 DECODE 函数的功能。 -- 创建…...

异常处理、事务管理
异常处理 程序开发过程中不可避免的会遇到异常现象 如何处理 方案一:在Controller的方法中进行try...catch处理(代码臃肿,不推荐) 方案二:全局异常处理器 全局异常处理器 RestControllerAdvice :定义全…...

UART(一)——UART基础
一、定义 UART(Universal Asynchronous Receiver/Transmitter)是一种广泛使用的串行通信协议,用于在设备间通过异步方式传输数据。它无需共享时钟信号,而是依赖双方预先约定的参数(如波特率)完成通信。 功能和特点 基本的 UART 系统只需三个信号即可提供稳健的中速全双工…...
MySQL 中各种日志简介
MySQL 日志 慢查询日志(Slow query log) 慢查询⽇志由执⾏时间超过系统变量 long_query_time 指定的秒数的SQL语句组成,并且检 查的⾏数⼤于系统变量 min_examined_row_limit 指定值。被记录的慢查询需要进⾏优化, 可以使⽤mysqldumpslow客⼾端程序对慢…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...

Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

通过Wrangler CLI在worker中创建数据库和表
官方使用文档:Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后,会在本地和远程创建数据库: npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库: 现在,您的Cloudfla…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理
引言 Bitmap(位图)是Android应用内存占用的“头号杀手”。一张1080P(1920x1080)的图片以ARGB_8888格式加载时,内存占用高达8MB(192010804字节)。据统计,超过60%的应用OOM崩溃与Bitm…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...