大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(正在更新…)
章节内容
上节我们完成了如下内容:
现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有1,想要修改为2、3来保证当部分Kafka的Broker宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过JSON+脚本的方式,来达到Kafka副本分区的调整。
- 启动服务、创建主题、查看主题
- 修改分区副本因子(不允许)、修改分区副本因子(成功)
- 查看结果
分区分配策略
在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只会被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。
Kafka中提供了多重分区分配算法(PartitionAssignor):
- RangeAssignor
- RoundRobinAssignor
- StickAssignor
RangeAssignor
- PartionAssignor 接口用于用户自定义分区分配算法,以实现Consumer之间的分区分配。
- 消费组的成员定义他们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker,协调者选择其中一个消费者来执行这个消费组的分区,并将分配结果转发给消费组内所有的消费者。
- Kafka默认采用的是RangeAssignor的分配算法。
- RangeAssignor对每个Topic进行独立的分区分配,对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,有一些消费者就会多分配到一些分区。
- RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运行来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀的分配给所有的消费者。
- 对于每一个Topic,RangerAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够分配均衡,那么字典序靠前的消费者会被多分配一个分区。
RoundRobinAssignor
- RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。
- 如果消费组内,消费订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果尽量均衡。如果订阅的Topic列表是不同的,那么分配结果不保证尽量均衡。
- 对于 RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能让消费者之间尽量均衡的分配到分区(分配到的分区的差值不会超过1,而RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)
- 对于消费组内消费者订阅Topic不一致的情况:假设有两个消费者分别为C0和C1,有2个TopicT1、T2,分别有3个分区、2个分区,并且 C0 订阅了T1和T2,那么RoundRobinAssignor的分配结果如下:
StickyAssignor
尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。
更核心的问题是无论是 RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。
Sticky是“粘性的”,可以理解为分配是带粘性的:
- 分区的分配尽量的均衡
- 每一次重分配的结果尽量与上一次分配结果保持一致
当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。
假设当前有如下内容:
- 3个Consumer C0、C1、C2
- 4个Topic:T0、T1、T2、T3 每个Topic有2个分区
- 所有Consumer都订阅了4个分区
如果 C1 宕机,此时 StickyAssignor 的结果:
自定义分区策略
基本概念
需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口
其中定义了两个内部类:
- Subscription:用来表示消费者的订阅信息,类中有两个属性:topics、userData,分别表示消费者所订阅Topic列表和用户自定义信息。
- PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意此方法中只有一个参数Topics,与Subscription类中的topics相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在Subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、IP地址、HOST或者机架
- Assignment:用来表示分配信息的,类中有两个属性:partitions、userData,分别表示所分配到的分区集合和用户自定义的数据,可以通过PartitonAssignor接口中的onAssignment()方法是在每个消费者收到消费组Leader分配结果时的回调函数,例如在:StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备下次消费组再平衡(Rebalance)时可以提供分配参考依据。
Kafka还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将Subscription的 userData信息去掉后,在进行分配。
代码实现
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;import java.nio.ByteBuffer;
import java.util.*;public class WeightedPartitionAssignor implements ConsumerPartitionAssignor {@Overridepublic Subscription subscription(Set<String> topics) {// 在这里添加权重信息到 userDataByteBuffer buffer = ByteBuffer.allocate(4);buffer.putInt(getWeight());buffer.flip();return new Subscription(new ArrayList<>(topics), buffer);}@Overridepublic Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {Map<String, Assignment> assignments = new HashMap<>();Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>();// 遍历所有订阅的topicsfor (String topic : metadata.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {partitionConsumers.putIfAbsent(partition, new ArrayList<>());}}// 根据权重分配分区for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {String consumerId = subscriptionEntry.getKey();Subscription subscription = subscriptionEntry.getValue();int weight = subscription.userData().getInt();for (String topic : subscription.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {List<String> consumers = partitionConsumers.get(partition);for (int i = 0; i < weight; i++) {consumers.add(consumerId); // 权重高的消费者多次添加,增加选中的机会}}}}// 随机分配分区给消费者Random random = new Random();for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) {List<String> consumers = entry.getValue();String assignedConsumer = consumers.get(random.nextInt(consumers.size()));assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>())).partitions().add(entry.getKey());}return assignments;}@Overridepublic void onAssignment(Assignment assignment, Cluster metadata) {// 可以在这里处理分配后的逻辑,比如保存当前分配的快照}@Overridepublic String name() {return "weighted";}private int getWeight() {// 获取权重,可以从配置文件或环境变量中获取return 10; // 默认权重为10}
}
注册使用
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());
// 配置其他消费者属性KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));
相关文章:

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器
点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…...

