当前位置: 首页 > news >正文

Kafka分区策略实现

引言

Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中,合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。

轮询策略(默认)

  • 轮询策略是 Kafka 默认的分区策略(当消息没有指定键时)。生产者会按照顺序依次将消息发送到各个分区中,确保每个分区都能均匀地接收到消息,从而实现负载均衡。简单高效,能使各个分区的消息量相对均衡,充分利用每个分区的存储和处理能力。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class RoundRobinProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    随机策略

  • 随机策略会随机地将消息分配到一个分区中。这种策略在某些情况下可以实现一定程度的负载均衡,但由于是随机分配,可能会导致分区之间的消息分布不够均匀。可以通过自定义分区器来实现随机策略。
  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;public class RandomPartitioner implements Partitioner {private final Random random = new Random();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);return random.nextInt(partitions.size());}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用随机分区器的生产者示例
    public class RandomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "RandomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

    按键哈希策略

  • 当消息指定了键时,Kafka 会根据键的哈希值将消息分配到特定的分区中。相同键的消息会被分配到同一个分区,这有助于保证具有相同业务逻辑的消息顺序性。可以保证消息的局部有序性,例如在处理用户相关的消息时,将同一个用户的消息发送到同一个分区,方便后续的处理和分析。
  • import org.apache.kafka.clients.producer.*;
    import java.util.Properties;public class KeyBasedProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "user-" + (i % 2), "message-" + i);producer.send(record);}producer.close();}
    }

    自定义分区策略(实现接口)

  • 当上述默认策略无法满足业务需求时,可以自定义分区策略。通过实现org.apache.kafka.clients.producer.Partitioner接口,重写partition方法来实现自定义的分区逻辑。例如,根据消息的某些特定字段(如时间、地理位置等)来进行分区,以满足特定的业务需求。

  • import org.apache.kafka.clients.producer.*;
    import java.util.List;
    import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 自定义分区逻辑,这里简单示例根据消息值的长度分区String message = (String) value;return message.length() % partitions.size();}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
    }// 使用自定义分区器的生产者示例
    public class CustomProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "CustomPartitioner");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "message-" + i);producer.send(record);}producer.close();}
    }

相关文章:

Kafka分区策略实现

引言 Kafka 的分区策略决定了生产者发送的消息会被分配到哪个分区中&#xff0c;合理的分区策略有助于实现负载均衡、提高消息处理效率以及满足特定的业务需求。 轮询策略&#xff08;默认&#xff09; 轮询策略是 Kafka 默认的分区策略&#xff08;当消息没有指定键时&…...

【归属地】批量号码归属地查询按城市高速的分流,基于WPF的解决方案

在现代商业活动中&#xff0c;企业为了提高营销效果和资源利用效率&#xff0c;需要针对不同地区的市场特点开展精准营销。通过批量号码归属地查询并按城市分流&#xff0c;可以为企业的营销决策提供有力支持。 短信营销&#xff1a;一家连锁餐饮企业计划开展促销活动&#xf…...

为AI聊天工具添加一个知识系统 之78 详细设计之19 正则表达式 之6

本文要点 要点 本项目设计的正则表达式 是一个 动态正则匹配框架。它是一个谓词系统&#xff1a;谓词 是运动&#xff0c;主语是“维度”&#xff0c;表语是 语言处理。主语的一个 双动结构。 Reg三大功能 语法验证、语义检查和 语用检验&#xff0c;三者 &#xff1a;语义约…...

使用Java操作Redis数据类型的详解指南

SEO Meta Description: 详细介绍如何使用Java操作Redis的各种数据类型&#xff0c;包括字符串、哈希、列表、集合和有序集合&#xff0c;提供代码示例和最佳实践。 介绍 Redis是一种开源的内存数据结构存储&#xff0c;用作数据库、缓存和消息代理。它支持多种数据结构&#…...

一表总结 Java 的3种设计模式与6大设计原则

设计模式通常分为三大类&#xff1a;创建型、结构型和行为型。 创建型模式&#xff1a;主要用于解决对象创建问题结构型模式&#xff1a;主要用于解决对象组合问题行为型模式&#xff1a;主要用于解决对象之间的交互问题 创建型模式 创建型模式关注于对象的创建机制&#xf…...

Hive on Spark优化

文章目录 第1章集群环境概述1.1 集群配置概述1.2 集群规划概述 第2章 Yarn配置2.1 Yarn配置说明2.2 Yarn配置实操 第3章 Spark配置3.1 Executor配置说明3.1.1 Executor CPU核数配置3.1.2 Executor内存配置3.1.3 Executor个数配置 3.2 Driver配置说明3.3 Spark配置实操 第4章 Hi…...

Java集合面试总结(题目来源JavaGuide)

问题1&#xff1a;说说 List,Set,Map 三者的区别&#xff1f; 在 Java 中&#xff0c;List、Set 和 Map 是最常用的集合框架&#xff08;Collection Framework&#xff09;接口&#xff0c;它们的主要区别如下&#xff1a; 1. List&#xff08;列表&#xff09; 特点&#xf…...

