大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(正在更新…)
章节内容
上节我们完成了如下内容:
- 消费组测试,消费者变动对消费的影响
- 消费者的心跳机制
- 消费者的相关配置参数

主题和分区
- Topic:Kafka用于分类管理消息的逻辑单元,类似于MySQL的数据库
- Partition:是Kafka下数据存储的基本单元,这个是物理上的概念,同一个Topic的数据,会被分散的存储到多个Partition中,这些Partition可以在同一台机器上,也可以在多台机器上。优势在于可以进行水平扩展,通常Partition的数量是BrokerServer数量的整数倍
- ConsumerGroup,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部消息。在消息组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。
- Consumer 采用 PULL 模式从 Broker 中读取数据,采用PULL模式 Consumer可以自行控制消费的速度。

反序列化
- Kafka的Broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交由给用户程序消费。
- 消费者的反序列化器包括Key和Value。
自定义反序列化
如果要实现自定义的反序列化器,需要实现 Deserializer 接口:
public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {Deserializer.super.configure(configs, isKey);}@Overridepublic User deserialize(String topic, byte[] data) {ByteBuffer buffer = ByteBuffer.allocate(data.length);buffer.put(data);buffer.flip();int userId = buffer.getInt();int usernameLen = buffer.getInt();String username = new String(data, 8, usernameLen);int passwordLen = buffer.getInt();String password = new String(data, 8 + usernameLen, passwordLen);int age = buffer.getInt();User user = new User();user.setUserId(userId);user.setUsername(username);user.setPassword(password);user.setAge(age);return user;}@Overridepublic User deserialize(String topic, Headers headers, byte[] data) {return Deserializer.super.deserialize(topic, headers, data);}@Overridepublic void close() {Deserializer.super.close();}
}
消费者拦截器
消费者在拉取了分区消息之后,要首先经过反序列化器对Key和Value进行反序列化操作。
消费端定义消息拦截器,要实现 ConsumerInterceptor接口:
- 一个可插拔的接口,允许拦截、更改消费者接收到的消息,首要的用例在于将第三方组件引入消费者应用程序,用于定制监控、日志处理等
- 该接口的实现类通过configure方法获取消费者配置的属性,如果消费者配置中没有指定ClientID,还可以获取KafkaConsumer生成的ClientID,获取这个配置跟其他拦截器是共享的,需要保证不会在各个拦截器之间产生冲突。
- ConsumerInterceptor方法抛出异常会被捕获,但不会向下传播,如果配置了错误的参数类型,消费者不会抛出异常而是记录下来。
- ConsumerInterceptor回调发生在KafkaConsumer.poll()方法的同一个线程
public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {System.out.println("=== 消费者拦截器 01 onConsume ===");return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {System.out.println("=== 消费者拦截器 01 onCommit ===");}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {System.out.println("消费者设置的参数");configs.forEach((k, v) -> {System.out.println(k + ", " + v);});}
}
位移提交
相关概念
- Consumer 需要向Kafka记录自己的位移数据,这个汇报过程称为:提交位移(Committing Offsets)
- Consumer 需要为分配给它的每个分区提交各自的位移数据
- 位移提交的由Consumer端负责的,Kafka只负责保管,存到 __consumer_offsets 中
- 位移提交:自动提交和手动提交
- 位移提交:同步提交和异步提交
自动提交
Kafka Consumer后台提交
- 开启自动提交 enable.auto.commit=true
- 配置启动提交间隔:auto.commit.interval.ms,默认是5秒
位移顺序
自动提交位移的顺序:
- 配置 enable.auto.commit=true
- Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息的
- 因此自动提交不会出现消息丢失,但是会重复消费
重复消费
重复消费的场景:
- Consumer设置5秒提交offset
- 假设提交offset后3秒发生了Rebalance
- Rebalance之后所有的Consumer从上一次提交的Offset的地方继续消费
- 因为Rebalance发生前3秒的内的提交就丢失了
异步提交
- 使用 KafkaConsumer#commitSync,会提交所有poll返回的最新Offset
- 该方法为同步操作 等待直到 offset 被成功提交才返回
- 手动同步提交可以控制offset提交的时机和频率
位移管理
Kafka中,消费者根据消息的位移顺序消费消息,消费者的位移由消费者者管理,Kafka提供了消费者的API,让消费者自行管理位移。



