当前位置: 首页 > news >正文

大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:

  • 消费组测试,消费者变动对消费的影响
  • 消费者的心跳机制
  • 消费者的相关配置参数

在这里插入图片描述

主题和分区

  • Topic:Kafka用于分类管理消息的逻辑单元,类似于MySQL的数据库
  • Partition:是Kafka下数据存储的基本单元,这个是物理上的概念,同一个Topic的数据,会被分散的存储到多个Partition中,这些Partition可以在同一台机器上,也可以在多台机器上。优势在于可以进行水平扩展,通常Partition的数量是BrokerServer数量的整数倍
  • ConsumerGroup,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部消息。在消息组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。
  • Consumer 采用 PULL 模式从 Broker 中读取数据,采用PULL模式 Consumer可以自行控制消费的速度。
    在这里插入图片描述

反序列化

  • Kafka的Broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交由给用户程序消费。
  • 消费者的反序列化器包括Key和Value。

自定义反序列化

如果要实现自定义的反序列化器,需要实现 Deserializer 接口:

public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {Deserializer.super.configure(configs, isKey);}@Overridepublic User deserialize(String topic, byte[] data) {ByteBuffer buffer = ByteBuffer.allocate(data.length);buffer.put(data);buffer.flip();int userId = buffer.getInt();int usernameLen = buffer.getInt();String username = new String(data, 8, usernameLen);int passwordLen = buffer.getInt();String password = new String(data, 8 + usernameLen, passwordLen);int age = buffer.getInt();User user = new User();user.setUserId(userId);user.setUsername(username);user.setPassword(password);user.setAge(age);return user;}@Overridepublic User deserialize(String topic, Headers headers, byte[] data) {return Deserializer.super.deserialize(topic, headers, data);}@Overridepublic void close() {Deserializer.super.close();}
}

消费者拦截器

消费者在拉取了分区消息之后,要首先经过反序列化器对Key和Value进行反序列化操作。
消费端定义消息拦截器,要实现 ConsumerInterceptor接口:

  • 一个可插拔的接口,允许拦截、更改消费者接收到的消息,首要的用例在于将第三方组件引入消费者应用程序,用于定制监控、日志处理等
  • 该接口的实现类通过configure方法获取消费者配置的属性,如果消费者配置中没有指定ClientID,还可以获取KafkaConsumer生成的ClientID,获取这个配置跟其他拦截器是共享的,需要保证不会在各个拦截器之间产生冲突。
  • ConsumerInterceptor方法抛出异常会被捕获,但不会向下传播,如果配置了错误的参数类型,消费者不会抛出异常而是记录下来。
  • ConsumerInterceptor回调发生在KafkaConsumer.poll()方法的同一个线程
public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {System.out.println("=== 消费者拦截器 01 onConsume ===");return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {System.out.println("=== 消费者拦截器 01 onCommit ===");}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {System.out.println("消费者设置的参数");configs.forEach((k, v) -> {System.out.println(k + ", " + v);});}
}

位移提交

相关概念

  • Consumer 需要向Kafka记录自己的位移数据,这个汇报过程称为:提交位移(Committing Offsets)
  • Consumer 需要为分配给它的每个分区提交各自的位移数据
  • 位移提交的由Consumer端负责的,Kafka只负责保管,存到 __consumer_offsets 中
  • 位移提交:自动提交和手动提交
  • 位移提交:同步提交和异步提交

自动提交

Kafka Consumer后台提交

  • 开启自动提交 enable.auto.commit=true
  • 配置启动提交间隔:auto.commit.interval.ms,默认是5秒

位移顺序

自动提交位移的顺序:

  • 配置 enable.auto.commit=true
  • Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息的
  • 因此自动提交不会出现消息丢失,但是会重复消费

重复消费

重复消费的场景:

  • Consumer设置5秒提交offset
  • 假设提交offset后3秒发生了Rebalance
  • Rebalance之后所有的Consumer从上一次提交的Offset的地方继续消费
  • 因为Rebalance发生前3秒的内的提交就丢失了

异步提交

  • 使用 KafkaConsumer#commitSync,会提交所有poll返回的最新Offset
  • 该方法为同步操作 等待直到 offset 被成功提交才返回
  • 手动同步提交可以控制offset提交的时机和频率

位移管理

Kafka中,消费者根据消息的位移顺序消费消息,消费者的位移由消费者者管理,Kafka提供了消费者的API,让消费者自行管理位移。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

重平衡

重平衡可以说是Kafka中诟病最厉害的一部分。
重平衡是一个协议,它规定了如何让消费者组下的所有消费者来分配Topic中每一个分区。
比如一个Topic中有100个分区,一个消费组内有20个消费者,在协调者的控制下可以让每一个消费者能分配到5个分区,这个分配过程就是重平衡。

