当前位置: 首页 > news >正文

flink 写入数据到 kafka 后,数据过一段时间自动删除

版本

  • flink 1.16.0
  • kafka 2.3

流程描述:

flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。

问题描述:

数据写入到新的topic后,过上几分钟的时间,利用工具offset explorer观察对应topic的数据量,显示为0。
刚写入没多久的数据消失了 ???大写的懵 ???

定位问题:

  • 首先查看kafka的日志:

在这里插入图片描述

  • 阅读flink 官方文档 kafkaSink的介绍:

DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write
all messages in a Kafka transaction that will be committed to Kafka on
a checkpoint. Thus, if the consumer reads only committed data (see
Kafka consumer config isolation.level), no duplicates will be seen in
case of a Flink restart. However, this delays record visibility
effectively until a checkpoint is written, so adjust the checkpoint
duration accordingly. Please ensure that you use unique
transactionalIdPrefix across your applications running on the same
Kafka cluster such that multiple running jobs do not interfere in
their transactions! Additionally, it is highly recommended to tweak
Kafka transaction timeout (see Kafka producer transaction.timeout.ms)»
maximum checkpoint duration + maximum restart duration or data loss
may happen when Kafka expires an uncommitted transaction.

  • 翻译过来的意思大概就是:

在EXACTLY_ONCE这种模式下,KafkaSink在事务中写入所有的消息,这些消息在checkpoint上提交给kafka。因此,在flink重启的情况下,如果消费者值读取提交的数据,不会看到重复的数据。缺点就是延迟记录可见性,知道写入检查点为止。强烈建议调整kafka的事务超时时间(见Kafka producer transaction.timeout.ms),超时时间要大于【最大检查点持续时间+最大重启持续时间】,否则当Kafka过期未提交的事务时可能会发生数据丢失。

  • 阅读kafka的官网介绍:

Producer Configs:
transaction.timeout.ms:60000(默认值)

参数描述:
The maximum amount of time in ms that the transaction coordinator will
wait for a transaction status update from the producer before
proactively aborting the ongoing transaction.If this value is larger
than the transaction.max.timeout.ms setting in the broker, the request
will fail with a InvalidTransactionTimeout error.

Broker Configs
transaction.max.timeout.ms:900000(默认值)

参数描述:
The maximum allowed timeout for transactions. If a client’s requested
transaction time exceed this, then the broker will return an error in
InitProducerIdRequest. This prevents a client from too large of a
timeout, which can stall consumers reading from topics included in the
transaction.

  • 最后排查
    在flink中设置的超时时间违反了kafka producer对应的参数规定。

解决问题

在kafkaSink的配置中,加入

Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可
properties.setProperty("transaction.timeout.ms","900000");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();

总结

在使用现有框架和工具的时候,往往只是懂得怎么用,具体底层的逻辑、原理,了解的很少。往往只有真正理解了原理,遇到了问题,才会更快、更准确的定位问题、解决问题。

相关文章:

flink 写入数据到 kafka 后,数据过一段时间自动删除

版本 flink 1.16.0kafka 2.3 流程描述&#xff1a; flink利用KafkaSource&#xff0c;读取kafka的数据&#xff0c;然后经过一系列的处理&#xff0c;通过KafkaSink&#xff0c;采用 EXACTLY_ONCE 的模式&#xff0c;将处理后的数据再写入到新的topic中。 问题描述&#xff1…...

golong基础相关操作--一

package main//go语言以包作为管理单位&#xff0c;每个文件必须先声明包 //程序必须有一个main包 // 导入包&#xff0c;必须要要使用 // 变量声明了&#xff0c;必须要使用 import ("fmt" )/* * 包内部的变量 */ var aa 3var ss "kkk"var bb truevar …...

【深度学习】基于卷积神经网络的铁路信号灯识别方法

基于卷积神经网络的铁路信号灯识别方法 摘 要&#xff1a;1 引言2 卷积神经网络模型2.1 卷积神经网络结构2.2.1 卷积层2.2.2 池化层2.2.3 全连接层 3 卷积神经网络算法实现3.1 数据集制作3.2 卷积神经网络的训练过程3.2.1 前向传播过程 4 实验5 结语 摘 要&#xff1a; 目前中…...

DR IP-SoC China 2023 Day演讲预告 | 龙智Perforce专家解析芯片开发中的数字资产管理

2023年9月6日&#xff08;周三&#xff09;&#xff0c;龙智即将亮相于上海举行的D&R IP-SoC China 2023 Day&#xff0c;呈现集成了Perforce与Atlassian产品的芯片开发解决方案&#xff0c;助力企业更好、更快地进行芯片开发。 D&R IP-SoC China 2023 Day 是中国首个…...

解决github连接不上的问题

改 hosts 我们在浏览器输入 GitHub 的网址时&#xff0c;会向 DNS 服务器发送一个请求&#xff0c;获取到 GitHub 网站所在的服务器 IP 地址&#xff0c;从而进行访问。 就像你是一名快递员&#xff0c;在送快递前要先找中间人询问收件人的地址。而 DNS 就是这个告诉你目标地址…...

