当前位置: 首页 > news >正文

Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)

目录

    • 一、Range分区分配策略原理
      • 1.1、Range分区分配策略原理的示例一
      • 1.2、Range分区分配策略原理的示例二
      • 1.3、Range分区分配策略原理的示例注意事项
    • 二、Range 分区分配策略代码案例
      • 2.1、创建带有4个分区的fiveTopic主题
      • 2.2、创建三个消费者 组成 消费者组
      • 2.3、创建生产者
      • 2.4、测试
      • 2.5、Range 分区分配策略代码案例说明
    • 三、Range 分区分配再平衡案例
      • 3.1、停止某一个消费者后,(45s 以内)重新发送消息示例
      • 3.2、停止某一个消费者后,(45s 以后)重新发送消息示例
      • 3.3、Range 分区分配再平衡案例说明

一、Range分区分配策略原理

  • Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

1.1、Range分区分配策略原理的示例一

假如现在有 4 个分区,3 个消费者,排序后的分区将会是0,1,2,3;消费者排序完之后将会是C0,C1,C2。

  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
  • 例如:4/3 = 1 余 1 ,除不尽,那么消费者C0便会多消费1个分区。
    在这里插入图片描述

1.2、Range分区分配策略原理的示例二

假如现在有 5 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4;消费者排序完之后将会是C0,C1,C2。

  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
  • 例如:5/3 = 1 余 2 ,除不尽,那么消费者么C0和C1分别多消费一个分区。
    在这里插入图片描述

1.3、Range分区分配策略原理的示例注意事项

  • 如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有N多个topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜!

二、Range 分区分配策略代码案例

2.1、创建带有4个分区的fiveTopic主题

  • 在 Kafka 集群控制台,创建带有4个分区的fiveTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 4 --replication-factor 1 --topic fiveTopic
    

    在这里插入图片描述

