Kafa分区策略实现
引言
Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。
轮询策略(默认)
- 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
-
import org.apache.kafka.clients.producer.*; import java.util.Properties;public class RoundRobinProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();} }随机策略
- 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
-
import org.apache.kafka.clients.producer.*; import java.util.List; import java.util.Map; import java.util.Random;public class RandomPartitioner implements Partitioner {private final Random random = new Random();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return random.nextInt(partitions.size());}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {} }// 使用随机分区器的生产者示例 public class RandomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "RandomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();} }按键哈希策略
- 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
-
import org.apache.kafka.clients.producer.*; import java.util.Properties;public class KeyBasedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);producer.send(record);}producer.close();} }自定义分区策略(实现接口)
-
当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现
org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。 -
import org.apache.kafka.clients.producer.*; import java.util.List; import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 自定义分区逻辑,这里简单示例根据消息值的长度分区String message = (String) value;return message.length() % partitions.size();}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {} }// 使用自定义分区器的生产者示例 public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();} }
相关文章:
Kafa分区策略实现
引言 Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。 轮询策略(默认) 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时&…...
Pyside/Pyqt中QWebEngineView和QWebEnginePage的区别
在 PySide/Qt 的 WebEngine 模块中,QWebEngineView 和 QWebEnginePage 是两个紧密相关但职责不同的类。以下是它们的核心区别和关系: 1. 职责区分 类名核心职责模块归属QWebEngineView作为可视化的窗口部件(Widget),负…...
Kafka的内部通信协议
引言 kafka内部用到的常见协议和优缺点可以看看原文 Kafka用到的协议 本文奖详细探究kafka核心通信协议和高性能的关键 网络层通信的实现 基于 Java NIO:Kafka 的网络通信层主要基于 Java NIO 来实现,这使得它能够高效地处理大量的连接和数据传输。…...
强大到工业层面的软件
电脑数据删不干净,简直是一种让人抓狂的折磨!明明已经把文件扔进了回收站,清空了,可那些残留的数据就像牛皮癣一样,怎么也除不掉。这种烦恼简直无处不在,让人从头到脚都感到无比烦躁。 首先,心…...
数据分析和AI丨应对AI实施挑战,工程领域AI应用的五大方法
工程领域的人工智能 (AI) 已经开始发挥价值,低代码和无代码工具正在使曾经仅属于专业数据科学家的 AI 能力变得大众化。 然而,并非工程领域的每个人都能从中受益,使用新的便捷的 AI 工具提高工作效率并不难,…...
54. UDP协议
UDP协议 UDP(User Datagram Protocol,用户数据报协议)是一个无连接的传输层协议,它提供简单的、不可靠的信息传送服务。与TCP(传输控制协议)不同,UDP不提供数据包的排序、错误检查(仅…...
AJAX笔记入门篇
黑马程序员视频地址: 黑马程序员前端AJAX入门到实战全套教程https://www.bilibili.com/video/BV1MN411y7pw?vd_source0a2d366696f87e241adc64419bf12cab&spm_id_from333.788.videopod.episodes&p2https://www.bilibili.com/video/BV1MN411y7pw?vd_source…...
深入解析Java集合框架:春招面试要点
在上一篇文章中,我们深入探讨了Java核心基础,这是学习Java的基石。而在实际的Java开发中,集合框架的使用频率极高,它为我们提供了丰富的数据结构和算法实现,极大地提高了开发效率。对于春招面试来说,集合框…...
【Elasticsearch】Elasticsearch的查询
Elasticsearch的查询 DSL查询基础语句叶子查询全文检索查询matchmulti_match 精确查询termrange 复合查询算分函数查询bool查询 排序分页基础分页深度分页 高亮高亮原理实现高亮 RestClient查询基础查询叶子查询复合查询排序和分页高亮 数据聚合DSL实现聚合Bucket聚合带条件聚合…...
STM32 PWM驱动直流电机
接线图: 代码配置: 根据驱动舵机的代码来写,与舵机不同的是,这次的引脚接到了PA2上,所以需要改一下引脚以及改为OC3通道。 另外还需在配置两个GPIO引脚,来控制电机的旋转方向,这里连接到了PA4与…...
系统思考—心智模式
“我们的大脑对连贯性的渴望远胜于对准确性的追求。”—诺贝尔经济学得主丹尼尔卡尼曼 在面对复杂的决策时,我们往往更倾向于寻找那些能够迅速串联起来的信息,而非深入挖掘每一个细节的真实性。这种倾向在日常生活中或许能帮助我们迅速作出决策…...
JavaScript_02 表单
表单常用演示: 1.图片 结果失真了... 2.切换图片 切换结果 3.表单:...
【Qt】06-对话框
对话框 前言一、模态和非模态对话框1.1 概念1.2 模态对话框1.2.1 代码QAction类 1.2.2 模态对话框运行分析 1.3 非模态对话框1.3.1 代码局部变量和成员变量setAttribute 类 1.3.2 现象解释 二、标准对话框2.1 提示对话框 QMessageBox2.1.1 现象及解释 2.2 问题对话框2.2.1 现象…...
AI学习指南Ollama篇-使用Ollama构建自己的私有化知识库
一、引言 (一)背景介绍 随着企业对数据隐私和效率的重视,私有化知识库的需求日益增长。私有化知识库不仅可以保护企业数据的安全性,还能提供高效的知识管理和问答系统,提升企业内部的工作效率和创新能力。 (二)Ollama和AnythingLLM的结合 Ollama和AnythingLLM的结合…...
2.策略模式(Strategy)
定义 定义一系列算法,把它们一个个封装起来,并且使他们可互相替换(变化)。该模式使算法可独立于使用它的客户程序(稳定)而变化(拓展,子类化)。 动机(Motiva…...
Python里的小整数问题挺有意思的
简单来说,Python为了优化性能,会把一些常用的整数(通常是-5到256)提前创建好,放到一个“缓存池”里。这样,当你用到这些小整数时,Python就不用每次都重新创建对象了,直接从缓存池里拿…...
开源智慧园区管理系统对比五款主流产品探索智能运营新模式
内容概要 在这个数字化迅速发展的时代,园区管理也迎来了全新的机遇和挑战。众所周知,开源智慧园区管理系统作为一种创新解决方案,正逐步打破传统管理的局限性。它的开放性不仅使得系统可以根据具体需求进行灵活调整,也为用户提供…...
正则表达式入门
入门 1、提取文章中所有的英文单词 //1.先创建一个Pattern对象,模式对象,可以理解成就是一个正则表达式对象 Pattern pattern Pattern.compile("[a-zA-Z]"); //2.创建一个匹配器对象 //理解:就是 matcher匹配器按照p…...
hive:数据导入,数据导出,加载数据到Hive,复制表结构
hive不建议用insert,因为Hive是建立在Hadoop之上的数据仓库工具,主要用于批处理和大数据分析,而不是为OLTP(在线事务处理)操作设计的。INSERT操作会非常慢 数据导入 命令行界面:建一个文件 查询数据>>复制>>粘贴到新…...
【某大厂一面】HashSet底层怎么实现的
HashSet 是 Java 集合框架中的一个非常常用的集合类,它实现了 Set 接口,并且底层通常是通过 哈希表(HashMap)来实现的。要理解 HashSet 的底层实现,我们需要从哈希表的工作原理开始讲起。下面是对 HashSet 底层实现的详…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql
智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...
