当前位置: 首页 > news >正文

【kafka实践】11|消费位移提交

消费者位移

消费者位移这一节介绍了消费者位移的基本概念和消息格式,本节我们来聊聊消费位移的提交。

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

提交位移主要是为了记录Consumer 的消费进度,这样当 Consumer 发生重启之后,就能够从 Kafka 中读取之前提交的位移,从而继续消费,避免以避免重复消费,或消息丢失等。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。

因为位移提交非常灵活,你完全可以提交任何位移值。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20,那么从理论上讲就丢失了10条数据;相反地,如果你提交的位移值是 5,那么就重复消费5条数据。所以你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。

位移提交

从使用角度来说位移提交分为自动提交和手动提交;从 Consumer 的角度来说,位移提交分为同步提交和异步提交。

自动提交

默认情况下就是自动提交,你根本无需关心位移提交的事情,Consumer 端有个参数 enable.auto.commit默认值是 true,即 Consumer 默认自动提交位移的。还有个参数auto.commit.interval.ms,默认值是 5 秒,即每 5 秒会为你自动提交一次位移。

这里我们用一段简单的代码来看看这两个参数怎么使用

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "kafka_test");// 自动提交props.put("enable.auto.commit", "true");// 间隔2秒  props.put("auto.commit.interval.ms", "2000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// process}}
手动提交

设置 enable.auto.commit 为 false,还需要调用相应的 API 手动提交位移,KafkaConsumer.commitSync()。

// props.put("enable.auto.commit", "false");
while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));// 处理消息process(records); try {// 同步提交consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}

commitSync()有一个缺陷,提交时Consumer 程序会处于阻塞状态,在生产系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。虽然也可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。鉴于这个问题,Kafka 提供了另一个 异步API 方法:KafkaConsumer.commitAsync()。

不过commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

我们可以将 commitSync 和 commitAsync 组合使用以规避这样的问题:

   try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 异步提交规避阻塞commitAysnc(); }
} catch(Exception e) {} finally {try {// 使用同步阻塞式提交兜底consumer.commitSync(); } finally {consumer.close();
}
}

同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,如果你自行编写代码开发一套 Kafka Consumer 应用,可以尝试使用上面的代码范例来实现手动的位移提交。

其实还有一种更高级的提交方式,就是分批量提交,就不再这里展开,留给大家查资料学习,也欢迎各位同学在评论区交流讨论!

相关文章:

【kafka实践】11|消费位移提交

消费者位移 消费者位移这一节介绍了消费者位移的基本概念和消息格式&#xff0c;本节我们来聊聊消费位移的提交。 Consumer 需要向 Kafka 汇报自己的位移数据&#xff0c;这个汇报过程被称为提交位移&#xff08;Committing Offsets&#xff09;。因为 Consumer 能够同时消费…...

Mac卸载、安装Python

卸载 说明 对于删除 Python&#xff0c;我们首先要知道其具体都安装了什么&#xff0c;实际上&#xff0c;在安装 Python 时&#xff0c;其自动生成&#xff1a; Python framework&#xff0c;即 Python 框架&#xff1b;Python 应用目录&#xff1b;指向 Python 的连接。 …...

算法——滑动窗口

滑动窗口大致分为两类&#xff1a;一类是窗口长度固定的&#xff0c;即left和right可以一起移动&#xff1b;另一种是窗口的长度变化&#xff08;例如前五道题&#xff09;&#xff0c;即right疯狂移动&#xff0c;left没怎么动&#xff0c;这类题需要观察单调性(即指针)等各方…...

带头双向循环链表:一种高效的数据结构

&#x1f493; 博客主页&#xff1a;江池俊的博客⏩ 收录专栏&#xff1a;数据结构探索&#x1f449;专栏推荐&#xff1a;✅cpolar ✅C语言进阶之路&#x1f4bb;代码仓库&#xff1a;江池俊的代码仓库&#x1f525;编译环境&#xff1a;Visual Studio 2022&#x1f389;欢迎大…...

C++基础 -34- 输入输出运算符重载

输出运算符重载格式 ostream & operator<<(ostream &out,person a) {cout << a.a << endl;return out; }举例输出运算符重载 #include "iostream"using namespace std;class person {public:person(int a):a(a){}int a; };ostream &…...

MimicGen论文分析与资料汇总

MimicGen论文分析与资料汇总 前言论文分析相关资料汇总 前言 论文分析 相关资料汇总 Paper:MimicGen: A Data Generation System for Scalable Robot Learning using Human Demonstrations mimicgen.github 破局利刃&#xff01;英伟达合成数据新成果&#xff1a;为机器人造…...

JAVA-每一页PDF转图片

结论&#xff1a;1、iText几乎找不到如何PDF转图片的信息&#xff0c;但能找到获取到PDF里面的图片并保存下来的信息&#xff1b;2、PDF box满大街都是参考代码&#xff08;下面会附上一个作为参考&#xff09;&#xff1b;3、收费的库使用起来更简单&#xff0c;但就是要收费&…...