# DevOps名词定义梳理

DevOps名词定义梳理 极限编程座右铭&#xff1a;如果它令你很受伤&#xff0c;那么就做更多的练习&#xff08;If it hurts, do it more often&#xff09; 经常人们会把这些名词用错&#xff1a; 构建&#xff1a;就是把源代码制成成品的过程&#xff0c;这个过程一般会有单元…...

Redis Cluster

文章目录 一、集群搭建1 节点规划2 集群启动 二、配置一致性1 基本分工2 更新规则 三、Sharding1 数据分片分片实现分片特点 2 slot迁移迁移原因迁移支持集群扩容迁移错误背景现象问题分析验证猜想 集群缩容 3. 请求路由client端server端migrating节点的读写importing节点的读写…...

Pandas常用指令

astype astype的作用是转换数据类型&#xff0c;astype是没办法直接在原df上进行修改的&#xff0c;只能通过赋值的形式将原有的df进行覆盖&#xff0c;即df df.astype(dtype) astype的基本语法 DataFrame.astype(dtype, copyTrue, errorsraise) dtype参数指定将数据类型转换…...

FPGA实战小项目3

基于FPGA的波形发生器 基于FPGA的波形发生器 基于FPGA的beep音乐播放器设计 基于FPGA的beep音乐播放器设计 基于FPGA的cordic算法实现DDS sin和cosine波形的产生 基于FPGA的cordic算法实现DDS sin和cosine波形的产生...

mysql创建用户

创建用户 创建 -- 创建用户 itcast , localhost只能够在当前主机localhost访问, 密码123456; create user test01localhost identified by 123456;使用命令show databases;命令&#xff0c;只显示一个数据库&#xff0c;因为没有权限 -- 创建用户 test02, 可以在任意主机访问…...

程序员写好简历的5个关键点

程序员就业竞争大&#xff1f;找不到工作&#xff1f;也许&#xff0c;从简历开始你就被淘汰了.... 在很多的公司中&#xff0c;HR的招聘压力是很大的&#xff0c;浏览每个人的简历的时间可能只有20几秒&#xff0c;所以即使你的工作能力十分的强&#xff0c;但如果你没有在简…...

Vue:关于如何配置一级路由和二级路由的方法

路由的嵌套配置 文章目录 路由的嵌套配置配置一级路由 配置一级路由 创建router文件夹&#xff0c;里面添加index.js文件配置以下代码&#xff1a; import Vue from vue import VueRouter from "vue-router"; import Layout from /views/Layout import ArticleDeta…...

【论文绘图】seaborn分类数据绘图

参考&#xff1a;https://seaborn.pydata.org/tutorial/categorical.html 分类变量关系图中的catplot类似于连续变量中的relplot&#xff0c;默认是stripplot。 分类变量图种类 分类散点图 stripplotswarmplot (kind‘swarm’) 类别分布图 boxplotviolinplotboxenplot …...

KubeSphere Namespace 数据删除事故分析与解决全记录

作者&#xff1a;宇轩辞白&#xff0c;运维研发工程师&#xff0c;目前专注于云原生、Kubernetes、容器、Linux、运维自动化等领域。 前言 2023 年 7 月 23 日在项目上线前夕&#xff0c;K8s 生产环境出现故障&#xff0c;经过紧急修复之后&#xff0c;K8s 环境恢复正常&#…...

mysql场景题:最近7天连续3天登陆用户,字段,id,date(已去重)

1.最近7天连续3天登陆用户&#xff0c;字段&#xff0c;id&#xff0c;date&#xff08;已去重&#xff09; 思路&#xff1a; lag对时间开窗&#xff08;注意时间得转换为时间戳&#xff08;int类型才可以添加后续条件&#xff09;&#xff0c;跳行为2&#xff08;连续3天&am…...

