Kafka Producer发送消息流程之分区器和数据收集器
文章目录
- 1. Partitioner分区器
- 2. 自定义分区器
- 3. RecordAccumulator数据收集器
1. Partitioner分区器
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
,中doSend方法,记录了生产者将消息发送的流程,其中有一步就是计算当前消息应该发送往对应Topic哪一个分区,
int partition = partition(record, serializedKey, serializedValue, cluster);
private final Partitioner partitioner;private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {//当record的分区已存在,则直接返回,这对应了创建Record时可以手动传入partition参数if (record.partition() != null)return record.partition();// 如果存在partitioner分区器,则使用Partitioner.partition方法计算分区数据if (partitioner != null) {int customPartition = partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);if (customPartition < 0) {throw new IllegalArgumentException(String.format("The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));}return customPartition;}// 如果没有分区器的情况if (serializedKey != null && !partitionerIgnoreKeys) {// hash the keyBytes to choose a partitionreturn BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());} else {return RecordMetadata.UNKNOWN_PARTITION;}}// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;}
2. 自定义分区器
新建类实现Partitioner
接口,key是字符串数字,奇数送到分区0,偶数送到分区1 。
public class MyKafkaPartitioner implements Partitioner {@Overridepublic int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {// Ensure the key is a non-null stringif (key == null || !(key instanceof String)) {throw new IllegalArgumentException("Key must be a non-null String");}// Parse the key as an integerint keyInt;try {keyInt = Integer.parseInt((String) key);} catch (NumberFormatException e) {throw new IllegalArgumentException("Key must be a numeric string", e);}// Determine the partition based on the key's odd/even natureif (keyInt % 2 == 0) {return 1; // Even keys go to partition 2} else {return 0; // Odd keys go to partition 0}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
新建一个存在多分区的Topic。
public class KafkaProducerPartitionorTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//指定拦截器config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());//指定分区器config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test1","key"+i,"我是你爹"+i);//发送recordproducer.send(record);Thread.sleep(500);}//关闭producerproducer.close();}
}
配置好PARTITIONER_CLASS_CONFIG
后发送消息。
可以分区器成功起作用了。
3. RecordAccumulator数据收集器
通过数据校验后,数据从分区器
来到数据收集器
。
数据收集器的工作机制
-
队列缓存:
RecordAccumulator
为每个分区维护一个队列。默认情况下,每个队列的批次大小(buffer size)是16KB,这个大小可以通过配置参数batch.size
来调整。 -
缓冲区管理:
- 每个分区都有一个或多个批次,每个批次包含多条消息。
- 当一个批次填满(即达到
batch.size
),或者达到发送条件(如linger.ms
时间窗口,即发送消息前等待的时间)时,批次会被标记为可发送状态,并被传递给Sender
线程。
-
满批次处理:
- 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足
linger.ms
的时间条件,RecordAccumulator
会将该批次加入到一个待发送的队列中。 Sender
线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。
- 当某个分区的队列中的某个批次大小超过了16KB(默认值)或满足
相关文章:

Kafka Producer发送消息流程之分区器和数据收集器
文章目录 1. Partitioner分区器2. 自定义分区器3. RecordAccumulator数据收集器 1. Partitioner分区器 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java,中doSend方法,记录了生产者将消息发送的流程,其中有一步…...

Codeforces Round 958 (Div. 2)
C o d e f o r c e s R o u n d 958 ( D i v . 2 ) \Huge{Codeforces Round 958 (Div. 2)} CodeforcesRound958(Div.2) 文章目录 Problems A. Split the Multiset题意思路标程 Problems B. Make Majority题意思路标程 Problems C. Increasing Sequence with Fixed OR题意思路标…...

<数据集>猫狗识别数据集<目标检测>
数据集格式:VOCYOLO格式 图片数量:3686张 标注数量(xml文件个数):3686 标注数量(txt文件个数):3686 标注类别数:2 标注类别名称:[cat, dog] 序号类别名称图片数框数1cat118811892dog24982498 使用标…...

Figma 中文版指南:获取和安装汉化插件
Figma是一种主流的在线团队合作设计工具,也是一种基于 Web 端的设计工具。在当今的设计时代,Figma 的使用满足了每个人的设计需求,不仅可以实现在线编辑,还可以方便日常管理,有效提高工作效率。然而,相信很…...

用c语言写一个贪吃蛇游戏
贪吃蛇游戏通常涉及到终端图形编程和简单的游戏逻辑。以下是一个基本的实现示例,包括贪吃蛇的移动、食物生成、碰撞检测等功能。 1. 贪吃蛇游戏的基本结构 贪吃蛇游戏可以分为以下几个部分: 游戏地图和终端绘制:使用二维数组表示游戏地图&am…...

计算机网络入门 --网络模型
计算机网络入门 --网络模型 1.OSI七层模型 1.1 模型概念 OSI七层模型是将计算机网络通信协议划分为七个不同层次的标准化框架,每一层都负责不同功能,并从物理连接层开始处理。OSI七层网络模型如下分别为:物理层、数据链路层、网络层、传输…...

陪玩系统小程序模式APP小程序H5系统搭建开发
随着移动互联网的营及和游戏行业的蓬轨发展,陪玩服务应远而生并迅速唱起,陪玩系统小程序作为连接游戏玩家与陪玩师的桥梁,其模式系统的搭建与开发是得尤为重要,本文将洋细凰述陪玩系统小程宗模式系统的搭建开发流程,包…...

算法训练营day72
题目:117. 软件构建 (kamacoder.com) #include<iostream> #include<unordered_map> #include<vector> #include<queue>using namespace std;int main() {int n, m;cin >> n >> m;vector<int> indegree(n, 0);unordered_…...

C语言------指针讲解(2)
目录 一、数组名的理解 二、使用指针访问数组 三、一维数组传参的本质 四、冒泡排序 五、二级指针 六、指针数组 七、指针数组模拟二维数组 一、数组名的理解 通过学习,我们知道:数组名和数组首元素的地址打印出来的结果一模一样,数组…...

大数据技术基础
一、大数据平台 1.大数据平台方案步骤: ①市场上有哪些大数据平台 ②硬件、系统、业务增长等方面 ③方案是否通过 通过后:按照一期目标投入 先虚拟环境部署联系,再实际部署 《大数据架构介绍》《Hadoop架构解析》《Hadoop集群规划》 《H…...

【文心智能体】前几天百度热搜有一条非常有趣的话题《00后疯感工牌》,看看如何通过低代码工作流方式实现图片显示
00后疯感工牌体验:https://mbd.baidu.com/ma/s/6yA90qtM 目录 前言比赛推荐工作流创建工作流入口创建工作流界面工作流界面HTTP工具卡点地方 总结推荐文章 前言 前几天百度热搜有一条非常有有趣《00后疯感工牌》。 想着通过文心智能体去一键生成00后疯感工牌是不是…...

C++20中的constinit说明符
constinit说明符断言(assert)变量具有静态初始化,即零初始化和常量初始化(zero initialization and constant initialization),否则程序格式不正确(program is ill-formed)。 constinit说明符声明具有静态或线程存储持续时间(thread storage duration)的…...

Java 中的正则表达式
转义字符由反斜杠\x组成,用于实现特殊功能当想取消这些特殊功能时可以在前面加上反斜杠\ 例如在Java中当\出现时是转义字符的一部分,具有特殊意义,前面加一个反斜可以取消其特殊意义,表示1个普通的反斜杠\,\\\\表示2个…...

华为配置蓝牙终端定位实验
个人主页:知孤云出岫 目录 配置蓝牙终端定位示例 业务需求 组网需求 数据规划 配置思路 配置注意事项 操作步骤 配置文件 配置蓝牙终端定位示例 组网图形 图1 配置蓝牙终端定位示例组网图 业务需求组网需求数据规划配置思路配置注意事项操作步骤配置文件 业…...

搭建hadoop+spark完全分布式集群环境
目录 一、集群规划 二、更改主机名 三、建立主机名和ip的映射 四、关闭防火墙(master,slave1,slave2) 五、配置ssh免密码登录 六、安装JDK 七、hadoop之hdfs安装与配置 1)解压Hadoop 2)修改hadoop-env.sh 3)修改 core-site.xml 4)修改hdfs-site.xml 5) 修改s…...

