Kafka之消费者客户端
1、历史上的二个版本
与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本:
- 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端。
- 新消费者客户端(New Consumer):从Kafka 0.9.0版本之后基于Java语言开发的版本,又称为Java消费者客户端。
2、必要的参数配置
-
bootstrap.servers
用来指定连接Kafka集群所需的broker地址清单,形式为:host1:port1,host2:port2,…,多个broker之间以“,”隔开。
不用将所有broker列出来,消费者可以根据一个broker查询到其他broker。
建议至少配置2个或2个以上的broker,防止只有一个broker的话,宕机的时候就无法连接到Kafka集群了。
-
group.id
消费者隶属消费组的名称。
-
key.deserializer 和 value.deserializer
与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。
用来将字节数组中的key和value反序列化还原为原来的对象格式。
3、订阅主题与分区
一个消费者可以订阅一个或多个主题。
Kafka消费者客户端提供了三种订阅方式:集合订阅subscribe(Collection)、正则表达式订阅subscribe(Pattern)、指定分区订阅assign(Collection)。
这三种订阅方式分别代表了三种不同的订阅状态,依次为AUTO_TOPICS、 AUTO_PATTERN、USER_ASSIGNED。如果没有订阅,订阅状态为NONE。
其中的集合订阅subscribe(Collection)和正则表达式订阅subscribe(Pattern)这两种订阅方式有消费者自动再均衡的功能,可以根据分区分配策略自动的为消费者分配对应的分区。而指定分区订阅assign(Collection)方式则不具备消费者自动再均衡的功能。
综上所述梳理了一张关于订阅方式、订阅状态和再均衡功能的关系表:
4、消费消息
消息消费一般有两种方式:
- 推模式:服务器主动将消息推送给消费者。
- 拉模式:消费者主动向服务器发起请求来来取信息。
Kafka采用的消息消费模式是拉模式。
在拉取消息的时候有一个超时时间参数(timeout),如果消费者的缓存区中无可用数据(即没有要消费消息),我们可以通过这个timeout参数来设置等待的时长。如果timeout=0,则不管有无数据立刻返回结果。
5、位移提交
在Kafka的分区当中,每一个消息都有一个唯一的标识offset,我们可以用它来表示消息在分区中的位置。
对于消费者而言,也有一个offset的概念,我们可以用它来表示消费到分区中某消息的位置。
对于offset这个单词,我们既可以翻译为偏移量,也可以翻译为位移,并没有什么严格的区分。但是为了更好的区分不同的使用场景,我们可以将用来表示消息在分区中位置的offset称为偏移量。对于用来表示消费者消费到的消息所处位置的offset称为位移,更明确的话称为“消费位移”。
通过下图希望能够帮助大家更清晰的理解:偏移量、消费位移、位移提交。
通过上图我们可以了解到如下信息:
- 正在消费的消息下标为3。
- 所以对于分区来说,它的偏移量为3;对于消费者来说,它的消费位移也为3。
- 对于分区来说,下标4则作为下一个消息要写入的位置。
- 对于消费者来说,将要提交的消费位移(即位移提交)是下标4。
Kafka默认情况下,消费位移的提交方式为自动提交,提交间隔时间默认为5秒。
根据位移提交的具体情况,可能会出现重复消费和消息丢失的现象。我们通过下面一个例子更详细介绍下重复消费和消息丢失是如何出现的。让我们先来看一张图:
根据上图,我们假设本次拉取的消息为x+2 ~ x+7,x+2为上一次的提交的消费位移,x+8为下一次要提交的消费位移,目前正在处理x+5。
-
消息丢失
假设我们在处理x+5之前(即在处理x+0或x+1或x+2…)就提交了本次的消费位移(即x+8),当到处理x+5的时候出现了异常,恢复后,就要从x+8开始拉取了,此时x+5、x+6、x+7实际上并没有被消费,这样便发生了消息丢失的现象。(在消费消息出现异常之前就执行了位移提交)。
-
重复消费
假设我们在处理x+5的时候出现了异常,此时还没有提交本次的消费位移(即x+8),恢复后,就还需要从x+2开始拉取消息,这样x+2 ~ x+4就又得再消费一次,这种现象就是重新消费。(在消费消息出现异常之前没有执行位移提交)。
通过以上的描述我们还可以发现:拉取线程和消息处理线程完全是两个独立的线程。
6、指定位移消息
首先提出一个问题:当消费者遇到无法获取所记录的消费位移的时候该怎么办?
为了要解决这个问题,消费者客户端提供了auto.offset.reset参数,用来在遇到这种情况的时候告诉消费者客户端从哪里开始拉取消息消费,该参数的值有几种选择:
- latest:默认值,意为从分区末尾开始消费消息(即分区中下一条消息要写入的位置)。
- earliest:意为消费者会从起始处也就是0开始消费。
- none:直接抛出NoOffsetForPartitionException异常。
7、再均衡
所谓再均衡就是将一个分区的所属权从一个消费者转移到另外一个消费者。
再均衡的过程中,消费组内的消费者无法读取消息。
再均衡后,可能会出现重复消费的情况。因为再均衡的时候,消费者会丢掉当前的状态。如果在上一个消费者(即具有分区所属权的消费者)正在消费消息(已消费了一部分消息了)还没有来得及提交消费位移的时候就发生了再均衡,那么新的消费者(分区所属权转移后的消费者)会重新拉取曾经消费过的消息再消费一遍。
8、消费者拦截器
我们可以通过消费者拦截器在poll返回消息之前和消费位移提交之后进行一些特定的处理。
9、多线程实现
为了提高整体的消费能力,我们对消费者客户端采取多线程来实现。
有三种多线程的实现方式:
- 线程封闭,即为每一个线程实现一个KafkaConsumer对象,如下图:
- 多个消费线程同时消费一个分区,通过assign()、seek()等方法实现,打破了原有的消费线程的个数不能超过分区个数的限制。但是这种实现方式会使位移提交和顺序控制变得非常负责,实际场景中很少会用到。
- 将处理消息的逻辑改为多线程实现,也就是在一个KafkaConsumer对象中有多个处理消息的handler线程,如下图:
在这种实现方式中,为了能够正确的完成位移提交,引入了一个共享变量offsets来参与提交,如下图:
基于这种实现方式提供以下两种实现方案:- 通过消费者拉取一个批次的消息,然后再将这些消息交给多线程去处理。
- 基于滑动窗口来实现,将拉取的消息以批次为单位暂存起来,多个消费线程拉取暂存的消息消费,如下图:
窗口滑动过程描述:上一次滑动窗口的范围是2 ~ 5,startOffset为2,当2中的消息都被消费完成后,提交2中的消费位移,窗口向前滑动一格,范围变为3 ~ 6,startOffset变为3。
上一篇:Kafka之消费组与消费者
相关文章:

