当前位置: 首页 > news >正文

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:
现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有1,想要修改为2、3来保证当部分Kafka的Broker宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过JSON+脚本的方式,来达到Kafka副本分区的调整。

  • 启动服务、创建主题、查看主题
  • 修改分区副本因子(不允许)、修改分区副本因子(成功)
  • 查看结果

在这里插入图片描述

分区分配策略

在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只会被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。
Kafka中提供了多重分区分配算法(PartitionAssignor):

  • RangeAssignor
  • RoundRobinAssignor
  • StickAssignor

在这里插入图片描述

RangeAssignor

  • PartionAssignor 接口用于用户自定义分区分配算法,以实现Consumer之间的分区分配。
  • 消费组的成员定义他们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker,协调者选择其中一个消费者来执行这个消费组的分区,并将分配结果转发给消费组内所有的消费者。
  • Kafka默认采用的是RangeAssignor的分配算法。

在这里插入图片描述

  • RangeAssignor对每个Topic进行独立的分区分配,对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,有一些消费者就会多分配到一些分区。
  • RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运行来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀的分配给所有的消费者。
  • 对于每一个Topic,RangerAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够分配均衡,那么字典序靠前的消费者会被多分配一个分区。

在这里插入图片描述

RoundRobinAssignor

  • RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。
  • 如果消费组内,消费订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果尽量均衡。如果订阅的Topic列表是不同的,那么分配结果不保证尽量均衡。

在这里插入图片描述

  • 对于 RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能让消费者之间尽量均衡的分配到分区(分配到的分区的差值不会超过1,而RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)
  • 对于消费组内消费者订阅Topic不一致的情况:假设有两个消费者分别为C0和C1,有2个TopicT1、T2,分别有3个分区、2个分区,并且 C0 订阅了T1和T2,那么RoundRobinAssignor的分配结果如下:

在这里插入图片描述

StickyAssignor

尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。
更核心的问题是无论是 RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。
Sticky是“粘性的”,可以理解为分配是带粘性的:

  • 分区的分配尽量的均衡
  • 每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。

假设当前有如下内容:

  • 3个Consumer C0、C1、C2
  • 4个Topic:T0、T1、T2、T3 每个Topic有2个分区
  • 所有Consumer都订阅了4个分区

在这里插入图片描述
如果 C1 宕机,此时 StickyAssignor 的结果:
在这里插入图片描述

自定义分区策略

基本概念

需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口
其中定义了两个内部类:

  • Subscription:用来表示消费者的订阅信息,类中有两个属性:topics、userData,分别表示消费者所订阅Topic列表和用户自定义信息。
  • PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意此方法中只有一个参数Topics,与Subscription类中的topics相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在Subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、IP地址、HOST或者机架
  • Assignment:用来表示分配信息的,类中有两个属性:partitions、userData,分别表示所分配到的分区集合和用户自定义的数据,可以通过PartitonAssignor接口中的onAssignment()方法是在每个消费者收到消费组Leader分配结果时的回调函数,例如在:StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备下次消费组再平衡(Rebalance)时可以提供分配参考依据。

Kafka还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将Subscription的 userData信息去掉后,在进行分配。

