当前位置: 首页 > news >正文

flink 写入数据到 kafka 后,数据过一段时间自动删除

版本

  • flink 1.16.0
  • kafka 2.3

流程描述:

flink利用KafkaSource,读取kafka的数据,然后经过一系列的处理,通过KafkaSink,采用 EXACTLY_ONCE 的模式,将处理后的数据再写入到新的topic中。

问题描述:

数据写入到新的topic后,过上几分钟的时间,利用工具offset explorer观察对应topic的数据量,显示为0。
刚写入没多久的数据消失了 ???大写的懵 ???

定位问题:

  • 首先查看kafka的日志:

在这里插入图片描述

  • 阅读flink 官方文档 kafkaSink的介绍:

DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write
all messages in a Kafka transaction that will be committed to Kafka on
a checkpoint. Thus, if the consumer reads only committed data (see
Kafka consumer config isolation.level), no duplicates will be seen in
case of a Flink restart. However, this delays record visibility
effectively until a checkpoint is written, so adjust the checkpoint
duration accordingly. Please ensure that you use unique
transactionalIdPrefix across your applications running on the same
Kafka cluster such that multiple running jobs do not interfere in
their transactions! Additionally, it is highly recommended to tweak
Kafka transaction timeout (see Kafka producer transaction.timeout.ms)»
maximum checkpoint duration + maximum restart duration or data loss
may happen when Kafka expires an uncommitted transaction.

  • 翻译过来的意思大概就是:

在EXACTLY_ONCE这种模式下,KafkaSink在事务中写入所有的消息,这些消息在checkpoint上提交给kafka。因此,在flink重启的情况下,如果消费者值读取提交的数据,不会看到重复的数据。缺点就是延迟记录可见性,知道写入检查点为止。强烈建议调整kafka的事务超时时间(见Kafka producer transaction.timeout.ms),超时时间要大于【最大检查点持续时间+最大重启持续时间】,否则当Kafka过期未提交的事务时可能会发生数据丢失。

  • 阅读kafka的官网介绍:

Producer Configs:
transaction.timeout.ms:60000(默认值)

参数描述:
The maximum amount of time in ms that the transaction coordinator will
wait for a transaction status update from the producer before
proactively aborting the ongoing transaction.If this value is larger
than the transaction.max.timeout.ms setting in the broker, the request
will fail with a InvalidTransactionTimeout error.

Broker Configs
transaction.max.timeout.ms:900000(默认值)

参数描述:
The maximum allowed timeout for transactions. If a client’s requested
transaction time exceed this, then the broker will return an error in
InitProducerIdRequest. This prevents a client from too large of a
timeout, which can stall consumers reading from topics included in the
transaction.

  • 最后排查
    在flink中设置的超时时间违反了kafka producer对应的参数规定。

解决问题

在kafkaSink的配置中,加入

Properties properties = new Properties();
// 根据上面的介绍自己计算这边的超时时间,满足条件即可
properties.setProperty("transaction.timeout.ms","900000");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic(sinkTopic).setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();

总结

在使用现有框架和工具的时候,往往只是懂得怎么用,具体底层的逻辑、原理,了解的很少。往往只有真正理解了原理,遇到了问题,才会更快、更准确的定位问题、解决问题。

相关文章:

flink 写入数据到 kafka 后,数据过一段时间自动删除

版本 flink 1.16.0kafka 2.3 流程描述&#xff1a; flink利用KafkaSource&#xff0c;读取kafka的数据&#xff0c;然后经过一系列的处理&#xff0c;通过KafkaSink&#xff0c;采用 EXACTLY_ONCE 的模式&#xff0c;将处理后的数据再写入到新的topic中。 问题描述&#xff1…...

golong基础相关操作--一

package main//go语言以包作为管理单位&#xff0c;每个文件必须先声明包 //程序必须有一个main包 // 导入包&#xff0c;必须要要使用 // 变量声明了&#xff0c;必须要使用 import ("fmt" )/* * 包内部的变量 */ var aa 3var ss "kkk"var bb truevar …...