Kafka之消费者客户端
1、历史上的二个版本 与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本: 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端。新消费…...

使用Python进行数据分析入门
文章目录 Python环境搭建安装Anaconda验证安装 必备库介绍NumPyPandasMatplotlibSciPy 数据导入与清洗导入数据清洗数据 数据探索与分析描述性统计相关性分析 数据可视化绘制直方图 高级主题机器学习深度学习 总结 随着大数据时代的到来,数据分析变得越来越重要。Py…...
ubuntu20 从源码编译升级到版本5.15.263
author: hjjdebug date: 2024年 10月 25日 星期五 15:38:48 CST description: ubuntu20 从源码编译升级到版本5.15.263 我的内核是 5.15.105, 用apt 下载源码后其版本是5.15.263 为什么要从源码编译内核. 升级内核? 目的: 练练手. 消除内核神秘性. 还可以裁减内核,也是调试内核…...
php 程序开发分层与验证思想
在PHP程序开发中,合理的层级设计可以提高代码的可维护性、可扩展性和可测试性。以下是常见的层级设计模式及建议: 1. 分层架构 通常可以将PHP应用分为以下几层: 表示层(Presentation Layer): 负责与用户交…...
关于InternVL2的单卡、多卡推理
关于InternVL2的单卡、多卡推理 前言单卡推理多卡推理总结前言 本章节将介绍如何使用上一章节微调后的模型进行推理。推理又分为单卡和多卡,这里介绍的两种方式都是Hugging Face的transformers方法进行推理。模型的话可以使用上一章微调的任意一个非lora模型进行测试。 单卡推…...
Go语言设计Web框架
如何设计一个Web框架 项目规划 在开始设计Web框架之前,我们需要对整个项目进行规划。主要包括以下几个方面: 项目结构依赖管理路由设计控制器设计日志和配置管理 项目结构 首先,我们定义项目的目录结构: ├── cmd/ │ └…...

2024年10月28日练习(双指针算法)
一.11. 盛最多水的容器 - 力扣(LeetCode) 1.题目描述: 这个题目代表的意思就是数组上每个对应的值就相当于每条垂直线的高度,就相当于短板效应,两 个高度的线会取最短的长度因为那样水才不会漏。而两条线的数组的下标…...

