当前位置: 首页 > news >正文

【kafka实践】11|消费位移提交

消费者位移

消费者位移这一节介绍了消费者位移的基本概念和消息格式,本节我们来聊聊消费位移的提交。

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

提交位移主要是为了记录Consumer 的消费进度,这样当 Consumer 发生重启之后,就能够从 Kafka 中读取之前提交的位移,从而继续消费,避免以避免重复消费,或消息丢失等。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。

因为位移提交非常灵活,你完全可以提交任何位移值。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20,那么从理论上讲就丢失了10条数据;相反地,如果你提交的位移值是 5,那么就重复消费5条数据。所以你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。

位移提交

从使用角度来说位移提交分为自动提交和手动提交;从 Consumer 的角度来说,位移提交分为同步提交和异步提交。

自动提交

默认情况下就是自动提交,你根本无需关心位移提交的事情,Consumer 端有个参数 enable.auto.commit默认值是 true,即 Consumer 默认自动提交位移的。还有个参数auto.commit.interval.ms,默认值是 5 秒,即每 5 秒会为你自动提交一次位移。

这里我们用一段简单的代码来看看这两个参数怎么使用

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "kafka_test");// 自动提交props.put("enable.auto.commit", "true");// 间隔2秒  props.put("auto.commit.interval.ms", "2000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// process}}
手动提交

设置 enable.auto.commit 为 false,还需要调用相应的 API 手动提交位移,KafkaConsumer.commitSync()。

// props.put("enable.auto.commit", "false");
while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));// 处理消息process(records); try {// 同步提交consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}

commitSync()有一个缺陷,提交时Consumer 程序会处于阻塞状态,在生产系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。虽然也可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。鉴于这个问题,Kafka 提供了另一个 异步API 方法:KafkaConsumer.commitAsync()。

不过commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

我们可以将 commitSync 和 commitAsync 组合使用以规避这样的问题:

   try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 异步提交规避阻塞commitAysnc(); }
} catch(Exception e) {} finally {try {// 使用同步阻塞式提交兜底consumer.commitSync(); } finally {consumer.close();
}
}

同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,如果你自行编写代码开发一套 Kafka Consumer 应用,可以尝试使用上面的代码范例来实现手动的位移提交。

其实还有一种更高级的提交方式,就是分批量提交,就不再这里展开,留给大家查资料学习,也欢迎各位同学在评论区交流讨论!

相关文章:

【kafka实践】11|消费位移提交

消费者位移 消费者位移这一节介绍了消费者位移的基本概念和消息格式&#xff0c;本节我们来聊聊消费位移的提交。 Consumer 需要向 Kafka 汇报自己的位移数据&#xff0c;这个汇报过程被称为提交位移&#xff08;Committing Offsets&#xff09;。因为 Consumer 能够同时消费…...

Mac卸载、安装Python

卸载 说明 对于删除 Python&#xff0c;我们首先要知道其具体都安装了什么&#xff0c;实际上&#xff0c;在安装 Python 时&#xff0c;其自动生成&#xff1a; Python framework&#xff0c;即 Python 框架&#xff1b;Python 应用目录&#xff1b;指向 Python 的连接。 …...

算法——滑动窗口

滑动窗口大致分为两类&#xff1a;一类是窗口长度固定的&#xff0c;即left和right可以一起移动&#xff1b;另一种是窗口的长度变化&#xff08;例如前五道题&#xff09;&#xff0c;即right疯狂移动&#xff0c;left没怎么动&#xff0c;这类题需要观察单调性(即指针)等各方…...

带头双向循环链表:一种高效的数据结构

&#x1f493; 博客主页&#xff1a;江池俊的博客⏩ 收录专栏&#xff1a;数据结构探索&#x1f449;专栏推荐&#xff1a;✅cpolar ✅C语言进阶之路&#x1f4bb;代码仓库&#xff1a;江池俊的代码仓库&#x1f525;编译环境&#xff1a;Visual Studio 2022&#x1f389;欢迎大…...

C++基础 -34- 输入输出运算符重载

输出运算符重载格式 ostream & operator<<(ostream &out,person a) {cout << a.a << endl;return out; }举例输出运算符重载 #include "iostream"using namespace std;class person {public:person(int a):a(a){}int a; };ostream &…...

MimicGen论文分析与资料汇总

MimicGen论文分析与资料汇总 前言论文分析相关资料汇总 前言 论文分析 相关资料汇总 Paper:MimicGen: A Data Generation System for Scalable Robot Learning using Human Demonstrations mimicgen.github 破局利刃&#xff01;英伟达合成数据新成果&#xff1a;为机器人造…...

JAVA-每一页PDF转图片

结论&#xff1a;1、iText几乎找不到如何PDF转图片的信息&#xff0c;但能找到获取到PDF里面的图片并保存下来的信息&#xff1b;2、PDF box满大街都是参考代码&#xff08;下面会附上一个作为参考&#xff09;&#xff1b;3、收费的库使用起来更简单&#xff0c;但就是要收费&…...

VS安装QT VS Tools编译无法通过

场景&#xff1a; 项目拷贝到虚拟机内部后&#xff0c;配置好相关环境后无法编译&#xff0c;安装QT VS Tools后依旧无法编译&#xff0c;查找资料网上说的是QT工具版本不一致导致的&#xff0c;但反复试了几个版本后依旧无法编译通过。错误信息如下&#xff1a; C:\Users\Ad…...