深度学习与图像修复:ADetailer插件在Stable Diffusion中的应用
文章目录 引言ADetailer插件介绍插件安装常用模型控制提示词参数配置参数详解 实践建议 示例插件的对比:1. ADetailer插件2. Photoshop插件(如Nik Collection)3. GIMP插件(如GMIC)4. Affinity Photo插件 结语 引言 无…...
【Pytorch】topk函数
topk 是 PyTorch 中的一个函数,用于从张量中选取最大(或最小)的 k 个元素及其对应的索引。其定义如下: values, indices torch.topk(input, k, dimNone, largestTrue, sortedTrue, *, outNone)参数说明 input (Tensor): 输入张…...

使用mybatis注解和xml映射执行javaWeb中增删改查等操作
Mapper接口 使用注解执行SQL语句操作和相应的Java抽象类(对于简单的增删改查使用注解) Mapper public interface EmpMapper {// 根据id删除员工信息Delete("delete from mybatis.emp where id#{id}")public int EmpDelete(Integer id);// 查…...
SpringBoot3 响应式编程
Spring Boot 3 中的响应式编程是一个重要的特性,它允许开发者构建非阻塞、异步和基于事件的应用程序,这对于处理高并发和实时数据流的应用场景尤为重要。以下是对Spring Boot 3响应式编程的详细解析: 一、响应式编程概述 响应式编程是一种编…...
【C++ 面试 - 基础题】每日 3 题(二)
✍个人博客:Pandaconda-CSDN博客 📣专栏地址:http://t.csdnimg.cn/fYaBd 📚专栏简介:在这个专栏中,我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话,欢迎点赞👍收藏&…...

Modelica建模,Modelica语言的学习,技术调研工作
Modelica建模,Modelica语言的学习,技术调研工作 参考资料: 苏州同元软控信息技术有限公司 - 同元 Modelica 再探冷却 modelica学习-CSDN博客 1、 Modelica简介 Modelica是由Modelica协会维护、免费开放的物理系统面向对象的统一建模语言规…...
Oracle数据字典之——v$lock 和v$locked_object
v$lock视图 v$lock视图列出当前系统持有的或正在申请的所有锁的情况,其主要字段说明如下: 11g如下: 字段名称类型说明ADDRRAW(8)锁定状态对象的地址KADDRRAW(8)锁的地址SIDNUMBER会话(SESSION)标识;TYPE…...
solidity 以太坊(Ether) 单位(很基础)
一个字面常数可以带一个后缀 wei, gwei 或 ether 来指定一个以太坊的数量, 其中没有后缀的以太数字被认为单位是wei。 在以太坊和许多其他基于以太坊的区块链系统中,以太币(Ether)是网络中的主要加密货币。 以太可以被…...
关于elementUI 分页 table 使用 toggleRowSelection
我出现问题的前提 在table表格第一页全选 ,第二页全选 回到第一页 点击按钮 取消 第一页,第二页我不要的勾选 初始实现 this.selectedPeraonal是表格 selection-change方法返回的值 handleSelectionChange(val) {this.selectedPeraonal val || []…...

K8s部署RocketMQ
准备工作 我是win电脑,本地安装了Podman,并使用Kind创建了一个K8s本地环境,并在 win 电脑上安装了 Helm。 部署RocketMQ 1. add rocketmq helm repo 2. deploy rocketmq cluster 3. verify the rocketmq cluster 4. Create Topic by api a…...

Linux服务管理-Nginx配置
静态解析主要解析html、css动态解析需要解析php 动态资源通过轮询分配到后端的Apache服务器处理 apache是同步阻塞,nginx是异步非阻塞...

C语言典型例题31
《C程序设计教程(第四版)——谭浩强》 习题2.8 请编写程序将China译为密码,密码的规律是:用原来字母后面的第4个字母代替原来的字母。 例如:C后面的4个字母是G,h后面第4个字母为l 代码: //《C程序设计教程…...
FFMPEG 工具方法
av_strerror int av_strerror ( int errnum, char * errbuf, size_t errbuf_size )ffmpeg获取与设置mp4文件旋转方向方法 设置与获取都是对AVStream的dict操作. 设置 for (i 0; i < ifmt_ctx_v->nb_streams; i) { //Create output AVStream according to input A…...

Qt QML 使用QPainterPath绘制弧形曲线和弧形文本
Qt并没有相关api直接绘制弧形文字,但提供了曲线绘制相关类,所以只能另辟蹊径,使用QPainterPath先生成曲线,然后通过曲线上的点来定位每个文字并draw出来。 QML具体做法为从QQuickPaintedItem继承,在派生类中实现paint…...

VMware虚拟机和Docker的备份与恢复
目录 1. VMware虚拟机的快照备份 1.1 VMware本机的快照备份 1.2 VMware快照备份到另一电脑 2. Docker知识点 2.1 Docker镜像和容器的关系 2.2 Docker的存储卷 2.3 Docker命令简介 2.4 删除Anylink镜像 3. Docker备份和恢复 3.1 确定要回滚的容器和版本 3.2 备份当前…...

新加坡服务器延迟大吗?如何进行优化
新加坡服务器延迟大吗?新加坡服务器的延迟通常在全国平均延迟111ms左右,其中移动网络约为90ms,联通网络106ms,电信网络最低约为85ms。为了进行优化,一般可以采取使用CDN、优化路由线路、增加带宽和服务器升级等方法。 …...
uniapp——列表图片加载太多且空间占用太大的处理方法(降低清晰度)
解决方法 列表默认显示的降低清晰度,预览图片的时候加载原图。 如果图片是上传到阿里云的OSS上,可以快速获取图片缩略图的方法 直接在后端返回的URL后面拼接字符串: XXX.png?x-oss-process 缩略图方法介绍: ?x-oss-proces…...

spring+SSM+Mybatis面试题(上)(30道)
目录 1. 何为Spring Bean容器?Spring Bean容器与Spring IOC 容器有什么不同吗?2. Spring IOC 如何理解?3. Spring DI 如何理解?4. Spring 中基于注解如何配置对象作用域?以及如何配置延迟加载机制?1.配置作用域需要注解Scope(“Singleton”)2.开启延迟加载:La…...

odoo17 翻译一个小bug
odoo17 翻译一个小bug 用户界面的没译过来 标红处,但在zh_CN.po中明显已经翻译过来了,采取暴力点的,直接把base下的base.pot删除,再更新一下,可以正常显示了...

Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

UE5 学习系列(三)创建和移动物体
这篇博客是该系列的第三篇,是在之前两篇博客的基础上展开,主要介绍如何在操作界面中创建和拖动物体,这篇博客跟随的视频链接如下: B 站视频:s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

MMaDA: Multimodal Large Diffusion Language Models
CODE : https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA,它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构…...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...

dify打造数据可视化图表
一、概述 在日常工作和学习中,我们经常需要和数据打交道。无论是分析报告、项目展示,还是简单的数据洞察,一个清晰直观的图表,往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server,由蚂蚁集团 AntV 团队…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...