Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)
目录
- 一、Range分区分配策略原理
- 1.1、Range分区分配策略原理的示例一
- 1.2、Range分区分配策略原理的示例二
- 1.3、Range分区分配策略原理的示例注意事项
- 二、Range 分区分配策略代码案例
- 2.1、创建带有4个分区的fiveTopic主题
- 2.2、创建三个消费者 组成 消费者组
- 2.3、创建生产者
- 2.4、测试
- 2.5、Range 分区分配策略代码案例说明
- 三、Range 分区分配再平衡案例
- 3.1、停止某一个消费者后,(45s 以内)重新发送消息示例
- 3.2、停止某一个消费者后,(45s 以后)重新发送消息示例
- 3.3、Range 分区分配再平衡案例说明
一、Range分区分配策略原理
- Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
1.1、Range分区分配策略原理的示例一
假如现在有 4 个分区,3 个消费者,排序后的分区将会是0,1,2,3;消费者排序完之后将会是C0,C1,C2。
- 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
- 例如:4/3 = 1 余 1 ,除不尽,那么消费者C0便会多消费1个分区。
1.2、Range分区分配策略原理的示例二
假如现在有 5 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4;消费者排序完之后将会是C0,C1,C2。
- 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
- 例如:5/3 = 1 余 2 ,除不尽,那么消费者么C0和C1分别多消费一个分区。
1.3、Range分区分配策略原理的示例注意事项
- 如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有N多个topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜!
二、Range 分区分配策略代码案例
2.1、创建带有4个分区的fiveTopic主题
-
在 Kafka 集群控制台,创建带有4个分区的fiveTopic主题
bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 4 --replication-factor 1 --topic fiveTopic
2.2、创建三个消费者 组成 消费者组
-
复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test”。
package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者 "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("fiveTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}} }
2.3、创建生产者
-
创建CustomProducer生产者。
package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 200; i++) {kafkaProducer.send(new ProducerRecord<>("fiveTopic", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();} }
2.4、测试
-
首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
-
然后,在 IDEA中分别启动生产者代码
-
在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:
2.5、Range 分区分配策略代码案例说明
- 由上述测试输出结果截图可知: 消费者1消费2分区的数据;消费者2消费0和3分区的数据;消费者3消费2分区的数据。
- 说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。
三、Range 分区分配再平衡案例
3.1、停止某一个消费者后,(45s 以内)重新发送消息示例
- 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。
- 由下图控制台输出可知:3号消费者 消费到 1号分区数据。
3.2、停止某一个消费者后,(45s 以后)重新发送消息示例
- 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。
- 由下图控制台输出可知:3号消费者 消费到 1、2号分区数据。
3.3、Range 分区分配再平衡案例说明
- 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
- 消费者1 已经被踢出消费者组,所以重新按照 range 方式分配。
相关文章:

Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)
目录 一、Range分区分配策略原理1.1、Range分区分配策略原理的示例一1.2、Range分区分配策略原理的示例二1.3、Range分区分配策略原理的示例注意事项 二、Range 分区分配策略代码案例2.1、创建带有4个分区的fiveTopic主题2.2、创建三个消费者 组成 消费者组2.3、创建生产者2.4、…...
WeiTools
目录 1.1 WeiTools 1.2 getTime 1.3 getImageView 1.4 StringEncode 1.4.1 // TODO Auto-generated catch block WeiTools package com.shrimp.xiaoweirobot.tools;...
目标检测数据集:医学图像检测数据集(自己标注)
1.专栏介绍 ✨✨✨✨✨✨目标检测数据集✨✨✨✨✨✨ 本专栏提供各种场景的数据集,主要聚焦:工业缺陷检测数据集、小目标数据集、遥感数据集、红外小目标数据集,该专栏的数据集会在多个专栏进行验证,在多个数据集进行验证mAP涨点明显,尤其是小目标、遮挡物精度提升明显的…...

【系统设计系列】数据库
系统设计系列初衷 System Design Primer: 英文文档 GitHub - donnemartin/system-design-primer: Learn how to design large-scale systems. Prep for the system design interview. Includes Anki flashcards. 中文版: https://github.com/donnemarti…...

mp4压缩视频不改变画质?跟我这样压缩视频大小
在当今数字化时代,视频文件变得越来越普遍,然而,这些文件通常都很大,给存储和传输带来了困难,为了解决这个问题,许多人都希望将视频压缩得更小,而又不牺牲画质,下面就来看看具体应该…...

