当前位置: 首页 > news >正文

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:
现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有1,想要修改为2、3来保证当部分Kafka的Broker宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过JSON+脚本的方式,来达到Kafka副本分区的调整。

  • 启动服务、创建主题、查看主题
  • 修改分区副本因子(不允许)、修改分区副本因子(成功)
  • 查看结果

在这里插入图片描述

分区分配策略

在Kafka中,每个Topic会包含多个分区,默认情况下一个分区只会被一个消费组下面的一个消费者消费,这里就产生了分区分配的问题。
Kafka中提供了多重分区分配算法(PartitionAssignor):

  • RangeAssignor
  • RoundRobinAssignor
  • StickAssignor

在这里插入图片描述

RangeAssignor

  • PartionAssignor 接口用于用户自定义分区分配算法,以实现Consumer之间的分区分配。
  • 消费组的成员定义他们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker,协调者选择其中一个消费者来执行这个消费组的分区,并将分配结果转发给消费组内所有的消费者。
  • Kafka默认采用的是RangeAssignor的分配算法。

在这里插入图片描述

  • RangeAssignor对每个Topic进行独立的分区分配,对于每一个Topic,首先对分区按照分区ID进行数值排序,然后订阅这个Topic的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,有一些消费者就会多分配到一些分区。
  • RangeAssignor策略的原理是按照消费者总数和分区总数进行整除运行来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀的分配给所有的消费者。
  • 对于每一个Topic,RangerAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够分配均衡,那么字典序靠前的消费者会被多分配一个分区。

在这里插入图片描述

RoundRobinAssignor

  • RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。
  • 如果消费组内,消费订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果尽量均衡。如果订阅的Topic列表是不同的,那么分配结果不保证尽量均衡。

在这里插入图片描述

  • 对于 RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的方式能让消费者之间尽量均衡的分配到分区(分配到的分区的差值不会超过1,而RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越大)
  • 对于消费组内消费者订阅Topic不一致的情况:假设有两个消费者分别为C0和C1,有2个TopicT1、T2,分别有3个分区、2个分区,并且 C0 订阅了T1和T2,那么RoundRobinAssignor的分配结果如下:

在这里插入图片描述

StickyAssignor

尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的Topic列表不相同的情况下。
更核心的问题是无论是 RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。
Sticky是“粘性的”,可以理解为分配是带粘性的:

  • 分区的分配尽量的均衡
  • 每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。

假设当前有如下内容:

  • 3个Consumer C0、C1、C2
  • 4个Topic:T0、T1、T2、T3 每个Topic有2个分区
  • 所有Consumer都订阅了4个分区

在这里插入图片描述
如果 C1 宕机,此时 StickyAssignor 的结果:
在这里插入图片描述

自定义分区策略

基本概念

需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口
其中定义了两个内部类:

  • Subscription:用来表示消费者的订阅信息,类中有两个属性:topics、userData,分别表示消费者所订阅Topic列表和用户自定义信息。
  • PartitionAssignor接口通过subscription()方法来设置消费者自身相关的Subscription信息,注意此方法中只有一个参数Topics,与Subscription类中的topics相互呼应,但是并没有有关userData的参数体现。为了增强用户对分配结果的控制,可以在Subscription()方法内部添加一些影响分配的用户自定义信息赋予userData,比如:权重、IP地址、HOST或者机架
  • Assignment:用来表示分配信息的,类中有两个属性:partitions、userData,分别表示所分配到的分区集合和用户自定义的数据,可以通过PartitonAssignor接口中的onAssignment()方法是在每个消费者收到消费组Leader分配结果时的回调函数,例如在:StickyAssignor策略中就是通过这个方法保存当前的分配方案,以备下次消费组再平衡(Rebalance)时可以提供分配参考依据。

Kafka还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将Subscription的 userData信息去掉后,在进行分配。