重平衡
重平衡可以说是Kafka中诟病最厉害的一部分。
重平衡是一个协议,它规定了如何让消费者组下的所有消费者来分配Topic中每一个分区。
比如一个Topic中有100个分区,一个消费组内有20个消费者,在协调者的控制下可以让每一个消费者能分配到5个分区,这个分配过程就是重平衡。
重平衡的出发条件主要有三个:
- 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
- 主题的分区数发生变化,Kafka目前只能增加分区数,当增加的时候就会触发重平衡
- 订阅的主题发生变化,当消费组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会重平衡
为什么说重平衡让人诟病呢?因为重平衡过程中,消费者无法从Kafka消费消息,对Kafka的TPS影响极大,而如果Kafka集群内节点较多,比如数百个,重平衡耗时会很久。
避免重平衡
要完全避免重平衡做不到,但是要尽量避免重平衡。
在分布式系统中,由于网络问题没有接收到心跳,此时不确认是挂了还是负载没过来还是网络阻塞。
- session.timeout.ms 规定超时时间是多久
- heartbeat.interval.ms 规定心跳的频率 越高越不容易误判 但是会消耗更多资源
- max.poll.interval.ms 消费者poll数据后,需要处理在进行拉取,如果两次拉取时间超过间隔就会被剔除,默认是5分钟。
这里给出一些推荐参数的配置:
- session.timeout.ms 设置为6秒
- heaertbeat.interval.ms 设置2秒
- max.poll.interval.ms 推荐消费者处理消息最长耗时再加1分钟
相关文章:
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…...
Google Gemma2 2B:语言模型的“小时代”到来?
北京时间8月1日凌晨(当地时间7月31日下午),Google发布了其Gemma系列开源语言模型的更新,在AI领域引发了巨大的震动。Google Developer的官方博客宣布,与6月发布的27B和9B参数版本相比,新的2B参数模型在保持…...
三线程顺序打印1-100
三线程顺序打印1-100 题目 三个线程顺序打印1-100; 解题 基本思路 首先需要创建三个线程, 确定使用线程1打印 num % 3 1 的数, 线程2打印 num % 3 2 的数, 线程3打印 num % 3 0 的数;使用 synchronized 同步锁让每次只有一个线程进行打印, 每个线程打印前先判断当前数是…...
中央处理器CPU
中央处理器CPU cpu的组成(从功能方面来看)cpu的执行过程★.取指令阶段★.解码阶段★.执行阶段 重点: 1.cpu的组成 2.cpu怎么执行程序(命令) cpu的组成(从功能方面来看) 寄存器:用来临…...
用Python实现AI人脸识别
实现AI人脸识别通常涉及到使用深度学习库,如TensorFlow或PyTorch,配合预训练的人脸识别模型。以下是一个使用Python和TensorFlow框架中的tensorflow_hub模块来加载和使用一个预训练的人脸识别模型的简单示例。 步骤 1: 安装必要的库 首先,你…...
MSPM0G3507_2024电赛自动行驶小车(H题)_问题与感悟
这次电赛题目选的简单了,还规定不能使用到摄像头,这让我之前学习的Opencv 4与树莓派无用武之地了,但我当时对于三子棋题目饶有兴趣,但架不住队友想稳奖,只能选择这个H题了...... 之后我还想抽空将这个E题三子棋题目做…...
C语言:指针(2)
一.数组名 在了解数组名前我们先看一段代码 int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[0]; 根据我们上一篇学习的知识,我们知道&arr[0]是数组第一个元素的地址,这时我们再看另一段代码的运行结果。 #include <stdio.h> int ma…...
数组——二维数组
数组(中) 二维数组 定义 二维数组本质上是一个行列式的组合,也就是说二维数组是有行和列两部分构成。二维数组数据是通过行列进行解读。 二维数组可被视为一个特殊的一维数组,相当于二维数组又是一个一维数组,只不过它的元素是一维数组。 …...
深入 Vue 组件与状态管理的教程
目录 深入 Vue 组件与状态管理的教程第一部分:深入组件1. 理解插槽(Slots)的使用1.1 基础插槽示例1.2 具名插槽1.3 作用域插槽 第二部分:Vue Router1. 学习 Vue Router 的基本配置1.1 基本路由配置1.2 嵌套路由1.3 路由参数 2. 导…...
Spring Boot 实现异步处理多个并行任务
在现代Web应用开发中,异步处理和多任务并行处理对于提高系统的响应性和吞吐量至关重要。Spring Boot 提供了多种机制来实现异步任务处理,本文将介绍如何利用这些机制来优化您的应用程序性能。 1. 引言 在高负载情况下,如果所有的请求都采用…...
TiDB系列之:使用Flink TiDB CDC Connector采集数据
TiDB系列之:使用Flink TiDB CDC Connector采集数据 一、依赖项二、Maven依赖三、SQL Client JAR四、如何创建 TiDB CDC 表五、连接器选项六、可用元数据七、特征一次性处理启动阅读位置多线程读取DataStream Source 八、数据类型映射 TiDB CDC 连接器允许从 TiDB 数…...
每日一道算法题 最接近的三数之和
题目 16. 最接近的三数之和 - 力扣(LeetCode) Python class Solution:def threeSumClosest(self, nums: List[int], target: int) -> int:nums.sort()nlen(nums)ans0min_diffinf # infinite 无穷for i in range(n-2):tmpnums[i]li1rn-1while l<…...
搭建自己的金融数据源和量化分析平台(六):下载并存储沪深两市上市公司财报
基于不依赖wind、某花顺等第三方平台数据的考虑,尝试直接从财报中解析三大报表进而计算ROE等财务指标,因此需要下载沪深两市的上市公司财报数据,便于后续从pdf中解析三大报表。 深市爬虫好做,先放深市爬虫: 根据时间段…...
C语言-常见关键字详解
一、const 关键字const用于声明常量,赋值后,其值不能再被修改。 示例: const int MAX_COUNT 100; 二、static static关键字在不同情境下有不同作用: 1.函数中的静态变量:保留变量状态,仅初始化一次&a…...
异步编程之std::future(一): 使用
目录 1.概述 2.std::future的基本用法 3.使用 std::shared_future 4.std::future的使用场景 5.总结 1.概述 在编程实践中,我们常常需要使用异步调用。通过异步调用,我们可以将一些耗时、阻塞的任务交给其他线程来执行,从而保证当前线程的…...
Vue3 + JS项目配置ESLint Pretter
前言 如果在开发大型项目 同时为多人协作开发 那么 ESLint 在项目中极为重要 在使用 ESLint 的同时 也需要使用 Pretter插件 统一对代码进行格式化 二者相辅相成 缺一不可 1. 安装 VsCode 插件 在 VsCode 插件市场搜索安装 ESLint 和 Pretter 2. 安装依赖 这里直接在 pac…...
JavaScript (十四)——JavaScript typeof和类型转换
目录 JavaScript typeof, null, 和 undefined typeof 操作符 null undefined undefined 和 null 的区别 JavaScript 类型转换 JavaScript 数据类型 JavaScript 类型转换 将数字转换为字符串 将布尔值转换为字符串 将日期转换为字符串 将字符串转换为数字 一元运算符…...
CTF-web 基础
网络协议 OSI七层参考模型:一个标准的参考模型 物理层 网线,网线接口等。 数据链路层 可以处理物理层传入的信息。 网络层 比如IP地址 传输层 控制传输的内容的传输,在传输的过程中将要传输的信息分块传输完成之后再进行合并。 应用…...
CP AUTOSAR标准之ChineseV2XNetwork(AUTOSAR_SWS_ChineseV2XNetwork)(更新中……)
1 简介和功能概述 本文档指定了AUTOSAR基础软件模块中国车辆对接网络(CnV2xNet)的功能、API和配置。 中国车联网网络(CnV2xNet)与中国车联网消息(CnV2xMsg)、中国车联网管理(CnV2xMgt)、中国车联网安全(CnV2xSec)以及AUTOSAR BSW模块以太网接口(EthIf)共同构成了AUTOSAR架构…...
【hloc】 项目流程
hloc 项目流程 1. 数据集准备2. 特征提取3. 匹配特征4. 三维重建5. 定位6. 结果评估7. 示例脚本 这个项目涉及到了视觉定位和三维重建的一系列步骤,从特征提取、匹配、三维重建到定位和结果评估。通过提供的脚本文件,用户可以方便地运行整个流程。 1. 数…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...
【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
遍历 Map 类型集合的方法汇总
1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
