当前位置: 首页 > news >正文

Kafka Producer发送消息流程之分区器和数据收集器

文章目录

  • 1. Partitioner分区器
  • 2. 自定义分区器
  • 3. RecordAccumulator数据收集器

1. Partitioner分区器

在这里插入图片描述

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,

int partition = partition(record, serializedKey, serializedValue, cluster);

private final Partitioner partitioner;private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {//当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数if (record.partition() != null)return record.partition();// 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据if (partitioner != null) {int customPartition = partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);if (customPartition < 0) {throw new IllegalArgumentException(String.format("The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));}return customPartition;}// 如果没有分区器的情况if (serializedKey != null && !partitionerIgnoreKeys) {// hash the keyBytes to choose a partitionreturn BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());} else {return RecordMetadata.UNKNOWN_PARTITION;}}// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;}

2. 自定义分区器

新建类实现Partitioner接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。

public class MyKafkaPartitioner implements Partitioner {@Overridepublic int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {// Ensure the key is a non-null stringif (key == null || !(key instanceof String)) {throw new IllegalArgumentException("Key must be a non-null String");}// Parse the key as an integerint keyInt;try {keyInt = Integer.parseInt((String) key);} catch (NumberFormatException e) {throw new IllegalArgumentException("Key must be a numeric string", e);}// Determine the partition based on the key's odd/even natureif (keyInt % 2 == 0) {return 1; // Even keys go to partition 2} else {return 0; // Odd keys go to partition 0}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

新建一个存在多分区的Topic。

在这里插入图片描述

public class KafkaProducerPartitionorTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//指定拦截器config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());//指定分区器config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test1","key"+i,"我是你爹"+i);//发送recordproducer.send(record);Thread.sleep(500);}//关闭producerproducer.close();}
}

配置好PARTITIONER_CLASS_CONFIG后发送消息。
在这里插入图片描述
在这里插入图片描述

可以分区器成功起作用了。

3. RecordAccumulator数据收集器

通过数据校验后,数据从分区器来到数据收集器

数据收集器的工作机制

  1. 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size来调整。

  2. 缓冲区管理

    • 每个分区都有一个或多个批次,每个批次包含多条消息。
    • 当一个批次填满(即达到batch.size),或者达到发送条件(如linger.ms时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender线程。
  3. 满批次处理

    • 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足linger.ms的时间条件,RecordAccumulator会将该批次加入到一个待发送的队列中。
    • Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。

相关文章:

Kafka Producer发送消息流程之分区器和数据收集器

文章目录 1. Partitioner分区器2. 自定义分区器3. RecordAccumulator数据收集器 1. Partitioner分区器 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java&#xff0c;中doSend方法&#xff0c;记录了生产者将消息发送的流程&#xff0c;其中有一步…...

Codeforces Round 958 (Div. 2)

C o d e f o r c e s R o u n d 958 ( D i v . 2 ) \Huge{Codeforces Round 958 (Div. 2)} CodeforcesRound958(Div.2) 文章目录 Problems A. Split the Multiset题意思路标程 Problems B. Make Majority题意思路标程 Problems C. Increasing Sequence with Fixed OR题意思路标…...

<数据集>猫狗识别数据集<目标检测>

数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;3686张 标注数量(xml文件个数)&#xff1a;3686 标注数量(txt文件个数)&#xff1a;3686 标注类别数&#xff1a;2 标注类别名称&#xff1a;[cat, dog] 序号类别名称图片数框数1cat118811892dog24982498 使用标…...

Figma 中文版指南:获取和安装汉化插件

Figma是一种主流的在线团队合作设计工具&#xff0c;也是一种基于 Web 端的设计工具。在当今的设计时代&#xff0c;Figma 的使用满足了每个人的设计需求&#xff0c;不仅可以实现在线编辑&#xff0c;还可以方便日常管理&#xff0c;有效提高工作效率。然而&#xff0c;相信很…...

用c语言写一个贪吃蛇游戏

贪吃蛇游戏通常涉及到终端图形编程和简单的游戏逻辑。以下是一个基本的实现示例&#xff0c;包括贪吃蛇的移动、食物生成、碰撞检测等功能。 1. 贪吃蛇游戏的基本结构 贪吃蛇游戏可以分为以下几个部分&#xff1a; 游戏地图和终端绘制&#xff1a;使用二维数组表示游戏地图&am…...

计算机网络入门 --网络模型

计算机网络入门 --网络模型 1.OSI七层模型 1.1 模型概念 OSI七层模型是将计算机网络通信协议划分为七个不同层次的标准化框架&#xff0c;每一层都负责不同功能&#xff0c;并从物理连接层开始处理。OSI七层网络模型如下分别为&#xff1a;物理层、数据链路层、网络层、传输…...

陪玩系统小程序模式APP小程序H5系统搭建开发

随着移动互联网的营及和游戏行业的蓬轨发展&#xff0c;陪玩服务应远而生并迅速唱起&#xff0c;陪玩系统小程序作为连接游戏玩家与陪玩师的桥梁&#xff0c;其模式系统的搭建与开发是得尤为重要&#xff0c;本文将洋细凰述陪玩系统小程宗模式系统的搭建开发流程&#xff0c;包…...

算法训练营day72

题目&#xff1a;117. 软件构建 (kamacoder.com) #include<iostream> #include<unordered_map> #include<vector> #include<queue>using namespace std;int main() {int n, m;cin >> n >> m;vector<int> indegree(n, 0);unordered_…...

C语言------指针讲解(2)

目录 一、数组名的理解 二、使用指针访问数组 三、一维数组传参的本质 四、冒泡排序 五、二级指针 六、指针数组 七、指针数组模拟二维数组 一、数组名的理解 通过学习&#xff0c;我们知道&#xff1a;数组名和数组首元素的地址打印出来的结果一模一样&#xff0c;数组…...

大数据技术基础

一、大数据平台 1.大数据平台方案步骤&#xff1a; ①市场上有哪些大数据平台 ②硬件、系统、业务增长等方面 ③方案是否通过 通过后&#xff1a;按照一期目标投入 先虚拟环境部署联系&#xff0c;再实际部署 《大数据架构介绍》《Hadoop架构解析》《Hadoop集群规划》 《H…...

【文心智能体】前几天百度热搜有一条非常有趣的话题《00后疯感工牌》,看看如何通过低代码工作流方式实现图片显示

00后疯感工牌体验&#xff1a;https://mbd.baidu.com/ma/s/6yA90qtM 目录 前言比赛推荐工作流创建工作流入口创建工作流界面工作流界面HTTP工具卡点地方 总结推荐文章 前言 前几天百度热搜有一条非常有有趣《00后疯感工牌》。 想着通过文心智能体去一键生成00后疯感工牌是不是…...

C++20中的constinit说明符

constinit说明符断言(assert)变量具有静态初始化&#xff0c;即零初始化和常量初始化(zero initialization and constant initialization)&#xff0c;否则程序格式不正确(program is ill-formed)。 constinit说明符声明具有静态或线程存储持续时间(thread storage duration)的…...

Java 中的正则表达式

转义字符由反斜杠\x组成&#xff0c;用于实现特殊功能当想取消这些特殊功能时可以在前面加上反斜杠\ 例如在Java中当\出现时是转义字符的一部分&#xff0c;具有特殊意义&#xff0c;前面加一个反斜可以取消其特殊意义&#xff0c;表示1个普通的反斜杠\&#xff0c;\\\\表示2个…...

华为配置蓝牙终端定位实验

个人主页&#xff1a;知孤云出岫 目录 配置蓝牙终端定位示例 业务需求 组网需求 数据规划 配置思路 配置注意事项 操作步骤 配置文件 配置蓝牙终端定位示例 组网图形 图1 配置蓝牙终端定位示例组网图 业务需求组网需求数据规划配置思路配置注意事项操作步骤配置文件 业…...

搭建hadoop+spark完全分布式集群环境

目录 一、集群规划 二、更改主机名 三、建立主机名和ip的映射 四、关闭防火墙(master,slave1,slave2) 五、配置ssh免密码登录 六、安装JDK 七、hadoop之hdfs安装与配置 1)解压Hadoop 2)修改hadoop-env.sh 3)修改 core-site.xml 4)修改hdfs-site.xml 5) 修改s…...

