Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)
目录
- 一、Range分区分配策略原理
- 1.1、Range分区分配策略原理的示例一
- 1.2、Range分区分配策略原理的示例二
- 1.3、Range分区分配策略原理的示例注意事项
- 二、Range 分区分配策略代码案例
- 2.1、创建带有4个分区的fiveTopic主题
- 2.2、创建三个消费者 组成 消费者组
- 2.3、创建生产者
- 2.4、测试
- 2.5、Range 分区分配策略代码案例说明
- 三、Range 分区分配再平衡案例
- 3.1、停止某一个消费者后,(45s 以内)重新发送消息示例
- 3.2、停止某一个消费者后,(45s 以后)重新发送消息示例
- 3.3、Range 分区分配再平衡案例说明
一、Range分区分配策略原理
- Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
1.1、Range分区分配策略原理的示例一
假如现在有 4 个分区,3 个消费者,排序后的分区将会是0,1,2,3;消费者排序完之后将会是C0,C1,C2。
- 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
- 例如:4/3 = 1 余 1 ,除不尽,那么消费者C0便会多消费1个分区。

1.2、Range分区分配策略原理的示例二
假如现在有 5 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4;消费者排序完之后将会是C0,C1,C2。
- 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
- 例如:5/3 = 1 余 2 ,除不尽,那么消费者么C0和C1分别多消费一个分区。