代码实现

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;import java.nio.ByteBuffer;
import java.util.*;public class WeightedPartitionAssignor implements ConsumerPartitionAssignor {@Overridepublic Subscription subscription(Set<String> topics) {// 在这里添加权重信息到 userDataByteBuffer buffer = ByteBuffer.allocate(4);buffer.putInt(getWeight());buffer.flip();return new Subscription(new ArrayList<>(topics), buffer);}@Overridepublic Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {Map<String, Assignment> assignments = new HashMap<>();Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>();// 遍历所有订阅的topicsfor (String topic : metadata.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {partitionConsumers.putIfAbsent(partition, new ArrayList<>());}}// 根据权重分配分区for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {String consumerId = subscriptionEntry.getKey();Subscription subscription = subscriptionEntry.getValue();int weight = subscription.userData().getInt();for (String topic : subscription.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {List<String> consumers = partitionConsumers.get(partition);for (int i = 0; i < weight; i++) {consumers.add(consumerId);  // 权重高的消费者多次添加,增加选中的机会}}}}// 随机分配分区给消费者Random random = new Random();for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) {List<String> consumers = entry.getValue();String assignedConsumer = consumers.get(random.nextInt(consumers.size()));assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>())).partitions().add(entry.getKey());}return assignments;}@Overridepublic void onAssignment(Assignment assignment, Cluster metadata) {// 可以在这里处理分配后的逻辑,比如保存当前分配的快照}@Overridepublic String name() {return "weighted";}private int getWeight() {// 获取权重,可以从配置文件或环境变量中获取return 10; // 默认权重为10}
}

注册使用

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());
// 配置其他消费者属性KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));

相关文章:

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…...

深度学习与图像修复:ADetailer插件在Stable Diffusion中的应用

文章目录 引言ADetailer插件介绍插件安装常用模型控制提示词参数配置参数详解 实践建议 示例插件的对比&#xff1a;1. ADetailer插件2. Photoshop插件&#xff08;如Nik Collection&#xff09;3. GIMP插件&#xff08;如GMIC&#xff09;4. Affinity Photo插件 结语 引言 无…...

【Pytorch】topk函数

topk 是 PyTorch 中的一个函数&#xff0c;用于从张量中选取最大&#xff08;或最小&#xff09;的 k 个元素及其对应的索引。其定义如下&#xff1a; values, indices torch.topk(input, k, dimNone, largestTrue, sortedTrue, *, outNone)参数说明 input (Tensor): 输入张…...

使用mybatis注解和xml映射执行javaWeb中增删改查等操作

Mapper接口 使用注解执行SQL语句操作和相应的Java抽象类&#xff08;对于简单的增删改查使用注解&#xff09; Mapper public interface EmpMapper {// 根据id删除员工信息Delete("delete from mybatis.emp where id#{id}")public int EmpDelete(Integer id);// 查…...

SpringBoot3 响应式编程

Spring Boot 3 中的响应式编程是一个重要的特性&#xff0c;它允许开发者构建非阻塞、异步和基于事件的应用程序&#xff0c;这对于处理高并发和实时数据流的应用场景尤为重要。以下是对Spring Boot 3响应式编程的详细解析&#xff1a; 一、响应式编程概述 响应式编程是一种编…...

【C++ 面试 - 基础题】每日 3 题(二)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏&…...

Modelica建模,Modelica语言的学习,技术调研工作

Modelica建模&#xff0c;Modelica语言的学习&#xff0c;技术调研工作 参考资料&#xff1a; 苏州同元软控信息技术有限公司 - 同元 Modelica 再探冷却 modelica学习-CSDN博客 1、 Modelica简介 Modelica是由Modelica协会维护、免费开放的物理系统面向对象的统一建模语言规…...

Oracle数据字典之——v$lock 和v$locked_object

v$lock视图 v$lock视图列出当前系统持有的或正在申请的所有锁的情况&#xff0c;其主要字段说明如下&#xff1a; 11g如下&#xff1a; 字段名称类型说明ADDRRAW(8)锁定状态对象的地址KADDRRAW(8)锁的地址SIDNUMBER会话&#xff08;SESSION&#xff09;标识&#xff1b;TYPE…...

solidity 以太坊(Ether) 单位(很基础)

一个字面常数可以带一个后缀 wei&#xff0c; gwei 或 ether 来指定一个以太坊的数量&#xff0c; 其中没有后缀的以太数字被认为单位是wei。 在以太坊和许多其他基于以太坊的区块链系统中&#xff0c;以太币&#xff08;Ether&#xff09;是网络中的主要加密货币。 以太可以被…...

关于elementUI 分页 table 使用 toggleRowSelection

