当前位置: 首页 > news >正文

为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

目录

  • 一、为什么需要带有 subscribe 的 group.id
  • 二、我们需要使用commitSync手动提交偏移量吗?
  • 三、如果我想手动提交偏移量,该怎么做?

一、为什么需要带有 subscribe 的 group.id

  • 消费概念:
    Kafka 使用消费者组的概念来实现主题的并行消费 - 每条消息都将在每个消费者组中传递一次,无论该组中实际有多少个消费者。所以 group 参数是强制性的,如果没有组,Kafka 将不知道如何对待订阅同一主题的其他消费者。
  • 偏移量
    每当我们启动一个消费者时,它都会加入一个消费者组,然后根据该消费者组中的其他消费者数量,为其分配要读取的分区。对于这些分区,它会检查列表读取偏移量是否已知,如果找到,它将从这一点开始读取消息。如果没有找到偏移量,则参数 auto.offset.reset 控制是从分区中最早的消息还是从最新的消息开始读取。

二、我们需要使用commitSync手动提交偏移量吗?

  • 是否需要手动提交偏移?
    是否需要提交偏移量取决于作为参数 enable.auto.commit 选择的值。默认情况下,此设置为 true,这意味着消费者将定期自动提交其偏移量(由auto.commit.interval.ms 决定提交的频率)。如果将其设置为 false,那么将需要自己提交偏移量。这种默认行为可能也是导致很多发现 kafka 总是从最新的开始消费的原因,由于偏移量是自动提交的,因此它将使用该偏移量。

  • 有没有办法从头开始重播消息?
    如果想每次都从头开始读取,可以调用seekToBeginning,如果不带参数调用,它将重置为所有订阅分区中的第一条消息,或者仅重置您传入的那些分区。

  • seekToBeginning
    查找每个给定分区的第一个偏移量。poll(long) 该函数延迟计算,仅在调用或时才查找所有分区中的第一个偏移量position(TopicPartition)。如果未提供分区,则查找所有当前分配的分区的第一个偏移量。

    public class MyListener implements ConsumerSeekAware {...@Overridepublic void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {callback.seekToBeginning(assignments.keySet());}}
    
  • 有没有办法从最后开始重播消息?
    有的,可以使用 seekToEnd() 查找所有分配的分区到最后。或者使用 seekToTimestamp(long time)- 查找所有分配的分区到该时间戳表示的偏移量。

    public class MyListener extends AbstractConsumerSeekAware {@KafkaListener(...)void listn(...) {...}
    }public class SomeOtherBean {MyListener listener;...void someMethod() {this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);}}
    

三、如果我想手动提交偏移量,该怎么做?

  • 1、禁用自动提交

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
  • 提交方法
    对于手动提交,KafkaConsumers提供了两种方法,即 commitSync() 和 commitAsync()。commitSync()是一个阻塞调用,在偏移量成功提交后返回,commitAsync()则立即返回。如果想知道提交是否成功,可以为回调处理程序 ( OffsetCommitCallback) 提供一个方法参数。请注意,在两次提交调用中,消费者都会提交最新poll()调用的偏移量。
    举个例子:假设一个分区主题有一个消费者并且最后一次调用poll()返回偏移量为 4、5、6 的消息。提交时,偏移量 6 将被提交,因为这是消费者客户端跟踪的最新偏移量。
    同时,commitSync() 和 commitAsync() 都允许更多地控制我们想要提交的偏移量:如果你使用允许你指定的相应重载,那么Map<TopicPartition, OffsetAndMetadata>消费者将仅提交指定的偏移量(即,映射可以包含分配的分区的任何子集) ,并且指定的偏移量可以为任意值)。

  • 同步提交:
    阻塞线程,直到提交成功或遇到不可恢复的错误(在这种情况下,它被抛出给调用者)

    while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());consumer.commitSync();}
    }
    

    对于 for 循环中的每次迭代,只有在consumer.commitSync()成功返回或因抛出异常而中断后,代码才会移至下一次迭代。

  • 异步提交:
    是一种非阻塞方法。调用它不会阻塞线程。相反,它将继续处理以下指令,无论最终是成功还是失败。

    while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());consumer.commitAsync(callback);}
    }
    

    对于 for 循环中的每次迭代,无论consumer.commitAsync()最终会发生什么,代码都会移至下一次迭代。并且,提交的结果将由定义的回调函数处理。

  • 权衡:延迟与数据一致性
    1、如果必须确保数据一致性,请选择commitSync(),因为它将确保在执行任何进一步操作之前,你将知道偏移量提交是成功还是失败。但由于它是同步和阻塞的,你将花费更多的时间来等待提交完成,这会导致高延迟。
    2、如果可以接受某些数据不一致并希望具有低延迟,请选择commitAsync(),因为它不会等待完成。相反,它只会发出提交请求并稍后处理来自 Kafka 的响应(成功或失败),同时代码将继续执行。

相关文章:

为什么kafka 需要 subscribe 的 group.id?我们是否需要使用 commitSync 手动提交偏移量?

