当前位置: 首页 > news >正文

flink写入hdfs数据如何保证幂等的?

在 Flink 中使用 HDFS Connector 将数据写入 HDFS 时,保证幂等性是一个重要的需求,尤其是在数据可靠性要求较高的场景下。以下是详细介绍如何通过 Flink 和 HDFS 的特性以及一些设计上的优化来实现幂等性。


一、Flink 的 Checkpoint 机制

Flink 的 Checkpoint 机制是实现幂等性的重要保障之一。Checkpoint 用于捕获流处理程序的状态快照,确保在任务失败或中断时能够从最近的 Checkpoint 恢复,从而避免重复处理数据。

1. Checkpoint 的工作原理
  • 状态快照:Flink 定期对任务的状态进行快照,这些快照存储在可靠的存储系统(如 HDFS 或 S3)中。
  • 容错恢复:当任务失败时,Flink 会从最近的 Checkpoint 恢复,重新处理未完成的数据。
  • Exactly-Once 语义:通过结合两阶段提交协议(2PC),Flink 可以保证每个事件仅被处理一次。
2. 配置 Checkpoint
# 在 Flink 配置文件中启用 Checkpoint 
execution.checkpointing.interval: 10s # 设置 Checkpoint 间隔 
execution.checkpointing.mode: EXACTLY_ONCE # 启用 Exactly-Once 语义 
execution.checkpointing.storage.directory: hdfs://namenode:8020/flink/checkpoints # 存储路径 

二、HDFS 的原子写入特性

HDFS 的原子写入特性是实现幂等性的基础之一。HDFS 支持原子提交操作,这意味着文件写入要么成功完成,要么完全失败,不会有中间状态。

1. 原子写入的工作原理
  • 原子提交:HDFS 在写入文件时会先将数据写入临时文件,只有在所有数据写入完成后才会将临时文件重命名为正式文件名。
  • 避免覆盖:通过合理的文件命名策略(如包含时间戳或唯一标识),可以避免文件被覆盖或重复写入。
2. 示例:HDFS 文件命名策略
// 使用时间戳和分区键生成唯一的文件名 
String fileName = "data_" + System.currentTimeMillis() + "_" + partitionKey;

三、Flink HDFS Sink 的设计优化

Flink 提供了多种 HDFS Sink 的实现方式,通过合理的设计可以进一步增强幂等性。

1. 滚动文件(Rolling Files)
  • 按时间滚动:每隔固定时间(如 1 分钟)创建一个新的文件。
  • 按大小滚动:当文件大小达到一定阈值(如 1GB)时创建新文件。
  • 优点:避免单个文件过大,提高数据写入效率。
2. 文件命名策略
  • 唯一标识:在文件名中包含唯一标识(如时间戳、分区键、随机 UUID 等)。
  • 示例
    String filePath = "/user/flink/output/" + LocalDateTime.now().toString() + "/" + UUID.randomUUID() + ".parquet";
    
3. 输出路径管理
  • 动态路径:每次作业运行时生成新的输出路径。
  • 历史数据清理:定期清理旧的历史数据以释放存储空间。

四、数据唯一性检查

在某些场景下,可以通过额外的元数据存储(如数据库或缓存)来记录已写入的数据,从而实现幂等性。

1. 元数据存储
  • 记录已处理的数据:在写入 HDFS 之前,检查数据是否已经存在于元数据存储中。
  • 去重逻辑:如果数据已经存在,则跳过写入操作。