1.3、Range分区分配策略原理的示例注意事项
- 如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有N多个topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜!
二、Range 分区分配策略代码案例
2.1、创建带有4个分区的fiveTopic主题
-
在 Kafka 集群控制台,创建带有4个分区的fiveTopic主题
bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 4 --replication-factor 1 --topic fiveTopic
2.2、创建三个消费者 组成 消费者组
-
复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test”。
package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者 "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("fiveTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}} }
2.3、创建生产者
-
创建CustomProducer生产者。
package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 200; i++) {kafkaProducer.send(new ProducerRecord<>("fiveTopic", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();} }
2.4、测试
-
首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码

-
然后,在 IDEA中分别启动生产者代码

-
在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:



2.5、Range 分区分配策略代码案例说明
- 由上述测试输出结果截图可知: 消费者1消费2分区的数据;消费者2消费0和3分区的数据;消费者3消费2分区的数据。
- 说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。
三、Range 分区分配再平衡案例
3.1、停止某一个消费者后,(45s 以内)重新发送消息示例
- 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。

- 由下图控制台输出可知:3号消费者 消费到 1号分区数据。

3.2、停止某一个消费者后,(45s 以后)重新发送消息示例
- 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。

- 由下图控制台输出可知:3号消费者 消费到 1、2号分区数据。

3.3、Range 分区分配再平衡案例说明
- 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
- 消费者1 已经被踢出消费者组,所以重新按照 range 方式分配。
相关文章:
Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)
目录 一、Range分区分配策略原理1.1、Range分区分配策略原理的示例一1.2、Range分区分配策略原理的示例二1.3、Range分区分配策略原理的示例注意事项 二、Range 分区分配策略代码案例2.1、创建带有4个分区的fiveTopic主题2.2、创建三个消费者 组成 消费者组2.3、创建生产者2.4、…...
WeiTools
目录 1.1 WeiTools 1.2 getTime 1.3 getImageView 1.4 StringEncode 1.4.1 // TODO Auto-generated catch block WeiTools package com.shrimp.xiaoweirobot.tools;...
目标检测数据集:医学图像检测数据集(自己标注)
1.专栏介绍 ✨✨✨✨✨✨目标检测数据集✨✨✨✨✨✨ 本专栏提供各种场景的数据集,主要聚焦:工业缺陷检测数据集、小目标数据集、遥感数据集、红外小目标数据集,该专栏的数据集会在多个专栏进行验证,在多个数据集进行验证mAP涨点明显,尤其是小目标、遮挡物精度提升明显的…...
【系统设计系列】数据库
系统设计系列初衷 System Design Primer: 英文文档 GitHub - donnemartin/system-design-primer: Learn how to design large-scale systems. Prep for the system design interview. Includes Anki flashcards. 中文版: https://github.com/donnemarti…...
mp4压缩视频不改变画质?跟我这样压缩视频大小
在当今数字化时代,视频文件变得越来越普遍,然而,这些文件通常都很大,给存储和传输带来了困难,为了解决这个问题,许多人都希望将视频压缩得更小,而又不牺牲画质,下面就来看看具体应该…...
AQS同步队列和等待队列的同步机制
理解AQS必须要理解同步队列和等待队列之间的同步机制,简单来说流程是: 获取锁失败的线程进入同步队列,成功的占用锁,占锁线程调用await方法进入条件等待队列,其他占锁线程调用signal方法,条件等待队列线程进…...
vue3实现无限循环滚动的方法;el-table内容无限循环滚动的实现
需求:vue3实现一个div内的内容无限循环滚动 方法一: <template><div idcontainer><div class"item" v-foritem in 5>测试内容{{{ item }}</div></div> </template><script setup> //封装一个方法…...
Windows 安装 MariaDB 数据库
之前一直使用 MySQL,使用 MySQL8.0 时候,占用内存比较大,储存空间好像也稍微有点大,看到 MariaDB 是用来代替 MySQL 的方案,之前用着也挺得劲,MySQL8.0 以上好像不能去导入低版本的 sql,或者需要…...
RK3568-mpp(Media Process Platform)媒体处理软件平台
第一章 MPP 介绍 1.1 概述 瑞芯微提供的媒体处理软件平台(Media Process Platform,简称 MPP)是适用于瑞芯微芯片系列的通用媒体处理软件平台。 该平台对应用软件屏蔽了芯片相关的复杂底层处理,其目的是为了屏蔽不同芯片的差异,为使用者提供统一的视频媒体处理接口(Medi…...
【ModelSim】使用终端命令行来编译、运行Verilog程序,创建脚本教程
▚ 01 ModelSim命令解说 📢 这些命令是 ModelSim 中常用的命令,用于创建库、编译源代码和启动仿真。 🔔 在使用这些命令之前,你需要在 ModelSim 的命令行界面或脚本中执行 vlib 命令来创建一个库,然后使用 vlog 命令…...
腾讯云网站备案详细流程_审核时间说明
腾讯云网站备案流程先填写基础信息、主体信息和网站信息,然后提交备案后等待腾讯云初审,初审通过后进行短信核验,最后等待各省管局审核,前面腾讯云初审时间1到2天左右,最长时间是等待管局审核时间,网站备案…...
HTTP介绍:一文了解什么是HTTP
前言: 在当今数字时代,互联网已经成为人们生活中不可或缺的一部分。无论是浏览网页、发送电子邮件还是在线购物,我们都离不开超文本传输协议(HTTP)。HTTP作为一种通信协议,扮演着连接客户端和服务器的重要角…...
动态规划之子数组系列
子数组系列 1. 环形⼦数组的最⼤和2. 乘积最大子数组3. 等差数列划分4. 最长湍流子数组5. 单词拆分6. 环绕字符串中唯⼀的子字符串 1. 环形⼦数组的最⼤和 1.题目链接:环形⼦数组的最⼤和 2.题目描述:给定一个长度为 n 的环形整数数组 nums ,…...
LeetCode(力扣)332.重新安排行程Python
LeetCode332.重新安排行程 题目链接代码 题目链接 https://leetcode.cn/problems/reconstruct-itinerary/ 代码 class Solution:def backtracking(self, tickets, used, cur, result, path):if len(path) len(tickets) 1:result.append(path[:])return Truefor i, ticket…...
Pytho 从列表中创建字典 (dict.fromkeys()的问题)
问题起因:想在代码中通过已有的列表创建一个字典,但是又不想写循环,更不想手动填,所以用到了字典对象的fromkeys()方法 。 先以一个简单的例子介绍一下该方法: a ["A", "B", "C", &qu…...
第14节-PhotoShop基础课程-图框工具
文章目录 前言1.矩形画框2.椭圆画框 前言 图框 上面两张图,生成下面一幅图,这个就是图框工具的作用 图框工具ICON 1.矩形画框 2.椭圆画框...
使用 Nacos 在 Spring Boot 项目中实现服务注册与配置管理
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…...
package.json中workspaces详解与monorepo
参考package.json配置详解,让你一看就会(下) - 掘金...
Spring Boot + Vue的网上商城之商品信息展示
Spring Boot Vue的网上商城之商品信息展示 当实现一个Spring Boot Vue的网上商城的商品信息展示时,可以按照以下步骤进行: 后端实现: 创建一个Spring Boot项目,并添加所需的依赖,包括Spring Web和Spring Data JPA。…...
深度优先搜索遍历与广度优先搜索遍历
目录 一.深度优先搜索遍历 1.深度优先遍历的方法 2.采用邻接矩阵表示图的深度优先搜索遍历 3.非连通图的遍历 二.广度优先搜索遍历 1.广度优先搜索遍历的方法 2.非连通图的广度遍历 3.广度优先搜索遍历的实现 4.按广度优先非递归遍历连通图 一.深度优先搜索遍历 1.深…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
前端中slice和splic的区别
1. slice slice 用于从数组中提取一部分元素,返回一个新的数组。 特点: 不修改原数组:slice 不会改变原数组,而是返回一个新的数组。提取数组的部分:slice 会根据指定的开始索引和结束索引提取数组的一部分。不包含…...
动态规划-1035.不相交的线-力扣(LeetCode)
一、题目解析 光看题目要求和例图,感觉这题好麻烦,直线不能相交啊,每个数字只属于一条连线啊等等,但我们结合题目所给的信息和例图的内容,这不就是最长公共子序列吗?,我们把最长公共子序列连线起…...
Android屏幕刷新率与FPS(Frames Per Second) 120hz
Android屏幕刷新率与FPS(Frames Per Second) 120hz 屏幕刷新率是屏幕每秒钟刷新显示内容的次数,单位是赫兹(Hz)。 60Hz 屏幕:每秒刷新 60 次,每次刷新间隔约 16.67ms 90Hz 屏幕:每秒刷新 90 次,…...
