当前位置: 首页 > news >正文

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:
现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有1,想要修改为2、3来保证当部分Kafka的Broker宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过JSON+脚本的方式,来达到Kafka副本分区的调整。

  • 启动服务、创建主题、查看主题
  • 修改分区副本因子(不允许)、修改分区副本因子(成功)
  • 查看结果

在这里插入图片描述

分区分配策略

在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只会被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。
Kafka中提供了多重分区分配算法(PartitionAssignor):

  • RangeAssignor
  • RoundRobinAssignor
  • StickAssignor

在这里插入图片描述

RangeAssignor

  • PartionAssignor 接口用于用户自定义分区分配算法,以实现Consumer之间的分区分配。
  • 消费组的成员定义他们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker,协调者选择其中一个消费者来执行这个消费组的分区,并将分配结果转发给消费组内所有的消费者。
  • Kafka默认采用的是RangeAssignor的分配算法。

在这里插入图片描述

  • RangeAssignor对每个Topic进行独立的分区分配,对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,有一些消费者就会多分配到一些分区。
  • RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运行来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀的分配给所有的消费者。
  • 对于每一个Topic,RangerAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够分配均衡,那么字典序靠前的消费者会被多分配一个分区。

在这里插入图片描述

RoundRobinAssignor

  • RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。
  • 如果消费组内,消费订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果尽量均衡。如果订阅的Topic列表是不同的,那么分配结果不保证尽量均衡。

在这里插入图片描述

  • 对于 RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能让消费者之间尽量均衡的分配到分区(分配到的分区的差值不会超过1,而RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)
  • 对于消费组内消费者订阅Topic不一致的情况:假设有两个消费者分别为C0和C1,有2个TopicT1、T2,分别有3个分区、2个分区,并且 C0 订阅了T1和T2,那么RoundRobinAssignor的分配结果如下:

在这里插入图片描述

StickyAssignor

尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。
更核心的问题是无论是 RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。
Sticky是“粘性的”,可以理解为分配是带粘性的:

  • 分区的分配尽量的均衡
  • 每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。

假设当前有如下内容:

  • 3个Consumer C0、C1、C2
  • 4个Topic:T0、T1、T2、T3 每个Topic有2个分区
  • 所有Consumer都订阅了4个分区

在这里插入图片描述
如果 C1 宕机,此时 StickyAssignor 的结果:
在这里插入图片描述

自定义分区策略

基本概念

需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口
其中定义了两个内部类:

  • Subscription:用来表示消费者的订阅信息,类中有两个属性:topics、userData,分别表示消费者所订阅Topic列表和用户自定义信息。
  • PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意此方法中只有一个参数Topics,与Subscription类中的topics相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在Subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、IP地址、HOST或者机架
  • Assignment:用来表示分配信息的,类中有两个属性:partitions、userData,分别表示所分配到的分区集合和用户自定义的数据,可以通过PartitonAssignor接口中的onAssignment()方法是在每个消费者收到消费组Leader分配结果时的回调函数,例如在:StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备下次消费组再平衡(Rebalance)时可以提供分配参考依据。

Kafka还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将Subscription的 userData信息去掉后,在进行分配。

代码实现

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;import java.nio.ByteBuffer;
import java.util.*;public class WeightedPartitionAssignor implements ConsumerPartitionAssignor {@Overridepublic Subscription subscription(Set<String> topics) {// 在这里添加权重信息到 userDataByteBuffer buffer = ByteBuffer.allocate(4);buffer.putInt(getWeight());buffer.flip();return new Subscription(new ArrayList<>(topics), buffer);}@Overridepublic Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {Map<String, Assignment> assignments = new HashMap<>();Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>();// 遍历所有订阅的topicsfor (String topic : metadata.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {partitionConsumers.putIfAbsent(partition, new ArrayList<>());}}// 根据权重分配分区for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {String consumerId = subscriptionEntry.getKey();Subscription subscription = subscriptionEntry.getValue();int weight = subscription.userData().getInt();for (String topic : subscription.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {List<String> consumers = partitionConsumers.get(partition);for (int i = 0; i < weight; i++) {consumers.add(consumerId);  // 权重高的消费者多次添加,增加选中的机会}}}}// 随机分配分区给消费者Random random = new Random();for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) {List<String> consumers = entry.getValue();String assignedConsumer = consumers.get(random.nextInt(consumers.size()));assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>())).partitions().add(entry.getKey());}return assignments;}@Overridepublic void onAssignment(Assignment assignment, Cluster metadata) {// 可以在这里处理分配后的逻辑,比如保存当前分配的快照}@Overridepublic String name() {return "weighted";}private int getWeight() {// 获取权重,可以从配置文件或环境变量中获取return 10; // 默认权重为10}
}

注册使用

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());
// 配置其他消费者属性KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));