2. 示例:基于数据库的去重
public class IdempotentWriter {private final Connection connection;public IdempotentWriter(Connection connection) {this.connection = connection;}public void write(String data) throws SQLException {// 检查数据是否已经存在 if (!isDataExists(data)) {// 写入 HDFS writeToFile(data);// 记录到数据库 markAsProcessed(data);}}private boolean isDataExists(String data) {// 查询数据库 return false;}private void markAsProcessed(String data) {// 更新数据库 }private void writeToFile(String data) {// 写入 HDFS }
}

五、业务逻辑中的幂等处理

除了技术层面的优化,业务逻辑的设计也对幂等性至关重要。

1. 事件时间处理
  • 事件时间排序:使用事件时间而不是处理时间来排序和处理数据。
  • 水印机制:通过设置水印(Watermark)来检测迟到的数据,并决定如何处理这些数据。
2. 幂等写入接口
  • 幂等操作:确保写入操作对相同的输入产生相同的结果。
  • 示例
    public interface IdempotentWriteInterface {void write(DataRecord record) throws IOException;
    }
    

六、结构图:Flink HDFS 写入幂等性设计

以下是一个逻辑结构图,展示了如何通过 Flink 和 HDFS 的特性实现幂等性:

+-------------------+       +-------------------+       +-------------------+
|   Flink Task      |       |   HDFS Sink       |       |   HDFS            |
|                   |       |                   |       |                   |
|   - Checkpoint     | <--> |   - 滚动文件       | <--> |   - 原子写入       |
|   - Exactly-Once   |       |   - 唯一文件名     |       |   - 文件锁机制     |
+-------------------+       +-------------------+       +-------------------+|                           |                           || 数据流                   | 数据写入                 ||                           |                           ||                           v                           |
+-------------------+       +-------------------+       +-------------------+
|   元数据存储      |       |   数据唯一性检查   |       |   业务逻辑处理    |
|                   |       |                   |       |                   |
|   - 数据去重      | <--> |   - 唯一标识       | <--> |   - 事件时间处理  |
|   - 历史记录      |       |   - 时间戳         |       |   - 水印机制      |
+-------------------+       +-------------------+       +-------------------+

总结

通过 Flink 的 Checkpoint 机制、HDFS 的原子写入特性、合理的文件命名策略、动态输出路径管理以及业务逻辑中的幂等处理,可以有效保证 Flink 写入 HDFS 的幂等性。这些方法相互配合,确保了数据在高并发和容错场景下的准确性和一致性。

相关文章:

flink写入hdfs数据如何保证幂等的?

在 Flink 中使用 HDFS Connector 将数据写入 HDFS 时&#xff0c;保证幂等性是一个重要的需求&#xff0c;尤其是在数据可靠性要求较高的场景下。以下是详细介绍如何通过 Flink 和 HDFS 的特性以及一些设计上的优化来实现幂等性。 一、Flink 的 Checkpoint 机制 Flink 的 Chec…...

newgrp docker需要每次刷新问题

每次都需要运行 newgrp docker 的原因: 当用户被添加到 docker 组后&#xff0c;当前会话并不会立即更新组信息&#xff0c;因此需要通过 newgrp docker 切换到新的用户组以使权限生效 如果不想每次都手动运行 newgrp docker&#xff0c;可以在终端中配置一个自动刷新的脚本。…...

LM_Funny-2-01 递推算法:从数学基础到跨学科应用

目录 第一章 递推算法的数学本质 1.1 形式化定义与公理化体系 定理1.1 (完备性条件) 1.2 高阶递推的特征分析 案例&#xff1a;Gauss同余递推4 第二章 工程实现优化技术 2.1 内存压缩的革新方法 滚动窗口策略 分块存储技术 2.2 异构计算加速方案 GPU并行递推 量子计…...

WDM_OTN_基础知识_波分站点与组网类型

为了便于理解&#xff0c;我们用高铁来打个比方&#xff0c;这是郑州与武汉的高铁&#xff0c;中间经过了许昌孝感等很多个站点&#xff0c;郑州武汉作为始发站和终点站&#xff0c;所有人员都是上车或下车&#xff0c;而许昌等中间站点&#xff0c;既有人员上下车&#xff0c;…...

机器视觉--索贝尔滤波

引言 在图像处理领域&#xff0c;边缘检测是一项至关重要的任务&#xff0c;它能够帮助我们识别图像中不同区域的边界&#xff0c;为后续的目标识别、图像分割等操作奠定基础。索贝尔滤波&#xff08;Sobel Filter&#xff09;作为一种经典的边缘检测算法&#xff0c;因其简单…...

网络分析仪E5071C的回波损耗测量

回波损耗&#xff08;Return Loss&#xff09;是评估射频/微波元件&#xff08;如滤波器、天线、电缆等&#xff09;信号反射特性的关键参数&#xff0c;反映端口阻抗匹配性能。E5071C矢量网络分析仪&#xff08;VNA&#xff09;通过以下步骤实现高精度回波损耗测量&#xff1a…...

力扣-二叉树-98 验证二叉搜索树

思路 第一个特性&#xff0c;二叉搜索树的中序遍历是有序的&#xff0c;第二个特性&#xff0c;利用两个指针判断大小关系 代码 class Solution { public:TreeNode* pre NULL;bool isValidBST(TreeNode* root) {if(root NULL) return true;bool left isValidBST(root->…...

【动态规划】详解 0-1背包问题

文章目录 1. 问题引入2. 从 dfs 到动态规划3. 动态规划过程分析4. 二维 dp 的遍历顺序5. 从二维数组到一维数组6. 一维数组的遍历次序7. 背包的遍历顺序8. 代码总结9. 总结 1. 问题引入 0-1 背包是比较经典的动态规划问题&#xff0c;这里以代码随想录里面的例子来介绍下。总的…...

【Java线程池与线程状态】线程池分类与最佳实践

解析Java线程池与线程状态变化&#xff0c;结合运行机制与业务场景对照&#xff0c;帮助形成系统性知识。 一、线程池核心要素&#xff08;五维模型&#xff09; 采用「参数配置→处理流程→工作模式」三层递进结构 核心参数&#xff08;线程池DNA&#xff09; corePoolSiz…...

【小白学AI系列】NLP 核心知识点(八)多头自注意力机制

文章目录 **多头自注意力机制&#xff08;Multi-Head Self-Attention&#xff09;****核心概念** **1. 自注意力机制&#xff08;Self-Attention&#xff09;****2. 多头机制&#xff08;Multi-Head Attention&#xff09;****3. 为什么要用多头注意力机制&#xff1f;****4. 公…...

学习笔记——word中图目录、表目录 标题引用

目标1&#xff1a; 建立——图1-1 引用——图1-1 1在word文档中的引用——>插入题注 新建标签&#xff0c;然后命名为“图1-“。 点击确认&#xff0c;即可插入如图所示 图1- 1 春天 需要把图1-和后面那个1中间的空格删除&#xff0c;即 图1-1 春天 2怎么去引用这个“…...

3.3 Hugging Face Transformers核心功能模块深度解析

Hugging Face Transformers核心功能模块深度解析 一、模块化架构总览 #mermaid-svg-wxTV5vrEo7Y57IlW {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-wxTV5vrEo7Y57IlW .error-icon{fill:#552222;}#mermaid-svg-wxT…...

linux中设置脚本定时执行ntp命令同步时间

目录 一、背景二、过程1.到系统目录2.安装ntp3.创建文件夹4.创建脚本文件5.提升脚本文件权限6.设置执行时间&#xff1a;7.检查是否设置了执行器&#xff08;执行后输出的内容为执行器中的定时执行内容&#xff09;8.执行脚本文件9.查看日志文件&#xff0c;是否执行成功 三、总…...

map的使用(c++)

在了解map之前&#xff0c;我们先看看两个场景&#xff0c;通过这两个场景的对比&#xff0c;让我们知道为什么要存在存储双关键字的容器 场景一&#xff1a;判断一堆字符串中&#xff0c;某一个字符串是否出现过 在没学set容器之前&#xff0c;我们只能想到把这一堆字符串存到…...

毕业设计—基于Spring Boot的社区居民健康管理平台的设计与实现

&#x1f393; 毕业设计大揭秘&#xff01;想要源码和文章&#xff1f;快来私信我吧&#xff01; Hey小伙伴们~ &#x1f44b; 毕业季又来啦&#xff01;是不是都在为毕业设计忙得团团转呢&#xff1f;&#x1f914; 别担心&#xff0c;我这里有个小小的福利要分享给你们哦&…...

Python:蟒蛇绘制(一笔画)

一、题目要求 使用turtle库&#xff0c;绘制一个蟒蛇形状的图形。‬ 二、代码展示 # 请在下方开始编写你的代码 import turtle turtle.setup(650,350,200,200) turtle.penup() turtle.fd(-250) turtle.pendown() turtle.pensize(25) turtle.pencolor("purple") turt…...

mysql查询判断函数,类似decode

mysql中没有decode函数&#xff0c;如果使用的话&#xff0c;会报如下错误&#xff1a;Error Code: 1305. FUNCTION stockdb.decode does not exist 如果要实现像 Oracle 数据库那样原生的 DECODE 函数&#xff0c;可以通过以下几种方式来实现类似 DECODE 函数的功能。 -- 创建…...

异常处理、事务管理

异常处理 程序开发过程中不可避免的会遇到异常现象 如何处理 方案一&#xff1a;在Controller的方法中进行try...catch处理&#xff08;代码臃肿&#xff0c;不推荐&#xff09; 方案二&#xff1a;全局异常处理器 全局异常处理器 RestControllerAdvice &#xff1a;定义全…...

UART(一)——UART基础

一、定义 UART(Universal Asynchronous Receiver/Transmitter)是一种广泛使用的串行通信协议,用于在设备间通过异步方式传输数据。它无需共享时钟信号,而是依赖双方预先约定的参数(如波特率)完成通信。 功能和特点 基本的 UART 系统只需三个信号即可提供稳健的中速全双工…...

MySQL 中各种日志简介

MySQL 日志 慢查询日志(Slow query log) 慢查询⽇志由执⾏时间超过系统变量 long_query_time 指定的秒数的SQL语句组成&#xff0c;并且检 查的⾏数⼤于系统变量 min_examined_row_limit 指定值。被记录的慢查询需要进⾏优化&#xff0c; 可以使⽤mysqldumpslow客⼾端程序对慢…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

Java - Mysql数据类型对应

Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

python如何将word的doc另存为docx

将 DOCX 文件另存为 DOCX 格式&#xff08;Python 实现&#xff09; 在 Python 中&#xff0c;你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是&#xff0c;.doc 是旧的 Word 格式&#xff0c;而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理

引言 Bitmap&#xff08;位图&#xff09;是Android应用内存占用的“头号杀手”。一张1080P&#xff08;1920x1080&#xff09;的图片以ARGB_8888格式加载时&#xff0c;内存占用高达8MB&#xff08;192010804字节&#xff09;。据统计&#xff0c;超过60%的应用OOM崩溃与Bitm…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)

目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关&#xff0…...