计算机网络 应用层 笔记1(C/S模型,P2P模型,FTP协议)

应用层概述&#xff1a; 功能&#xff1a; 常见协议 应用层与其他层的关系 网络应用模型 C/S模型&#xff1a; 优点 缺点 P2P模型&#xff1a; 优点 缺点 DNS系统&#xff1a; 基本功能 系统架构 域名空间&#xff1a; DNS 服务器 根服务器&#xff1a; 顶级域…...

ES6基础内容

ES 全称 EcmaScript ,是脚本语言的规范&#xff0c;而平时经常编写的 JavaScript 是 EcmaScript 的一种实现&#xff0c;所以 ES 新特性其实指的就是 JavaScript 的新特性。 一、 let变量声明和声明特性 1.1 变量声明 <!DOCTYPE html> <html lang"en">…...

DeepSeek本地部署的一些使用体会

春节期间我也尝试了一下Deepseek的本地部署&#xff0c;方案选用了Ollama Chatbox或AnythingLLM。Chatbox里有很多有意思的“助手”&#xff0c;而AnythingLLM支持本地知识库。 网上教程很多&#xff0c;总的来说还是很方便的&#xff0c;不需要费太多脑子。甚至可以这么说&a…...

鲸鱼算法 matlab pso

算法原理 鲸鱼优化算法的核心思想是通过模拟座头鲸的捕食过程来进行搜索和优化。座头鲸在捕猎时会围绕猎物游动并产生气泡网&#xff0c;迫使猎物聚集。这一行为被用来设计搜索策略&#xff0c;使算法能够有效地找到全局最优解。 算法步骤 ‌初始化‌&#xff1a;随机生成一…...

013-51单片机红外遥控器模拟控制空调,自动制冷制热定时开关

主要功能是通过红外遥控器模拟控制空调&#xff0c;可以实现根据环境温度制冷和制热&#xff0c;能够通过遥控器设定温度&#xff0c;可以定时开关空调。 1.硬件介绍 硬件是我自己设计的一个通用的51单片机开发平台&#xff0c;可以根据需要自行焊接模块&#xff0c;这是用立创…...

在Vue3 + Vite 项目中使用 Tailwind CSS 4.0

