当前位置: 首页 > news >正文

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:
现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有1,想要修改为2、3来保证当部分Kafka的Broker宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过JSON+脚本的方式,来达到Kafka副本分区的调整。

  • 启动服务、创建主题、查看主题
  • 修改分区副本因子(不允许)、修改分区副本因子(成功)
  • 查看结果

在这里插入图片描述

分区分配策略

在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只会被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。
Kafka中提供了多重分区分配算法(PartitionAssignor):

  • RangeAssignor
  • RoundRobinAssignor
  • StickAssignor

在这里插入图片描述

RangeAssignor

  • PartionAssignor 接口用于用户自定义分区分配算法,以实现Consumer之间的分区分配。
  • 消费组的成员定义他们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker,协调者选择其中一个消费者来执行这个消费组的分区,并将分配结果转发给消费组内所有的消费者。
  • Kafka默认采用的是RangeAssignor的分配算法。

在这里插入图片描述

  • RangeAssignor对每个Topic进行独立的分区分配,对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,有一些消费者就会多分配到一些分区。
  • RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运行来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀的分配给所有的消费者。
  • 对于每一个Topic,RangerAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够分配均衡,那么字典序靠前的消费者会被多分配一个分区。

在这里插入图片描述

RoundRobinAssignor

  • RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。
  • 如果消费组内,消费订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果尽量均衡。如果订阅的Topic列表是不同的,那么分配结果不保证尽量均衡。

在这里插入图片描述

  • 对于 RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能让消费者之间尽量均衡的分配到分区(分配到的分区的差值不会超过1,而RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)
  • 对于消费组内消费者订阅Topic不一致的情况:假设有两个消费者分别为C0和C1,有2个TopicT1、T2,分别有3个分区、2个分区,并且 C0 订阅了T1和T2,那么RoundRobinAssignor的分配结果如下:

在这里插入图片描述

StickyAssignor

尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。
更核心的问题是无论是 RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。
Sticky是“粘性的”,可以理解为分配是带粘性的:

  • 分区的分配尽量的均衡
  • 每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。

假设当前有如下内容:

  • 3个Consumer C0、C1、C2
  • 4个Topic:T0、T1、T2、T3 每个Topic有2个分区
  • 所有Consumer都订阅了4个分区

在这里插入图片描述
如果 C1 宕机,此时 StickyAssignor 的结果:
在这里插入图片描述

自定义分区策略

基本概念

需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口
其中定义了两个内部类:

  • Subscription:用来表示消费者的订阅信息,类中有两个属性:topics、userData,分别表示消费者所订阅Topic列表和用户自定义信息。
  • PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意此方法中只有一个参数Topics,与Subscription类中的topics相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在Subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、IP地址、HOST或者机架
  • Assignment:用来表示分配信息的,类中有两个属性:partitions、userData,分别表示所分配到的分区集合和用户自定义的数据,可以通过PartitonAssignor接口中的onAssignment()方法是在每个消费者收到消费组Leader分配结果时的回调函数,例如在:StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备下次消费组再平衡(Rebalance)时可以提供分配参考依据。

Kafka还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将Subscription的 userData信息去掉后,在进行分配。

代码实现

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;import java.nio.ByteBuffer;
import java.util.*;public class WeightedPartitionAssignor implements ConsumerPartitionAssignor {@Overridepublic Subscription subscription(Set<String> topics) {// 在这里添加权重信息到 userDataByteBuffer buffer = ByteBuffer.allocate(4);buffer.putInt(getWeight());buffer.flip();return new Subscription(new ArrayList<>(topics), buffer);}@Overridepublic Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {Map<String, Assignment> assignments = new HashMap<>();Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>();// 遍历所有订阅的topicsfor (String topic : metadata.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {partitionConsumers.putIfAbsent(partition, new ArrayList<>());}}// 根据权重分配分区for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {String consumerId = subscriptionEntry.getKey();Subscription subscription = subscriptionEntry.getValue();int weight = subscription.userData().getInt();for (String topic : subscription.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {List<String> consumers = partitionConsumers.get(partition);for (int i = 0; i < weight; i++) {consumers.add(consumerId);  // 权重高的消费者多次添加,增加选中的机会}}}}// 随机分配分区给消费者Random random = new Random();for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) {List<String> consumers = entry.getValue();String assignedConsumer = consumers.get(random.nextInt(consumers.size()));assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>())).partitions().add(entry.getKey());}return assignments;}@Overridepublic void onAssignment(Assignment assignment, Cluster metadata) {// 可以在这里处理分配后的逻辑,比如保存当前分配的快照}@Overridepublic String name() {return "weighted";}private int getWeight() {// 获取权重,可以从配置文件或环境变量中获取return 10; // 默认权重为10}
}

注册使用

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());
// 配置其他消费者属性KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));

