Kafka 位移提交
Kafka 位移提交
- 自动提交
- 手动提交
Consumer 的消费位移 : 记录 Consumer 下一条消息的消费位移
- 如 : Consumer 已消费 5 条消息 (位移: 0 - 4) , 此时 Consumer 位移 = 5 : 指向下一条消息的位移
提交位移 (Committing Offsets) : Consumer 向 Kafka 汇报位移数据
- Consumer 能同时消费多个分区的数据,Consumer 要维护每个分区提交各自的位移数据
- 当 Consumer 重启后,能从之前位移继续消费,避免重新消费整个消息
Consumer API 的提交位移的方法 :
- 从用户分 : 自动提交 , 手动提交
- 从 Consumer 分 : 同步提交 , 异步提交
- 自动提交 : Consumer 在后台提交位移,用户无需操作
- 手动提交 : 用户提交位移,Consumer 不管
| 提交位移 | 自动提交 | 配置 | enable.auto.commit = true |
|---|---|---|---|
| 手动提交 | 同步提交 | KafkaConsumer.commitSync | |
| 异步提交 | KafkaConsumer.commitAsync | ||
| 细化位移提交 | commitSync(Map<TopicPartition, OffsetAndMetadata>) | ||
commitAsync(Map<TopicPartition, OffsetAndMetadata>) |
自动提交
Consumer 参数 :
enable.auto.commit = true: 自动提交位移auto.commit.interval.ms(默认值是 5 秒) : Kafka 每 5 秒自动提交一次位移
自动提交位移 :
- 可能出现重复消费
- 例子:Consumer 每 5 秒自动提交一次位移。提交位移 3 秒后出现 Rebalance。在 Rebalance 后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据,在 Rebalance 发生前 3 秒消费的所有数据都会重新消费
设置自动提交位移 :
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}
手动提交
enable.auto.commit = false : 手动提交位移
手动提交位移 :
- 好处 : 更灵活,能把控位移提交的时机和频率
- 缺点 : 用 commitSync() 时,Consumer 处于阻塞状态,直到 Broker 返回提交结果,影响整个应用程序的 TPS
commitSync() :
while (true) {// 返回最新位移。一直等位移提交后才返回 (同步操作)ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}
commitAsync() :
- 异步操作,会立即返回,不会阻塞,不影响 Consumer 的 TPS
- 用回调函数 (callback) 实现提交后的逻辑,如 : 记录日志或处理异常
- 无法自动失败重试
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null)handle(exception);});
}
异步无阻塞式 :
- 用 commitSync 自动重试避免瞬时错误,如 : 网络的瞬时抖动,Broker 端 GC
- 异步处理,不影响 TPS
// 实现异步无阻塞式的位移管理,保证 Consumer 位移的正确性
try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞}
} catch (Exception e) {handle(e); // 处理异常
} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();}
}
更精细的位移管理 :
commitSync(Map<TopicPartition, OffsetAndMetadata>)commitAsync(Map<TopicPartition, OffsetAndMetadata>)- 参数 : Map 对象 : 键 = TopicPartition (消费的分区),值 = OffsetAndMetadata 对象 (位移数据)
// 创建 Map 对象,保存 Consumer 消费要提交的分区位移
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
//...
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record: records) {process(record); // 处理消息// 构造要提交的位移值offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1);// 每 100 条消息提交一次位移if(count % 100 == 0){consumer.commitAsync(offsets, null); // 回调处理逻辑是 null}count++;}
}
相关文章:
Kafka 位移提交
Kafka 位移提交自动提交手动提交Consumer 的消费位移 : 记录 Consumer 下一条消息的消费位移 如 : Consumer 已消费 5 条消息 (位移: 0 - 4) , 此时 Consumer 位移 5 : 指向下一条消息的位移 提交位移 (Committing Offsets) : Consumer 向 Kafka 汇报位移数据 Consumer 能同…...
kubernetes--监控容器运行时:Falco
目录 Falco介绍 Falco架构 Falco的安装 告警规则示列 威胁场景测试: 监控容器创建的不可信任进程(自定义规则) Falco支持五种输出告警方式falco.yaml: Falco告警集中化展示: Falco介绍 Falco是一个Linux安全工具…...
HTTP协议详解(上)
目录 前言: 认识URL HTTP协议方法 通过Fiddler抓包 GET和POST之间典型区别 header详解 HTTP响应状态码 常见状态码解释 状态码分类 HTTP协议报文格式 小结: 前言: HTTP协议属于应用层协议,称为超文本传输协议ÿ…...
java性能-原生内存-内存分析
原生内存最佳实践 内存占用 jVM使用的原生内存和堆内存总和就是一个应用程序的总内存——操作系统角度 jvm启动时候加载的类路径下的jar文件相关的内存和系统其他进程共享资源的可能 测量内存占用 线程是个例外——每当创建一个线程操作系统都会分配一些原生内存存储线程栈…...
c++类与对象
🐶博主主页:ᰔᩚ. 一怀明月ꦿ ❤️🔥专栏系列:线性代数,C初学者入门训练,题解C,C的使用文章 🔥座右铭:“不要等到什么都没有了,才下定决心去做” …...
Java并发编程与API详解
文章目录前言操作系统——进程和线程进程进程组成进程状态进程控制进程创建进程终止进程阻塞和唤醒进程通信线程线程组成线程状态线程控制线程的实现方式用户线程内核线程混合方式CPU调度调度的层次调度的实现调度器调度的时机、切换与过程进程调度的方式闲逛进程两种线程的调度…...
【冲刺蓝桥杯的最后30天】day5
大家好😃,我是想要慢慢变得优秀的向阳🌞同学👨💻,断更了整整一年,又开始恢复CSDN更新,从今天开始更新备战蓝桥30天系列,一共30天,如果对你有帮助或者正在备…...
大厂与小厂招人的区别,看完多少有点不敢相信
前两天在头条发了一条招人的感慨,关于大厂招人和小公司招人的区别。 大厂:有影响力,有钱,能够吸引了大量的应聘者。因此,也就有了筛选的资格,比如必须985名校毕业,必须35岁以下,不能…...
前端ES5对象特性
ES5对象特性 对象和函数的原型 JS中每一个对象都有一个特殊的内置属性,这个特殊的对象可以指向其他的对象 我们通过引用对象的属性key来获取一个value时,它会触发 Get 的操作首先检查该对象是否有对应的属性,如果有的话就使用对象内的如果…...
Linux入门介绍及Linux文件与目录结构
前言 本文小新为大家带来 Linux 入门介绍及Linux 文件与目录结构 相关知识,具体内容包括Linux入门介绍(包括:Linux概述,Linux与Windows区别,CentOS 下载地址),Linux文件与目录结构等进行详尽介绍…...
超赞,用python实现流媒体服务器功能,寥寥几句搞定。
步骤: 要使用Python将实时摄像机传送流写入H5页面,可以使用以下步骤。 1、安装必要的软件包。您需要安装OpenCV和Flask以及gunicorn 与 gevent 。您可以通过在终端中运行以下命令来执行此操作。 pip install opencv-python pip install Flask pip ins…...
冥想第七百二十一天
1.3.3周五,又是周五了。今天又运动了5公里,很舒服轻松。 2.还是往常的生活,休息的也很好,开春后跑的一直很好。 3.早上30分钟健康操。中午转了圈, 给大哥说下周去上海。 4.感谢父母,感谢朋友,感…...
06-Oracle表空间与用户管理
本讲主要内容: 1.表空间管理:表空间的作用,创建,修改,删除及管理; 2.用户管理:创建用户,修改用户,删除用户,修改密码,解锁; 3.用户…...
Mysql 索引特点
承接上文Mysql Server原理简介聚簇索引、二级索引、联合索引分别具备什么样的特点?聚簇索引数据跟索引放在一起的叫聚簇索引;数据和索引分开存储的叫非聚簇索引;innodb存储引擎,数据和文件都放在ibd文件中,实际的数据是…...
读书笔记-终身学习
前言人需要终身成长,也需要终身学习,以下是记录个人读书学习的笔记总结,希望能给大家一点借鉴,仅供参考。笔记1、《匠人精神》秋山利辉是日本神奈川县横滨市都筑区“秋山木工”的经营者,从事订制家具制作业务。是一家小…...
了解栈Stack一篇文章就够了
什么是栈栈是一种特殊的线性表,只允许一端进行数据的插入和删除,即先进后出原则。类似于弹夹先装进去的子弹后面出,后放入的子弹先出。栈的底层原理栈是一种线性结构,所以既能使用数组实现,也能使用链表实现࿰…...
CNStack 助推龙源电力扛起“双碳”大旗
作者:CNStack 容器平台、龙源电力:张悦超 、党旗 龙源电力容器云项目背景 龙源电力集团是世界第一大风电运营商, 随着国家西部大开发战略推进,龙源电力已经把风力发电场铺设到全国各地,甚至是交通极不便利的偏远地区&…...
ruoyi-vue-plus1(控制台相关的输出日志)(p6spy插件)(jackson全局配置)(StopWatch)
Jackson配置在启动项目时,我们发现日志打印出这样几行字,初始化了jacdson配置,我们去查看一下来源找。我们找到了一个全局序列化配置类,其中重写了BigNumberSerializer.INSTANCE进去查看发现了这里对于部分范围的数字进行了转为为…...
【Mybatis】| 如何创建MyBatis的工具类
目录🌟更多专栏请点击👇一、前言二、实现过程1. 创建一个ThreadLocal对象2. 初始化SqlSessionFactory3. 获取并存储sqlSession对象4. 关闭sqlSession对象三、 总代码🌟更多专栏请点击👇 专栏名字🔥Elasticsearch专栏e…...
【Java】DT怎么写?
几个重要的注解 怎么用mockito写单元测试? package Biz;import Client.FileIOClient; import Req.FileRequest; import Res.FileResponse; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks;…...
千问3.5-2B多场景落地:电商商品图识别、医疗报告图释义、工业缺陷初筛
千问3.5-2B多场景落地:电商商品图识别、医疗报告图释义、工业缺陷初筛 1. 开箱即用的视觉理解工具 千问3.5-2B是Qwen系列中的小型视觉语言模型,它能够理解图片内容并生成相关文本描述。这个工具特别适合需要快速处理图片信息的场景,比如电商…...
GB28181视频监控平台EasyCVR助力景区数字化转型,打造一体化视频监控解决方案
随着文旅行业数字化转型进程持续加速,旅游景区的安全管理、服务优化与运营效率提升已成为行业发展的核心诉求。景区场景普遍具有面积广阔、人员流动性强等特点,传统监控方案存在设备兼容性差、可视化管控能力不足等诸多短板,难以满足当前景区…...
千问3.5-2B在办公提效场景:会议白板照片文字提取+要点总结实战
千问3.5-2B在办公提效场景:会议白板照片文字提取要点总结实战 1. 办公场景的痛点与解决方案 1.1 会议记录的传统困境 每次开完会,最让人头疼的就是整理会议记录了。特别是那些在白板上写满讨论要点的会议,你需要: 对着白板照片…...
新手最值得入的一款ai音乐工具
2026年,ai音乐爆发的一年。国内国外各种AI音乐工具层出不穷。想要尝试AI音乐的新手宝宝该怎么去选择呢?市面上大大小小的ai音乐创作软件我基本都尝试过。我觉得只有一款工具是最值得推荐的,也是我使用的最多的。那就是蘑兔AI,你们…...
AI写论文超厉害!4款AI论文生成工具,解决毕业论文写作难题!
还在为撰写期刊论文而烦恼吗?面对成堆的文献、复杂的格式要求以及无休止的修改,许多学术人员常常感到效率低下。这并不奇怪!不过,不必太担心,以下将推荐4款实测有效的AI论文写作工具,它们能帮助你在论文文献…...
【Git】深入解析 ‘.git/index.lock‘ 文件冲突:从报错到彻底解决
1. 当Git突然罢工:index.lock报错现场还原 那天下午我正忙着切换分支部署新功能,突然终端弹出红字警告:fatal: Unable to create .git/index.lock: File exists。这就像你急着上厕所却发现门被反锁,更糟的是你不知道里面到底有没有…...
3D打印雕塑与玻璃钢雕塑的区别、工艺详解及定制雕塑相关疑问解答
3D打印雕塑与玻璃钢雕塑的区别、工艺详解及定制雕塑相关疑问解答3D打印雕塑与玻璃钢雕塑是当代主流雕塑工艺,核心差异在于成型逻辑与材料特性:3D打印以数字化建模为核心,遵循“分层叠加”的增材逻辑;玻璃钢以复合材料为基础&#…...
[Python3高阶编程] - 异步编程深度学习指南二(补充1): 什么是 Barrier 原语 【异步!!!】
asyncio.Barrier 是 Python 3.11(2022 年 10 月)新增的高级同步原语,用于解决特定并发协作场景。一、Barrier 产生的背景:为什么需要它?核心问题:“多协程阶段对齐”在并发编程中,经常遇到这样的…...
002:RAG 入门-LangChain 读取文本
正文 异步/等待解决了什么问题? 在传统同步I/O操作中(如文件读取或Web API调用),调用线程会被阻塞直到操作完成。这在UI应用中会导致界面冻结,在服务器应用中则造成线程资源的浪费。async/await通过非阻塞的异步操作解…...
3个维度玩转League-Toolkit:从入门到精通的实战指南
3个维度玩转League-Toolkit:从入门到精通的实战指南 【免费下载链接】League-Toolkit 兴趣使然的、简单易用的英雄联盟工具集。支持战绩查询、自动秒选等功能。基于 LCU API。 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit League-Toolkit是…...