重平衡的出发条件主要有三个:

  • 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
  • 主题的分区数发生变化,Kafka目前只能增加分区数,当增加的时候就会触发重平衡
  • 订阅的主题发生变化,当消费组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会重平衡

为什么说重平衡让人诟病呢?因为重平衡过程中,消费者无法从Kafka消费消息,对Kafka的TPS影响极大,而如果Kafka集群内节点较多,比如数百个,重平衡耗时会很久。

避免重平衡

要完全避免重平衡做不到,但是要尽量避免重平衡。
在分布式系统中,由于网络问题没有接收到心跳,此时不确认是挂了还是负载没过来还是网络阻塞。

  • session.timeout.ms 规定超时时间是多久
  • heartbeat.interval.ms 规定心跳的频率 越高越不容易误判 但是会消耗更多资源
  • max.poll.interval.ms 消费者poll数据后,需要处理在进行拉取,如果两次拉取时间超过间隔就会被剔除,默认是5分钟。

这里给出一些推荐参数的配置:

  • session.timeout.ms 设置为6秒
  • heaertbeat.interval.ms 设置2秒
  • max.poll.interval.ms 推荐消费者处理消息最长耗时再加1分钟

相关文章:

大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…...

Google Gemma2 2B:语言模型的“小时代”到来?

北京时间8月1日凌晨&#xff08;当地时间7月31日下午&#xff09;&#xff0c;Google发布了其Gemma系列开源语言模型的更新&#xff0c;在AI领域引发了巨大的震动。Google Developer的官方博客宣布&#xff0c;与6月发布的27B和9B参数版本相比&#xff0c;新的2B参数模型在保持…...

三线程顺序打印1-100

三线程顺序打印1-100 题目 三个线程顺序打印1-100; 解题 基本思路 首先需要创建三个线程, 确定使用线程1打印 num % 3 1 的数, 线程2打印 num % 3 2 的数, 线程3打印 num % 3 0 的数;使用 synchronized 同步锁让每次只有一个线程进行打印, 每个线程打印前先判断当前数是…...

中央处理器CPU

中央处理器CPU cpu的组成&#xff08;从功能方面来看&#xff09;cpu的执行过程★.取指令阶段★.解码阶段★.执行阶段 重点&#xff1a; 1.cpu的组成 2.cpu怎么执行程序&#xff08;命令&#xff09; cpu的组成&#xff08;从功能方面来看&#xff09; 寄存器&#xff1a;用来临…...

用Python实现AI人脸识别

实现AI人脸识别通常涉及到使用深度学习库&#xff0c;如TensorFlow或PyTorch&#xff0c;配合预训练的人脸识别模型。以下是一个使用Python和TensorFlow框架中的tensorflow_hub模块来加载和使用一个预训练的人脸识别模型的简单示例。 步骤 1: 安装必要的库 首先&#xff0c;你…...

MSPM0G3507_2024电赛自动行驶小车(H题)_问题与感悟

这次电赛题目选的简单了&#xff0c;还规定不能使用到摄像头&#xff0c;这让我之前学习的Opencv 4与树莓派无用武之地了&#xff0c;但我当时对于三子棋题目饶有兴趣&#xff0c;但架不住队友想稳奖&#xff0c;只能选择这个H题了...... 之后我还想抽空将这个E题三子棋题目做…...

C语言:指针(2)

一.数组名 在了解数组名前我们先看一段代码 int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[0]; 根据我们上一篇学习的知识&#xff0c;我们知道&arr[0]是数组第一个元素的地址&#xff0c;这时我们再看另一段代码的运行结果。 #include <stdio.h> int ma…...

数组——二维数组

数组(中) 二维数组 定义 二维数组本质上是一个行列式的组合&#xff0c;也就是说二维数组是有行和列两部分构成。二维数组数据是通过行列进行解读。 二维数组可被视为一个特殊的一维数组&#xff0c;相当于二维数组又是一个一维数组&#xff0c;只不过它的元素是一维数组。 …...

深入 Vue 组件与状态管理的教程

目录 深入 Vue 组件与状态管理的教程第一部分&#xff1a;深入组件1. 理解插槽&#xff08;Slots&#xff09;的使用1.1 基础插槽示例1.2 具名插槽1.3 作用域插槽 第二部分&#xff1a;Vue Router1. 学习 Vue Router 的基本配置1.1 基本路由配置1.2 嵌套路由1.3 路由参数 2. 导…...

Spring Boot 实现异步处理多个并行任务

在现代Web应用开发中&#xff0c;异步处理和多任务并行处理对于提高系统的响应性和吞吐量至关重要。Spring Boot 提供了多种机制来实现异步任务处理&#xff0c;本文将介绍如何利用这些机制来优化您的应用程序性能。 1. 引言 在高负载情况下&#xff0c;如果所有的请求都采用…...