相关文章:

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…...

深度学习与图像修复:ADetailer插件在Stable Diffusion中的应用

文章目录 引言ADetailer插件介绍插件安装常用模型控制提示词参数配置参数详解 实践建议 示例插件的对比&#xff1a;1. ADetailer插件2. Photoshop插件&#xff08;如Nik Collection&#xff09;3. GIMP插件&#xff08;如GMIC&#xff09;4. Affinity Photo插件 结语 引言 无…...

【Pytorch】topk函数

topk 是 PyTorch 中的一个函数&#xff0c;用于从张量中选取最大&#xff08;或最小&#xff09;的 k 个元素及其对应的索引。其定义如下&#xff1a; values, indices torch.topk(input, k, dimNone, largestTrue, sortedTrue, *, outNone)参数说明 input (Tensor): 输入张…...

使用mybatis注解和xml映射执行javaWeb中增删改查等操作

Mapper接口 使用注解执行SQL语句操作和相应的Java抽象类&#xff08;对于简单的增删改查使用注解&#xff09; Mapper public interface EmpMapper {// 根据id删除员工信息Delete("delete from mybatis.emp where id#{id}")public int EmpDelete(Integer id);// 查…...

SpringBoot3 响应式编程

Spring Boot 3 中的响应式编程是一个重要的特性&#xff0c;它允许开发者构建非阻塞、异步和基于事件的应用程序&#xff0c;这对于处理高并发和实时数据流的应用场景尤为重要。以下是对Spring Boot 3响应式编程的详细解析&#xff1a; 一、响应式编程概述 响应式编程是一种编…...

【C++ 面试 - 基础题】每日 3 题(二)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏&…...

Modelica建模,Modelica语言的学习,技术调研工作

Modelica建模&#xff0c;Modelica语言的学习&#xff0c;技术调研工作 参考资料&#xff1a; 苏州同元软控信息技术有限公司 - 同元 Modelica 再探冷却 modelica学习-CSDN博客 1、 Modelica简介 Modelica是由Modelica协会维护、免费开放的物理系统面向对象的统一建模语言规…...

Oracle数据字典之——v$lock 和v$locked_object

v$lock视图 v$lock视图列出当前系统持有的或正在申请的所有锁的情况&#xff0c;其主要字段说明如下&#xff1a; 11g如下&#xff1a; 字段名称类型说明ADDRRAW(8)锁定状态对象的地址KADDRRAW(8)锁的地址SIDNUMBER会话&#xff08;SESSION&#xff09;标识&#xff1b;TYPE…...

solidity 以太坊(Ether) 单位(很基础)

一个字面常数可以带一个后缀 wei&#xff0c; gwei 或 ether 来指定一个以太坊的数量&#xff0c; 其中没有后缀的以太数字被认为单位是wei。 在以太坊和许多其他基于以太坊的区块链系统中&#xff0c;以太币&#xff08;Ether&#xff09;是网络中的主要加密货币。 以太可以被…...

关于elementUI 分页 table 使用 toggleRowSelection

我出现问题的前提 在table表格第一页全选 &#xff0c;第二页全选 回到第一页 点击按钮 取消 第一页&#xff0c;第二页我不要的勾选 初始实现 this.selectedPeraonal是表格 selection-change方法返回的值 handleSelectionChange(val) {this.selectedPeraonal val || []…...