目录 一、为什么需要带有 subscribe 的 group.id二、我们需要使用commitSync手动提交偏移量吗&#xff1f;三、如果我想手动提交偏移量&#xff0c;该怎么做&#xff1f; 一、为什么需要带有 subscribe 的 group.id 消费概念&#xff1a; Kafka 使用消费者组的概念来实现主题的…...

什么是Web应用程序防火墙,WAF与其他网络安全工具差异在哪?

一、什么是Web 应用程序防火墙 (WAF) &#xff1f; WAF软件产品被广泛应用于保护Web应用程序和网站免受威胁或攻击&#xff0c;它通过监控用户、应用程序和其他互联网来源之间的流量&#xff0c;有效防御跨站点伪造、跨站点脚本&#xff08;XSS攻击&#xff09;、SQL注入、DDo…...

打家劫舍 II——力扣213

动规 int robrange(vector<int>& nums, int start, int end){int first=nums[start]...

动手学深度学习—卷积神经网络LeNet(代码详解)

1. LeNet LeNet由两个部分组成&#xff1a; 卷积编码器&#xff1a;由两个卷积层组成&#xff1b;全连接层密集块&#xff1a;由三个全连接层组成。 每个卷积块中的基本单元是一个卷积层、一个sigmoid激活函数和平均汇聚层&#xff1b;每个卷积层使用55卷积核和一个sigmoid激…...

腾讯面经总结

最近在准备面试&#xff0c;看了很多大厂的面经&#xff0c;抽空将腾讯面试的题目整理了一下&#xff0c;希望对大家有所帮助~ 一面 1、mysql索引结构&#xff1f; 2、redis持久化策略&#xff1f; 3、zookeeper节点类型说一下&#xff1b; 4、zookeeper选举机制&#xff…...

matlab机器人工具箱基础使用

资料&#xff1a;https://blog.csdn.net/huangjunsheng123/article/details/110630665 用vscode直接看工具箱api代码比较方便&#xff0c;代码说明很多 一、模型设置 1、基础效果 %采用机器人工具箱进行正逆运动学验证 a[0,-0.3,-0.3,0,0,0];%DH参数 d[0.05,0,0,0.06,0.05,…...

利用WonderLeak进行内存泄露检测【一】

1、下载地址&#xff1a; WonderLeak - Visual Studio Marketplace https://www.relyze.com/ 2、WonderLeak支持vs2017 2019扩展&#xff0c;或者单独启动 3、https://www.relyze.com/docs/wonderleak/help/w/overview/msvc_extension1.png 4、对于二进制程序来说支持以下…...

二刷LeetCode--155. 最小栈(C++版本),思维题

思路:本题需要使用两个栈,一个就是正常栈,执行出入操作,另一个栈只负责将对应的最小值进行保存即可.每次入栈的时候,最小值栈的栈顶也需要入栈元素,不过这个元素是最小值,那么就需要进行比较,因此在getmin()的时候只需要将最小值栈的栈顶元素弹出即可.初始化的时候只需要将最小…...

进程的状态与转换

进程在其生命周期内&#xff0c;由于系统中各进程之间的相互制约及系统的运行环境的变化&#xff0c;使得进程的状态也在不断地发生变化。通常进程有以下5种状态&#xff0c;前三种是基础讷航的基本状态 1&#xff09;运行态。进程正在处理机上运行。在单处理机机中&#xff0…...

用MariaDB创建数据库,SQL练习,MarialDB安装和使用

前言&#xff1a;MariaDB数据库管理系统是MySQL的一个分支&#xff0c;主要由开源社区在维护&#xff0c;采用GPL授权许可 MariaDB的目的是完全兼容MySQL&#xff0c;包括API和命令行&#xff0c;使之能轻松成为MySQL的代替品。在存储引擎方面&#xff0c;使用XtraDB来代替MySQ…...

【Docker】 使用Docker-Compose 搭建基于 WordPress 的博客网站

引 本文将使用流行的博客搭建工具 WordPress 搭建一个私人博客站点。部署过程中使用到了 Docker 、MySQL 。站点搭建完成后经行了发布文章的体验。 WordPress WordPress 是一个广泛使用的开源内容管理系统&#xff08;CMS&#xff09;&#xff0c;用于构建和管理网站、博客和…...

Hlang社区-前端社区宣传首页实现

文章目录 前言页面结构固定钉头部轮播JS特效完整代码总结前言 这里的话,博主其实也是今年参与考研的大军之一,所以的话,是抽空去完成这个项目的,当然这个项目的肯定是可以在较短的时间内完成的。 那么废话不多说,昨天也是干到1点多,把这个首页写出来了。先看看看效果吧:…...

【LeetCode-Medium】833. 字符串中的查找与替换

题目链接 833. 字符串中的查找与替换 标签 字符串 步骤 Step1. 初始化 ans[]&#xff1a; for (int i 0; i < s.length(); i) { // 初始化ansans[i] s[i]; }Step2. 根据 index, source, target 查找&#xff1b;如果找到&#xff0c;那么将 ans[i] 更改为 target&am…...

数据结构中公式前中后缀表达式-二叉树应用

目录 数据结构中公式前中后缀表达式-二叉树应用 数据结构中公式前中后缀表达式-二叉树应用 什么是前缀表达式、中缀表达式、后缀表达式 前缀表达式、中缀表达式、后缀表达式&#xff0c;是通过树来存储和计算表达式的三种不同方式 以如下公式为例 通过树来存储该公式&#x…...

Visual Studio 2022连接远程系统进行C/C++开发

Visual Studio被称为是宇宙最强IDE&#xff0c;以前开发Linux C/C服务器程序&#xff0c;基本上都是在Windows上使用VS编写跨平台的C/C代码&#xff0c;然后先在VS中编译、链接、调试&#xff0c;然后在Linux下编译、链接&#xff0c;再针对Linux下的特定代码进行调试。后面Vis…...

TiDB数据库从入门到精通系列之二:TiDB数据库的简介

TiDB数据库从入门到精通系列之二&#xff1a;TiDB数据库的简介 一、TiDB数据库的简介二、五大核心特性三、四大核心应用场景四、TiDB数据库与MySQL数据库的兼容性 一、TiDB数据库的简介 TiDB是开源分布式关系型数据库&#xff0c;是一款同时支持在线事务处理与在线分析处理 (H…...

opencv视频截取每一帧并保存为图片python代码CV2实现练习

当涉及到视频处理时&#xff0c;Python中的OpenCV库提供了强大的功能&#xff0c;可以方便地从视频中截取每一帧并将其保存为图片。这是一个很有趣的练习&#xff0c;可以让你更深入地了解图像处理和多媒体操作。 使用OpenCV库&#xff0c;你可以轻松地读取视频文件&#xff0…...

虹科方案 | 汽车总线协议转换解决方案(二)

上期说到&#xff0c;虹科的PCAN-LIN网关在CAN、LIN总线转换方面有显著的作用&#xff0c;尤其是为BMS电池通信的测试提供了优秀的解决方案。假如您感兴趣&#xff0c;可以点击文末相关链接进行回顾&#xff01; 而今天&#xff0c;虹科将继续给大家带来Router系列在各个领域的…...

[Android] 通过JNI 让 JAVA 调用 android native 接口

前言&#xff1a; JNI (java native interface) 是一个库&#xff0c;可以让 java 代码和其他语言互动&#xff0c;比如 java 通过 JNI 调用融合了 jni库的 c/c 代码&#xff0c;注意&#xff0c;这里要求 c/c代码中必须通过链接 jni 库并按照 JNI 规范定义一套可供 JAVA 调用…...

MySQL高可用MHA

目录 前言 一、概述 二、配置免密、组从复制 三、MHA配置 四、测试 总结 前言 MySQL高可用管理工具&#xff08;MHA&#xff0c;Master High Availability&#xff09;是一个用于自动管理MySQL主从复制的工具&#xff0c;它可以提供高可用性和自动故障转移。MHA由原版的MHA工具…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

质量体系的重要

质量体系是为确保产品、服务或过程质量满足规定要求&#xff0c;由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面&#xff1a; &#x1f3db;️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限&#xff0c;形成层级清晰的管理网络&#xf…...

vue3+vite项目中使用.env文件环境变量方法

vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量&#xff0c;这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

华硕a豆14 Air香氛版,美学与科技的馨香融合

在快节奏的现代生活中&#xff0c;我们渴望一个能激发创想、愉悦感官的工作与生活伙伴&#xff0c;它不仅是冰冷的科技工具&#xff0c;更能触动我们内心深处的细腻情感。正是在这样的期许下&#xff0c;华硕a豆14 Air香氛版翩然而至&#xff0c;它以一种前所未有的方式&#x…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配

目录 一、C 内存的基本概念​ 1.1 内存的物理与逻辑结构​ 1.2 C 程序的内存区域划分​ 二、栈内存分配​ 2.1 栈内存的特点​ 2.2 栈内存分配示例​ 三、堆内存分配​ 3.1 new和delete操作符​ 4.2 内存泄漏与悬空指针问题​ 4.3 new和delete的重载​ 四、智能指针…...

莫兰迪高级灰总结计划简约商务通用PPT模版

莫兰迪高级灰总结计划简约商务通用PPT模版&#xff0c;莫兰迪调色板清新简约工作汇报PPT模版&#xff0c;莫兰迪时尚风极简设计PPT模版&#xff0c;大学生毕业论文答辩PPT模版&#xff0c;莫兰迪配色总结计划简约商务通用PPT模版&#xff0c;莫兰迪商务汇报PPT模版&#xff0c;…...

GitHub 趋势日报 (2025年06月06日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...

uniapp 字符包含的相关方法

在uniapp中&#xff0c;如果你想检查一个字符串是否包含另一个子字符串&#xff0c;你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的&#xff0c;但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...