我出现问题的前提 在table表格第一页全选 &#xff0c;第二页全选 回到第一页 点击按钮 取消 第一页&#xff0c;第二页我不要的勾选 初始实现 this.selectedPeraonal是表格 selection-change方法返回的值 handleSelectionChange(val) {this.selectedPeraonal val || []…...

K8s部署RocketMQ

准备工作 我是win电脑&#xff0c;本地安装了Podman&#xff0c;并使用Kind创建了一个K8s本地环境&#xff0c;并在 win 电脑上安装了 Helm。 部署RocketMQ 1. add rocketmq helm repo 2. deploy rocketmq cluster 3. verify the rocketmq cluster 4. Create Topic by api a…...

Linux服务管理-Nginx配置

静态解析主要解析html、css动态解析需要解析php 动态资源通过轮询分配到后端的Apache服务器处理 apache是同步阻塞&#xff0c;nginx是异步非阻塞...

C语言典型例题31

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 习题2.8 请编写程序将China译为密码&#xff0c;密码的规律是&#xff1a;用原来字母后面的第4个字母代替原来的字母。 例如:C后面的4个字母是G&#xff0c;h后面第4个字母为l 代码&#xff1a; //《C程序设计教程…...

FFMPEG 工具方法

av_strerror int av_strerror ( int errnum, char * errbuf, size_t errbuf_size )ffmpeg获取与设置mp4文件旋转方向方法 设置与获取都是对AVStream的dict操作. 设置 for (i 0; i < ifmt_ctx_v->nb_streams; i) { //Create output AVStream according to input A…...

Qt QML 使用QPainterPath绘制弧形曲线和弧形文本

Qt并没有相关api直接绘制弧形文字&#xff0c;但提供了曲线绘制相关类&#xff0c;所以只能另辟蹊径&#xff0c;使用QPainterPath先生成曲线&#xff0c;然后通过曲线上的点来定位每个文字并draw出来。 QML具体做法为从QQuickPaintedItem继承&#xff0c;在派生类中实现paint…...

VMware虚拟机和Docker的备份与恢复

目录 1. VMware虚拟机的快照备份 1.1 VMware本机的快照备份 1.2 VMware快照备份到另一电脑 2. Docker知识点 2.1 Docker镜像和容器的关系 2.2 Docker的存储卷 2.3 Docker命令简介 2.4 删除Anylink镜像 3. Docker备份和恢复 3.1 确定要回滚的容器和版本 3.2 备份当前…...

新加坡服务器延迟大吗?如何进行优化

新加坡服务器延迟大吗&#xff1f;新加坡服务器的延迟通常在全国平均延迟111ms左右&#xff0c;其中移动网络约为90ms&#xff0c;联通网络106ms&#xff0c;电信网络最低约为85ms。为了进行优化&#xff0c;一般可以采取使用CDN、优化路由线路、增加带宽和服务器升级等方法。 …...

uniapp——列表图片加载太多且空间占用太大的处理方法(降低清晰度)

解决方法 列表默认显示的降低清晰度&#xff0c;预览图片的时候加载原图。 如果图片是上传到阿里云的OSS上&#xff0c;可以快速获取图片缩略图的方法 直接在后端返回的URL后面拼接字符串&#xff1a; XXX.png?x-oss-process 缩略图方法介绍&#xff1a; ?x-oss-proces…...

spring+SSM+Mybatis面试题(上)(30道)

目录 1. 何为Spring Bean容器?Spring Bean容器与Spring IOC 容器有什么不同吗?2. Spring IOC 如何理解?3. Spring DI 如何理解?4. Spring 中基于注解如何配置对象作用域?以及如何配置延迟加载机制?1.配置作用域需要注解Scope(“Singleton”)2.开启延迟加载&#xff1a;La…...

odoo17 翻译一个小bug

odoo17 翻译一个小bug 用户界面的没译过来 标红处&#xff0c;但在zh_CN.po中明显已经翻译过来了&#xff0c;采取暴力点的&#xff0c;直接把base下的base.pot删除&#xff0c;再更新一下&#xff0c;可以正常显示了...

sqli-labs-php7-master第5-10关

第五关&#xff1a; 根据提示输入ID,随便来个一 输入100&#xff0c;因为数据库没有&#xff0c;所以这里没输出内容 还是先找注入点&#xff1a;输入单引号试试 注入点找到了‘ 查询数据库列数&#xff1b;&#xff1f;id1 order by 4 -- 测试发现order by 3时页面正常&…...

全方位教程:接入视频美颜SDK与直播美颜插件的完整步骤

今天&#xff0c;小编将为您详细介绍如何接入视频美颜SDK与直播美颜插件的完整步骤。 一、准备工作 开发环境的搭建 在开始集成之前&#xff0c;确保您的开发环境已配置完毕。无论是使用iOS、Android&#xff0c;还是Web开发&#xff0c;您都需要准备好对应的开发工具链&…...

Apache Curator 分布式锁的介绍,以及案例

可重入锁&#xff08;InterProcessMutex&#xff09;&#xff1a;这种锁允许同一个客户端多次获取同一把锁而不会被阻塞&#xff0c;类似于Java中的ReentrantLock。它通过在Zookeeper的指定路径下创建临时序列节点来实现锁的功能。如果获取锁失败&#xff0c;当前线程会监听前一…...

自动化测试 — selenium + Java

什么是自动化测试 将人为驱动的测试行为转化为机器执行的过程。 自动化测试包括UI 自动化&#xff0c;接口自动化&#xff0c;单元测试自动化。按照这个金字塔模型来进行自动化测试规划&#xff0c;可以产生最佳的自贡话测试产出投入比&#xff08;ROI &#xff09;&#xff0c…...

【SpringBoot系列】接口参数的默认值与必要性

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

茶余饭后(五)

真正出类拔萃的人 往往都是狠角色&#xff0c; 他们具备着一种独特的特质 那就是&#xff1a; 目标清晰 意志如铁 底线分明 同时手段又极为高明 且勤奋不屑 在处于劣势时 他们表现的极为谦逊和低调 像一只温顺无害的小羊羔 然而一旦时机成熟 他们便会毫不犹豫的展现出强…...

【网络编程详解】

&#x1f308;个人主页&#xff1a;努力学编程’ ⛅个人推荐&#xff1a; c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构&#xff0c;刷题刻不容缓&#xff1a;点击一起刷题 &#x1f319;心灵鸡汤&#xff1a;总有人要赢&#xff0c;为什么不能是我呢 &#x1f525…...

C# winform三层架构 实现增删改查( 显示数据,查询数据 显示,查询篇)

一.留言 上一篇讲解了如何去添加数据&#xff0c;那么本章节我们来做&#xff0c;添加数据后显示&#xff0c;以及咋现有的数据里&#xff0c;查询我们所需要的数据。 二.显示 首先我们看上一篇更新&#xff0c;我们在添加成功后跳转页面显示数据&#xff0c;那么跳转代码只…...

Apache Kylin 系列入门教程

Apache Kylin 是一款开源的分布式分析引擎&#xff0c;主要用于提供SQL接口及多维分析&#xff08;OLAP&#xff09;能力以支持超大规模数据集。它能在亚秒级时间内完成PB级别的数据查询。本文将带你一步步了解如何安装、配置和使用Apache Kylin来构建数据仓库&#xff0c;并执…...

如何识别并防御漏洞扫描类攻击

随着网络安全威胁的不断演变&#xff0c;漏洞扫描已成为黑客常用的手段之一&#xff0c;旨在发现目标系统中的弱点以便进行后续攻击。高防服务作为一种专业的安全防护措施&#xff0c;能够在一定程度上识别并阻止这类攻击行为。本文将深入探讨高防服务是如何识别并防御漏洞扫描…...