VS安装QT VS Tools编译无法通过

场景&#xff1a; 项目拷贝到虚拟机内部后&#xff0c;配置好相关环境后无法编译&#xff0c;安装QT VS Tools后依旧无法编译&#xff0c;查找资料网上说的是QT工具版本不一致导致的&#xff0c;但反复试了几个版本后依旧无法编译通过。错误信息如下&#xff1a; C:\Users\Ad…...

【C语言之 CJson】学CJson看这一篇就够了

文章目录 前言一、下载CJson二、创建一个json2.1 创建json对象cJSON类型详解 2.2 创建键值对2.3 添加嵌套的 JSON 对象2.4 添加数组创建数组添加元素到数组添加数组到obj 2.5 将 JSON 对象转为字符串2.6 释放内存2.7 示例代码 三、解析json3.1 解析json root3.2 把一个key解析出…...

使用Java语言实现字母之间的大小写转换

这个类的作用为实现字母之间的大小写转换&#xff0c;通过加减32来完成。 输入的代码 import java.util.Scanner; public class WordChangeDemo {public static void main(String[] args){try (Scanner in new Scanner(System.in)) {System.out.println("请输入您要进…...

Docker的数据持久化;Docker网络;Dockerfile编写

Docker的数据持久化&#xff1b;Docker网络&#xff1b;Dockerfile编写&#xff1b; 文章目录 Docker的数据持久化&#xff1b;Docker网络&#xff1b;Dockerfile编写&#xff1b;**Docker的数据持久化**1&#xff09;将本地目录映射到容器里2&#xff09;数据卷3&#xff09;将…...

OpenHarmony亮相MTSC 2023 | 质量效率共进,赋能应用生态发展

11月25日&#xff0c;MTSC 2023第十二届中国互联网测试开发大会在深圳登喜路国际大酒店圆满举行。大会以“软件质量保障体系和测试研发技术交流”为主要目的&#xff0c;旨在为行业搭建一个深入探讨和交流的桥梁和平台。OpenAtom OpenHarmony&#xff08;简称“OpenHarmony”&a…...

windows11 调整鼠标灵敏度方法

首先 我们打开电脑设置 或者在 此电脑/此计算机/我的电脑 右击选择属性 然后 有的电脑 左侧菜单中 直接就有 设备 然后在设备中直接就可以找到 鼠标 选项 调整光标速度即可 如果操作系统和我的一样 可以直接搜索鼠标 然后 选择 鼠标设置 然后 调整上面的鼠标指针速度即可...

贪心算法个人见解

目录 基本思想&#xff1a; 贪心算法的步骤&#xff1a; 示例&#xff1a; 贪心算法&#xff08;Greedy Algorithm&#xff09;是一种基于贪心策略的算法范式&#xff0c;它在每一步选择中都采取当前状态下的最优选择&#xff0c;而不考虑全局最优解。贪心算法通常适用于那些…...

Win中Redis部署与配置

1.下载msi版本 下载传送门 2.双击next-->next安装安装 3.密码配置以及开机自启 在配置文件中配置相应配置进行配置密码以及端口和ip port 6379指定 Redis 监听端口&#xff0c;默认端口为 6379&#xff0c;作者在自己的一篇博文中解释了为什么选用 6379 作为默认端口&…...

vue el-button 封装及使用

使用了 Element UI 中的 el-button 组件&#xff0c;并对其进行了封装和定制。 创建组件index.vue (src/common-ui/button/index.vue) <template><el-buttonclass"h-button":type"type":icon"hIcon":disabled"disabled"clic…...

QT之QMediaPlayer的用法

QT之QMediaPlayer的用法 成员函数例程 成员函数 1)setMedia(const QMediaContent &media, QIODevice *stream nullptr) 设置要播放的媒体内容&#xff0c;其中参数media指定了媒体内容&#xff0c;stream参数指定了用于读取媒体的输入设备&#xff08;如文件流&#xff0…...

TCP_报文格式解读

报文格式 header部分字段含义解析 固定字段 对于header中固定部分字段含义&#xff0c;见之前的blog《TCP报文分析》&#xff1b; 对部分字段含义补充说明 Data Offset&#xff1a;4bit&#xff0c;tcp header的长度&#xff0c;单位&#xff1a;32bit&#xff08;4字节&…...

C语言面试之旅:掌握基础,探索深度(面试实战之c语言关键词下篇)

一.枚举&#xff08; enum&#xff09; 枚举是 C 语言中的一种基本数据类型&#xff0c;用于定义一组具有离散值的常量&#xff0c;它可以让数据更简洁&#xff0c;更易读。枚举类型通常用于为程序中的一组相关的常量取名字&#xff0c;以便于程序的可读性和维护性。定义一个枚…...

Java学习第十三天

