当前位置: 首页 > news >正文

【kafka实践】11|消费位移提交

消费者位移

消费者位移这一节介绍了消费者位移的基本概念和消息格式,本节我们来聊聊消费位移的提交。

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

提交位移主要是为了记录Consumer 的消费进度,这样当 Consumer 发生重启之后,就能够从 Kafka 中读取之前提交的位移,从而继续消费,避免以避免重复消费,或消息丢失等。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。

因为位移提交非常灵活,你完全可以提交任何位移值。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20,那么从理论上讲就丢失了10条数据;相反地,如果你提交的位移值是 5,那么就重复消费5条数据。所以你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。

位移提交

从使用角度来说位移提交分为自动提交和手动提交;从 Consumer 的角度来说,位移提交分为同步提交和异步提交。

自动提交

默认情况下就是自动提交,你根本无需关心位移提交的事情,Consumer 端有个参数 enable.auto.commit默认值是 true,即 Consumer 默认自动提交位移的。还有个参数auto.commit.interval.ms,默认值是 5 秒,即每 5 秒会为你自动提交一次位移。

这里我们用一段简单的代码来看看这两个参数怎么使用

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "kafka_test");// 自动提交props.put("enable.auto.commit", "true");// 间隔2秒  props.put("auto.commit.interval.ms", "2000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// process}}
手动提交

设置 enable.auto.commit 为 false,还需要调用相应的 API 手动提交位移,KafkaConsumer.commitSync()。

// props.put("enable.auto.commit", "false");
while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));// 处理消息process(records); try {// 同步提交consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}

commitSync()有一个缺陷,提交时Consumer 程序会处于阻塞状态,在生产系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。虽然也可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。鉴于这个问题,Kafka 提供了另一个 异步API 方法:KafkaConsumer.commitAsync()。

不过commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

我们可以将 commitSync 和 commitAsync 组合使用以规避这样的问题:

   try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 异步提交规避阻塞commitAysnc(); }
} catch(Exception e) {} finally {try {// 使用同步阻塞式提交兜底consumer.commitSync(); } finally {consumer.close();
}
}

同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,如果你自行编写代码开发一套 Kafka Consumer 应用,可以尝试使用上面的代码范例来实现手动的位移提交。

其实还有一种更高级的提交方式,就是分批量提交,就不再这里展开,留给大家查资料学习,也欢迎各位同学在评论区交流讨论!

相关文章:

【kafka实践】11|消费位移提交

消费者位移 消费者位移这一节介绍了消费者位移的基本概念和消息格式&#xff0c;本节我们来聊聊消费位移的提交。 Consumer 需要向 Kafka 汇报自己的位移数据&#xff0c;这个汇报过程被称为提交位移&#xff08;Committing Offsets&#xff09;。因为 Consumer 能够同时消费…...

Mac卸载、安装Python

卸载 说明 对于删除 Python&#xff0c;我们首先要知道其具体都安装了什么&#xff0c;实际上&#xff0c;在安装 Python 时&#xff0c;其自动生成&#xff1a; Python framework&#xff0c;即 Python 框架&#xff1b;Python 应用目录&#xff1b;指向 Python 的连接。 …...

算法——滑动窗口

滑动窗口大致分为两类&#xff1a;一类是窗口长度固定的&#xff0c;即left和right可以一起移动&#xff1b;另一种是窗口的长度变化&#xff08;例如前五道题&#xff09;&#xff0c;即right疯狂移动&#xff0c;left没怎么动&#xff0c;这类题需要观察单调性(即指针)等各方…...

带头双向循环链表:一种高效的数据结构

&#x1f493; 博客主页&#xff1a;江池俊的博客⏩ 收录专栏&#xff1a;数据结构探索&#x1f449;专栏推荐&#xff1a;✅cpolar ✅C语言进阶之路&#x1f4bb;代码仓库&#xff1a;江池俊的代码仓库&#x1f525;编译环境&#xff1a;Visual Studio 2022&#x1f389;欢迎大…...

