谈谈Flink消费kafka的偏移量
offset配置:
flinkKafkaConsumer.setStartFromEarliest():从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromLatest():从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromTimestamp(…):从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。
flinkKafkaConsumer.setStartFromGroupOffsets():默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。
消费者offset提交配置:
配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。5000单位毫秒,代表每5秒进行依次checkpoint
关闭checkpoint:如何禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。
开启checkpoint:如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。
总结:Flink提供了消费kafka数据的offset如何提交给Kafka或者zookeeper(kafka0.8之前,因为0.8之前offset是维护在zookeeper中的)的配置 ;关闭checkpoint的话,flink消费kafka数据 offset取决于kafka客户端的配置;开启checkpoint的话,flink消费kafka offset由jobmanager中的checkpoint维护,并同步到kafka中保持一置,注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况。
使用checkpoint + 两阶段提交来保证仅消费一次kafka中的数据:
Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。
当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时,也没有重复的数据或者未被处理的数据出现,实现仅一次处理的语义。
checkpoint中包含:
当前应用的状态;
当前消费流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;
使用两阶段提交协议保证flink连接外部系统数据仅一次处理
Flink checkpoint机制: 这种机制是在Flink应用内部实现仅一次处理数据的基础。
当谈及“exactly-once semantics”仅一次处理数据时,指的是每条数据只会影响最终结果一次。Flink可以保证当机器出现故障或者程序出现错误时,也没有重复的数据或者未被处理的数据出现,实现仅一次处理的语义。
checkpoint中包含:
当前应用的状态;
当前消费流数据的位置;
注意:checkpoint机制仅限于Flink架构内部保证仅一次处理数据;
使用两阶段提交协议保证flink连接外部系统数据仅一次处理;
当Flink处理完的数据需要写入外部系统时,不保证仅一次处理数据。为了提供端到端的仅一次处理数据,在将数据写入外部系统时也要保证仅一次处理数据,这些外部系统必须提供一种手段来允许程序提交或者回滚写入操作,同时还要保证与Flink的checkpoint机制协调使用,在分布式系统中协调提交和回滚的常见方法就是两阶段提交协议。下面给出一个实例了解Flink如何使用两阶段提交协议来实现数据仅一次处理语义。
该实例是从kafka中读取数据,经过处理数据之后将结果再写回kafka。kafka0.11版本之后支持事务,这也是Flink与kafka交互时仅一次处理的必要条件。【注意:当Flink处理完的数据写入kafka时,即当sink为kafka时,自动封装了两阶段提交协议】。Flink支持仅一次处理数据不仅仅限于和Kafka的结合,只要sink提供了必要的两阶段协调实现,可以对任何sink都能实现仅一次处理数据语义。
其原理如下:
上图Flink程序包含以下组件:
一个从kafka中读取数据的source
一个窗口聚合操作
一个将结果写往kafka的sink。
要使sink支持仅一次处理数据语义,必须以事务的方式将数据写往kafka,将两次checkpoint之间的操作当做一个事务提交,确保出现故障时操作能够被回滚。假设出现故障,在分布式多并发执行sink的应用程序中,仅仅执行单次提交或回滚事务是不够的,因为分布式中的各个sink程序都必须对这些提交或者回滚达成共识,这样才能保证两次checkpoint之间的数据得到一个一致性的结果。Flink使用两阶段提交协议(pre-commit+commit)来实现这个问题。
Filnk checkpointing开始时就进入到pre-commit阶段,具体来说,一旦checkpoint开始,Flink的JobManager向输入流中写入一个checkpoint barrier将流中所有消息分隔成属于本次checkpoint的消息以及属于下次checkpoint的消息,barrier也会在操作算子间流转,对于每个operator来说,该barrier会触发operator的State Backend来为当前的operator来打快照。如下图示:

Flink DataSource中存储着Kafka消费的offset,当完成快照保存后,将chechkpoint barrier传递给下一个operator。这种方式只有在Flink内部状态的场景是可行的,内部状态指的是由Flink的State Backend管理状态,例如上面的window的状态就是内部状态管理。只有当内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些定义好的状态变量即可,checkpoint成功时Flink负责提交这些状态写入,否则就不写入当前状态。
但是,一旦operator操作包含外部状态,事情就不一样了。我们不能像处理内部状态一样处理外部状态,因为外部状态涉及到与外部系统的交互。这种情况下,外部系统必须要支持可以与两阶段提交协议绑定的事务才能保证仅一次处理数据。
本例中的data sink是将数据写往kafka,因为写往kafka是有外部状态的,这种情况下,pre-commit阶段下data sink 在保存状态到State Backend的同时,还必须pre-commit外部的事务。如下图:
当checkpoint barrier在所有的operator都传递一遍切对应的快照都成功完成之后,pre-commit阶段才算完成。这个过程中所有创建的快照都被视为checkpoint的一部分,checkpoint中保存着整个应用的全局状态,当然也包含pre-commit阶段提交的外部状态。当程序出现崩溃时,我们可以回滚状态到最新已经完成快照的时间点。
下一步就是通知所有的operator,告诉它们checkpoint已经完成,这便是两阶段提交的第二个阶段:commit阶段。这个阶段中JobManager会为应用中的每个operator发起checkpoint已经完成的回调逻辑。本例中,DataSource和Winow操作都没有外部状态,因此在该阶段,这两个operator无需执行任何逻辑,但是Data Sink是有外部状态的,因此此时我们需要提交外部事务。如下图示:
汇总以上信息,总结得出:
一旦所有的operator完成各自的pre-commit,他们会发起一个commit操作。
如果一个operator的pre-commit失败,所有其他的operator 的pre-commit必须被终止,并且Flink会回滚到最近成功完成的checkpoint位置。
一旦pre-commit完成,必须要确保commit也要成功,内部的operator和外部的系统都要对此进行保证。假设commit失败【网络故障原因】,Flink程序就会崩溃,然后根据用户重启策略执行重启逻辑,重启之后会再次commit。
因此,所有的operator必须对checkpoint最终结果达成共识,即所有的operator都必须认定数据提交要么成功执行,要么被终止然后回滚。
参考:https://blog.51cto.com/u_16213620/7923389
相关文章:
谈谈Flink消费kafka的偏移量
offset配置: flinkKafkaConsumer.setStartFromEarliest():从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。 flinkKafkaConsumer.setStartFromLatest():从topic的最新offset位置开始处理数据,如果kafka中保存有消费…...
MySQL 高级SQL高级语句(二)
一.CREATE VIEW 视图 可以被当作是虚拟表或存储查询。 视图跟表格的不同是,表格中有实际储存数据记录,而视图是建立在表格之上的一个架构,它本身并不实际储存数据记录。 临时表在用户退出或同数据库的连接断开后就自动消失了,而…...
MySQL之高可用性(四)
高可用性 故障转移和故障恢复 冗余是很好的技术,但实际上只有在遇到故障需要恢复时才会用到。(见鬼,这可以用备份来实现)。冗余一点儿也不会增加可用性或减少宕机。在故障转移的过程中,高可用性是建立在冗余的基础上。当有一个组件失效&…...
招聘智能管理系统设计
设计一个招聘智能管理系统,需要从多个维度考虑,包括但不限于用户界面、功能模块、数据安全、算法模型等。以下是一个基本的设计框架: 1. 系统架构: 前端:提供直观的用户界面,包括应聘者和招聘者的登录/注册…...
达梦数据库系列—15. 表的备份和还原
目录 1、表备份 2、表还原 1、表备份 表备份和表还原恢复,都必须在联机状态下进行。 与备份数据库与表空间不同,不需要备份归档日志,不存在增量备份之说。 CREATE TABLE TAB_FOR_RES_02(C1 INT);CREATE INDEX I_TAB_FOR_RES_02 ON TAB_F…...
无线领夹麦克风哪个品牌音质最好,直播用领夹麦克风还是声卡麦
随着社交媒体的兴起,直播和Vlog已经成为内容创作的新趋势,这些变化不仅改变了人们分享生活的方式,也带动了音频设备市场的增长。无线领夹麦克风,以其便携性和卓越的录音品质,迅速成为视频制作者的重要工具。它们在直播…...
《Windows API每日一练》6.2 客户区鼠标消息
第五章已经讲到,Windows只会把键盘消息发送到当前具有输入焦点的窗口。鼠标消息则不同:当鼠标经过窗口或在窗口内被单击,则即使该窗口是非活动窗口或不带输入焦点, 窗口过程还是会收到鼠标消息。Windows定义了 21种鼠标消息。不过…...
体验升级:扫描全能王智能高清滤镜2.0全面测评
🤵♂️ 个人主页:艾派森的个人主页 ✍🏻作者简介:Python学习者 🐋 希望大家多多支持,我们一起进步!😄 如果文章对你有帮助的话, 欢迎评论 💬点赞Ǵ…...
【JVM系列】JVM调优
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
Linux基础 - Postfix 与 Dovecot 部署邮件系统
目录 零. 简介 一. 部署 二. 设置用户别名信箱 三. Linux 邮件客户端 零. 简介 Postfix 和 Dovecot 是在 Linux 系统中常用于部署邮件系统的两个重要组件。 Postfix 是一种邮件传输代理(MTA),主要负责接收、转发和发送邮件。它具有高性能…...
Qt的安装
一、Qt安装 下载地址:https://download.qt.io/archive/qt/ opencv下载安装 下载地址:https://opencv.org/releases/ 陈年旧文,没有下文,以此纪念。。。。。...
ThreeJS-3D教学十二:ShaderMaterial
一、首先 Shader 是做什么的 Shader 可以自定义每个顶点、每个片元/像素如何显示,而控制顶点和片元显示是通过设置 vertexShader 顶点着色器和 fragmentShader 片元着色器,这两个着色器用在 ShaderMaterial 和 RawShaderMaterial 材质上。 我们先看一个例…...
计算机网络面试TCP篇之TCP三次握手与四次挥手
TCP 三次握手与四次挥手面试题 任 TCP 虐我千百遍,我仍待 TCP 如初恋。 巨巨巨巨长的提纲,发车!发车! PS:本次文章不涉及 TCP 流量控制、拥塞控制、可靠性传输等方面知识,这些知识在这篇: TCP …...
Python-数据分析组合可视化实例图【附完整源码】
数据分析组合可视化实例图 开篇:应女朋友的要求,于是写下了这篇详细的数据可视化代码及完整注释 一:柱状图、折线图横向组合网格布局 本段代码使用了pyecharts库来创建一个包含多个图表(柱状图、折线图)和网格布局的…...
【JavaEE】Spring Web MVC详解
一.基本概念. 1.什么是Spring Web MVC? 官方链接: https://docs.spring.io/spring-framework/reference/web/webmvc.html Spring Web MVC is the original web framework built on the Servlet API and has been included in the Spring Framework from the very beginning…...
docker安装rocketMq5x以上的版本
1.背景 安装RocketMQ 5.x以上的版本主要是因为新版本引入了许多性能优化、新功能以及对已有特性的增强,这些改进可以帮助提升消息队列系统的稳定性和效率。 1.性能提升:RocketMQ 5.x版本通常包括了对消息处理速度、吞吐量和延迟的优化,使得系…...
【Spring】DAO 和 Repository 的区别
DAO 和 Repository 的区别 1.概述2.DAO 模式2.1 User2.2 UserDao2.3 UserDaoImpl 3.Repository 模式3.1 UserRepository3.2 UserRepositoryImpl 4.具有多个 DAO 的 Repository 模式4.1 Tweet4.2 TweetDao 和 TweetDaoImpl4.3 增强 User 域4.4 UserRepositoryImpl 5.比较两种模式…...
高阶面试-秒杀系统的设计
场景 特价商品如茅台,在8月1日22点10分0秒开始秒杀 平台用户量:几千万,预计几十万用户感兴趣 需求 临时性的活动,不要太大技术改动 原则 商品不能超卖下单成功的订单不能丢失服务器和数据库不能崩溃尽量不让机器人抢走商品 …...
四十五、 证券基金业数据出境有无特别规范需要注意?
证券基金业数据合规除应遵守本《实务问答》前述的各项通用规定外,还应注意中国证券监督管理委员会等其他机构发布的相关规范。其中,与数据出境相关的主要包括《证券期货业数据分类分级指引》(JR/T 0158—2018,2018年 9月 27日实施…...
02.Linux下安装FFmpeg
目录 一、下载FFmpeg的编译源码 二、编译源码 三、ffmpeg工具结构解析 1、bin目录 2、include库 3、lib库 四、注意事项 五、可能出现的一些问题 1、某些工具未安装/版本过久 2、缺少pkg-config工具 3、缺少ffmplay FFmpeg 是一个开源的跨平台音视频处理工具集&…...
K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
音视频——I2S 协议详解
I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议,专门用于在数字音频设备之间传输数字音频数据。它由飞利浦(Philips)公司开发,以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...
【笔记】WSL 中 Rust 安装与测试完整记录
#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统:Ubuntu 24.04 LTS (WSL2)架构:x86_64 (GNU/Linux)Rust 版本:rustc 1.87.0 (2025-05-09)Cargo 版本:cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...
Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换
目录 关键点 技术实现1 技术实现2 摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车…...