pytorch-pytorch之LSTM

目录 1. nn.LSTM2. nn.LSTMCell 1. nn.LSTM 初始化函数输入参数与RNN相同&#xff0c;分别是input_size&#xff0c;hidden_size和num_layer foward函数也与RNN类似&#xff0c;只不过返回值除了out外&#xff0c;ht变为(ht,ct) 代码见下图&#xff1a; 2. nn.LSTMCell 初…...

jvm优化

1.jvm组成 什么是jvm&#xff0c;java是跨平台语言&#xff0c;对不同的平台&#xff08;windos&#xff0c;linux&#xff09;&#xff0c;有不同的jvm版本。jvm屏蔽了平台的不同&#xff0c;提供了统一的运行环境&#xff0c;让java代码无需考虑平台的差异。 jdk包含jre包含…...

网络安全——防御课实验二

在实验一的基础上&#xff0c;完成7-11题 拓扑图 7、办公区设备可以通过电信链路和移动链路上网(多对多的NAT&#xff0c;并且需要保留一个公网IP不能用来转换) 首先&#xff0c;按照之前的操作&#xff0c;创建新的安全区&#xff08;电信和移动&#xff09;分别表示两个外网…...

朴素模式匹配算法与KMP算法(非重点)

目录 一. 朴素模式匹配算法1.1 什么是字符串的匹配模式1.2 朴素模式匹配算法1.3 通过数组下标实现朴素模式匹配算法 二. KMP算法2.1 算法分析2.2 用代码实现&#xff08;只会出现在选择题&#xff0c;考察代码的概率不大&#xff09; 三. 手算next数组四. KMP算法的进一步优化4…...