C++基础 -34- 输入输出运算符重载

输出运算符重载格式 ostream & operator<<(ostream &out,person a) {cout << a.a << endl;return out; }举例输出运算符重载 #include "iostream"using namespace std;class person {public:person(int a):a(a){}int a; };ostream &…...

MimicGen论文分析与资料汇总

MimicGen论文分析与资料汇总 前言论文分析相关资料汇总 前言 论文分析 相关资料汇总 Paper:MimicGen: A Data Generation System for Scalable Robot Learning using Human Demonstrations mimicgen.github 破局利刃&#xff01;英伟达合成数据新成果&#xff1a;为机器人造…...

JAVA-每一页PDF转图片

结论&#xff1a;1、iText几乎找不到如何PDF转图片的信息&#xff0c;但能找到获取到PDF里面的图片并保存下来的信息&#xff1b;2、PDF box满大街都是参考代码&#xff08;下面会附上一个作为参考&#xff09;&#xff1b;3、收费的库使用起来更简单&#xff0c;但就是要收费&…...

VS安装QT VS Tools编译无法通过

场景&#xff1a; 项目拷贝到虚拟机内部后&#xff0c;配置好相关环境后无法编译&#xff0c;安装QT VS Tools后依旧无法编译&#xff0c;查找资料网上说的是QT工具版本不一致导致的&#xff0c;但反复试了几个版本后依旧无法编译通过。错误信息如下&#xff1a; C:\Users\Ad…...

【C语言之 CJson】学CJson看这一篇就够了

文章目录 前言一、下载CJson二、创建一个json2.1 创建json对象cJSON类型详解 2.2 创建键值对2.3 添加嵌套的 JSON 对象2.4 添加数组创建数组添加元素到数组添加数组到obj 2.5 将 JSON 对象转为字符串2.6 释放内存2.7 示例代码 三、解析json3.1 解析json root3.2 把一个key解析出…...

使用Java语言实现字母之间的大小写转换

这个类的作用为实现字母之间的大小写转换&#xff0c;通过加减32来完成。 输入的代码 import java.util.Scanner; public class WordChangeDemo {public static void main(String[] args){try (Scanner in new Scanner(System.in)) {System.out.println("请输入您要进…...

Docker的数据持久化;Docker网络;Dockerfile编写

Docker的数据持久化&#xff1b;Docker网络&#xff1b;Dockerfile编写&#xff1b; 文章目录 Docker的数据持久化&#xff1b;Docker网络&#xff1b;Dockerfile编写&#xff1b;**Docker的数据持久化**1&#xff09;将本地目录映射到容器里2&#xff09;数据卷3&#xff09;将…...

OpenHarmony亮相MTSC 2023 | 质量效率共进,赋能应用生态发展

11月25日&#xff0c;MTSC 2023第十二届中国互联网测试开发大会在深圳登喜路国际大酒店圆满举行。大会以“软件质量保障体系和测试研发技术交流”为主要目的&#xff0c;旨在为行业搭建一个深入探讨和交流的桥梁和平台。OpenAtom OpenHarmony&#xff08;简称“OpenHarmony”&a…...

windows11 调整鼠标灵敏度方法

首先 我们打开电脑设置 或者在 此电脑/此计算机/我的电脑 右击选择属性 然后 有的电脑 左侧菜单中 直接就有 设备 然后在设备中直接就可以找到 鼠标 选项 调整光标速度即可 如果操作系统和我的一样 可以直接搜索鼠标 然后 选择 鼠标设置 然后 调整上面的鼠标指针速度即可...

贪心算法个人见解

目录 基本思想&#xff1a; 贪心算法的步骤&#xff1a; 示例&#xff1a; 贪心算法&#xff08;Greedy Algorithm&#xff09;是一种基于贪心策略的算法范式&#xff0c;它在每一步选择中都采取当前状态下的最优选择&#xff0c;而不考虑全局最优解。贪心算法通常适用于那些…...

