当前位置: 首页 > news >正文

分布式 - 消息队列Kafka:Kafka 消费者的消费位移

文章目录

      • 01. Kafka 分区位移
      • 02. Kafka 消费位移
      • 03. kafka 消费位移的作用
      • 04. Kafka 消费位移的提交
      • 05. kafka 消费位移的存储位置
      • 06. Kafka 消费位移与消费者提交的位移
      • 07. kafka 消费位移的提交时机
      • 08. Kafka 维护消费状态跟踪的方法

01. Kafka 分区位移

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。偏移量从0开始,每个新消息的偏移量比前一个消息的偏移量大1。

每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从 0 开始,假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、1、2、…、9。

02. Kafka 消费位移

对于kafka中的消费者而言,也有一个offset的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。

消费位移(偏移量)是指消费者在消费分区中的消息时,记录的已经消费的消息的位移。消费者会定期地将已经消费的消息的位移提交到Kafka集群中,以便在下一次启动时从上次消费的位置继续消费。

每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset)。注意,这和分区位移完全不是一个概念。“分区位移”表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器嘛。另外每个消费者有着自己的消费者位移。

03. kafka 消费位移的作用

消费者位移(偏移量)是指消费者在消费分区中的消息时,记录的已经消费的消息的位移。它的作用主要有以下几个方面:

① 消费者可以通过记录偏移量来实现断点续传。当消费者下线或者重启时,它可以通过记录的偏移量来恢复之前的消费状态,从而避免重复消费已经处理过的消息。

② Kafka 通过偏移量来保证消息的顺序性。在同一个分区中,消息的顺序是有序的,消费者可以通过记录偏移量来保证消费的顺序性。

③ Kafka 还可以通过偏移量来实现消息的回溯。消费者可以通过指定偏移量来重新消费之前的消息,这在某些场景下非常有用,比如重新处理之前出现的错误。

总之,Kafka 消息偏移量是非常重要的一个概念,它可以帮助消费者实现断点续传、保证消息的顺序性以及实现消息的回溯等功能。

04. Kafka 消费位移的提交

消费者可以通过订阅一个或多个主题来拉取消息。当消费者调用 poll() 方法时,它会从 Kafka 集群中拉取一批消息,这些消息会被缓存在消费者的本地缓存中,等待消费者进一步处理。在消费者处理完这批消息后,它可以再次调用 poll() 方法来拉取下一批消息。如果消费者在处理消息时发生了错误,那么这批消息将会被重新拉取,直到消费者成功地处理它们为止。

因此每次调用poll()方法,它总是会返回还没有被消费者读取过的记录,这意味着我们可以追踪哪些记录是被群组里的哪个消费者读取过的。要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题 __consumer_offsets 中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

05. kafka 消费位移的存储位置

消费者默认将 offset 保存在Kafka一个内置的 topic 中,该 topic 为 __consumer_offsets。

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --list
__consumer_offsets

消费者会向一个叫作 __consumer_offset 的内置主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么实际作用。但是,如果消费者发生崩溃或有新的消费者加入群组,则会触发再均衡。再均衡完成之后,每个消费者可能会被分配新的分区,而不是之前读取的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的位置继续读取消息。

消费 offset 案例:

① __consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。但是需要在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。

② 创建主题 haha

[root@master01 kafka01]# bin/kafka-topics.sh --zookeeper localhost:2183 --create --partitions 3 --replication-factor 2  --topic haha
Created topic test1.

③ 启动生产者生产数据:

[root@master01 kafka01]# bin/kafka-console-producer.sh --broker-list 10.65.132.2:9093 --topic haha
>hello,haha!
>你好,haha!
>

④ 启动消费者消费数据:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic haha --group group-haha --from-beginning
hello,haha!
你好,haha!

⑤ 查看消费者消费主题__consumer_offsets:

[root@master01 kafka01]# bin/kafka-console-consumer.sh --bootstrap-server 10.65.132.2:9093 --topic __consumer_offsets --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning# key是[消费者组,消费的主题,消费的分区],value中已经消费的消息在当前分区的offset+1
[group-haha,haha,2]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,1]::OffsetAndMetadata(offset=1, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)
[group-haha,haha,0]::OffsetAndMetadata(offset=0, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1692000487851, expireTimestamp=None)

06. Kafka 消费位移与消费者提交的位移

如下图,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,不过当前消费者需要提交的消费位移并不是 x,而是 x+1,它表示下一条需要拉取的消息的位置。
在这里插入图片描述
如果使用自动提交或不指定提交的偏移量,那么将默认提交poll()返回的最后一个位置之后的偏移量,即提交比客户端从poll()返回的最后一个位置大1的偏移量。在进行手动提交或需要提交特定的偏移量时,一定要记住这一点。

