当前位置: 首页 > news >正文

flink 写入数据到 kafka 后,数据过一段时间自动删除

版本

  • flink 1.16.0
  • kafka 2.3

流程描述:

flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。

问题描述:

数据写入到新的topic后,过上几分钟的时间,利用工具offset explorer观察对应topic的数据量,显示为0。
刚写入没多久的数据消失了 ???大写的懵 ???

定位问题:

  • 首先查看kafka的日志:

在这里插入图片描述

  • 阅读flink 官方文档 kafkaSink的介绍:

DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write
all messages in a Kafka transaction that will be committed to Kafka on
a checkpoint. Thus, if the consumer reads only committed data (see
Kafka consumer config isolation.level), no duplicates will be seen in
case of a Flink restart. However, this delays record visibility
effectively until a checkpoint is written, so adjust the checkpoint
duration accordingly. Please ensure that you use unique
transactionalIdPrefix across your applications running on the same
Kafka cluster such that multiple running jobs do not interfere in
their transactions! Additionally, it is highly recommended to tweak
Kafka transaction timeout (see Kafka producer transaction.timeout.ms)»
maximum checkpoint duration + maximum restart duration or data loss
may happen when Kafka expires an uncommitted transaction.

  • 翻译过来的意思大概就是:

在EXACTLY_ONCE这种模式下,KafkaSink在事务中写入所有的消息,这些消息在checkpoint上提交给kafka。因此,在flink重启的情况下,如果消费者值读取提交的数据,不会看到重复的数据。缺点就是延迟记录可见性,知道写入检查点为止。强烈建议调整kafka的事务超时时间(见Kafka producer transaction.timeout.ms),超时时间要大于【最大检查点持续时间+最大重启持续时间】,否则当Kafka过期未提交的事务时可能会发生数据丢失。

  • 阅读kafka的官网介绍:

Producer Configs:
transaction.timeout.ms:60000(默认值)

参数描述:
The maximum amount of time in ms that the transaction coordinator will
wait for a transaction status update from the producer before
proactively aborting the ongoing transaction.If this value is larger
than the transaction.max.timeout.ms setting in the broker, the request
will fail with a InvalidTransactionTimeout error.

Broker Configs
transaction.max.timeout.ms:900000(默认值)

参数描述:
The maximum allowed timeout for transactions. If a client’s requested
transaction time exceed this, then the broker will return an error in
InitProducerIdRequest. This prevents a client from too large of a
timeout, which can stall consumers reading from topics included in the
transaction.

  • 最后排查
    在flink中设置的超时时间违反了kafka producer对应的参数规定。

解决问题

在kafkaSink的配置中,加入

Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可
properties.setProperty("transaction.timeout.ms","900000");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();

总结

在使用现有框架和工具的时候,往往只是懂得怎么用,具体底层的逻辑、原理,了解的很少。往往只有真正理解了原理,遇到了问题,才会更快、更准确的定位问题、解决问题。

相关文章:

flink 写入数据到 kafka 后,数据过一段时间自动删除

版本 flink 1.16.0kafka 2.3 流程描述&#xff1a; flink利用KafkaSource&#xff0c;读取kafka的数据&#xff0c;然后经过一系列的处理&#xff0c;通过KafkaSink&#xff0c;采用 EXACTLY_ONCE 的模式&#xff0c;将处理后的数据再写入到新的topic中。 问题描述&#xff1…...

golong基础相关操作--一

package main//go语言以包作为管理单位&#xff0c;每个文件必须先声明包 //程序必须有一个main包 // 导入包&#xff0c;必须要要使用 // 变量声明了&#xff0c;必须要使用 import ("fmt" )/* * 包内部的变量 */ var aa 3var ss "kkk"var bb truevar …...

【深度学习】基于卷积神经网络的铁路信号灯识别方法

基于卷积神经网络的铁路信号灯识别方法 摘 要&#xff1a;1 引言2 卷积神经网络模型2.1 卷积神经网络结构2.2.1 卷积层2.2.2 池化层2.2.3 全连接层 3 卷积神经网络算法实现3.1 数据集制作3.2 卷积神经网络的训练过程3.2.1 前向传播过程 4 实验5 结语 摘 要&#xff1a; 目前中…...

