当前位置: 首页 > news >正文

Kafka分区策略实现

引言

Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。

轮询策略(默认)

  • 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class RoundRobinProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    随机策略

  • 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;public class RandomPartitioner implements Partitioner {private final Random random = new Random();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return random.nextInt(partitions.size());}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用随机分区器的生产者示例
    public class RandomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "RandomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    按键哈希策略

  • 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class KeyBasedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);producer.send(record);}producer.close();}
    }

    自定义分区策略(实现接口)

  • 当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 自定义分区逻辑,这里简单示例根据消息值的长度分区String message = (String) value;return message.length() % partitions.size();}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用自定义分区器的生产者示例
    public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

相关文章:

Kafka分区策略实现

引言 Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中&#xff0c;合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。 轮询策略&#xff08;默认&#xff09; 轮询策略是 Kafka 默认的分区策略&#xff08;当消息没有指定键时&…...

【归属地】批量号码归属地查询按城市高速的分流,基于WPF的解决方案

在现代商业活动中&#xff0c;企业为了提高营销效果和资源利用效率&#xff0c;需要针对不同地区的市场特点开展精准营销。通过批量号码归属地查询并按城市分流&#xff0c;可以为企业的营销决策提供有力支持。 短信营销&#xff1a;一家连锁餐饮企业计划开展促销活动&#xf…...

为AI聊天工具添加一个知识系统 之78 详细设计之19 正则表达式 之6

本文要点 要点 本项目设计的正则表达式 是一个 动态正则匹配框架。它是一个谓词系统&#xff1a;谓词 是运动&#xff0c;主语是“维度”&#xff0c;表语是 语言处理。主语的一个 双动结构。 Reg三大功能 语法验证、语义检查和 语用检验&#xff0c;三者 &#xff1a;语义约…...

使用Java操作Redis数据类型的详解指南

SEO Meta Description: 详细介绍如何使用Java操作Redis的各种数据类型&#xff0c;包括字符串、哈希、列表、集合和有序集合&#xff0c;提供代码示例和最佳实践。 介绍 Redis是一种开源的内存数据结构存储&#xff0c;用作数据库、缓存和消息代理。它支持多种数据结构&#…...

一表总结 Java 的3种设计模式与6大设计原则

设计模式通常分为三大类&#xff1a;创建型、结构型和行为型。 创建型模式&#xff1a;主要用于解决对象创建问题结构型模式&#xff1a;主要用于解决对象组合问题行为型模式&#xff1a;主要用于解决对象之间的交互问题 创建型模式 创建型模式关注于对象的创建机制&#xf…...

Hive on Spark优化

文章目录 第1章集群环境概述1.1 集群配置概述1.2 集群规划概述 第2章 Yarn配置2.1 Yarn配置说明2.2 Yarn配置实操 第3章 Spark配置3.1 Executor配置说明3.1.1 Executor CPU核数配置3.1.2 Executor内存配置3.1.3 Executor个数配置 3.2 Driver配置说明3.3 Spark配置实操 第4章 Hi…...

Java集合面试总结(题目来源JavaGuide)

问题1&#xff1a;说说 List,Set,Map 三者的区别&#xff1f; 在 Java 中&#xff0c;List、Set 和 Map 是最常用的集合框架&#xff08;Collection Framework&#xff09;接口&#xff0c;它们的主要区别如下&#xff1a; 1. List&#xff08;列表&#xff09; 特点&#xf…...

计算机网络 应用层 笔记1(C/S模型,P2P模型,FTP协议)

应用层概述&#xff1a; 功能&#xff1a; 常见协议 应用层与其他层的关系 网络应用模型 C/S模型&#xff1a; 优点 缺点 P2P模型&#xff1a; 优点 缺点 DNS系统&#xff1a; 基本功能 系统架构 域名空间&#xff1a; DNS 服务器 根服务器&#xff1a; 顶级域…...

ES6基础内容

ES 全称 EcmaScript ,是脚本语言的规范&#xff0c;而平时经常编写的 JavaScript 是 EcmaScript 的一种实现&#xff0c;所以 ES 新特性其实指的就是 JavaScript 的新特性。 一、 let变量声明和声明特性 1.1 变量声明 <!DOCTYPE html> <html lang"en">…...

DeepSeek本地部署的一些使用体会

春节期间我也尝试了一下Deepseek的本地部署&#xff0c;方案选用了Ollama Chatbox或AnythingLLM。Chatbox里有很多有意思的“助手”&#xff0c;而AnythingLLM支持本地知识库。 网上教程很多&#xff0c;总的来说还是很方便的&#xff0c;不需要费太多脑子。甚至可以这么说&a…...

鲸鱼算法 matlab pso

