消费者Rebalance机制
优质博文:IT-BLOG-CN
一、消费者Rebalance机制
在Apache Kafka中,消费者组
Consumer Group会在以下几种情况下发生重新平衡Rebalance:
【1】消费者加入或离开消费者组: 当一个新的消费者加入消费者组或一个现有的消费者离开消费者组时,Kafka会触发重新平衡,以重新分配分区给消费者。
【2】消费者崩溃或失去连接: 如果Kafka检测到某个消费者崩溃或失去连接(例如,由于网络问题或消费者进程被终止),它会触发重新平衡。
【3】主题的分区数量发生变化: 如果一个主题的分区数量增加或减少,Kafka会触发重新平衡,以确保新的分区被分配给消费者组中的消费者。
【4】消费者组协调器变更: 消费者组协调器是负责管理消费者组的一个Kafka Broker。如果消费者组协调器发生变更(例如,协调器所在的Broker崩溃),也会触发重新平衡。
【5】消费者组成员发送心跳失败: 消费者需要定期向消费者组协调器发送心跳heartbeat以表明它们仍然活跃。如果心跳失败,协调器会认为该消费者已经失去连接,从而触发重新平衡。
rebalance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进行rebanlance。
Kafka在高峰期重平衡rebalancing会导致消费者组的停顿,影响系统的性能和稳定性。为了避免在高峰期发生重平衡,可以采取以下几种策略:
【1】优化分区分配策略: 使用RangeAssignor或StickyAssignor等分区分配策略来减少重平衡的频率和影响。
RangeAssignor 是Kafka默认的分区分配策略之一,它将分区按范围分配给消费者。
我们通过一个具体的例子来说明RangeAssignor如何分配分区。
假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。
初始分配:假设初始分配如下:
C1: P0, P1
C2: P2, P3
C3: P4, P5
消费者组成员变化:现在假设C2离开了消费者组,那么RangeAssignor会重新分配分区,以确保分区尽量按顺序和均匀地分配给剩余的消费者。新的分配可能如下:
C1: P0, P1, P2
C3: P3, P4, P5
在这个过程中,RangeAssignor将分区按顺序重新分配给剩余的消费者,确保每个消费者分配到的分区尽量连续。
新消费者加入:现在假设有一个新消费者C4加入了消费者组,RangeAssignor会再次按顺序和均匀地分配分区。新的分配可能如下:
C1: P0, P1
C3: P2, P3
C4: P4, P5
在这个过程中,RangeAssignor将分区重新分配,以确保每个消费者分配到的分区尽量连续和均匀。
通过这个例子,我们可以看到RangeAssignor的分配策略:
1、将分区按顺序分配给消费者。
2、当消费者组成员变化时,重新分配分区,以确保分区尽量按顺序和均匀地分配给所有消费者。
3、分区分配尽量保持连续性。
这种策略的好处是分区分配简单且稳定,减少了分区在消费者组成员变化时的重新分配范围,从而减少了重平衡的频率和影响。
以下是配置RangeAssignor的代码示例:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class RangeAssignorExample {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置分区分配策略为 RangeAssignorprops.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(List.of("example-topic"));// 消费消息的逻辑// ...}
}
StickyAssignor 是Kafka 2.4及以上版本引入的一种分区分配策略,它的目标是尽量保持分区分配的稳定性,减少重平衡的频率。
我们通过一个具体的例子来说明StickyAssignor如何分配分区。
假设我们有一个Kafka主题my-topic,它有6个分区P0, P1, P2, P3, P4, P5,并且我们有3个消费者C1, C2, C3在一个消费者组中。
初始分配:假设初始分配如下:
C1: P0, P1
C2: P2, P3
C3: P4, P5
消费者组成员变化:现在假设C2离开了消费者组,那么StickyAssignor会尽量保持现有的分区分配不变,并重新分配C2的分区。新的分配可能如下:
C1: P0, P1, P2
C3: P3, P4, P5
在这个过程中,StickyAssignor尽量保持C1和C3的分区分配不变,只是将C2的分区重新分配给其他消费者。
新消费者加入:现在假设有一个新消费者C4加入了消费者组,StickyAssignor会尝试保持现有的分区分配不变,并将分区尽量均匀地分配给所有消费者。新的分配可能如下:
C1: P0, P1
C3: P4, P5
C4: P2, P3
在这个过程中,StickyAssignor保持了C1和C3的分区不变,并将C2的分区重新分配给C4。
通过这个例子,我们可以看到StickyAssignor的分配策略:
1、尽量保持现有的分区分配不变。
2、当消费者组成员变化时,尽量最小化分区在消费者之间的移动。
3、尽量保持分区分配的平衡性。
这种策略的好处是减少了重平衡带来的影响,提高了分区分配的稳定性,减少了因分区移动带来的数据重新加载和处理的开销。
以下是配置StickyAssignor的代码示例:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class StickyAssignorExample {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置分区分配策略为 StickyAssignorprops.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(List.of("example-topic"));// 消费消息的逻辑// ...}
}
或者在配置中进行指定
group.id=my-consumer-group
partition.assignment.strategy=org.apache.kafka.clients.consumer.StickyAssignor
【2】增加session.timeout.ms和heartbeat.interval.ms:增加session.timeout.ms和heartbeat.interval.ms的值,这样可以减少消费者因为心跳超时而被认为失效,从而触发重平衡。
1、session.timeout.ms是消费者与Kafka broker之间的会话超时时间。如果在这个时间内Kafka broker没有收到某个消费者的心跳,broker就会认为该消费者已经失效,并触发重平衡。
2、heartbeat.interval.ms是消费者发送心跳给Kafka broker的时间间隔。心跳是消费者向broker表示自己仍然活跃的方式。
session.timeout.ms=30000
heartbeat.interval.ms=3000
3、heartbeat.interval.ms的值通常要远小于session.timeout.ms的值。这样可以确保在会话超时之前,消费者有多次机会发送心跳。一般建议session.timeout.ms至少是heartbeat.interval.ms的10倍,以确保有足够的时间进行多次心跳尝试。
【3】合理配置消费者组:确保消费者组中的消费者数量稳定,避免频繁地增加或减少消费者。尽量在低峰期进行消费者的添加或移除操作。
【4】优化消费者性能:提高消费者的处理能力,确保消费者能够及时处理消息,避免因为处理延迟导致的重平衡。使用异步处理或批量处理来提高消费者的吞吐量。
【5】监控和报警:实时监控Kafka集群和消费者组的状态,设置报警机制,当检测到重平衡风险时,及时采取措施。
【6】使用静态成员Static Membership:Kafka 2.3及以上版本支持静态成员功能,可以通过配置group.instance.id来减少重平衡的频率。
group.instance.id是Kafka 2.4.0引入的一个配置项,用于为每个消费者实例指定一个唯一的标识符。当消费者组中的消费者具有唯一的group.instance.id时,Kafka可以更智能地处理消费者组成员的变化,从而减少不必要的重平衡。
静态成员:通过配置group.instance.id,消费者实例变成了“静态成员”,即使它们暂时断开连接,Kafka也会保留它们的成员身份。这与传统的动态成员(没有group.instance.id)不同,动态成员在断开连接后会被移除,从而触发重平衡。
group.id=my-consumer-group
group.instance.id=consumer-instance-1
【7】调整rebalance.timeout.ms:增加rebalance.timeout.ms的值,确保消费者有足够的时间完成重平衡过程,避免因超时导致的频繁重平衡。
消费者Rebalance分区分配策略
主要包含四种relalance策略:RangeAssignor(范围分配策略),RoundRobinAssignor(轮询分配策略),StickyAssignor(粘性分配策略),CooperativeStickyAssignor(协作粘性分配策略),之前已经讲过两个,这里聊聊剩下的两个
RoundRobinAssignor(轮询分配策略)
RoundRobinAssignor采用轮询的方式将分区分配给消费者。它会将所有分区和消费者按照字典顺序排序,然后依次将每个分区分配给下一个消费者,直到所有分区都被分配完毕。
CooperativeStickyAssignor(协作粘性分配策略)
CooperativeStickyAssignor是StickyAssignor的改进版本,它引入了协作重平衡的概念,使得重平衡过程更加平滑,减少了重平衡期间的停顿时间。
二、Rebalance 过程