【C语言之 CJson】学CJson看这一篇就够了

文章目录 前言一、下载CJson二、创建一个json2.1 创建json对象cJSON类型详解 2.2 创建键值对2.3 添加嵌套的 JSON 对象2.4 添加数组创建数组添加元素到数组添加数组到obj 2.5 将 JSON 对象转为字符串2.6 释放内存2.7 示例代码 三、解析json3.1 解析json root3.2 把一个key解析出…...

使用Java语言实现字母之间的大小写转换

这个类的作用为实现字母之间的大小写转换&#xff0c;通过加减32来完成。 输入的代码 import java.util.Scanner; public class WordChangeDemo {public static void main(String[] args){try (Scanner in new Scanner(System.in)) {System.out.println("请输入您要进…...

Docker的数据持久化;Docker网络;Dockerfile编写

Docker的数据持久化&#xff1b;Docker网络&#xff1b;Dockerfile编写&#xff1b; 文章目录 Docker的数据持久化&#xff1b;Docker网络&#xff1b;Dockerfile编写&#xff1b;**Docker的数据持久化**1&#xff09;将本地目录映射到容器里2&#xff09;数据卷3&#xff09;将…...

OpenHarmony亮相MTSC 2023 | 质量效率共进,赋能应用生态发展

11月25日&#xff0c;MTSC 2023第十二届中国互联网测试开发大会在深圳登喜路国际大酒店圆满举行。大会以“软件质量保障体系和测试研发技术交流”为主要目的&#xff0c;旨在为行业搭建一个深入探讨和交流的桥梁和平台。OpenAtom OpenHarmony&#xff08;简称“OpenHarmony”&a…...

windows11 调整鼠标灵敏度方法

首先 我们打开电脑设置 或者在 此电脑/此计算机/我的电脑 右击选择属性 然后 有的电脑 左侧菜单中 直接就有 设备 然后在设备中直接就可以找到 鼠标 选项 调整光标速度即可 如果操作系统和我的一样 可以直接搜索鼠标 然后 选择 鼠标设置 然后 调整上面的鼠标指针速度即可...

贪心算法个人见解

目录 基本思想&#xff1a; 贪心算法的步骤&#xff1a; 示例&#xff1a; 贪心算法&#xff08;Greedy Algorithm&#xff09;是一种基于贪心策略的算法范式&#xff0c;它在每一步选择中都采取当前状态下的最优选择&#xff0c;而不考虑全局最优解。贪心算法通常适用于那些…...

Win中Redis部署与配置

1.下载msi版本 下载传送门 2.双击next-->next安装安装 3.密码配置以及开机自启 在配置文件中配置相应配置进行配置密码以及端口和ip port 6379指定 Redis 监听端口&#xff0c;默认端口为 6379&#xff0c;作者在自己的一篇博文中解释了为什么选用 6379 作为默认端口&…...

vue el-button 封装及使用

使用了 Element UI 中的 el-button 组件&#xff0c;并对其进行了封装和定制。 创建组件index.vue (src/common-ui/button/index.vue) <template><el-buttonclass"h-button":type"type":icon"hIcon":disabled"disabled"clic…...

QT之QMediaPlayer的用法

QT之QMediaPlayer的用法 成员函数例程 成员函数 1)setMedia(const QMediaContent &media, QIODevice *stream nullptr) 设置要播放的媒体内容&#xff0c;其中参数media指定了媒体内容&#xff0c;stream参数指定了用于读取媒体的输入设备&#xff08;如文件流&#xff0…...

TCP_报文格式解读

报文格式 header部分字段含义解析 固定字段 对于header中固定部分字段含义&#xff0c;见之前的blog《TCP报文分析》&#xff1b; 对部分字段含义补充说明 Data Offset&#xff1a;4bit&#xff0c;tcp header的长度&#xff0c;单位&#xff1a;32bit&#xff08;4字节&…...

C语言面试之旅:掌握基础,探索深度(面试实战之c语言关键词下篇)

一.枚举&#xff08; enum&#xff09; 枚举是 C 语言中的一种基本数据类型&#xff0c;用于定义一组具有离散值的常量&#xff0c;它可以让数据更简洁&#xff0c;更易读。枚举类型通常用于为程序中的一组相关的常量取名字&#xff0c;以便于程序的可读性和维护性。定义一个枚…...

Java学习第十三天

Java多态 多态是同一个行为具有多个不同表现形式或形态的能力。 多态就是同一个接口&#xff0c;使用不同的实例而执行不同操作 多态性是对象多种表现形式的体现。 多态的优点 1. 消除类型之间的耦合关系2. 可替换性3. 可扩充性4. 接口性5. 灵活性6. 简化性 多态存在的三个…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...

【Java学习笔记】BigInteger 和 BigDecimal 类

BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点&#xff1a;传参类型必须是类对象 一、BigInteger 1. 作用&#xff1a;适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...

Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换

目录 关键点 技术实现1 技术实现2 摘要&#xff1a; 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式&#xff08;自动驾驶、人工驾驶、远程驾驶、主动安全&#xff09;&#xff0c;并通过实时消息推送更新车…...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...

spring Security对RBAC及其ABAC的支持使用

RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型&#xff0c;它将权限分配给角色&#xff0c;再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...