相关文章:

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…...

深度学习与图像修复:ADetailer插件在Stable Diffusion中的应用

文章目录 引言ADetailer插件介绍插件安装常用模型控制提示词参数配置参数详解 实践建议 示例插件的对比&#xff1a;1. ADetailer插件2. Photoshop插件&#xff08;如Nik Collection&#xff09;3. GIMP插件&#xff08;如GMIC&#xff09;4. Affinity Photo插件 结语 引言 无…...

【Pytorch】topk函数

topk 是 PyTorch 中的一个函数&#xff0c;用于从张量中选取最大&#xff08;或最小&#xff09;的 k 个元素及其对应的索引。其定义如下&#xff1a; values, indices torch.topk(input, k, dimNone, largestTrue, sortedTrue, *, outNone)参数说明 input (Tensor): 输入张…...

使用mybatis注解和xml映射执行javaWeb中增删改查等操作

Mapper接口 使用注解执行SQL语句操作和相应的Java抽象类&#xff08;对于简单的增删改查使用注解&#xff09; Mapper public interface EmpMapper {// 根据id删除员工信息Delete("delete from mybatis.emp where id#{id}")public int EmpDelete(Integer id);// 查…...

SpringBoot3 响应式编程

Spring Boot 3 中的响应式编程是一个重要的特性&#xff0c;它允许开发者构建非阻塞、异步和基于事件的应用程序&#xff0c;这对于处理高并发和实时数据流的应用场景尤为重要。以下是对Spring Boot 3响应式编程的详细解析&#xff1a; 一、响应式编程概述 响应式编程是一种编…...

【C++ 面试 - 基础题】每日 3 题(二)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏&…...

Modelica建模,Modelica语言的学习,技术调研工作

Modelica建模&#xff0c;Modelica语言的学习&#xff0c;技术调研工作 参考资料&#xff1a; 苏州同元软控信息技术有限公司 - 同元 Modelica 再探冷却 modelica学习-CSDN博客 1、 Modelica简介 Modelica是由Modelica协会维护、免费开放的物理系统面向对象的统一建模语言规…...

Oracle数据字典之——v$lock 和v$locked_object

v$lock视图 v$lock视图列出当前系统持有的或正在申请的所有锁的情况&#xff0c;其主要字段说明如下&#xff1a; 11g如下&#xff1a; 字段名称类型说明ADDRRAW(8)锁定状态对象的地址KADDRRAW(8)锁的地址SIDNUMBER会话&#xff08;SESSION&#xff09;标识&#xff1b;TYPE…...

solidity 以太坊(Ether) 单位(很基础)

一个字面常数可以带一个后缀 wei&#xff0c; gwei 或 ether 来指定一个以太坊的数量&#xff0c; 其中没有后缀的以太数字被认为单位是wei。 在以太坊和许多其他基于以太坊的区块链系统中&#xff0c;以太币&#xff08;Ether&#xff09;是网络中的主要加密货币。 以太可以被…...

关于elementUI 分页 table 使用 toggleRowSelection

我出现问题的前提 在table表格第一页全选 &#xff0c;第二页全选 回到第一页 点击按钮 取消 第一页&#xff0c;第二页我不要的勾选 初始实现 this.selectedPeraonal是表格 selection-change方法返回的值 handleSelectionChange(val) {this.selectedPeraonal val || []…...

K8s部署RocketMQ

准备工作 我是win电脑&#xff0c;本地安装了Podman&#xff0c;并使用Kind创建了一个K8s本地环境&#xff0c;并在 win 电脑上安装了 Helm。 部署RocketMQ 1. add rocketmq helm repo 2. deploy rocketmq cluster 3. verify the rocketmq cluster 4. Create Topic by api a…...

Linux服务管理-Nginx配置

静态解析主要解析html、css动态解析需要解析php 动态资源通过轮询分配到后端的Apache服务器处理 apache是同步阻塞&#xff0c;nginx是异步非阻塞...