AQS同步队列和等待队列的同步机制
理解AQS必须要理解同步队列和等待队列之间的同步机制,简单来说流程是: 获取锁失败的线程进入同步队列,成功的占用锁,占锁线程调用await方法进入条件等待队列,其他占锁线程调用signal方法,条件等待队列线程进…...
vue3实现无限循环滚动的方法;el-table内容无限循环滚动的实现
需求:vue3实现一个div内的内容无限循环滚动 方法一: <template><div idcontainer><div class"item" v-foritem in 5>测试内容{{{ item }}</div></div> </template><script setup> //封装一个方法…...

Windows 安装 MariaDB 数据库
之前一直使用 MySQL,使用 MySQL8.0 时候,占用内存比较大,储存空间好像也稍微有点大,看到 MariaDB 是用来代替 MySQL 的方案,之前用着也挺得劲,MySQL8.0 以上好像不能去导入低版本的 sql,或者需要…...

RK3568-mpp(Media Process Platform)媒体处理软件平台
第一章 MPP 介绍 1.1 概述 瑞芯微提供的媒体处理软件平台(Media Process Platform,简称 MPP)是适用于瑞芯微芯片系列的通用媒体处理软件平台。 该平台对应用软件屏蔽了芯片相关的复杂底层处理,其目的是为了屏蔽不同芯片的差异,为使用者提供统一的视频媒体处理接口(Medi…...
【ModelSim】使用终端命令行来编译、运行Verilog程序,创建脚本教程
▚ 01 ModelSim命令解说 📢 这些命令是 ModelSim 中常用的命令,用于创建库、编译源代码和启动仿真。 🔔 在使用这些命令之前,你需要在 ModelSim 的命令行界面或脚本中执行 vlib 命令来创建一个库,然后使用 vlog 命令…...

腾讯云网站备案详细流程_审核时间说明
腾讯云网站备案流程先填写基础信息、主体信息和网站信息,然后提交备案后等待腾讯云初审,初审通过后进行短信核验,最后等待各省管局审核,前面腾讯云初审时间1到2天左右,最长时间是等待管局审核时间,网站备案…...

HTTP介绍:一文了解什么是HTTP
前言: 在当今数字时代,互联网已经成为人们生活中不可或缺的一部分。无论是浏览网页、发送电子邮件还是在线购物,我们都离不开超文本传输协议(HTTP)。HTTP作为一种通信协议,扮演着连接客户端和服务器的重要角…...

动态规划之子数组系列
子数组系列 1. 环形⼦数组的最⼤和2. 乘积最大子数组3. 等差数列划分4. 最长湍流子数组5. 单词拆分6. 环绕字符串中唯⼀的子字符串 1. 环形⼦数组的最⼤和 1.题目链接:环形⼦数组的最⼤和 2.题目描述:给定一个长度为 n 的环形整数数组 nums ,…...

LeetCode(力扣)332.重新安排行程Python
LeetCode332.重新安排行程 题目链接代码 题目链接 https://leetcode.cn/problems/reconstruct-itinerary/ 代码 class Solution:def backtracking(self, tickets, used, cur, result, path):if len(path) len(tickets) 1:result.append(path[:])return Truefor i, ticket…...
Pytho 从列表中创建字典 (dict.fromkeys()的问题)
问题起因:想在代码中通过已有的列表创建一个字典,但是又不想写循环,更不想手动填,所以用到了字典对象的fromkeys()方法 。 先以一个简单的例子介绍一下该方法: a ["A", "B", "C", &qu…...
第14节-PhotoShop基础课程-图框工具
文章目录 前言1.矩形画框2.椭圆画框 前言 图框 上面两张图,生成下面一幅图,这个就是图框工具的作用 图框工具ICON 1.矩形画框 2.椭圆画框...

使用 Nacos 在 Spring Boot 项目中实现服务注册与配置管理
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🦄 博客首页——🐅🐾猫头虎的博客🎐 🐳 《面试题大全专栏》 🦕 文章图文…...
package.json中workspaces详解与monorepo
参考package.json配置详解,让你一看就会(下) - 掘金...
Spring Boot + Vue的网上商城之商品信息展示
Spring Boot Vue的网上商城之商品信息展示 当实现一个Spring Boot Vue的网上商城的商品信息展示时,可以按照以下步骤进行: 后端实现: 创建一个Spring Boot项目,并添加所需的依赖,包括Spring Web和Spring Data JPA。…...
深度优先搜索遍历与广度优先搜索遍历
目录 一.深度优先搜索遍历 1.深度优先遍历的方法 2.采用邻接矩阵表示图的深度优先搜索遍历 3.非连通图的遍历 二.广度优先搜索遍历 1.广度优先搜索遍历的方法 2.非连通图的广度遍历 3.广度优先搜索遍历的实现 4.按广度优先非递归遍历连通图 一.深度优先搜索遍历 1.深…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...

vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

网络编程(UDP编程)
思维导图 UDP基础编程(单播) 1.流程图 服务器:短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...