第一阶段:选择"组协调器"
组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。
consumer group中的每个consumer启动时会向kafka集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接。
组协调器选择方式:consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumer group的coordinator
第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。然后GroupCoordinator从一个consumer group中选择第一个加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案。
第三阶段:SYNC GROUP
consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。
相关文章:
消费者Rebalance机制
优质博文:IT-BLOG-CN 一、消费者Rebalance机制 在Apache Kafka中,消费者组 Consumer Group会在以下几种情况下发生重新平衡Rebalance: 【1】消费者加入或离开消费者组: 当一个新的消费者加入消费者组或一个现有的消费者离开消费…...
消息队列介绍
一、ActiveMQ 优点:性能单台(6000)成熟,已经在很多公司得到应用。各种协议支持好,有多个语言的成熟客户端 缺点:性能较弱,缺乏大规模吞吐的场景的应用,有江河日下之感 二、RabbitMQ…...
告别@Value,Spring Boot 3.3更优雅的配置注入方案
在Spring Boot的早期版本中,我们常使用Value注解来注入配置文件中的属性值。然而,这种方式虽然简单直接,却存在一些局限,比如它只能注入基本类型的值,并且需要显式地在每个需要注入的字段上使用注解。随着Spring Boot的…...
甲虫身体图像分割系统源码&数据集分享
甲虫身体图像分割系统源码&数据集分享 [yolov8-seg-EfficientRepBiPAN&yolov8-seg-C2f-FocusedLinearAttention等50全套改进创新点发刊_一键训练教程_Web前端展示] 1.研究背景与意义 项目参考ILSVRC ImageNet Large Scale Visual Recognition Challen…...
Qt - QMenu
QMenu 1、menu转string输出 //GlobalEnum.h #include <QObject> #include <QMetaEnum> class GlobalEnum : public QObject {Q_OBJECT public:EnumTest();enum Enum_Test{ZhangSan 0,WangWu,};Q_ENUM(Enum_Test) };#define EnumToString(e) \ QMetaEnum::fromTy…...
舵机驱动详解(模拟/数字 STM32)
目录 一、介绍 二、模块原理 1.舵机驱动原理 2.引脚描述 三、程序设计 main.c文件 servo.h文件 servo.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 舵机(Servo)是在程序的控制下,在一定范围内连续改变输出轴角度并保持的电机系统。即舵机只支持…...
dvwa:文件包含、文件上传
文件包含 本地文件包含(敏感信息泄露)和远程文件包含(命令执行) 本地文件包含一般包含一些本地的敏感文件,如:/etc/passwd或/etc/shadow等 远程文件包含能使得服务器代码执行,如包含黑客vps的…...
基于 C# .NET Framework 4.0 开发实现 WCF 服务实例详解(二)——实现Windows服务内嵌WCF服务
目录 引言 1. 创建一个新的Windows服务项目 2. 添加WCF服务 2.1 添加服务接口和实现 2.2 添加服务配置 3. 实现Windows服务 3.1 修改Service1类 3.2 在项目中添加ServiceInstaller 4. 安装和运行Windows服务 4.1 编译项目 4.2 使用InstallUtil.exe安装服务 …...
【ArcGIS/C#】调用控制台处理代码
代码示例 private static string[] run_conda_process(string command, Action<string> on_msg, CancellationTokenSource cancel){if (string.IsNullOrEmpty(command)){return new string[]{null,ArcGIS.Desktop.Internal.Core.Conda.Resources.Error_Unexpected + &qu…...
06_23 种设计模式之《适配器模式》
文章目录 一、适配器模式基础知识实例 一、适配器模式基础知识 适配器模式定义:将一个类的接口转换成客户希望的另一个接 口。适配器模式使得原本由于接口不兼容而不能一起工作的那些类可 以一起工作。 Client:客户端,调用自已需要的领域接口…...
Go语言--快速入门
Go语言特点 我们先看一下简单的Go语言程序 package mainimport "fmt"func main() { // 错误,{ 不能在单独的行上fmt.Println("Hello, World!") }我们可以看到外型非常像我们的JAVA,但是不需要;作为结尾,…...
京东云主机怎么用?使用京东云服务器建网站(图文教程)
京东云主机怎么用?非常简单,本文京东云服务器网jdyfwq.com使用以使用京东云服务器搭建WordPress博客网站为例,来详细说下京东云主机的使用方法。使用京东云服务器快速搭建WordPress网站教程,3分钟基于应用镜像一键搞定,…...
Linux 基础入门操作-实验七 进程的介绍
实验七 进程的介绍 7.1 进程基础概念 Linux进程在内存中包含三部分数据:码段、堆栈段和数据段。代码段存放了程序的代码。代码段可以为机器中运行同一程序的数个进程共享。堆栈段存放的是子程序的返回地址、子程序的参数及程序的局部变量。而数据段则存放程序的全…...
SQL进阶技巧:SQL中的正则表达式应用?
目录 0 引言 1. 正则表达式函数 1.1 regexp_extract 1.2 regexp_replace 1.3 regexp_like 2. 在WHERE子句中使用正则表达式 3. 在GROUP BY中使用正则表达式 4. 性能考虑 5. 高级正则表达式技巧 5.1 使用正则表达式进行数据清洗 5.2 使用正则表达式处理JSON 6. 正则…...
算法数组面试理论
数组是存放在连续内存空间内的相同类型数据的集合 (所以在删除添加元素的时候需要移动其他的元素的地址) 数组的元素是不能删除的,只能覆盖。(因为内存地址是连续的,所以不能删除。或者可以这么理解:在一…...
ASP.NET Zero是什么?适合哪些业务场景?
一、ASP.NET Zero是什么? ASP.NET Zero 是一个基于 ASP.NET Boilerplate (ABP) 框架的模板项目,它提供了预建的页面和强大的基础设施架构,以便开发者能够快速开发应用层。它的特点包括但不限于: 多合一解决方案:提供多…...
获取期货股票分钟级别数据以及均线策略
【数据获取】 银河金融数据库(yinhedata.com) 能够获取国内外金融股票、期货历史行情数据,包含各分钟级别。 【搭建策略】 均线策略作为一种广泛应用于股票、期货等市场的技术分析方法,凭借其简单易懂、操作性强等特点…...
入门篇-1 数据结构简介
数据结构简介 在计算机科学中,数据结构是指组织、存储和管理数据的方式,它使得数据可以被高效地访问和修改。数据结构是计算机程序设计和算法分析中的一个重要概念,因为它们直接影响到程序的执行效率和内存使用。 1. 什么是数据结构&#x…...
Anaconda安装
1.进入Anaconda官网 2.填写邮箱信息 3.在邮箱消息中获取下载链接 4.进入下载页面,选择合适版本下载 5.进入Anaconda安装界面 6.点击“I agree” 7.选择个人即可“Just Me” 8.选择文件安装路径 9.允许创建快捷方式 10.等待下载 11.完成安装...
Elasticsearch学习笔记(六)使用集群令牌将新加点加入集群
随着业务的增长,陆续会有新的节点需要加入集群。当我们在集群中的某个节点上使用命令生成令牌时会出现报错信息。 # 生成令牌 /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s node出现报错信息: Unable to create enrollment…...
eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...
大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
React Native 导航系统实战(React Navigation)
导航系统实战(React Navigation) React Navigation 是 React Native 应用中最常用的导航库之一,它提供了多种导航模式,如堆栈导航(Stack Navigator)、标签导航(Tab Navigator)和抽屉…...
【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案
JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停 1. 安全点(Safepoint)阻塞 现象:JVM暂停但无GC日志,日志显示No GCs detected。原因:JVM等待所有线程进入安全点(如…...
如何理解 IP 数据报中的 TTL?
目录 前言理解 前言 面试灵魂一问:说说对 IP 数据报中 TTL 的理解?我们都知道,IP 数据报由首部和数据两部分组成,首部又分为两部分:固定部分和可变部分,共占 20 字节,而即将讨论的 TTL 就位于首…...
人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...
GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...
JavaScript基础-API 和 Web API
在学习JavaScript的过程中,理解API(应用程序接口)和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能,使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...
Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...