Java多态 多态是同一个行为具有多个不同表现形式或形态的能力。 多态就是同一个接口&#xff0c;使用不同的实例而执行不同操作 多态性是对象多种表现形式的体现。 多态的优点 1. 消除类型之间的耦合关系2. 可替换性3. 可扩充性4. 接口性5. 灵活性6. 简化性 多态存在的三个…...

手把手教你为全志Tina Linux添加新SPI屏驱动:以GC9306和HX8357C为例

全志Tina Linux SPI屏驱动移植实战&#xff1a;从裸机到内核框架的完整指南 在嵌入式Linux开发中&#xff0c;LCD显示屏的驱动移植是一个常见但颇具挑战性的任务。不同于裸机环境下的直接寄存器操作&#xff0c;Linux内核要求驱动程序遵循特定的框架和规范。本文将深入探讨如何…...

Adafruit Bluefruit Playground:iOS与蓝牙开发板的物联网交互实战

1. 项目概述与核心价值 如果你手头有一块Adafruit的Circuit Playground Bluefruit或者CLUE开发板&#xff0c;想让它在你的iPhone或iPad上“活”起来&#xff0c;变成一个能远程控制彩灯、读取传感器数据甚至演奏音乐的智能玩具&#xff0c;那么Adafruit Bluefruit Playground …...

东方博宜OJ入门题解:从A+B到高精度算法的实战解析

1. 东方博宜OJ平台入门指南 第一次接触在线评测系统(OJ)时&#xff0c;很多人都会被各种题目搞得晕头转向。东方博宜OJ作为国内知名的编程练习平台&#xff0c;特别适合编程新手从零开始系统学习。我刚开始刷题时也走过不少弯路&#xff0c;今天就和大家分享一些实战经验。 这…...

Banana Pi BPI-M2S边缘AI开发板:双千兆网口与5TOPS NPU实战指南

1. 项目概述&#xff1a;一块为边缘AI与网络应用而生的全能型单板计算机 最近在捣鼓一些边缘计算和轻量级网络服务的项目&#xff0c;一直在寻找一块性能足够、接口丰富&#xff0c;同时性价比又不错的开发板。市面上常见的树莓派4B固然经典&#xff0c;但在面对需要一定AI推理…...

别再硬啃英文文档了!手把手教你给Vue2项目里的DHTMLX Gantt甘特图做中文汉化

Vue2项目深度汉化DHTMLX Gantt甘特图实战指南 在项目管理工具中&#xff0c;甘特图因其直观的时间轴展示方式而备受青睐。DHTMLX Gantt作为一款功能强大的甘特图组件&#xff0c;却在中文环境下存在明显的本地化短板。本文将彻底解决这一问题&#xff0c;从界面文本到日期格式…...

Motorola LS2208条码扫描器USB接口模式解析与Python数据采集实战

1. 项目概述&#xff1a;从“扫码枪”到数据采集终端在仓库、快递站或者超市收银台&#xff0c;我们每天都能看到工作人员拿着一个像手枪一样的东西&#xff0c;“嘀”一声&#xff0c;商品信息就录入了系统。这个设备就是条码扫描器&#xff0c;很多人习惯叫它“扫码枪”。你可…...

国密SM2的P7格式签名,和PKCS#7到底有啥区别?一张图讲清楚

国密SM2的P7格式签名与PKCS#7核心差异解析&#xff1a;从结构到实战 在密码学应用开发中&#xff0c;数字签名格式的标准化是实现安全通信的基础。当开发者从国际通用的PKCS#7标准转向中国自主研发的国密SM2算法体系时&#xff0c;P7签名格式的差异往往成为第一个需要跨越的技术…...

基于Arduino与IRLib2的万能遥控器DIY:从红外解码到蓝牙HID的嵌入式实践

1. 项目概述与核心价值如果你和我一样&#xff0c;家里电视、机顶盒、音响、空调的遥控器堆满了茶几&#xff0c;每次想用都得翻找半天&#xff0c;或者你正在为一位行动不便的亲友寻找一种更便捷的控制家电的方式&#xff0c;那么这个基于Arduino和IRLib2的万能遥控器DIY项目&…...

连接池失效——高并发下的隐形杀手

连接池失效——高并发下的隐形杀手 系统挂了 现象&#xff1a;用户打开页面&#xff0c;一直转圈。5分钟后&#xff0c;页面报错。 错误日志&#xff1a; org.apache.tomcat.jdbc.pool.PoolExhaustedException: [http-nio-8080-exec-72] Timeout: Pool empty. Unable to fetch …...

水凝膜、钢化膜、护景贴大对决:一张表看懂该买谁

水凝膜、钢化膜、护景贴大对决&#xff1a;一张表看懂该买谁手机屏幕保护膜主要有三种&#xff1a;水凝膜、普通钢化膜和护景贴&#xff08;悟赫德为代表&#xff09;。很多人不知道它们到底有什么区别&#xff0c;我们从六个维度给你讲清楚。材料结构。水凝膜是单层软塑料&…...