当前位置: 首页 > news >正文

Kafka第四篇——生产数据总体概括,源码解析分区策略,数据收集器,Sender发送线程,key值

目录

流程图以及总体概述

拦截器

分区器以及分区计算策略

为啥进行分区计算?

producer生产者怎么知道有哪些分区?

分区计算

如何自定义实现分区器?

想说的在图里啦!宝宝!💡 ​编辑

如果key值忘记传递了呢!?

数据校验

数据收集器

注意

Sender发送线程


流程图以及总体概述

producer进行发送record,record对象包含topic,key,value,partition,时间戳,通过拦截器,将数据信息发送给broker,但是咱们也不知道把数据信息发送给哪个broker,而我们的Metadata就可以获取出来这个,如下面代码就是获取到9092.获取到缓存,放在底层。然后经过key对象的序列化,value对象的序列化,对应在代码中就是,configMap.put()那两行,并且这个是必须写的。然后经过分区器,partition,每个数据需要发送到broker中,每个消息发送到特定的主题,主题分为多个分区。kafka在发送数据时候,可以将数据发送到指定主题的指定分区,kafka会自动决定将消息发送到那个分区。分区器有那种判断发送给那个broker。然后进行数据校验。在数据收集器当中,相当于一个缓冲池,将同一个主题的数据可以存放在一个队列中,按“批”为单位进行发送,提高效率,并且指定了每批的大小是16K,

数据已经缓存到数据收集器后,就可以进行发送数据喽!此时就不会按topic为单位进行发送了,就可以重新整合,以节点为主!(why??因为不同的topic可以发送给同一个节点呀傻瓜!也就是说,在缓冲区以topic为单位,在发送线程中以节点为单位)封装请求,然后放在缓冲区中。再由网络通信从缓冲区中取出,发送给socket。在缓冲区,需要注意概念,在途请求缓冲区为5,表示同一个节点同一时间处理的请求数量。

拦截器

数据的规范化处理。可以有多个,可以按顺序执行数据的被拦截。和框架那块的一样。

onsend方法就是主要进行执行拦截规则的,for(ProducerInterceptor<K,V> interceptor:this.intercept)就可以循环执行多个拦截器,并且,看try,catch内容,无论当前拦截器发生什么异常,都不会影响到下一个拦截器的执行,更不会影响整个数据的发送。

自定义实现拦截器,帮助自己更好地了解拦截器。