TiDB系列之:使用Flink TiDB CDC Connector采集数据

TiDB系列之&#xff1a;使用Flink TiDB CDC Connector采集数据 一、依赖项二、Maven依赖三、SQL Client JAR四、如何创建 TiDB CDC 表五、连接器选项六、可用元数据七、特征一次性处理启动阅读位置多线程读取DataStream Source 八、数据类型映射 TiDB CDC 连接器允许从 TiDB 数…...

每日一道算法题 最接近的三数之和

题目 16. 最接近的三数之和 - 力扣&#xff08;LeetCode&#xff09; Python class Solution:def threeSumClosest(self, nums: List[int], target: int) -> int:nums.sort()nlen(nums)ans0min_diffinf # infinite 无穷for i in range(n-2):tmpnums[i]li1rn-1while l<…...

搭建自己的金融数据源和量化分析平台(六):下载并存储沪深两市上市公司财报

基于不依赖wind、某花顺等第三方平台数据的考虑&#xff0c;尝试直接从财报中解析三大报表进而计算ROE等财务指标&#xff0c;因此需要下载沪深两市的上市公司财报数据&#xff0c;便于后续从pdf中解析三大报表。 深市爬虫好做&#xff0c;先放深市爬虫&#xff1a; 根据时间段…...

C语言-常见关键字详解

一、const 关键字const用于声明常量&#xff0c;赋值后&#xff0c;其值不能再被修改。 示例&#xff1a; const int MAX_COUNT 100; 二、static static关键字在不同情境下有不同作用&#xff1a; 1.函数中的静态变量&#xff1a;保留变量状态&#xff0c;仅初始化一次&a…...

异步编程之std::future(一): 使用

目录 1.概述 2.std::future的基本用法 3.使用 std::shared_future 4.std::future的使用场景 5.总结 1.概述 在编程实践中&#xff0c;我们常常需要使用异步调用。通过异步调用&#xff0c;我们可以将一些耗时、阻塞的任务交给其他线程来执行&#xff0c;从而保证当前线程的…...

Vue3 + JS项目配置ESLint Pretter

前言 如果在开发大型项目 同时为多人协作开发 那么 ESLint 在项目中极为重要 在使用 ESLint 的同时 也需要使用 Pretter插件 统一对代码进行格式化 二者相辅相成 缺一不可 1. 安装 VsCode 插件 在 VsCode 插件市场搜索安装 ESLint 和 Pretter 2. 安装依赖 这里直接在 pac…...

JavaScript (十四)——JavaScript typeof和类型转换

目录 JavaScript typeof, null, 和 undefined typeof 操作符 null undefined undefined 和 null 的区别 JavaScript 类型转换 JavaScript 数据类型 JavaScript 类型转换 将数字转换为字符串 将布尔值转换为字符串 将日期转换为字符串 将字符串转换为数字 一元运算符…...

CTF-web 基础

网络协议 OSI七层参考模型&#xff1a;一个标准的参考模型 物理层 网线&#xff0c;网线接口等。 数据链路层 可以处理物理层传入的信息。 网络层 比如IP地址 传输层 控制传输的内容的传输&#xff0c;在传输的过程中将要传输的信息分块传输完成之后再进行合并。 应用…...

CP AUTOSAR标准之ChineseV2XNetwork(AUTOSAR_SWS_ChineseV2XNetwork)(更新中……)

1 简介和功能概述 本文档指定了AUTOSAR基础软件模块中国车辆对接网络(CnV2xNet)的功能、API和配置。   中国车联网网络(CnV2xNet)与中国车联网消息(CnV2xMsg)、中国车联网管理(CnV2xMgt)、中国车联网安全(CnV2xSec)以及AUTOSAR BSW模块以太网接口(EthIf)共同构成了AUTOSAR架构…...

【hloc】 项目流程

hloc 项目流程 1. 数据集准备2. 特征提取3. 匹配特征4. 三维重建5. 定位6. 结果评估7. 示例脚本 这个项目涉及到了视觉定位和三维重建的一系列步骤&#xff0c;从特征提取、匹配、三维重建到定位和结果评估。通过提供的脚本文件&#xff0c;用户可以方便地运行整个流程。 1. 数…...

JavaSec-RCE

简介 RCE(Remote Code Execution)&#xff0c;可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景&#xff1a;Groovy代码注入 Groovy是一种基于JVM的动态语言&#xff0c;语法简洁&#xff0c;支持闭包、动态类型和Java互操作性&#xff0c…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

大数据学习(132)-HIve数据分析

​​​​&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4…...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...

Linux nano命令的基本使用

参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时&#xff0c;显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...