文章目录 首先是我的package.json根据官网步骤VS Code安装插件验证是否引入成功参考资料 首先是我的package.json {"name": "aplumweb","private": true,"version": "0.0.0","type": "module","s…...

Leetcode—922. 按奇偶排序数组 II【简单】

2025每日刷题&#xff08;207&#xff09; Leetcode—922. 按奇偶排序数组 II 实现代码 class Solution { public:vector<int> sortArrayByParityII(vector<int>& nums) {for(int i 0, j 1; i < nums.size() - 1; i 2) {// 前奇后偶if(nums[i] % 2) {w…...

一个开源 GenBI AI 本地代理(确保本地数据安全),使数据驱动型团队能够与其数据进行互动,生成文本到 SQL、图表、电子表格、报告和 BI

一、GenBI AI 代理介绍&#xff08;文末提供下载&#xff09; github地址&#xff1a;https://github.com/Canner/WrenAI 本文信息图片均来源于github作者主页 在 Wren AI&#xff0c;我们的使命是通过生成式商业智能 &#xff08;GenBI&#xff09; 使组织能够无缝访问数据&…...

使用Posix共享内存区实现进程间通信

使用Posix共享内存区实现进程间通信 使用Posix共享内存区通常涉以下步骤: 进程A 调用shm_open 创建共享内存区进程A调用ftruncate修改共享内存区大小进程A 调用mmap将共享内存区映射到进程地址空间ptrA进程A 使用ptrA对共享内存区进程更改进程B 使用shm_open打开已有共享内存…...

家政预约小程序12服务详情

目录 1 修改数据源2 创建页面3 搭建轮播图4 搭建基本信息5 显示服务规格6 搭建服务描述7 设置过滤条件总结 我们已经在首页、分类页面显示了服务的列表信息&#xff0c;当点击服务的内容时候需要显示服务的详情信息&#xff0c;本篇介绍一下详情页功能的搭建。 1 修改数据源 在…...

【C语言】指针详细解读2

1.const 修饰指针 1.1 const修饰变量 变量是可以修改的&#xff0c;如果把变量的地址交给⼀个指针变量&#xff0c;通过指针变量的也可以修改这个变量。 但是如果我们希望⼀个变量加上⼀些限制&#xff0c;不能被修改&#xff0c;怎么做呢&#xff1f;这就是const的作⽤。 #in…...

MongoDB 聚合

MongoDB 中聚合(aggregate)主要用于处理数据(诸如统计平均值&#xff0c;求和等)&#xff0c;并返回计算后的数据结果。 有点类似 SQL 语句中的 count(*)。 aggregate() 方法 MongoDB中聚合的方法使用aggregate()。 语法 aggregate() 方法的基本语法格式如下所示&#xff1…...

LabVIEW涡轮诊断系统

一、项目背景与行业痛点 涡轮机械是发电厂、航空发动机、石油化工等领域的核心动力设备&#xff0c;其运行状态直接关系到生产安全与经济效益。据统计&#xff0c;涡轮故障导致的非计划停机可造成每小时数十万元的经济损失&#xff0c;且突发故障可能引发严重安全事故。传统人…...

Qwen3-ASR-1.7B部署案例:AI初创公司低成本构建ASR SaaS服务

Qwen3-ASR-1.7B部署案例&#xff1a;AI初创公司低成本构建ASR SaaS服务 想象一下&#xff0c;你是一家AI初创公司的技术负责人&#xff0c;老板给你下了个任务&#xff1a;两周内&#xff0c;为公司的新产品上线一个语音转文字&#xff08;ASR&#xff09;功能。要求是识别要准…...

3步精通n8n浏览器自动化:从安装到流程编排

3步精通n8n浏览器自动化&#xff1a;从安装到流程编排 【免费下载链接】n8n-nodes-puppeteer n8n node for requesting webpages using Puppeteer 项目地址: https://gitcode.com/gh_mirrors/n8/n8n-nodes-puppeteer n8n-nodes-puppeteer是一款专为n8n平台开发的浏览器控…...

HunyuanVideo-Foley效果展示:为体育直播生成实时观众欢呼/球鞋摩擦/哨声

HunyuanVideo-Foley效果展示&#xff1a;为体育直播生成实时观众欢呼/球鞋摩擦/哨声 1. 惊艳的体育音效生成能力 想象一下&#xff0c;当篮球运动员急停变向时&#xff0c;球鞋与地板摩擦发出的"吱吱"声&#xff1b;当足球射门得分时&#xff0c;全场观众爆发的欢呼…...

MinerU本地部署安全吗?数据隐私保护实战配置

MinerU本地部署安全吗&#xff1f;数据隐私保护实战配置 1. 引言&#xff1a;当AI遇见你的敏感文档 想象一下这个场景&#xff1a;你有一份包含商业机密的合同PDF&#xff0c;或者一份涉及个人隐私的医疗报告扫描件。你想用AI快速提取里面的关键信息&#xff0c;但又担心把文…...

从音乐均衡器到语音降噪:深入浅出玩转数字谐振器设计与MATLAB仿真

从音乐均衡器到语音降噪&#xff1a;深入浅出玩转数字谐振器设计与MATLAB仿真 你是否曾在调整音乐播放器的均衡器时好奇——那些滑动条如何精确控制特定频段的声音强弱&#xff1f;这背后隐藏的数字信号处理魔法&#xff0c;正是我们今天要探索的数字谐振器技术。无论是提取语音…...

提示工程架构师经验总结:Agentic AI环保项目从失败到成功的关键转折点

提示工程架构师经验总结:Agentic AI环保项目从失败到成功的关键转折点 一、引言:那些“死在落地路上”的环保AI 你知道吗? 全球每年有800万吨塑料流入海洋,相当于每秒钟往海里倒一辆卡车的垃圾;中国城市生活垃圾年清运量超过3亿吨,但仅有**23%**的垃圾得到规范分拣——…...

Element React:构建企业级UI的React组件解决方案

Element React&#xff1a;构建企业级UI的React组件解决方案 【免费下载链接】element-react Element UI 项目地址: https://gitcode.com/gh_mirrors/el/element-react 作为React开发者&#xff0c;你是否曾为UI组件的一致性和开发效率而困扰&#xff1f;Element React作…...

告别串口线!手把手教你用WCH-LinkE的SDI功能实现CH32V303RCT6的无线调试打印

无线调试革命&#xff1a;基于WCH-LinkE的SDI功能实现CH32V303RCT6高效打印 调试嵌入式系统时&#xff0c;串口打印是最常用的调试手段之一。然而传统串口调试需要占用宝贵的硬件UART资源&#xff0c;在IO口紧张或串口已被占用的场景下尤为不便。沁恒微电子推出的SDI(Serial Da…...

SDMatte效果对比图谱:SDMatte/RemBG/BackgroundMattingV2在玻璃场景PK

SDMatte效果对比图谱&#xff1a;SDMatte/RemBG/BackgroundMattingV2在玻璃场景PK 1. 引言&#xff1a;玻璃抠图的特殊挑战 玻璃材质因其透明和反光特性&#xff0c;一直是图像抠图领域最具挑战性的对象之一。传统抠图工具在处理玻璃制品时&#xff0c;往往会出现边缘断裂、透…...

OpenClaw 实战:3 分钟打造一个真正能「干活」的 AI 员工

OpenClaw 实战&#xff1a;3 分钟打造一个真正能「干活」的 AI 员工 市面上关于 OpenClaw 入门的文章一抓一大把&#xff0c;但真正能落地应用的实践却少之又少。经过半个多月的深度测试&#xff0c;我从搜索精度到人格配置进行了全量跑测&#xff0c;整理出这份让 Agent 真正…...