当前位置: 首页 > news >正文

Kafka 位移提交

Kafka 位移提交

  • 自动提交
  • 手动提交

Consumer 的消费位移 : 记录 Consumer 下一条消息的消费位移

  • 如 : Consumer 已消费 5 条消息 (位移: 0 - 4) , 此时 Consumer 位移 = 5 : 指向下一条消息的位移

提交位移 (Committing Offsets) : Consumer 向 Kafka 汇报位移数据

  • Consumer 能同时消费多个分区的数据,Consumer 要维护每个分区提交各自的位移数据
  • 当 Consumer 重启后,能从之前位移继续消费,避免重新消费整个消息

Consumer API 的提交位移的方法 :

  • 从用户分 : 自动提交 , 手动提交
  • 从 Consumer 分 : 同步提交 , 异步提交
  • 自动提交 : Consumer 在后台提交位移,用户无需操作
  • 手动提交 : 用户提交位移,Consumer 不管
提交位移自动提交配置enable.auto.commit = true
手动提交同步提交KafkaConsumer.commitSync
异步提交KafkaConsumer.commitAsync
细化位移提交commitSync(Map<TopicPartition, OffsetAndMetadata>)
commitAsync(Map<TopicPartition, OffsetAndMetadata>)

自动提交

Consumer 参数 :

  • enable.auto.commit = true : 自动提交位移
  • auto.commit.interval.ms (默认值是 5 秒) : Kafka 每 5 秒自动提交一次位移

自动提交位移 :

  • 可能出现重复消费
  • 例子:Consumer 每 5 秒自动提交一次位移。提交位移 3 秒后出现 Rebalance。在 Rebalance 后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据,在 Rebalance 发生前 3 秒消费的所有数据都会重新消费

设置自动提交位移 :

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}

手动提交

enable.auto.commit = false : 手动提交位移

手动提交位移 :

  • 好处 : 更灵活,能把控位移提交的时机和频率
  • 缺点 : 用 commitSync() 时,Consumer 处于阻塞状态,直到 Broker 返回提交结果,影响整个应用程序的 TPS

commitSync() :

while (true) {// 返回最新位移。一直等位移提交后才返回 (同步操作)ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}

commitAsync() :

  • 异步操作,会立即返回,不会阻塞,不影响 Consumer 的 TPS
  • 用回调函数 (callback) 实现提交后的逻辑,如 : 记录日志或处理异常
  • 无法自动失败重试
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null)handle(exception);});
}

异步无阻塞式 :

  • 用 commitSync 自动重试避免瞬时错误,如 : 网络的瞬时抖动,Broker 端 GC
  • 异步处理,不影响 TPS
// 实现异步无阻塞式的位移管理,保证 Consumer 位移的正确性
try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞}
} catch (Exception e) {handle(e); // 处理异常
} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();}
}

更精细的位移管理 :

  • commitSync(Map<TopicPartition, OffsetAndMetadata>)
  • commitAsync(Map<TopicPartition, OffsetAndMetadata>)
  • 参数 : Map 对象 : 键 = TopicPartition (消费的分区),值 = OffsetAndMetadata 对象 (位移数据)
// 创建 Map 对象,保存 Consumer 消费要提交的分区位移
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
//...
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record: records) {process(record);  // 处理消息// 构造要提交的位移值offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1);// 每 100 条消息提交一次位移if(count % 100 == 0{consumer.commitAsync(offsets, null); // 回调处理逻辑是 null}count++;}
}

相关文章:

Kafka 位移提交

Kafka 位移提交自动提交手动提交Consumer 的消费位移 : 记录 Consumer 下一条消息的消费位移 如 : Consumer 已消费 5 条消息 (位移: 0 - 4) , 此时 Consumer 位移 5 : 指向下一条消息的位移 提交位移 (Committing Offsets) : Consumer 向 Kafka 汇报位移数据 Consumer 能同…...

kubernetes--监控容器运行时:Falco

目录 Falco介绍 Falco架构 Falco的安装 告警规则示列 威胁场景测试&#xff1a; 监控容器创建的不可信任进程&#xff08;自定义规则&#xff09; Falco支持五种输出告警方式falco.yaml&#xff1a; Falco告警集中化展示&#xff1a; Falco介绍 Falco是一个Linux安全工具…...