[k8s源码]2.CURD deployment

加载kubernetes配置 使用 clientcmd方法&#xff0c;是通过"k8s.io/client-go/tools/clientcmd"包加载的。这个函数返回的是config和error两个值。可以看到返回的config是一个指针变量。 func clientcmd.BuildConfigFromFlags(masterUrl string, kubeconfigPath str…...

YOLO12快速部署指南:Gradio界面已配好,启动就能用

YOLO12快速部署指南&#xff1a;Gradio界面已配好&#xff0c;启动就能用 1. 为什么选择YOLO12镜像 YOLO12作为2025年最新发布的目标检测模型&#xff0c;带来了革命性的注意力为中心架构。这个预配置好的镜像让您无需任何复杂操作&#xff0c;就能立即体验最先进的目标检测技…...

PID控制在自动循迹小车中的实战应用与参数整定指南

PID控制在自动循迹小车中的实战应用与参数整定指南 当你在实验室里第一次看到自己设计的自动循迹小车歪歪扭扭地沿着黑线前进时&#xff0c;那种既兴奋又挫败的感觉一定记忆犹新。为什么理论上完美的PID算法&#xff0c;在实际应用中却总是出现超调、振荡或者响应迟缓&#xff…...

菊水PBZ40电源协议详解:从‘*IDN?’到波形设置,一份给硬件测试新人的避坑指南

菊水PBZ40电源协议实战手册&#xff1a;从基础指令到复杂波形配置的工程指南 第一次接触菊水PBZ40可编程电源时&#xff0c;面对满屏的协议指令和参数配置&#xff0c;不少硬件测试工程师都会感到无从下手。这台看似简单的设备&#xff0c;实际上隐藏着许多需要特别注意的细节…...

西门子V90参数移植实战指南:从备份到验证的完整流程

1. 西门子V90参数移植的核心价值 当你面对生产线上的V90驱动器需要更换时&#xff0c;最头疼的问题莫过于如何让新设备"继承"旧设备的全部参数特性。我经历过三次完整的设备迭代&#xff0c;深刻理解参数移植的重要性——它直接关系到设备重启后的运行稳定性。不同于…...

新手程序员必备:收藏这份Prompt指南,轻松驾驭大模型创造业务价值!

本文系统介绍了大模型Prompt的概念、撰写框架及核心原则&#xff0c;深入剖析了构建高质量Prompt的实操方法。从RTF、思考链到RISEN等五大框架&#xff0c;再到提升Prompt效果的策略&#xff0c;如明确指令、结构化输出、赋予模型思考时间等&#xff0c;帮助读者高效驾驭大模型…...

TouchGal:3个关键功能让你成为真正的Galgame收藏家

TouchGal&#xff1a;3个关键功能让你成为真正的Galgame收藏家 【免费下载链接】kun-touchgal-next TouchGAL是立足于分享快乐的一站式Galgame文化社区, 为Gal爱好者提供一片净土! 项目地址: https://gitcode.com/gh_mirrors/ku/kun-touchgal-next 你是否曾为寻找心仪的…...

Wireshark抓包实战:用一道CTF题彻底搞懂IP分片与UDP重组

Wireshark抓包实战&#xff1a;用一道CTF题彻底搞懂IP分片与UDP重组 在网络安全竞赛中&#xff0c;一个看似简单的UDP传输任务可能隐藏着协议层面的精妙设计。去年CyBRICS赛事中的lx100题目就完美诠释了这一点——参赛者需要从相机传输的UDP流量中提取图片&#xff0c;而真正的…...

2KW移相全桥整机Matlab Simulink仿真模型电源 2KW移相全桥整机Matlab Simulink仿真模型电源学习资料,报告mathcad参数设计,

2KW移相全桥整机Matlab Simulink仿真模型电源 2KW移相全桥整机Matlab Simulink仿真模型电源学习资料&#xff0c;报告mathcad参数设计&#xff0c;模型搭建过程参考资料&#xff0c;仿真模型等&#xff0c;很全面的移相全桥学习资料&#xff0c;电子资料针对你提到的 2kW 移相全…...

科研绘图不止Origin:聊聊OriginPro 2021与Python/Matlab的共存与选择

科研绘图工具三选一&#xff1a;OriginPro 2021与Python/Matlab的深度对比指南 当科研工作者面临数据可视化需求时&#xff0c;往往会在OriginPro、Python&#xff08;Matplotlib/Seaborn&#xff09;和Matlab这三款主流工具之间犹豫不决。每种工具都有其独特的优势和应用场景…...

如何高效保存B站视频?BiliTools全能下载解决方案让你无忧离线观看

如何高效保存B站视频&#xff1f;BiliTools全能下载解决方案让你无忧离线观看 【免费下载链接】BiliTools A cross-platform bilibili toolbox. 跨平台哔哩哔哩工具箱&#xff0c;支持下载视频、番剧等等各类资源 项目地址: https://gitcode.com/GitHub_Trending/bilit/BiliT…...