【深度学习】基于卷积神经网络的铁路信号灯识别方法

基于卷积神经网络的铁路信号灯识别方法 摘 要&#xff1a;1 引言2 卷积神经网络模型2.1 卷积神经网络结构2.2.1 卷积层2.2.2 池化层2.2.3 全连接层 3 卷积神经网络算法实现3.1 数据集制作3.2 卷积神经网络的训练过程3.2.1 前向传播过程 4 实验5 结语 摘 要&#xff1a; 目前中…...

DR IP-SoC China 2023 Day演讲预告 | 龙智Perforce专家解析芯片开发中的数字资产管理

2023年9月6日&#xff08;周三&#xff09;&#xff0c;龙智即将亮相于上海举行的D&R IP-SoC China 2023 Day&#xff0c;呈现集成了Perforce与Atlassian产品的芯片开发解决方案&#xff0c;助力企业更好、更快地进行芯片开发。 D&R IP-SoC China 2023 Day 是中国首个…...

解决github连接不上的问题

改 hosts 我们在浏览器输入 GitHub 的网址时&#xff0c;会向 DNS 服务器发送一个请求&#xff0c;获取到 GitHub 网站所在的服务器 IP 地址&#xff0c;从而进行访问。 就像你是一名快递员&#xff0c;在送快递前要先找中间人询问收件人的地址。而 DNS 就是这个告诉你目标地址…...

# DevOps名词定义梳理

DevOps名词定义梳理 极限编程座右铭&#xff1a;如果它令你很受伤&#xff0c;那么就做更多的练习&#xff08;If it hurts, do it more often&#xff09; 经常人们会把这些名词用错&#xff1a; 构建&#xff1a;就是把源代码制成成品的过程&#xff0c;这个过程一般会有单元…...

Redis Cluster

文章目录 一、集群搭建1 节点规划2 集群启动 二、配置一致性1 基本分工2 更新规则 三、Sharding1 数据分片分片实现分片特点 2 slot迁移迁移原因迁移支持集群扩容迁移错误背景现象问题分析验证猜想 集群缩容 3. 请求路由client端server端migrating节点的读写importing节点的读写…...

Pandas常用指令

astype astype的作用是转换数据类型&#xff0c;astype是没办法直接在原df上进行修改的&#xff0c;只能通过赋值的形式将原有的df进行覆盖&#xff0c;即df df.astype(dtype) astype的基本语法 DataFrame.astype(dtype, copyTrue, errorsraise) dtype参数指定将数据类型转换…...

FPGA实战小项目3

基于FPGA的波形发生器 基于FPGA的波形发生器 基于FPGA的beep音乐播放器设计 基于FPGA的beep音乐播放器设计 基于FPGA的cordic算法实现DDS sin和cosine波形的产生 基于FPGA的cordic算法实现DDS sin和cosine波形的产生...

mysql创建用户

创建用户 创建 -- 创建用户 itcast , localhost只能够在当前主机localhost访问, 密码123456; create user test01localhost identified by 123456;使用命令show databases;命令&#xff0c;只显示一个数据库&#xff0c;因为没有权限 -- 创建用户 test02, 可以在任意主机访问…...

程序员写好简历的5个关键点

程序员就业竞争大&#xff1f;找不到工作&#xff1f;也许&#xff0c;从简历开始你就被淘汰了.... 在很多的公司中&#xff0c;HR的招聘压力是很大的&#xff0c;浏览每个人的简历的时间可能只有20几秒&#xff0c;所以即使你的工作能力十分的强&#xff0c;但如果你没有在简…...

Vue:关于如何配置一级路由和二级路由的方法

路由的嵌套配置 文章目录 路由的嵌套配置配置一级路由 配置一级路由 创建router文件夹&#xff0c;里面添加index.js文件配置以下代码&#xff1a; import Vue from vue import VueRouter from "vue-router"; import Layout from /views/Layout import ArticleDeta…...