DR IP-SoC China 2023 Day演讲预告 | 龙智Perforce专家解析芯片开发中的数字资产管理

2023年9月6日&#xff08;周三&#xff09;&#xff0c;龙智即将亮相于上海举行的D&R IP-SoC China 2023 Day&#xff0c;呈现集成了Perforce与Atlassian产品的芯片开发解决方案&#xff0c;助力企业更好、更快地进行芯片开发。 D&R IP-SoC China 2023 Day 是中国首个…...

解决github连接不上的问题

改 hosts 我们在浏览器输入 GitHub 的网址时&#xff0c;会向 DNS 服务器发送一个请求&#xff0c;获取到 GitHub 网站所在的服务器 IP 地址&#xff0c;从而进行访问。 就像你是一名快递员&#xff0c;在送快递前要先找中间人询问收件人的地址。而 DNS 就是这个告诉你目标地址…...

# DevOps名词定义梳理

DevOps名词定义梳理 极限编程座右铭&#xff1a;如果它令你很受伤&#xff0c;那么就做更多的练习&#xff08;If it hurts, do it more often&#xff09; 经常人们会把这些名词用错&#xff1a; 构建&#xff1a;就是把源代码制成成品的过程&#xff0c;这个过程一般会有单元…...

Redis Cluster

文章目录 一、集群搭建1 节点规划2 集群启动 二、配置一致性1 基本分工2 更新规则 三、Sharding1 数据分片分片实现分片特点 2 slot迁移迁移原因迁移支持集群扩容迁移错误背景现象问题分析验证猜想 集群缩容 3. 请求路由client端server端migrating节点的读写importing节点的读写…...

Pandas常用指令

astype astype的作用是转换数据类型&#xff0c;astype是没办法直接在原df上进行修改的&#xff0c;只能通过赋值的形式将原有的df进行覆盖&#xff0c;即df df.astype(dtype) astype的基本语法 DataFrame.astype(dtype, copyTrue, errorsraise) dtype参数指定将数据类型转换…...

FPGA实战小项目3

基于FPGA的波形发生器 基于FPGA的波形发生器 基于FPGA的beep音乐播放器设计 基于FPGA的beep音乐播放器设计 基于FPGA的cordic算法实现DDS sin和cosine波形的产生 基于FPGA的cordic算法实现DDS sin和cosine波形的产生...

mysql创建用户

创建用户 创建 -- 创建用户 itcast , localhost只能够在当前主机localhost访问, 密码123456; create user test01localhost identified by 123456;使用命令show databases;命令&#xff0c;只显示一个数据库&#xff0c;因为没有权限 -- 创建用户 test02, 可以在任意主机访问…...

程序员写好简历的5个关键点

程序员就业竞争大&#xff1f;找不到工作&#xff1f;也许&#xff0c;从简历开始你就被淘汰了.... 在很多的公司中&#xff0c;HR的招聘压力是很大的&#xff0c;浏览每个人的简历的时间可能只有20几秒&#xff0c;所以即使你的工作能力十分的强&#xff0c;但如果你没有在简…...

Vue:关于如何配置一级路由和二级路由的方法

路由的嵌套配置 文章目录 路由的嵌套配置配置一级路由 配置一级路由 创建router文件夹&#xff0c;里面添加index.js文件配置以下代码&#xff1a; import Vue from vue import VueRouter from "vue-router"; import Layout from /views/Layout import ArticleDeta…...

【论文绘图】seaborn分类数据绘图

参考&#xff1a;https://seaborn.pydata.org/tutorial/categorical.html 分类变量关系图中的catplot类似于连续变量中的relplot&#xff0c;默认是stripplot。 分类变量图种类 分类散点图 stripplotswarmplot (kind‘swarm’) 类别分布图 boxplotviolinplotboxenplot …...

KubeSphere Namespace 数据删除事故分析与解决全记录