2.2、创建三个消费者 组成 消费者组

  • 复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test”。

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("fiveTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }
    

2.3、创建生产者

  • 创建CustomProducer生产者。

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 200; i++) {kafkaProducer.send(new ProducerRecord<>("fiveTopic", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
    }
    

2.4、测试

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

2.5、Range 分区分配策略代码案例说明

  • 由上述测试输出结果截图可知: 消费者1消费2分区的数据;消费者2消费0和3分区的数据;消费者3消费2分区的数据。
  • 说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

三、Range 分区分配再平衡案例

3.1、停止某一个消费者后,(45s 以内)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 1号分区数据。
    在这里插入图片描述

3.2、停止某一个消费者后,(45s 以后)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。
    在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 1、2号分区数据。
    在这里插入图片描述

3.3、Range 分区分配再平衡案例说明

  • 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 消费者1 已经被踢出消费者组,所以重新按照 range 方式分配。

相关文章:

Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)

目录 一、Range分区分配策略原理1.1、Range分区分配策略原理的示例一1.2、Range分区分配策略原理的示例二1.3、Range分区分配策略原理的示例注意事项 二、Range 分区分配策略代码案例2.1、创建带有4个分区的fiveTopic主题2.2、创建三个消费者 组成 消费者组2.3、创建生产者2.4、…...

WeiTools

目录 1.1 WeiTools 1.2 getTime 1.3 getImageView 1.4 StringEncode 1.4.1 // TODO Auto-generated catch block WeiTools package com.shrimp.xiaoweirobot.tools;...

目标检测数据集:医学图像检测数据集(自己标注)

1.专栏介绍 ✨✨✨✨✨✨目标检测数据集✨✨✨✨✨✨ 本专栏提供各种场景的数据集,主要聚焦:工业缺陷检测数据集、小目标数据集、遥感数据集、红外小目标数据集,该专栏的数据集会在多个专栏进行验证,在多个数据集进行验证mAP涨点明显,尤其是小目标、遮挡物精度提升明显的…...

【系统设计系列】数据库

系统设计系列初衷 System Design Primer&#xff1a; 英文文档 GitHub - donnemartin/system-design-primer: Learn how to design large-scale systems. Prep for the system design interview. Includes Anki flashcards. 中文版&#xff1a; https://github.com/donnemarti…...

mp4压缩视频不改变画质?跟我这样压缩视频大小

在当今数字化时代&#xff0c;视频文件变得越来越普遍&#xff0c;然而&#xff0c;这些文件通常都很大&#xff0c;给存储和传输带来了困难&#xff0c;为了解决这个问题&#xff0c;许多人都希望将视频压缩得更小&#xff0c;而又不牺牲画质&#xff0c;下面就来看看具体应该…...

AQS同步队列和等待队列的同步机制

理解AQS必须要理解同步队列和等待队列之间的同步机制&#xff0c;简单来说流程是&#xff1a; 获取锁失败的线程进入同步队列&#xff0c;成功的占用锁&#xff0c;占锁线程调用await方法进入条件等待队列&#xff0c;其他占锁线程调用signal方法&#xff0c;条件等待队列线程进…...

vue3实现无限循环滚动的方法;el-table内容无限循环滚动的实现

需求&#xff1a;vue3实现一个div内的内容无限循环滚动 方法一&#xff1a; <template><div idcontainer><div class"item" v-foritem in 5>测试内容{{{ item }}</div></div> </template><script setup> //封装一个方法…...

Windows 安装 MariaDB 数据库

之前一直使用 MySQL&#xff0c;使用 MySQL8.0 时候&#xff0c;占用内存比较大&#xff0c;储存空间好像也稍微有点大&#xff0c;看到 MariaDB 是用来代替 MySQL 的方案&#xff0c;之前用着也挺得劲&#xff0c;MySQL8.0 以上好像不能去导入低版本的 sql&#xff0c;或者需要…...

RK3568-mpp(Media Process Platform)媒体处理软件平台

第一章 MPP 介绍 1.1 概述 瑞芯微提供的媒体处理软件平台(Media Process Platform,简称 MPP)是适用于瑞芯微芯片系列的通用媒体处理软件平台。 该平台对应用软件屏蔽了芯片相关的复杂底层处理,其目的是为了屏蔽不同芯片的差异,为使用者提供统一的视频媒体处理接口(Medi…...

【ModelSim】使用终端命令行来编译、运行Verilog程序,创建脚本教程

▚ 01 ModelSim命令解说 &#x1f4e2; 这些命令是 ModelSim 中常用的命令&#xff0c;用于创建库、编译源代码和启动仿真。 &#x1f514; 在使用这些命令之前&#xff0c;你需要在 ModelSim 的命令行界面或脚本中执行 vlib 命令来创建一个库&#xff0c;然后使用 vlog 命令…...

腾讯云网站备案详细流程_审核时间说明

腾讯云网站备案流程先填写基础信息、主体信息和网站信息&#xff0c;然后提交备案后等待腾讯云初审&#xff0c;初审通过后进行短信核验&#xff0c;最后等待各省管局审核&#xff0c;前面腾讯云初审时间1到2天左右&#xff0c;最长时间是等待管局审核时间&#xff0c;网站备案…...

HTTP介绍:一文了解什么是HTTP

前言&#xff1a; 在当今数字时代&#xff0c;互联网已经成为人们生活中不可或缺的一部分。无论是浏览网页、发送电子邮件还是在线购物&#xff0c;我们都离不开超文本传输协议&#xff08;HTTP&#xff09;。HTTP作为一种通信协议&#xff0c;扮演着连接客户端和服务器的重要角…...

动态规划之子数组系列

子数组系列 1. 环形⼦数组的最⼤和2. 乘积最大子数组3. 等差数列划分4. 最长湍流子数组5. 单词拆分6. 环绕字符串中唯⼀的子字符串 1. 环形⼦数组的最⼤和 1.题目链接&#xff1a;环形⼦数组的最⼤和 2.题目描述&#xff1a;给定一个长度为 n 的环形整数数组 nums &#xff0c…...

LeetCode(力扣)332.重新安排行程Python

LeetCode332.重新安排行程 题目链接代码 题目链接 https://leetcode.cn/problems/reconstruct-itinerary/ 代码 class Solution:def backtracking(self, tickets, used, cur, result, path):if len(path) len(tickets) 1:result.append(path[:])return Truefor i, ticket…...

Pytho 从列表中创建字典 (dict.fromkeys()的问题)

问题起因&#xff1a;想在代码中通过已有的列表创建一个字典&#xff0c;但是又不想写循环&#xff0c;更不想手动填&#xff0c;所以用到了字典对象的fromkeys()方法 。 先以一个简单的例子介绍一下该方法&#xff1a; a ["A", "B", "C", &qu…...

第14节-PhotoShop基础课程-图框工具

文章目录 前言1.矩形画框2.椭圆画框 前言 图框 上面两张图&#xff0c;生成下面一幅图&#xff0c;这个就是图框工具的作用 图框工具ICON 1.矩形画框 2.椭圆画框...

使用 Nacos 在 Spring Boot 项目中实现服务注册与配置管理

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…...

package.json中workspaces详解与monorepo

参考package.json配置详解&#xff0c;让你一看就会&#xff08;下&#xff09; - 掘金...

Spring Boot + Vue的网上商城之商品信息展示

Spring Boot Vue的网上商城之商品信息展示 当实现一个Spring Boot Vue的网上商城的商品信息展示时&#xff0c;可以按照以下步骤进行&#xff1a; 后端实现&#xff1a; 创建一个Spring Boot项目&#xff0c;并添加所需的依赖&#xff0c;包括Spring Web和Spring Data JPA。…...

深度优先搜索遍历与广度优先搜索遍历

目录 一.深度优先搜索遍历 1.深度优先遍历的方法 2.采用邻接矩阵表示图的深度优先搜索遍历 3.非连通图的遍历 二.广度优先搜索遍历 1.广度优先搜索遍历的方法 2.非连通图的广度遍历 3.广度优先搜索遍历的实现 4.按广度优先非递归遍历连通图 一.深度优先搜索遍历 1.深…...

设计模式和设计原则回顾

设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法

树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作&#xff0c;无需更改相机配置。但是&#xff0c;一…...

《Playwright:微软的自动化测试工具详解》

Playwright 简介:声明内容来自网络&#xff0c;将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具&#xff0c;支持 Chrome、Firefox、Safari 等主流浏览器&#xff0c;提供多语言 API&#xff08;Python、JavaScript、Java、.NET&#xff09;。它的特点包括&a…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

网络编程(UDP编程)

思维导图 UDP基础编程&#xff08;单播&#xff09; 1.流程图 服务器&#xff1a;短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

大数据学习(132)-HIve数据分析

​​​​&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4…...

代理篇12|深入理解 Vite中的Proxy接口代理配置

在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...