K8s部署RocketMQ

准备工作 我是win电脑&#xff0c;本地安装了Podman&#xff0c;并使用Kind创建了一个K8s本地环境&#xff0c;并在 win 电脑上安装了 Helm。 部署RocketMQ 1. add rocketmq helm repo 2. deploy rocketmq cluster 3. verify the rocketmq cluster 4. Create Topic by api a…...

Linux服务管理-Nginx配置

静态解析主要解析html、css动态解析需要解析php 动态资源通过轮询分配到后端的Apache服务器处理 apache是同步阻塞&#xff0c;nginx是异步非阻塞...

C语言典型例题31

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 习题2.8 请编写程序将China译为密码&#xff0c;密码的规律是&#xff1a;用原来字母后面的第4个字母代替原来的字母。 例如:C后面的4个字母是G&#xff0c;h后面第4个字母为l 代码&#xff1a; //《C程序设计教程…...

FFMPEG 工具方法

av_strerror int av_strerror ( int errnum, char * errbuf, size_t errbuf_size )ffmpeg获取与设置mp4文件旋转方向方法 设置与获取都是对AVStream的dict操作. 设置 for (i 0; i < ifmt_ctx_v->nb_streams; i) { //Create output AVStream according to input A…...

Qt QML 使用QPainterPath绘制弧形曲线和弧形文本

Qt并没有相关api直接绘制弧形文字&#xff0c;但提供了曲线绘制相关类&#xff0c;所以只能另辟蹊径&#xff0c;使用QPainterPath先生成曲线&#xff0c;然后通过曲线上的点来定位每个文字并draw出来。 QML具体做法为从QQuickPaintedItem继承&#xff0c;在派生类中实现paint…...

VMware虚拟机和Docker的备份与恢复

目录 1. VMware虚拟机的快照备份 1.1 VMware本机的快照备份 1.2 VMware快照备份到另一电脑 2. Docker知识点 2.1 Docker镜像和容器的关系 2.2 Docker的存储卷 2.3 Docker命令简介 2.4 删除Anylink镜像 3. Docker备份和恢复 3.1 确定要回滚的容器和版本 3.2 备份当前…...

新加坡服务器延迟大吗?如何进行优化

新加坡服务器延迟大吗&#xff1f;新加坡服务器的延迟通常在全国平均延迟111ms左右&#xff0c;其中移动网络约为90ms&#xff0c;联通网络106ms&#xff0c;电信网络最低约为85ms。为了进行优化&#xff0c;一般可以采取使用CDN、优化路由线路、增加带宽和服务器升级等方法。 …...

uniapp——列表图片加载太多且空间占用太大的处理方法(降低清晰度)

解决方法 列表默认显示的降低清晰度&#xff0c;预览图片的时候加载原图。 如果图片是上传到阿里云的OSS上&#xff0c;可以快速获取图片缩略图的方法 直接在后端返回的URL后面拼接字符串&#xff1a; XXX.png?x-oss-process 缩略图方法介绍&#xff1a; ?x-oss-proces…...

spring+SSM+Mybatis面试题(上)(30道)

目录 1. 何为Spring Bean容器?Spring Bean容器与Spring IOC 容器有什么不同吗?2. Spring IOC 如何理解?3. Spring DI 如何理解?4. Spring 中基于注解如何配置对象作用域?以及如何配置延迟加载机制?1.配置作用域需要注解Scope(“Singleton”)2.开启延迟加载&#xff1a;La…...

odoo17 翻译一个小bug

odoo17 翻译一个小bug 用户界面的没译过来 标红处&#xff0c;但在zh_CN.po中明显已经翻译过来了&#xff0c;采取暴力点的&#xff0c;直接把base下的base.pot删除&#xff0c;再更新一下&#xff0c;可以正常显示了...

UE5 BaseEditorSettings.ini加载原理与配置生效机制

1. 为什么你改了BaseEditorSettings.ini却没生效&#xff1f;——从UE5编辑器启动流程讲起很多人在UE5项目里折腾半天&#xff0c;把BaseEditorSettings.ini文件翻来覆去改了十几遍&#xff0c;重启编辑器后发现&#xff1a;缩放比例还是不对、网格间距没变、甚至“启用实时预览…...

Topit:macOS窗口置顶神器,让多任务处理效率翻倍

Topit&#xff1a;macOS窗口置顶神器&#xff0c;让多任务处理效率翻倍 【免费下载链接】Topit Pin any window to the top of your screen / 在Mac上将你的任何窗口强制置顶 项目地址: https://gitcode.com/gh_mirrors/to/Topit 你是否经常在macOS上同时处理多个任务时…...

ARM指令追踪技术及TRCVICTLR寄存器详解

1. ARM指令追踪技术概述在嵌入式系统开发和调试过程中&#xff0c;指令追踪&#xff08;Instruction Trace&#xff09;是一项至关重要的技术。它通过硬件机制记录处理器的执行流程&#xff0c;为开发者提供程序运行的完整轨迹。ARM架构从v7开始引入嵌入式跟踪宏单元&#xff0…...

CANN-昇腾NPU-RAG推理-检索增强生成怎么部署

RAG&#xff08;Retrieval-Augmented Generation&#xff09;是 LLM 知识库的组合&#xff1a;先检索相关文档&#xff0c;再让 LLM 基于文档回答。昇腾NPU 上部署 RAG 需要两个组件&#xff1a;Embedding 模型&#xff08;做向量检索&#xff09;和 LLM&#xff08;做生成&am…...

Lindy自动化效率翻倍的秘密:从零搭建高可靠多步骤任务流的7步黄金流程

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;Lindy自动化效率翻倍的秘密&#xff1a;从零搭建高可靠多步骤任务流的7步黄金流程 Lindy自动化平台以“越久越可靠”为设计哲学&#xff0c;将经典软件工程原则与现代可观测性实践深度融合。其核心优势…...

Linux服务器被挖矿木马劫持的五步应急处置指南

1. 这不是“中病毒”&#xff0c;是服务器被劫持成了矿机——先别慌&#xff0c;但必须立刻断网“服务器被黑客攻击&#xff0c;用来挖矿&#xff01;”——这句话在运维圈里一出&#xff0c;比收到OOM告警还让人头皮发紧。它不像网页被挂马、数据库被拖库那样有明显业务影响&a…...

告别鼠标手!5分钟上手开源鼠标连点器MouseClick,轻松实现自动化点击

告别鼠标手&#xff01;5分钟上手开源鼠标连点器MouseClick&#xff0c;轻松实现自动化点击 【免费下载链接】MouseClick &#x1f5b1;️ MouseClick &#x1f5b1;️ 是一款功能强大的鼠标连点器和管理工具&#xff0c;采用 QT Widget 开发 &#xff0c;具备跨平台兼容性 。软…...

自制极低频电流探头:负电阻补偿原理与低频方波测量实践

1. 项目概述&#xff1a;为极低频电流测量而生在电子测试领域&#xff0c;电流探头是个再常见不过的工具&#xff0c;无论是排查开关电源的纹波&#xff0c;还是分析电机驱动的波形&#xff0c;都离不开它。但如果你尝试用市面上常见的电流探头去观察一个频率低至几赫兹&#x…...

账务台账数据

银行里说的 “账务台账数据”&#xff0c;本质就是按会计规则把每笔业务逐笔、分户、分科目记下来的完整明细流水 余额 辅助信息&#xff0c;核心是 “可逐笔追溯、可对账、可审计” 的一套明细数据。下面用通俗、具体的方式拆开说&#xff1a;一、银行 “账务台账” 到底是什…...

2026 文章代码高亮方案选型

将基于 Prism.js 或 Highlight.js 的传统高亮方案与基于 Shiki 的现代化高亮方案进行对比&#xff0c;其核心区别在于底层解析原理的不同&#xff08;正则表达式 vs. TextMate 语法树&#xff09;。 以下是两种方案的底层原理、各自优缺点、核心对比矩阵以及适用场景的详细分析…...