HTTP协议详解(上)

目录 前言&#xff1a; 认识URL HTTP协议方法 通过Fiddler抓包 GET和POST之间典型区别 header详解 HTTP响应状态码 常见状态码解释 状态码分类 HTTP协议报文格式 小结&#xff1a; 前言&#xff1a; HTTP协议属于应用层协议&#xff0c;称为超文本传输协议&#xff…...

java性能-原生内存-内存分析

原生内存最佳实践 内存占用 jVM使用的原生内存和堆内存总和就是一个应用程序的总内存——操作系统角度 jvm启动时候加载的类路径下的jar文件相关的内存和系统其他进程共享资源的可能 测量内存占用 线程是个例外——每当创建一个线程操作系统都会分配一些原生内存存储线程栈…...

c++类与对象

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练&#xff0c;题解C&#xff0c;C的使用文章 &#x1f525;座右铭&#xff1a;“不要等到什么都没有了&#xff0c;才下定决心去做” &#x1…...

Java并发编程与API详解

文章目录前言操作系统——进程和线程进程进程组成进程状态进程控制进程创建进程终止进程阻塞和唤醒进程通信线程线程组成线程状态线程控制线程的实现方式用户线程内核线程混合方式CPU调度调度的层次调度的实现调度器调度的时机、切换与过程进程调度的方式闲逛进程两种线程的调度…...

【冲刺蓝桥杯的最后30天】day5

大家好&#x1f603;&#xff0c;我是想要慢慢变得优秀的向阳&#x1f31e;同学&#x1f468;‍&#x1f4bb;&#xff0c;断更了整整一年&#xff0c;又开始恢复CSDN更新&#xff0c;从今天开始更新备战蓝桥30天系列&#xff0c;一共30天&#xff0c;如果对你有帮助或者正在备…...

大厂与小厂招人的区别,看完多少有点不敢相信

前两天在头条发了一条招人的感慨&#xff0c;关于大厂招人和小公司招人的区别。 大厂&#xff1a;有影响力&#xff0c;有钱&#xff0c;能够吸引了大量的应聘者。因此&#xff0c;也就有了筛选的资格&#xff0c;比如必须985名校毕业&#xff0c;必须35岁以下&#xff0c;不能…...

前端ES5对象特性

ES5对象特性 对象和函数的原型 JS中每一个对象都有一个特殊的内置属性&#xff0c;这个特殊的对象可以指向其他的对象 我们通过引用对象的属性key来获取一个value时&#xff0c;它会触发 Get 的操作首先检查该对象是否有对应的属性&#xff0c;如果有的话就使用对象内的如果…...

Linux入门介绍及Linux文件与目录结构

前言 本文小新为大家带来 Linux 入门介绍及Linux 文件与目录结构 相关知识&#xff0c;具体内容包括Linux入门介绍&#xff08;包括&#xff1a;Linux概述&#xff0c;Linux与Windows区别&#xff0c;CentOS 下载地址&#xff09;&#xff0c;Linux文件与目录结构等进行详尽介绍…...

超赞,用python实现流媒体服务器功能,寥寥几句搞定。

步骤&#xff1a; 要使用Python将实时摄像机传送流写入H5页面&#xff0c;可以使用以下步骤。 1、安装必要的软件包。您需要安装OpenCV和Flask以及gunicorn 与 gevent 。您可以通过在终端中运行以下命令来执行此操作。 pip install opencv-python pip install Flask pip ins…...

冥想第七百二十一天

1.3.3周五&#xff0c;又是周五了。今天又运动了5公里&#xff0c;很舒服轻松。 2.还是往常的生活&#xff0c;休息的也很好&#xff0c;开春后跑的一直很好。 3.早上30分钟健康操。中午转了圈&#xff0c; 给大哥说下周去上海。 4.感谢父母&#xff0c;感谢朋友&#xff0c;感…...

06-Oracle表空间与用户管理

本讲主要内容&#xff1a; 1.表空间管理&#xff1a;表空间的作用&#xff0c;创建&#xff0c;修改&#xff0c;删除及管理&#xff1b; 2.用户管理&#xff1a;创建用户&#xff0c;修改用户&#xff0c;删除用户&#xff0c;修改密码&#xff0c;解锁&#xff1b; 3.用户…...