07. kafka 消费位移的提交时机

当前一次 poll() 操作所拉取的消息集为[x+2,x+7],x+2代表上一次提交的消费位移,说明已经完成了x+1之前(包括x+1在内)的所有消息的消费,x+5表示当前正在处理的位置。
在这里插入图片描述
① 如果最后一次提交的偏移量大于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会丢失:

如图,如果拉取到消息之后就进行了位移提交,即提交了x+8,那么当前消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+8开始的。也就是说,x+5至x+7之间的消息并未能被消费,如此便发生了消息丢失的现象。

② 如果最后一次提交的偏移量小于客户端处理的最后一条消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理:

如图,如果消费完所有拉取到的消息之后才进行位移提交,那么当消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+2开始的。也就是说,x+2至x+4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。

08. Kafka 维护消费状态跟踪的方法

在Kafka中,消费者组可以通过消费者偏移量(consumer offset)来跟踪它们在分区中消费的消息。消费者偏移量是一个整数,表示消费者已经成功读取的消息的位置。当消费者读取消息时,它会将偏移量保存在内存中,以便在下一次读取消息时能够从正确的位置开始读取。

相关文章:

分布式 - 消息队列Kafka:Kafka 消费者的消费位移

文章目录 01. Kafka 分区位移02. Kafka 消费位移03. kafka 消费位移的作用04. Kafka 消费位移的提交05. kafka 消费位移的存储位置06. Kafka 消费位移与消费者提交的位移07. kafka 消费位移的提交时机08. Kafka 维护消费状态跟踪的方法 01. Kafka 分区位移 对于Kafka中的分区而…...

H3C QoS打标签和限速配置案例

EF:快速转发 AF:确保转发 CS:给各种协议用的 BE:默认标记(尽力而为) VSR-88-2 出口路由配置: [H3C]dis current-configuration version 7.1.075, ESS 8305 vlan 1 traffic classifier vlan10 operator and if-match a…...

带curl的docker镜像image

带curl的docker镜像,便于k8s中查找问题,确认容器内部是否可用。 用于测试网络的工具,带有curl nslookup等命令 镜像名docker.io/appropriate/curl 测试命令docker run --rm -it docker.io/appropriate/curl /bin/sh 已测试可用 用于测试网…...

Hadoop数据迁移distcp

Hadoop数据迁移distcp 准备工作 确认源集群(a),目标集群(b)确认a集群的主节点和b集群的主节点确认两个集群的网络相通确认迁移模式(全量迁移还是增量迁移),这里选择全量迁移 迁移文件 迁移t…...

QT-Mysql数据库图形化接口