pytorch-pytorch之LSTM
目录 1. nn.LSTM2. nn.LSTMCell 1. nn.LSTM 初始化函数输入参数与RNN相同,分别是input_size,hidden_size和num_layer foward函数也与RNN类似,只不过返回值除了out外,ht变为(ht,ct) 代码见下图: 2. nn.LSTMCell 初…...

jvm优化
1.jvm组成 什么是jvm,java是跨平台语言,对不同的平台(windos,linux),有不同的jvm版本。jvm屏蔽了平台的不同,提供了统一的运行环境,让java代码无需考虑平台的差异。 jdk包含jre包含…...

网络安全——防御课实验二
在实验一的基础上,完成7-11题 拓扑图 7、办公区设备可以通过电信链路和移动链路上网(多对多的NAT,并且需要保留一个公网IP不能用来转换) 首先,按照之前的操作,创建新的安全区(电信和移动)分别表示两个外网…...

朴素模式匹配算法与KMP算法(非重点)
目录 一. 朴素模式匹配算法1.1 什么是字符串的匹配模式1.2 朴素模式匹配算法1.3 通过数组下标实现朴素模式匹配算法 二. KMP算法2.1 算法分析2.2 用代码实现(只会出现在选择题,考察代码的概率不大) 三. 手算next数组四. KMP算法的进一步优化4…...