华为OD机试 - 最差产品奖 - 双端队列 deque(Java 2023 B卷 200分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#…...

【校招VIP】前端算法考察之链表算法

考点介绍&#xff1a; 链表是一种物理存储结构上非连续的数据结构,数据的逻辑顺序是通过链表中的指针链接次序实现相互勾连。链表相对数组而言有很多不同之处,在特定场景下能发挥独特的优势。例如链表的插入和删除操作比数组效率高,数组需要改变其他元素的位置,而链表只需要改变…...

uni-app之android离线自定义基座

一 为什么要自定义基座 1&#xff0c;基座其实就是一个app&#xff0c;然后新开发的页面可以直接在手机上面显示&#xff0c;查看效果。 2&#xff0c;默认的基座就是uniapp帮我们打包好的基座app&#xff0c;然后我们可以进行页面的调试。 3&#xff0c;自定义基座主要用来…...

【AWS】实操-保护 Amazon S3 VPC 终端节点通信

文章目录 实验概览目标实验环境任务 1&#xff1a;探索并启动实验环境任务 1.1&#xff1a;探索 Amazon VPC 资源任务 1.2&#xff1a;探索 Amazon EC2 资源任务 1.3&#xff1a;创建 Amazon VPC 终端节点任务 1.4&#xff1a;连接私有 EC2 实例任务 1.5&#xff1a;探索 Amazo…...

C# Color颜色RGB对照表

序号Color色系颜色RGB图例1Color.AliceBlue蓝色艾丽丝蓝240,248,2552Color.AntiqueWhite白色古典白色250,235,2153Color.Aqua&#xff0c;Color.Cyan青色浅蓝色&#xff0c;蓝绿色&#xff0c;青色0,255,255 C# Color颜色RGB对照表_旭东怪的博客-CSDN博客 C#颜色和名称样式对照…...

大家都在签电子合同了,对企业有什么好处?

一、电子合同&#xff0c;已经不是什么新鲜事了可能你身边还有人在犹豫电子合同靠不靠谱&#xff0c;但数据不会骗人。据统计&#xff0c;2025年我国电子合同签约量达到2576.1亿份&#xff0c;市场规模已经达到305.1亿元&#xff0c;这几年年均增速超过23%。说白了&#xff0c;…...

中科院空天院团队Geography and Sustainability:1985年至2022年各国人均耕地面积差距的扩大:对实现可持续发展目标的威胁

耕地作为粮食的载体&#xff0c;是保障粮食安全的关键要素。全球人口增长不可避免地导致耕地扩张以满足对食物、纤维和能源日益增长的需求&#xff0c;这给耕地的承载能力带来沉重负担&#xff0c;并加速了土壤退化与流失&#xff0c;对实现联合国可持续发展目标2&#xff08;S…...

基于ZYNQ与IgH的EtherCAT主站方案:软硬协同实现工业实时控制

1. 项目概述&#xff1a;当工业实时网络遇上可编程SoC在工业自动化领域&#xff0c;实时性和确定性是永恒的核心诉求。EtherCAT作为高性能的工业以太网协议&#xff0c;以其独特的“飞读飞写”数据处理机制和极低的通信抖动&#xff0c;成为了众多高精度运动控制、机器人、半导…...

如何用openpilot升级你的驾驶体验:让300+车型秒变智能座驾

如何用openpilot升级你的驾驶体验&#xff1a;让300车型秒变智能座驾 【免费下载链接】openpilot openpilot is an operating system for robotics. Currently, it upgrades the driver assistance system on 300 supported cars. 项目地址: https://gitcode.com/GitHub_Tren…...

软考系统架构设计师实战论文集:自动驾驶与AI云端架构演进

【引言】 自动驾驶的下半场&#xff0c;早已不再局限于单车智能的角逐&#xff0c;而是演变成了一场关乎云端算力、海量数据治理与大模型工程化的全面战役。当接入的车辆规模突破百万级&#xff0c;当每日回传的工况数据达到 PB 级&#xff0c;云端数据平台的可靠性、扩展性与…...

长期使用后回顾 Taotoken 在 API 调用稳定性与客服响应上的综合体验

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 长期使用后回顾 Taotoken 在 API 调用稳定性与客服响应上的综合体验 作为一项服务于项目开发的基础设施&#xff0c;大模型 API 的…...

1CMS网址导航 支持二级栏目分类 前台界面美观清爽 自适应

内容目录 一、详细介绍二、效果展示1.部分代码2.效果图展示 三、学习资料下载 一、详细介绍 一款基于1CMS制作的导航网站程序&#xff0c;提供简洁高效的上网导航体验。程序支持二级栏目分类&#xff0c;后台管理界面精简高效&#xff0c;前台界面美观清爽。 完善的栏目管理 …...

私有化视频会议平台/视频高清直播点播EasyDSS构建智慧校园音视频协作新生态

在教育数字化转型的关键阶段&#xff0c;智慧校园对音视频协作系统的需求&#xff0c;已从基础的远程沟通&#xff0c;升级为安全可控、体验流畅、管理智能的一体化解决方案。视频直播点播平台EasyDSS凭借技术创新与场景深耕&#xff0c;成为智慧校园建设的核心支撑&#xff0c…...

ElevenLabs湖南话TTS深度评测(2024真实场景压测报告):声调准确率92.6%、连读自然度行业首破88分

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;ElevenLabs湖南话语音技术概览 ElevenLabs 作为全球领先的语音合成平台&#xff0c;其多语言支持能力持续扩展&#xff0c;但需明确指出&#xff1a;截至 2024 年底&#xff0c;ElevenLabs 官方模型库*…...

PPTist完全手册:零成本打造专业演示文稿的终极方案

PPTist完全手册&#xff1a;零成本打造专业演示文稿的终极方案 【免费下载链接】PPTist PowerPoint-ist&#xff08;/pauəpɔintist/&#xff09;, An online presentation application that replicates most of the commonly used features of MS PowerPoint, allowing for t…...