作者&#xff1a;宇轩辞白&#xff0c;运维研发工程师&#xff0c;目前专注于云原生、Kubernetes、容器、Linux、运维自动化等领域。 前言 2023 年 7 月 23 日在项目上线前夕&#xff0c;K8s 生产环境出现故障&#xff0c;经过紧急修复之后&#xff0c;K8s 环境恢复正常&#…...

mysql场景题:最近7天连续3天登陆用户,字段,id,date(已去重)

1.最近7天连续3天登陆用户&#xff0c;字段&#xff0c;id&#xff0c;date&#xff08;已去重&#xff09; 思路&#xff1a; lag对时间开窗&#xff08;注意时间得转换为时间戳&#xff08;int类型才可以添加后续条件&#xff09;&#xff0c;跳行为2&#xff08;连续3天&am…...

华为OD机试 - 最差产品奖 - 双端队列 deque(Java 2023 B卷 200分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#…...

【校招VIP】前端算法考察之链表算法

考点介绍&#xff1a; 链表是一种物理存储结构上非连续的数据结构,数据的逻辑顺序是通过链表中的指针链接次序实现相互勾连。链表相对数组而言有很多不同之处,在特定场景下能发挥独特的优势。例如链表的插入和删除操作比数组效率高,数组需要改变其他元素的位置,而链表只需要改变…...

uni-app之android离线自定义基座

一 为什么要自定义基座 1&#xff0c;基座其实就是一个app&#xff0c;然后新开发的页面可以直接在手机上面显示&#xff0c;查看效果。 2&#xff0c;默认的基座就是uniapp帮我们打包好的基座app&#xff0c;然后我们可以进行页面的调试。 3&#xff0c;自定义基座主要用来…...

【AWS】实操-保护 Amazon S3 VPC 终端节点通信

文章目录 实验概览目标实验环境任务 1&#xff1a;探索并启动实验环境任务 1.1&#xff1a;探索 Amazon VPC 资源任务 1.2&#xff1a;探索 Amazon EC2 资源任务 1.3&#xff1a;创建 Amazon VPC 终端节点任务 1.4&#xff1a;连接私有 EC2 实例任务 1.5&#xff1a;探索 Amazo…...

C# Color颜色RGB对照表

序号Color色系颜色RGB图例1Color.AliceBlue蓝色艾丽丝蓝240,248,2552Color.AntiqueWhite白色古典白色250,235,2153Color.Aqua&#xff0c;Color.Cyan青色浅蓝色&#xff0c;蓝绿色&#xff0c;青色0,255,255 C# Color颜色RGB对照表_旭东怪的博客-CSDN博客 C#颜色和名称样式对照…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

Web 架构之 CDN 加速原理与落地实践

文章目录 一、思维导图二、正文内容&#xff08;一&#xff09;CDN 基础概念1. 定义2. 组成部分 &#xff08;二&#xff09;CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 &#xff08;三&#xff09;CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 &#xf…...

算法:模拟

1.替换所有的问号 1576. 替换所有的问号 - 力扣&#xff08;LeetCode&#xff09; ​遍历字符串​&#xff1a;通过外层循环逐一检查每个字符。​遇到 ? 时处理​&#xff1a; 内层循环遍历小写字母&#xff08;a 到 z&#xff09;。对每个字母检查是否满足&#xff1a; ​与…...

基于Java+VUE+MariaDB实现(Web)仿小米商城

仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意&#xff1a;运行前…...

Java中HashMap底层原理深度解析:从数据结构到红黑树优化

一、HashMap概述与核心特性 HashMap作为Java集合框架中最常用的数据结构之一&#xff0c;是基于哈希表的Map接口非同步实现。它允许使用null键和null值&#xff08;但只能有一个null键&#xff09;&#xff0c;并且不保证映射顺序的恒久不变。与Hashtable相比&#xff0c;Hash…...

Linux入门课的思维导图

耗时两周&#xff0c;终于把慕课网上的Linux的基础入门课实操、总结完了&#xff01; 第一次以Blog的形式做学习记录&#xff0c;过程很有意思&#xff0c;但也很耗时。 课程时长5h&#xff0c;涉及到很多专有名词&#xff0c;要去逐个查找&#xff0c;以前接触过的概念因为时…...