Win中Redis部署与配置

1.下载msi版本 下载传送门 2.双击next-->next安装安装 3.密码配置以及开机自启 在配置文件中配置相应配置进行配置密码以及端口和ip port 6379指定 Redis 监听端口&#xff0c;默认端口为 6379&#xff0c;作者在自己的一篇博文中解释了为什么选用 6379 作为默认端口&…...

vue el-button 封装及使用

使用了 Element UI 中的 el-button 组件&#xff0c;并对其进行了封装和定制。 创建组件index.vue (src/common-ui/button/index.vue) <template><el-buttonclass"h-button":type"type":icon"hIcon":disabled"disabled"clic…...

QT之QMediaPlayer的用法

QT之QMediaPlayer的用法 成员函数例程 成员函数 1)setMedia(const QMediaContent &media, QIODevice *stream nullptr) 设置要播放的媒体内容&#xff0c;其中参数media指定了媒体内容&#xff0c;stream参数指定了用于读取媒体的输入设备&#xff08;如文件流&#xff0…...

TCP_报文格式解读

报文格式 header部分字段含义解析 固定字段 对于header中固定部分字段含义&#xff0c;见之前的blog《TCP报文分析》&#xff1b; 对部分字段含义补充说明 Data Offset&#xff1a;4bit&#xff0c;tcp header的长度&#xff0c;单位&#xff1a;32bit&#xff08;4字节&…...

C语言面试之旅:掌握基础,探索深度(面试实战之c语言关键词下篇)

一.枚举&#xff08; enum&#xff09; 枚举是 C 语言中的一种基本数据类型&#xff0c;用于定义一组具有离散值的常量&#xff0c;它可以让数据更简洁&#xff0c;更易读。枚举类型通常用于为程序中的一组相关的常量取名字&#xff0c;以便于程序的可读性和维护性。定义一个枚…...

Java学习第十三天

Java多态 多态是同一个行为具有多个不同表现形式或形态的能力。 多态就是同一个接口&#xff0c;使用不同的实例而执行不同操作 多态性是对象多种表现形式的体现。 多态的优点 1. 消除类型之间的耦合关系2. 可替换性3. 可扩充性4. 接口性5. 灵活性6. 简化性 多态存在的三个…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

React hook之useRef

React useRef 详解 useRef 是 React 提供的一个 Hook&#xff0c;用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途&#xff0c;下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例

使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件&#xff0c;常用于在两个集合之间进行数据转移&#xff0c;如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model&#xff1a;绑定右侧列表的值&…...

Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务

通过akshare库&#xff0c;获取股票数据&#xff0c;并生成TabPFN这个模型 可以识别、处理的格式&#xff0c;写一个完整的预处理示例&#xff0c;并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务&#xff0c;进行预测并输…...

什么是库存周转?如何用进销存系统提高库存周转率?

你可能听说过这样一句话&#xff1a; “利润不是赚出来的&#xff0c;是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业&#xff0c;很多企业看着销售不错&#xff0c;账上却没钱、利润也不见了&#xff0c;一翻库存才发现&#xff1a; 一堆卖不动的旧货…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

GitFlow 工作模式(详解)

今天再学项目的过程中遇到使用gitflow模式管理代码&#xff0c;因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存&#xff0c;无论是github还是gittee&#xff0c;都是一种基于git去保存代码的形式&#xff0c;这样保存代码…...

作为测试我们应该关注redis哪些方面

1、功能测试 数据结构操作&#xff1a;验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化&#xff1a;测试aof和aof持久化机制&#xff0c;确保数据在开启后正确恢复。 事务&#xff1a;检查事务的原子性和回滚机制。 发布订阅&#xff1a;确保消息正确传递。 2、性…...