Mysql 索引特点

承接上文Mysql Server原理简介聚簇索引、二级索引、联合索引分别具备什么样的特点&#xff1f;聚簇索引数据跟索引放在一起的叫聚簇索引&#xff1b;数据和索引分开存储的叫非聚簇索引&#xff1b;innodb存储引擎&#xff0c;数据和文件都放在ibd文件中&#xff0c;实际的数据是…...

读书笔记-终身学习

前言人需要终身成长&#xff0c;也需要终身学习&#xff0c;以下是记录个人读书学习的笔记总结&#xff0c;希望能给大家一点借鉴&#xff0c;仅供参考。笔记1、《匠人精神》秋山利辉是日本神奈川县横滨市都筑区“秋山木工”的经营者&#xff0c;从事订制家具制作业务。是一家小…...

了解栈Stack一篇文章就够了

什么是栈栈是一种特殊的线性表&#xff0c;只允许一端进行数据的插入和删除&#xff0c;即先进后出原则。类似于弹夹先装进去的子弹后面出&#xff0c;后放入的子弹先出。栈的底层原理栈是一种线性结构&#xff0c;所以既能使用数组实现&#xff0c;也能使用链表实现&#xff0…...

CNStack 助推龙源电力扛起“双碳”大旗

作者&#xff1a;CNStack 容器平台、龙源电力&#xff1a;张悦超 、党旗 龙源电力容器云项目背景 龙源电力集团是世界第一大风电运营商&#xff0c; 随着国家西部大开发战略推进&#xff0c;龙源电力已经把风力发电场铺设到全国各地&#xff0c;甚至是交通极不便利的偏远地区&…...

ruoyi-vue-plus1(控制台相关的输出日志)(p6spy插件)(jackson全局配置)(StopWatch)

Jackson配置在启动项目时&#xff0c;我们发现日志打印出这样几行字&#xff0c;初始化了jacdson配置&#xff0c;我们去查看一下来源找。我们找到了一个全局序列化配置类&#xff0c;其中重写了BigNumberSerializer.INSTANCE进去查看发现了这里对于部分范围的数字进行了转为为…...

【Mybatis】| 如何创建MyBatis的工具类

目录&#x1f31f;更多专栏请点击&#x1f447;一、前言二、实现过程1. 创建一个ThreadLocal对象2. 初始化SqlSessionFactory3. 获取并存储sqlSession对象4. 关闭sqlSession对象三、 总代码&#x1f31f;更多专栏请点击&#x1f447; 专栏名字&#x1f525;Elasticsearch专栏e…...

【Java】DT怎么写?

几个重要的注解 怎么用mockito写单元测试&#xff1f; package Biz;import Client.FileIOClient; import Req.FileRequest; import Res.FileResponse; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks;…...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

Opencv中的addweighted函数

一.addweighted函数作用 addweighted&#xff08;&#xff09;是OpenCV库中用于图像处理的函数&#xff0c;主要功能是将两个输入图像&#xff08;尺寸和类型相同&#xff09;按照指定的权重进行加权叠加&#xff08;图像融合&#xff09;&#xff0c;并添加一个标量值&#x…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

Robots.txt 文件

什么是robots.txt&#xff1f; robots.txt 是一个位于网站根目录下的文本文件&#xff08;如&#xff1a;https://example.com/robots.txt&#xff09;&#xff0c;它用于指导网络爬虫&#xff08;如搜索引擎的蜘蛛程序&#xff09;如何抓取该网站的内容。这个文件遵循 Robots…...

2023赣州旅游投资集团

单选题 1.“不登高山&#xff0c;不知天之高也&#xff1b;不临深溪&#xff0c;不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...

uniapp 实现腾讯云IM群文件上传下载功能

UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中&#xff0c;群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS&#xff0c;在uniapp中实现&#xff1a; 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...

恶补电源:1.电桥

一、元器件的选择 搜索并选择电桥&#xff0c;再multisim中选择FWB&#xff0c;就有各种型号的电桥: 电桥是用来干嘛的呢&#xff1f; 它是一个由四个二极管搭成的“桥梁”形状的电路&#xff0c;用来把交流电&#xff08;AC&#xff09;变成直流电&#xff08;DC&#xff09;。…...