[k8s源码]2.CURD deployment
加载kubernetes配置 使用 clientcmd方法,是通过"k8s.io/client-go/tools/clientcmd"包加载的。这个函数返回的是config和error两个值。可以看到返回的config是一个指针变量。 func clientcmd.BuildConfigFromFlags(masterUrl string, kubeconfigPath str…...

使用base64通用文件上传
编写一个上传文件的组件 tuku,点击图片上传后使用FileReader异步读取文件的内容,读取完成后获得文件名和base64码,调用后端uploadApi,传入姓名和base64文件信息,后端存入nginx中,用于访问 tuku.ts组件代码: <templa…...

Python深度学习
python深度学习,python代码定制, 可做创新点 创新思路 代码改进跑通 深度学习 Python代跑时间序列预测 分析 代码编写 python编程 深度学习算法 自然语言处理 神经网络跑通指导 爬虫调试代做 项目指导 定制帮做 改进 提升 创新 优化 Python Matlab C…...

django报错(三):No crontab program或got an unexpected keyword argument ‘user’
Crontab是linux系统上的定时管理模块,简单配置,灵活使用。但是要在windows使用必须借助Cygwin等虚拟工具,否则会报错“No crontab program”。如下图: python-crontab是其提供了python模块对crontab的访问,即可以通过p…...

数据库(创建数据库和表)
目录 一:创建数据库 二:创建表 2.1:创建employees表 2.2:创建orders表 2.3:创建invoices表 一:创建数据库 mysql> create database mydb6_product; Query OK, 1 row affected (0.01 sec) mysql&g…...

Log4j的原理及应用详解(一)
本系列文章简介: 在软件开发的广阔领域中,日志记录是一项至关重要的活动。它不仅帮助开发者追踪程序的执行流程,还在问题排查、性能监控以及用户行为分析等方面发挥着不可替代的作用。随着软件系统的日益复杂,对日志管理的需求也日…...

ubuntu系统Docker常用命令
1.查看docker是否开机启动 sudo systemctl list-unit-files | grep enable|grep docker 2.设置开机启动 sudo systemctl enable docker 3.关闭docker开机启动 sudo systemctl disable docker 4.开启docker服务 sudo service docker start 5.关闭docker服务 sudo servi…...

韦东山嵌入式linux系列-驱动设计的思想(面向对象/分层/分离)
1 面向对象 字符设备驱动程序抽象出一个 file_operations 结构体; 我们写的程序针对硬件部分抽象出 led_operations 结构体。 2 分层 上下分层,比如我们前面写的 LED 驱动程序就分为 2 层: ① 上层实现硬件无关的操作,比如注册…...

0/1背包
0/1背包 背包问题是DP最经典的类型之一,而0/1背包是最经典最基础的背包问题。 背包体积为 V V V, n n n种物品,每种物品只有1个,第 i i i种物品对应体积为 c i c_i ci,价值为 w i w_i wi,怎样装填能使…...

Linux的进程和权限的基本命令
目录 基本命令 man find date cal du ln exit grep 基本命令-帮助查询: wc cat more less head tail echo alias unalias 基本命令-进程管理: ps kill top 操作系统负载查看 用户分类: 程序用户 普通用户&#x…...

鼠标录制工具怎么挑选?9款电脑鼠标录制工具分享(2024)
你知道鼠标录制工具吗?鼠标录制工具通过记录和回放用户的操作,帮助自动化重复性任务,提高工作效率和精确性。它可以帮助用户简化很多繁琐的操作步骤,非常适合运用在电脑自动化任务、游戏自动化中,给大家整理了2024年9款…...