C语言典型例题31

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 习题2.8 请编写程序将China译为密码&#xff0c;密码的规律是&#xff1a;用原来字母后面的第4个字母代替原来的字母。 例如:C后面的4个字母是G&#xff0c;h后面第4个字母为l 代码&#xff1a; //《C程序设计教程…...

FFMPEG 工具方法

av_strerror int av_strerror ( int errnum, char * errbuf, size_t errbuf_size )ffmpeg获取与设置mp4文件旋转方向方法 设置与获取都是对AVStream的dict操作. 设置 for (i 0; i < ifmt_ctx_v->nb_streams; i) { //Create output AVStream according to input A…...

Qt QML 使用QPainterPath绘制弧形曲线和弧形文本

Qt并没有相关api直接绘制弧形文字&#xff0c;但提供了曲线绘制相关类&#xff0c;所以只能另辟蹊径&#xff0c;使用QPainterPath先生成曲线&#xff0c;然后通过曲线上的点来定位每个文字并draw出来。 QML具体做法为从QQuickPaintedItem继承&#xff0c;在派生类中实现paint…...

VMware虚拟机和Docker的备份与恢复

目录 1. VMware虚拟机的快照备份 1.1 VMware本机的快照备份 1.2 VMware快照备份到另一电脑 2. Docker知识点 2.1 Docker镜像和容器的关系 2.2 Docker的存储卷 2.3 Docker命令简介 2.4 删除Anylink镜像 3. Docker备份和恢复 3.1 确定要回滚的容器和版本 3.2 备份当前…...

新加坡服务器延迟大吗?如何进行优化

新加坡服务器延迟大吗&#xff1f;新加坡服务器的延迟通常在全国平均延迟111ms左右&#xff0c;其中移动网络约为90ms&#xff0c;联通网络106ms&#xff0c;电信网络最低约为85ms。为了进行优化&#xff0c;一般可以采取使用CDN、优化路由线路、增加带宽和服务器升级等方法。 …...

uniapp——列表图片加载太多且空间占用太大的处理方法(降低清晰度)

解决方法 列表默认显示的降低清晰度&#xff0c;预览图片的时候加载原图。 如果图片是上传到阿里云的OSS上&#xff0c;可以快速获取图片缩略图的方法 直接在后端返回的URL后面拼接字符串&#xff1a; XXX.png?x-oss-process 缩略图方法介绍&#xff1a; ?x-oss-proces…...

spring+SSM+Mybatis面试题(上)(30道)

目录 1. 何为Spring Bean容器?Spring Bean容器与Spring IOC 容器有什么不同吗?2. Spring IOC 如何理解?3. Spring DI 如何理解?4. Spring 中基于注解如何配置对象作用域?以及如何配置延迟加载机制?1.配置作用域需要注解Scope(“Singleton”)2.开启延迟加载&#xff1a;La…...

odoo17 翻译一个小bug

odoo17 翻译一个小bug 用户界面的没译过来 标红处&#xff0c;但在zh_CN.po中明显已经翻译过来了&#xff0c;采取暴力点的&#xff0c;直接把base下的base.pot删除&#xff0c;再更新一下&#xff0c;可以正常显示了...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明&#xff1a;server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

Java-41 深入浅出 Spring - 声明式事务的支持 事务配置 XML模式 XML+注解模式

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

自然语言处理——Transformer

自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效&#xff0c;它能挖掘数据中的时序信息以及语义信息&#xff0c;但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN&#xff0c;但是…...

Android15默认授权浮窗权限

我们经常有那种需求&#xff0c;客户需要定制的apk集成在ROM中&#xff0c;并且默认授予其【显示在其他应用的上层】权限&#xff0c;也就是我们常说的浮窗权限&#xff0c;那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...

【HTTP三个基础问题】

面试官您好&#xff01;HTTP是超文本传输协议&#xff0c;是互联网上客户端和服务器之间传输超文本数据&#xff08;比如文字、图片、音频、视频等&#xff09;的核心协议&#xff0c;当前互联网应用最广泛的版本是HTTP1.1&#xff0c;它基于经典的C/S模型&#xff0c;也就是客…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理

引言 Bitmap&#xff08;位图&#xff09;是Android应用内存占用的“头号杀手”。一张1080P&#xff08;1920x1080&#xff09;的图片以ARGB_8888格式加载时&#xff0c;内存占用高达8MB&#xff08;192010804字节&#xff09;。据统计&#xff0c;超过60%的应用OOM崩溃与Bitm…...