QT sql mysqloper.h qsqlrelationaltablemodelview.h /************************************************************************* 接口描述:Mysql数据库图形化接口 拟制: 接口版本:V1.0 时间:20230727 说明:支…...

LeetCode150道面试经典题-- 合并两个有序链表(简单)

1.题目 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 2.示例 示例 1: 输入:l1 [1,2,4], l2 [1,3,4] 输出:[1,1,2,3,4,4] 示例 2: 输入:l1 [], l2 [] 输…...

GitHub 如何部署写好的H5静态页面

感谢粉皮zu的私信,又有素材写笔记了。(●’◡’●) 刚好记录一下我示例代码的GitHub部署配置,以便于后期追加仓库。 效果 环境 gitwin 步骤 第一步 新建仓库 第二步 拉取代码 将仓库clone到本地 git clone 地址第三步 部署文件 新建.github\workflo…...

SharkTeam:Worldcoin运营数据及业务安全分析

Worldcoin的白皮书中声明,Worldcoin旨在构建一个连接全球人类的新型数字经济系统,由OpenAI创始人Sam Altman于2020年发起。通过区块链技术在Web3世界中实现更加公平、开放和包容的经济体系,并将所有权赋予每个人。并且希望让全世界每一个人都…...

C语言编程练习

考点:【字符串】【数组】 题目1. 打印X 题目描述 输入一个正整数N, 你需要按样例的方式返回一个字符串列表。 1≤N≤15。 样例 1: 输入:1 输出:[“X”] X样例 2: 输入:2 [“XX”, “XX”] …...

vue入门(增查改!)

<template><div><!-- 搜索栏 --><el-card id"search"><el-row><el-col :span"20"><el-input v-model"searchModel.name" placeholder"根据名字查询"></el-input><el-input v-mode…...

移动端身份证识别技术的应用,告别手动录入证件信息

随着移动互联网的的发展&#xff0c;越来越多的公司都推出了自己的移动APP&#xff0c;这些APP多数都涉及到个人身份证信息的输入认证&#xff08;即实名认证&#xff09;&#xff0c;如果手动去输入身份证号码和姓名&#xff0c;速度非常慢&#xff0c;且用户体验非常差。为了…...

网络通信原理TCP字段解析(第四十七课)

字段含义Source Port(源端口号)源端口,标识哪...

uniapp微信小程序消息订阅快速上手

一、微信公众平台小程序开通消息订阅并设置模板 这边的模板id和详细内容后续前后端需要使用 二、uniapp前端 需要是一个button触发 js&#xff1a; wx.getSetting({success(res){console.log(res)if(res.authSetting[scope.subscribeMessage]){// 业务逻辑}else{uni.request…...

MySQL 根据多字段查询重复数据

MySQL 根据多字段查询重复数据 在实际的数据库应用中&#xff0c;我们经常需要根据多个字段来查询重复的数据。MySQL 提供了一些方法来实现这个功能&#xff0c;让我们能够快速准确地找到和处理重复数据。本文将介绍如何使用 MySQL 来根据多字段查询重复数据&#xff0c;并提供…...

Markdown编辑器 Mac版Typora功能介绍

Typora mac是一款跨平台的Markdown编辑器&#xff0c;支持Windows、MacOS和Linux操作系统。它具有实时预览功能&#xff0c;能够自动将Markdown文本转换为漂亮的排版效果&#xff0c;让用户专注于写作内容而不必关心格式调整。 Typora Mac版除了支持常见的Markdown语法外&#…...

el-form自定义校验规则

Vue 的 el-form 组件可以使用自定义校验规则进行表单验证。自定义校验规则可以通过传递一个函数来实现&#xff0c;该函数接受要校验的字段的值作为参数&#xff0c;并返回一个布尔值或一个 Promise 对象。 下面是一个示例&#xff0c;演示如何在 el-form 中使用自定义校验规则…...

xml对象与字符串互换

很多老系统&#xff0c;特别是C的系统&#xff0c;可能数据结构采用的xml。xml对java来说没有什么&#xff0c;但是C来说&#xff0c;可能还有个顺序问题&#xff0c;毕竟c没有那么多通用类库。 2 xstream 先说依赖&#xff0c;我本来不想升级&#xff0c;但是有个问题卡者就给…...

单例模式和多例模式和工厂模式

1单例设计模式 学习目标 能够使用单例设计模式设计代码 内容讲解 正常情况下一个类可以创建多个对象 public static void main(String[] args) {// 正常情况下一个类可以创建多个对象Person p1 new Person();Person p2 new Person();Person p3 new Person(); }如果说有…...

【网络架构】华为hw交换机网络高可用网络架构拓扑图以及配置

一、网络拓扑 1.网络架构 核心层:接入网络----路由器 汇聚层:vlan间通信 创建vlan ---什么是vlan:虚拟局域网,在大型平面网络中,为了实现广播控制引入了vlan,可以根据功能或者部门等创建vlan,再把相关的端口加入到vlan.为了实现不用交换机上的相同vlan通信,需要配置中继,为了…...

信也科技一面凉经

1.在项目经历里挑一个详细介绍一下 项目的应用场景 2.项目里用到多线程是怎么用的&#xff1f;回答&#xff1a;线程池 用通过 ThreadPoolExecutor 构造函数的方式创建的线程池 3.线程池有哪些重要参数&#xff1f;回答&#xff1a;核心线程数、最大线程数、阻塞队列类型、…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来

一、破局&#xff1a;PCB行业的时代之问 在数字经济蓬勃发展的浪潮中&#xff0c;PCB&#xff08;印制电路板&#xff09;作为 “电子产品之母”&#xff0c;其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透&#xff0c;PCB行业面临着前所未有的挑战与机遇。产品迭代…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

OkHttp 中实现断点续传 demo

在 OkHttp 中实现断点续传主要通过以下步骤完成&#xff0c;核心是利用 HTTP 协议的 Range 请求头指定下载范围&#xff1a; 实现原理 Range 请求头&#xff1a;向服务器请求文件的特定字节范围&#xff08;如 Range: bytes1024-&#xff09; 本地文件记录&#xff1a;保存已…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南

&#x1f680; C extern 关键字深度解析&#xff1a;跨文件编程的终极指南 &#x1f4c5; 更新时间&#xff1a;2025年6月5日 &#x1f3f7;️ 标签&#xff1a;C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言&#x1f525;一、extern 是什么&#xff1f;&…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...