算法原理 鲸鱼优化算法的核心思想是通过模拟座头鲸的捕食过程来进行搜索和优化。座头鲸在捕猎时会围绕猎物游动并产生气泡网&#xff0c;迫使猎物聚集。这一行为被用来设计搜索策略&#xff0c;使算法能够有效地找到全局最优解。 算法步骤 ‌初始化‌&#xff1a;随机生成一…...

013-51单片机红外遥控器模拟控制空调,自动制冷制热定时开关

主要功能是通过红外遥控器模拟控制空调&#xff0c;可以实现根据环境温度制冷和制热&#xff0c;能够通过遥控器设定温度&#xff0c;可以定时开关空调。 1.硬件介绍 硬件是我自己设计的一个通用的51单片机开发平台&#xff0c;可以根据需要自行焊接模块&#xff0c;这是用立创…...

在Vue3 + Vite 项目中使用 Tailwind CSS 4.0

文章目录 首先是我的package.json根据官网步骤VS Code安装插件验证是否引入成功参考资料 首先是我的package.json {"name": "aplumweb","private": true,"version": "0.0.0","type": "module","s…...

Leetcode—922. 按奇偶排序数组 II【简单】

2025每日刷题&#xff08;207&#xff09; Leetcode—922. 按奇偶排序数组 II 实现代码 class Solution { public:vector<int> sortArrayByParityII(vector<int>& nums) {for(int i 0, j 1; i < nums.size() - 1; i 2) {// 前奇后偶if(nums[i] % 2) {w…...

一个开源 GenBI AI 本地代理(确保本地数据安全),使数据驱动型团队能够与其数据进行互动,生成文本到 SQL、图表、电子表格、报告和 BI

一、GenBI AI 代理介绍&#xff08;文末提供下载&#xff09; github地址&#xff1a;https://github.com/Canner/WrenAI 本文信息图片均来源于github作者主页 在 Wren AI&#xff0c;我们的使命是通过生成式商业智能 &#xff08;GenBI&#xff09; 使组织能够无缝访问数据&…...

使用Posix共享内存区实现进程间通信

使用Posix共享内存区实现进程间通信 使用Posix共享内存区通常涉以下步骤: 进程A 调用shm_open 创建共享内存区进程A调用ftruncate修改共享内存区大小进程A 调用mmap将共享内存区映射到进程地址空间ptrA进程A 使用ptrA对共享内存区进程更改进程B 使用shm_open打开已有共享内存…...

家政预约小程序12服务详情

目录 1 修改数据源2 创建页面3 搭建轮播图4 搭建基本信息5 显示服务规格6 搭建服务描述7 设置过滤条件总结 我们已经在首页、分类页面显示了服务的列表信息&#xff0c;当点击服务的内容时候需要显示服务的详情信息&#xff0c;本篇介绍一下详情页功能的搭建。 1 修改数据源 在…...

【C语言】指针详细解读2

1.const 修饰指针 1.1 const修饰变量 变量是可以修改的&#xff0c;如果把变量的地址交给⼀个指针变量&#xff0c;通过指针变量的也可以修改这个变量。 但是如果我们希望⼀个变量加上⼀些限制&#xff0c;不能被修改&#xff0c;怎么做呢&#xff1f;这就是const的作⽤。 #in…...

MongoDB 聚合

MongoDB 中聚合(aggregate)主要用于处理数据(诸如统计平均值&#xff0c;求和等)&#xff0c;并返回计算后的数据结果。 有点类似 SQL 语句中的 count(*)。 aggregate() 方法 MongoDB中聚合的方法使用aggregate()。 语法 aggregate() 方法的基本语法格式如下所示&#xff1…...

LabVIEW涡轮诊断系统

一、项目背景与行业痛点 涡轮机械是发电厂、航空发动机、石油化工等领域的核心动力设备&#xff0c;其运行状态直接关系到生产安全与经济效益。据统计&#xff0c;涡轮故障导致的非计划停机可造成每小时数十万元的经济损失&#xff0c;且突发故障可能引发严重安全事故。传统人…...

Vim 调用外部命令学习笔记

Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

C语言中提供的第三方库之哈希表实现

一. 简介 前面一篇文章简单学习了C语言中第三方库&#xff08;uthash库&#xff09;提供对哈希表的操作&#xff0c;文章如下&#xff1a; C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...

Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案

在大数据时代&#xff0c;海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构&#xff0c;在处理大规模数据抓取任务时展现出强大的能力。然而&#xff0c;随着业务规模的不断扩大和数据抓取需求的日益复杂&#xff0c;传统…...

pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)

目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 &#xff08;1&#xff09;输入单引号 &#xff08;2&#xff09;万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...

离线语音识别方案分析

随着人工智能技术的不断发展&#xff0c;语音识别技术也得到了广泛的应用&#xff0c;从智能家居到车载系统&#xff0c;语音识别正在改变我们与设备的交互方式。尤其是离线语音识别&#xff0c;由于其在没有网络连接的情况下仍然能提供稳定、准确的语音处理能力&#xff0c;广…...