【论文绘图】seaborn分类数据绘图

参考&#xff1a;https://seaborn.pydata.org/tutorial/categorical.html 分类变量关系图中的catplot类似于连续变量中的relplot&#xff0c;默认是stripplot。 分类变量图种类 分类散点图 stripplotswarmplot (kind‘swarm’) 类别分布图 boxplotviolinplotboxenplot …...

KubeSphere Namespace 数据删除事故分析与解决全记录

作者&#xff1a;宇轩辞白&#xff0c;运维研发工程师&#xff0c;目前专注于云原生、Kubernetes、容器、Linux、运维自动化等领域。 前言 2023 年 7 月 23 日在项目上线前夕&#xff0c;K8s 生产环境出现故障&#xff0c;经过紧急修复之后&#xff0c;K8s 环境恢复正常&#…...

mysql场景题:最近7天连续3天登陆用户,字段,id,date(已去重)

1.最近7天连续3天登陆用户&#xff0c;字段&#xff0c;id&#xff0c;date&#xff08;已去重&#xff09; 思路&#xff1a; lag对时间开窗&#xff08;注意时间得转换为时间戳&#xff08;int类型才可以添加后续条件&#xff09;&#xff0c;跳行为2&#xff08;连续3天&am…...

华为OD机试 - 最差产品奖 - 双端队列 deque(Java 2023 B卷 200分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#…...

【校招VIP】前端算法考察之链表算法

考点介绍&#xff1a; 链表是一种物理存储结构上非连续的数据结构,数据的逻辑顺序是通过链表中的指针链接次序实现相互勾连。链表相对数组而言有很多不同之处,在特定场景下能发挥独特的优势。例如链表的插入和删除操作比数组效率高,数组需要改变其他元素的位置,而链表只需要改变…...

uni-app之android离线自定义基座

一 为什么要自定义基座 1&#xff0c;基座其实就是一个app&#xff0c;然后新开发的页面可以直接在手机上面显示&#xff0c;查看效果。 2&#xff0c;默认的基座就是uniapp帮我们打包好的基座app&#xff0c;然后我们可以进行页面的调试。 3&#xff0c;自定义基座主要用来…...

【AWS】实操-保护 Amazon S3 VPC 终端节点通信

文章目录 实验概览目标实验环境任务 1&#xff1a;探索并启动实验环境任务 1.1&#xff1a;探索 Amazon VPC 资源任务 1.2&#xff1a;探索 Amazon EC2 资源任务 1.3&#xff1a;创建 Amazon VPC 终端节点任务 1.4&#xff1a;连接私有 EC2 实例任务 1.5&#xff1a;探索 Amazo…...

C# Color颜色RGB对照表

序号Color色系颜色RGB图例1Color.AliceBlue蓝色艾丽丝蓝240,248,2552Color.AntiqueWhite白色古典白色250,235,2153Color.Aqua&#xff0c;Color.Cyan青色浅蓝色&#xff0c;蓝绿色&#xff0c;青色0,255,255 C# Color颜色RGB对照表_旭东怪的博客-CSDN博客 C#颜色和名称样式对照…...

告别软路由?实测ARM架构MT7981硬路由刷OpenWrt:性能、功耗与稳定性深度对比

ARM硬路由 vs x86软路由&#xff1a;2024年高性能网络设备终极对决 在家庭与企业网络设备的选择上&#xff0c;x86架构软路由长期占据着性能王座&#xff0c;而传统硬路由则因扩展性不足被极客们视为"玩具"。但2023年MTK发布的MT7981芯片组彻底改变了这一格局——这颗…...

chromedp实战:如何用JavaScript绕过iframe内容获取难题(附完整代码)

chromedp实战&#xff1a;突破iframe内容获取的JavaScript高阶技巧 在电商数据抓取和动态内容监控场景中&#xff0c;iframe始终是爬虫开发者最头疼的障碍之一。传统DOM操作方法在iframe嵌套页面面前往往束手无策&#xff0c;而chromedp提供的Evaluate系列方法则打开了新世界的…...

告别逐行阅读:这个终端工具让你的阅读速度提升200%

告别逐行阅读&#xff1a;这个终端工具让你的阅读速度提升200% 【免费下载链接】speedread A simple terminal-based open source Spritz-alike (per-word RSVP aligned on optimal reading points) 项目地址: https://gitcode.com/gh_mirrors/sp/speedread 在信息爆炸的…...

OpenClaw自动化测试实践:GLM-4.7-Flash驱动脚本执行与结果分析

OpenClaw自动化测试实践&#xff1a;GLM-4.7-Flash驱动脚本执行与结果分析 1. 为什么选择OpenClaw做测试自动化&#xff1f; 上个月接手一个新项目时&#xff0c;我遇到了一个典型的技术矛盾&#xff1a;作为独立开发者&#xff0c;既需要保证代码质量&#xff0c;又没精力手…...

实战指南:Whisper 的 `prompt` 与 `initial_prompt` 参数在语音转文字中的高效应用

1. Whisper 语音转文字的核心参数解析 第一次用 Whisper 做语音转文字时&#xff0c;我发现同样的音频文件&#xff0c;同事转出来的结果总比我的准确率高。后来才发现&#xff0c;原来他偷偷用了一个叫 prompt 的秘密武器。这就像考试时的"小抄"&#xff0c;给模型…...

ATOM-PRINTER嵌入式热敏打印固件深度解析

1. ATOM-PRINTER 嵌入式打印库深度解析与工程实践指南ATOM-PRINTER 是 M5Stack 推出的面向 ESP32 平台的轻量级嵌入式热敏打印固件库&#xff0c;专为 M5Stack Atom 系列微型主控模块&#xff08;搭载 ESP32-WROVER-B&#xff09;设计。该库并非传统意义上的“驱动层”C/C 库&a…...

从 0 手写一个巡检调度系统(五):接入大模型实现巡检问题解读与修复建议

摘要&#xff1a;在既有「架构巡检 → 问题落库」链路中&#xff0c;第一次引入大模型能力&#xff1a;对单条 issue 做「解读 修复建议」&#xff0c;要求输出可解析的结构化 JSON 并落库可追溯。本文记录选型、配置、HTTP 客户端、Prompt 约束与踩坑&#xff0c;便于同类业务…...

InstructPix2Pix在.NET平台的应用开发实战

InstructPix2Pix在.NET平台的应用开发实战 1. 引言&#xff1a;当AI修图遇上.NET开发 想象一下这样的场景&#xff1a;电商平台的商品图片需要批量调整风格&#xff0c;摄影工作室想要快速实现创意效果&#xff0c;或者内容创作者需要即时编辑社交媒体图片。传统图像处理方式…...

深度学习项目训练环境多场景落地:自动驾驶小车图像识别项目快速启动

深度学习项目训练环境多场景落地&#xff1a;自动驾驶小车图像识别项目快速启动 你是不是也遇到过这样的问题&#xff1f;想跑一个深度学习项目&#xff0c;光是配环境就花了大半天&#xff0c;各种版本冲突、依赖报错&#xff0c;好不容易装好了&#xff0c;一运行又提示缺这…...

【全场景优化】WaveTools鸣潮性能调校指南:从卡顿到流畅的完整解决方案

【全场景优化】WaveTools鸣潮性能调校指南&#xff1a;从卡顿到流畅的完整解决方案 【免费下载链接】WaveTools &#x1f9f0;鸣潮工具箱 项目地址: https://gitcode.com/gh_mirrors/wa/WaveTools 问题定位&#xff1a;硬件与软件的兼容性挑战 当代游戏性能优化面临的核…...