Objective-C 音频爬虫:实时接收数据的 didReceiveData_ 方法
在互联网技术领域,数据的获取和处理是至关重要的。尤其是对于音频内容的获取,实时性和效率是衡量一个爬虫性能的重要指标。本文将深入探讨在Objective-C中实现音频爬虫时,如何高效地使用didReceiveData:方法来实时接收数据,并通过…...

提升网站流量和自然排名的SEO基本知识与策略分析
内容概要 在当今数字化时代,SEO(搜索引擎优化)成为加强网站可见度和提升流量的重要工具。SEO的基础知识包括理解搜索引擎的工作原理,以及如何通过优化网站内容和结构来提高自然排名。白帽SEO和黑帽SEO代表了两种截然不同的策略&a…...
雷池社区版compose文件配置讲解--fvm
在现代网络安全中,选择合适的 Web 应用防火墙至关重要。雷池(SafeLine)社区版免费切好用。为网站提供全面的保护,帮助网站抵御各种网络攻击。 docker-compose.yml 文件是 Docker Compose 的核心文件,用于定义和管理多…...

基于51单片机的智能断路器proteus仿真
地址: https://pan.baidu.com/s/16lfGgrgVr9V7JehonMNVQA 提取码:1234 仿真图: 芯片/模块的特点: AT89C52/AT89C51简介: AT89C52/AT89C51是一款经典的8位单片机,是意法半导体(STMicroelectro…...

(N-154)基于springboot酒店预订管理系统
开发工具:IDEA 服务器:Tomcat9.0, jdk1.8 项目构建:maven 数据库:mysql5.7 前端技术:AdminLTEBootstrapLayUIHTMLjQuery 服务端技术:springbootmybatis-plusthymeleaf 本项目分前台和后台…...

elasticsearch 8.x 插件安装(三)之拼音插件
elasticsearch 8.x 插件安装(三)之拼音插件 elasticsearch插件安装合集 elasticsearch插件安装(一)之ik分词器安装(含MySQL更新) elasticsearch 8.x插件(二)之同义词安装如何解决…...

快速遍历包含合并单元格的Word表格
Word中的合并表格如下,现在需要根据子类(例如:果汁)查找对应的品类,如果这是Excel表格,那么即使包含合并单元格,也很容易处理,但是使用Word VBA进行查找,就需要一些技巧。…...
手机收银云进销存管理软件,商品档案Excel格式批量导入导出,一键导入Excel的商品档案
如果您有Excel的商品档案,那么就可以批量导入到我们的手机云进销存软件系统里,就不需要人工手工一个个商品的新建商品档案,大大提高工作效率。如果您看下面的步骤不会操作,可以联系我们技术支持,来帮您把商品档案导入。…...
html 中识别\n自动换行
CSS实现:white-space <div style"white-space: pre-wrap;" v-html"str"> </div>white-space: normal|nowrap|pre|pre-line|pre-wrap|initial|inherit;值描述换行符空格和制表符文字换行行尾空格normal默认。空白会被浏览器忽略。合…...
用QWebSocketServer写websocket服务端
1. 引入必要的头文件 #include <QCoreApplication> #include <QWebSocketServer> #include <QWebSocket> #include <QDebug> #include <QObject>QCoreApplication:用于创建控制台应用的事件循环。QWebSocketServer:提供 …...
云原生后端:现代应用架构的核心力量
云原生后端:现代应用架构的核心力量 云原生后端是基于云环境进行设计和开发的一种理念,利用云服务和云原生技术构建的服务端应用。它旨在提供灵活、高效、弹性和可扩展的解决方案,成为推动应用现代化的核心力量。本文将详细探讨云原生后端的…...

arcgis中dem转模型导入3dmax
文末分享素材 效果 1、准备数据 (1)DEM (2)DOM 2、打开arcscene软件 3、加载DEM、DOM数据 4、设置DOM的高度为DEM...

Python自动化测试中的Mock与单元测试实战
在软件开发过程中,自动化测试是确保代码质量和稳定性的关键一环。而Python作为一门灵活且强大的编程语言,提供了丰富的工具和库来支持自动化测试。本文将深入探讨如何结合Mock与单元测试,利用Python进行自动化测试,以提高代码的可…...
[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解
突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 安全措施依赖问题 GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...

1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

Android15默认授权浮窗权限
我们经常有那种需求,客户需要定制的apk集成在ROM中,并且默认授予其【显示在其他应用的上层】权限,也就是我们常说的浮窗权限,那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...

AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
代码随想录刷题day30
1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...