代码实现

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;import java.nio.ByteBuffer;
import java.util.*;public class WeightedPartitionAssignor implements ConsumerPartitionAssignor {@Overridepublic Subscription subscription(Set<String> topics) {// 在这里添加权重信息到 userDataByteBuffer buffer = ByteBuffer.allocate(4);buffer.putInt(getWeight());buffer.flip();return new Subscription(new ArrayList<>(topics), buffer);}@Overridepublic Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {Map<String, Assignment> assignments = new HashMap<>();Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>();// 遍历所有订阅的topicsfor (String topic : metadata.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {partitionConsumers.putIfAbsent(partition, new ArrayList<>());}}// 根据权重分配分区for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {String consumerId = subscriptionEntry.getKey();Subscription subscription = subscriptionEntry.getValue();int weight = subscription.userData().getInt();for (String topic : subscription.topics()) {List<TopicPartition> partitions = metadata.partitionsForTopic(topic);for (TopicPartition partition : partitions) {List<String> consumers = partitionConsumers.get(partition);for (int i = 0; i < weight; i++) {consumers.add(consumerId);  // 权重高的消费者多次添加,增加选中的机会}}}}// 随机分配分区给消费者Random random = new Random();for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) {List<String> consumers = entry.getValue();String assignedConsumer = consumers.get(random.nextInt(consumers.size()));assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>())).partitions().add(entry.getKey());}return assignments;}@Overridepublic void onAssignment(Assignment assignment, Cluster metadata) {// 可以在这里处理分配后的逻辑,比如保存当前分配的快照}@Overridepublic String name() {return "weighted";}private int getWeight() {// 获取权重,可以从配置文件或环境变量中获取return 10; // 默认权重为10}
}

注册使用

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());
// 配置其他消费者属性KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1", "topic2"));

相关文章:

大数据-67 Kafka 高级特性 分区 分配策略 Ranger、RoundRobin、Sticky、自定义分区器

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…...

深度学习与图像修复:ADetailer插件在Stable Diffusion中的应用

文章目录 引言ADetailer插件介绍插件安装常用模型控制提示词参数配置参数详解 实践建议 示例插件的对比&#xff1a;1. ADetailer插件2. Photoshop插件&#xff08;如Nik Collection&#xff09;3. GIMP插件&#xff08;如GMIC&#xff09;4. Affinity Photo插件 结语 引言 无…...

【Pytorch】topk函数

topk 是 PyTorch 中的一个函数&#xff0c;用于从张量中选取最大&#xff08;或最小&#xff09;的 k 个元素及其对应的索引。其定义如下&#xff1a; values, indices torch.topk(input, k, dimNone, largestTrue, sortedTrue, *, outNone)参数说明 input (Tensor): 输入张…...

使用mybatis注解和xml映射执行javaWeb中增删改查等操作

Mapper接口 使用注解执行SQL语句操作和相应的Java抽象类&#xff08;对于简单的增删改查使用注解&#xff09; Mapper public interface EmpMapper {// 根据id删除员工信息Delete("delete from mybatis.emp where id#{id}")public int EmpDelete(Integer id);// 查…...

SpringBoot3 响应式编程

Spring Boot 3 中的响应式编程是一个重要的特性&#xff0c;它允许开发者构建非阻塞、异步和基于事件的应用程序&#xff0c;这对于处理高并发和实时数据流的应用场景尤为重要。以下是对Spring Boot 3响应式编程的详细解析&#xff1a; 一、响应式编程概述 响应式编程是一种编…...

【C++ 面试 - 基础题】每日 3 题(二)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/fYaBd &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 C 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏&…...

Modelica建模,Modelica语言的学习,技术调研工作

Modelica建模&#xff0c;Modelica语言的学习&#xff0c;技术调研工作 参考资料&#xff1a; 苏州同元软控信息技术有限公司 - 同元 Modelica 再探冷却 modelica学习-CSDN博客 1、 Modelica简介 Modelica是由Modelica协会维护、免费开放的物理系统面向对象的统一建模语言规…...

Oracle数据字典之——v$lock 和v$locked_object

v$lock视图 v$lock视图列出当前系统持有的或正在申请的所有锁的情况&#xff0c;其主要字段说明如下&#xff1a; 11g如下&#xff1a; 字段名称类型说明ADDRRAW(8)锁定状态对象的地址KADDRRAW(8)锁的地址SIDNUMBER会话&#xff08;SESSION&#xff09;标识&#xff1b;TYPE…...

solidity 以太坊(Ether) 单位(很基础)

一个字面常数可以带一个后缀 wei&#xff0c; gwei 或 ether 来指定一个以太坊的数量&#xff0c; 其中没有后缀的以太数字被认为单位是wei。 在以太坊和许多其他基于以太坊的区块链系统中&#xff0c;以太币&#xff08;Ether&#xff09;是网络中的主要加密货币。 以太可以被…...

关于elementUI 分页 table 使用 toggleRowSelection

我出现问题的前提 在table表格第一页全选 &#xff0c;第二页全选 回到第一页 点击按钮 取消 第一页&#xff0c;第二页我不要的勾选 初始实现 this.selectedPeraonal是表格 selection-change方法返回的值 handleSelectionChange(val) {this.selectedPeraonal val || []…...

K8s部署RocketMQ

准备工作 我是win电脑&#xff0c;本地安装了Podman&#xff0c;并使用Kind创建了一个K8s本地环境&#xff0c;并在 win 电脑上安装了 Helm。 部署RocketMQ 1. add rocketmq helm repo 2. deploy rocketmq cluster 3. verify the rocketmq cluster 4. Create Topic by api a…...