java
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;public class ValueInterceptorTest implements ProducerInterceptor<String, String> {/*** 实现拦截器规则**/@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {}/*** 当记录被Broker确认接收时调用** */@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {// 这个方法在记录被Broker确认接收时被调用// 根据确认情况实现自定义的处理逻辑}/*** 关闭拦截器时调用*/@Overridepublic void close() {}/*** 配置拦截器时调用**configs 配置信息*/@Overridepublic void configure(Map<String, ?> configs) {}
}

分区器以及分区计算策略

为啥进行分区计算?

 数据发送给某个主题,主题会有很多分区,会在不同的broker当中,所以要算分区编号,不然连数据要发送给主题哪个节点都不知道。但是分区标号也得有范围呀!

producer生产者怎么知道有哪些分区?

从元数据缓存中获取到producer需要的主题相关信息

意味着只要元数据信息缓存了,主题的相关信息我们就可以拿到。

 分区器通过Matadata获取到分区,副本id,leadid之类的,

分区计算

¹²³⁴ 如果参数中指定了分区编号就直接返回

如何自定义实现分区器?

1.实现partitioner接口, 重写相关方法。感觉主要就是实现partition方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;public class CustomPartitioner implements Partitioner {/*** 配置分区器** @param configs 配置信息*/@Overridepublic void configure(Map<String, ?> configs) {}/*** 计算分区** @param topic       主题名称* @param key         消息键,可以为null* @param keyBytes    消息键的字节数组表示,可以为null* @param value       消息值* @param valueBytes  消息值的字节数组表示* @param cluster     Kafka集群信息* @return 分配的分区ID*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 如果键为null,则使用轮询分区策略if (keyBytes == null) {return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;}// 使用键的hashCode来计算分区return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}/*** 关闭分区器*/@Overridepublic void close() {// 可以在这里进行资源的清理操作,通常分区器不需要进行额外的关闭操作}
}

想说的在图里啦!宝宝!💡 

嘿嘿,这里解决了之前的问题,key并不像之前学到的hashmap中消费者用来消费的key,它的核心作用就是用来进行分区计算

这个点就可以从:没有指定特定的分区标号,并且分区标号没有超过范围!序列化key以及分区器不忽略key的情况下看出来。partitionForKey()方法中就用不加密的hash算法并且对分区数量进行取余处理计算。

如果key值忘记传递了呢!?

return RecordMetadata.UNKNOWN_PARTITION;(这是一个表示未知分区的常量)。表明当前生产者无法确定消息发送到哪个分区,可能需要进一步处理或记录错误信息。那感觉也不太对啊,不知道把key发送到哪一个分区!

其实他是在数据收集器那一步追加了,看这个accumulator.append方法!

点进去哦!分区标号计算:粘性分区策略

如果没有进行传递key参数,也就是当前分区是未知分区,就会根据当前主题的分区负载情况来动态获取分区标号。这就是一种优化后的粘性分区策略!如图1.1

🤔图1.1  当前分区是未知分区,就会根据当前主题的分区负载因子来动态获取分区标号。

会根据当前分区负载情况判断去那个分区!如图1.2

✅当分区负载情况为空,就动态去随机选择分区,然后就尽可能的给这个分区追加数据(粘性分区策略),并且也不能超过数值batch.size=16K。如果超过这个阈值就会切换到下一个分区。并且更新分区负载情况。

✅当前主题分区负载情况不为空,那就不用随机生成了。会根据分区负载使用频率随机生成一个随机权重,然后利用二分查找算法找与权重相近的值,根据这个值获取到相应的分区,就可以得到我们的分区标号啦!

图1.2

数据校验

当数据校验成功,数据就到达了数据收集器当中。数据收集器,生产的数据作为一个临时的存储。

数据收集器

 

如果直接生产一条数据就通过网络通信来发送,这样做效率很低哦!像javaio流读取文件一样,读一个字节写一个字节,性能很低呀!

所以就有了ProducerBatch双端队列,从很减少频繁的网络交互,提高传输效率!

在神魔时候真正进行网络交互呢??

嘿嘿,看最大范围,batch.size=16K。在前面分区计算中,有一个粘性分区策略(一旦确定了一个分区,就尽可能往这个分区中追加数据,追加数据就是往producebatch中追加数据,当到达16K,就会被sender检测到),里面就有“没有传递key,如果没有分区负载情况,就会随机生成分区,不能超过最大

注意

🤔而且这里的16k意思是超过16k就不再接收数据了,不意味着数据不能超过16k!比如数据是20k,kafka要保证数据的完整性,发现这个数据值大于16k,就立马关闭,不再接收!

Sender发送线程

 kafka底层就采用了很多生产者消费者模型,一个放一个取。数据收集器是按照主题分区来放数据,而Sender发送线程会按照broker重新整合。(主题的不同分区会放在不同的节点当中,所以有可能存在不同主题的分区在同一个节点当中)。

当整合好之后,就会封装成produceRequest,进而发送给网络客户端。

默认发送时间0,也就是消息取过来就可以直接发送了!

注意这个在途请求缓冲区数量:5

  • Broker 和 Topic:每个 broker 可以存储一个或多个 topic 的数据分片,Kafka 集群的每个 broker 都可以服务于多个 topic
  • Topic 和 分区:每个 topic 可以被分为多个分区,分区内的消息顺序是有序的,而不同分区之间的消息顺序则不保证,分区允许 Kafka 横向扩展和提高并行处理能力。
  • Broker 和 分区:每个 broker 可能会存储多个 topic 的多个分区数据,这样在整个 Kafka 集群中就形成了数据的分布式存储和处理能力。 

相关文章:

Kafka第四篇——生产数据总体概括,源码解析分区策略,数据收集器,Sender发送线程,key值

目录 流程图以及总体概述 拦截器 分区器以及分区计算策略 为啥进行分区计算&#xff1f; producer生产者怎么知道有哪些分区&#xff1f; 分区计算 如何自定义实现分区器&#xff1f; 想说的在图里啦&#xff01;宝宝&#xff01;&#x1f4a1; ​编辑 如果key值忘记传递了呢&a…...

二叉树的链式结构

前言 Hello,友友们&#xff0c;小编将继续重新开始数据结构的学习&#xff0c;前面讲解了堆的部分知识&#xff0c;今天将讲解二叉树的链式结构的部分内容。 1.概念回顾与新增 二叉树是一种数据结构&#xff0c;其中每个节点最多有两个子节点&#xff0c;分别是左子节点和右子…...

【STM32】在标准库中使用DMA

1.MDA简介 DMA全称Direct Memory Access,直接存储区访问。 DMA传输将数据从一个地址空间复制到另一个地址空间。当CPU初始化这个传输动作&#xff0c;传输动作本身是由DMA控制器来实现和完成的。DMA传输方式无需CPU直接控制传输&#xff0c;也没有中断处理方式那样保留现场和…...

多线程详解

文章目录 多线程创建方式p3一些教程 狂神说 多线程创建方式p3 代码: package com.demo1;//创建线程方式一:继承Thread类&#xff0c;重写run()方法&#xff0c;调用start开启线程/*** 总结:注意,线程开启不一定立即执行,dCPU调度执行*/public class TestThread1 extends Thre…...

软件工程需求之:业务需求与用户需求

在软件开发项目中&#xff0c;"业务需求"和"用户需求"是两个核心概念&#xff0c;它们分别从不同的角度描述了软件应该具备的功能和特性。理解这两个概念的区别对于成功地规划和开发软件至关重要。 业务需求 业务需求主要关注于软件项目如何帮助实现企业…...

Nettyの源码分析

本篇为Netty系列的最后一篇&#xff0c;按照惯例会简单介绍一些Netty相关核心源码。 1、Netty启动源码分析 代码就使用最初的Netty服务器案例&#xff0c;在bind这一行打上断点&#xff0c;观察启动的全过程&#xff1a; 由于某些方法的调用链过深&#xff0c;节约篇幅&#xf…...

MySQL远程登录

root是超级管理员&#xff0c;默认情况下&#xff0c;root不能作为远程登录的用户名&#xff0c;远程登录前&#xff0c;需要将登录的数据库在本地登录&#xff0c;修改权限&#xff0c;输入&#xff1a; update user set host % where user root ; 回车键&#xff0c;再输…...

html的作业

目录 作业题目 1.用户注册 A图 B代码 2.工商银行电子汇款单 A图 B代码 3.李白诗词 A图 B代码 4.豆瓣电影 A图 B代码 学习产出&#xff1a; 作业题目 1.用户注册 A图 B代码 <!DOCTYPE html> <html lang"zh"> <head><meta charset&qu…...

【TORCH】查看dataloader里的数据,通过dataloader.dataset或enumerate

文章目录 dataloader.dataset示例代码使用自定义数据集使用 MNIST 数据集 说明 enumerate示例代码说明使用 MNIST 数据集的例子 dataloader.dataset 是的&#xff0c;您可以直接访问 train_loader 的数据集来查看数据&#xff0c;而不必通过 enumerate 遍历数据加载器。可以通…...

KDTree 简单原理与实现

介绍 K-D树是一种二叉树的数据结构&#xff0c;其中每个节点代表一个k维点&#xff0c;可用于组织K维空间中的点&#xff0c;其中K通常是一个非常大的数字。二叉树结构允许对多维空间中的点进行非常有效的搜索&#xff0c;包括最近邻搜索和范围搜索&#xff0c;树中的每个非叶…...

[c++] 可变参数模版

前言 可变参数模板是C11及之后才开始使用,学校的老古董编译器不一定能用 相信大家在刚入门c/c时都接触过printf函数 int printf ( const char * format, ... ); printf用于将数据格式化输出到屏幕上,它的参数非常有意思,可以支持任意数量,任意类型的多参数.而如果我们想实现类…...

QWidget窗口抗锯齿圆角的一个实现方案(支持子控件)2

QWidget窗口抗锯齿圆角的一个实现方案&#xff08;支持子控件&#xff09;2 本方案使用了QGraphicsEffect&#xff0c;由于QGraphicsEffect对一些控件会有渲染问题&#xff0c;比如列表、表格等&#xff0c;所以暂时仅作为研究&#xff0c;优先其他方案 在之前的文章中&#…...

数据结构之“队列”(全方位认识)

&#x1f339;个人主页&#x1f339;&#xff1a;喜欢草莓熊的bear &#x1f339;专栏&#x1f339;&#xff1a;数据结构 前言 上期博客介绍了” 栈 “这个数据结构&#xff0c;他具有先进后出的特点。本期介绍“ 队列 ”这个数据结构&#xff0c;他具有先进先出的特点。 目录…...

密码学复习

目录 基础 欧拉函数 欧拉函数φ(n)定义 计算方法的技巧 当a=a_1*a_2*……*a_n时 欧拉定理 剩余系 一些超简单密码 维吉尼亚 密钥fox 凯撒(直接偏移) 凯特巴氏(颠倒字母表) 摩斯密码(字母对应电荷线) 希尔(hill)密码 一些攻击 RSA 求uf+vg=1 快速幂模m^…...

【文献解析】一种像素级的激光雷达相机配准方法

大家好呀&#xff0c;我是一个SLAM方向的在读博士&#xff0c;深知SLAM学习过程一路走来的坎坷&#xff0c;也十分感谢各位大佬的优质文章和源码。随着知识的越来越多&#xff0c;越来越细&#xff0c;我准备整理一个自己的激光SLAM学习笔记专栏&#xff0c;从0带大家快速上手激…...

Http 实现请求body体和响应body体的双向压缩方案

目录 一、前言 二、方案一(和http header不进行关联) 二、方案二(和http header进行关联) 三、 客户端支持Accept-Encoding压缩方式,服务器就一定会进行压缩吗? 四、参考 一、前言 有时请求和响应的body体比较大,需要进行压缩,以减少传输的带宽。 二、方案一(和…...

C++(Qt)-GIS开发-简易瓦片地图下载器

Qt-GIS开发-简易瓦片地图下载器 文章目录 Qt-GIS开发-简易瓦片地图下载器1、概述2、安装openssl3、实现效果4、主要代码4.1 算法函数4.2 瓦片地图下载url拼接4.3 多线程下载 5、源码地址6、参考 更多精彩内容&#x1f449;个人内容分类汇总 &#x1f448;&#x1f449;GIS开发 …...

誉天教育7月开班计划:为梦想插上腾飞的翅膀!

随着夏日的脚步渐近&#xff0c;誉天教育也迎来了新一轮的学习热潮。在这个充满活力和希望的季节里&#xff0c;我们精心策划了7月的开班计划&#xff0c;旨在为广大学子提供一个优质、高效的学习平台&#xff0c;助力他们追逐梦想&#xff0c;实现自我价值。 本月 Linux云计算…...

STM32基础篇:GPIO

GPIO简介 GPIO&#xff1a;即General Purpose Input/Output&#xff0c;通用目的输入/输出。就是一种片上外设&#xff08;内部模块&#xff09;。 对于STM32的芯片来说&#xff0c;周围有一圈引脚&#xff0c;有时需要对引脚进行读写&#xff08;读&#xff1a;从外部输入一…...

HTTPS 发送请求出现TLS握手失败

最近在工作中&#xff0c;调外部接口&#xff0c;发现在clientHello步骤报错&#xff0c;服务端没有返回serverHello。 从网上找了写方法&#xff0c;都没有解决&#xff1b; 在idea的vm options加上参数&#xff1a; -Djavax.net.debugSSL,handshake 把SSL和handshake的日…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

MMaDA: Multimodal Large Diffusion Language Models

CODE &#xff1a; https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA&#xff0c;它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构&#xf…...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍

文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结&#xff1a; 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析&#xff1a; 实际业务去理解体会统一注…...

【HTML-16】深入理解HTML中的块元素与行内元素

HTML元素根据其显示特性可以分为两大类&#xff1a;块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

在WSL2的Ubuntu镜像中安装Docker

Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包&#xff1a; for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...