Linux服务管理-Nginx配置

静态解析主要解析html、css动态解析需要解析php 动态资源通过轮询分配到后端的Apache服务器处理 apache是同步阻塞&#xff0c;nginx是异步非阻塞...

C语言典型例题31

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 习题2.8 请编写程序将China译为密码&#xff0c;密码的规律是&#xff1a;用原来字母后面的第4个字母代替原来的字母。 例如:C后面的4个字母是G&#xff0c;h后面第4个字母为l 代码&#xff1a; //《C程序设计教程…...

FFMPEG 工具方法

av_strerror int av_strerror ( int errnum, char * errbuf, size_t errbuf_size )ffmpeg获取与设置mp4文件旋转方向方法 设置与获取都是对AVStream的dict操作. 设置 for (i 0; i < ifmt_ctx_v->nb_streams; i) { //Create output AVStream according to input A…...

Qt QML 使用QPainterPath绘制弧形曲线和弧形文本

Qt并没有相关api直接绘制弧形文字&#xff0c;但提供了曲线绘制相关类&#xff0c;所以只能另辟蹊径&#xff0c;使用QPainterPath先生成曲线&#xff0c;然后通过曲线上的点来定位每个文字并draw出来。 QML具体做法为从QQuickPaintedItem继承&#xff0c;在派生类中实现paint…...

VMware虚拟机和Docker的备份与恢复

目录 1. VMware虚拟机的快照备份 1.1 VMware本机的快照备份 1.2 VMware快照备份到另一电脑 2. Docker知识点 2.1 Docker镜像和容器的关系 2.2 Docker的存储卷 2.3 Docker命令简介 2.4 删除Anylink镜像 3. Docker备份和恢复 3.1 确定要回滚的容器和版本 3.2 备份当前…...

新加坡服务器延迟大吗?如何进行优化

新加坡服务器延迟大吗&#xff1f;新加坡服务器的延迟通常在全国平均延迟111ms左右&#xff0c;其中移动网络约为90ms&#xff0c;联通网络106ms&#xff0c;电信网络最低约为85ms。为了进行优化&#xff0c;一般可以采取使用CDN、优化路由线路、增加带宽和服务器升级等方法。 …...

uniapp——列表图片加载太多且空间占用太大的处理方法(降低清晰度)

解决方法 列表默认显示的降低清晰度&#xff0c;预览图片的时候加载原图。 如果图片是上传到阿里云的OSS上&#xff0c;可以快速获取图片缩略图的方法 直接在后端返回的URL后面拼接字符串&#xff1a; XXX.png?x-oss-process 缩略图方法介绍&#xff1a; ?x-oss-proces…...

spring+SSM+Mybatis面试题(上)(30道)

目录 1. 何为Spring Bean容器?Spring Bean容器与Spring IOC 容器有什么不同吗?2. Spring IOC 如何理解?3. Spring DI 如何理解?4. Spring 中基于注解如何配置对象作用域?以及如何配置延迟加载机制?1.配置作用域需要注解Scope(“Singleton”)2.开启延迟加载&#xff1a;La…...

odoo17 翻译一个小bug

odoo17 翻译一个小bug 用户界面的没译过来 标红处&#xff0c;但在zh_CN.po中明显已经翻译过来了&#xff0c;采取暴力点的&#xff0c;直接把base下的base.pot删除&#xff0c;再更新一下&#xff0c;可以正常显示了...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

Spring Boot 实现流式响应(兼容 2.7.x)

在实际开发中&#xff0c;我们可能会遇到一些流式数据处理的场景&#xff0c;比如接收来自上游接口的 Server-Sent Events&#xff08;SSE&#xff09; 或 流式 JSON 内容&#xff0c;并将其原样中转给前端页面或客户端。这种情况下&#xff0c;传统的 RestTemplate 缓存机制会…...

线程与协程

1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指&#xff1a;像函数调用/返回一样轻量地完成任务切换。 举例说明&#xff1a; 当你在程序中写一个函数调用&#xff1a; funcA() 然后 funcA 执行完后返回&…...

vue3 字体颜色设置的多种方式

在Vue 3中设置字体颜色可以通过多种方式实现&#xff0c;这取决于你是想在组件内部直接设置&#xff0c;还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法&#xff1a; 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

HDFS分布式存储 zookeeper

hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架&#xff0c;允许使用简单的变成模型跨计算机对大型集群进行分布式处理&#xff08;1.海量的数据存储 2.海量数据的计算&#xff09;Hadoop核心组件 hdfs&#xff08;分布式文件存储系统&#xff09;&a…...

Fabric V2.5 通用溯源系统——增加图片上传与下载功能

fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行

项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战&#xff0c;克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...