当前位置: 首页 > news >正文

Kafka~消息发送过程与ISR机制了解

消息发送过程

使用Kafka发送消息时,一般有两种方式分别是:

  1. 同步发送
  2. 异步发送

同步发送时,可以在发送消息后,通过get方法等待消息结果,这种情况能够准确的拿到消息最终的发送结果,要么是成功、要么是失败。

而异步发送,是采用了callback的方式进行回调的,可以大大的的提升消息的吞吐量,也可以根据回调来判断消息是否发送成功。

不管是同步发送还是异步发送,最终都需要在Producer端把消息发送到Broker中,那么这个过程大致如下:
在这里插入图片描述
Kafka的Producer在发送消息时通常涉及两个线程,主线程(Main)和发送线程(Sender)和一个消息累加器(RecordAccumulator)

  • Main线程是Producer的入口,负责初始化Producer的配置、创建KafkaProducer实例并执行发送逻辑。它会按照用户定义的发送方式(同步或异步)发送消息,然后等待消息发送完成。一条消息的发送,在调用send方法后,会经过拦截器、序列化器及分区器。拦截器主要用于在消息发送之前和之后对消息进行定制化的处理,如对消息进行修改、记录日志、统计信息等。序列化器负责将消息的键和值对象转换为字节数组,以便在网络上传输。分区器决定了一条消息被发送到哪个Partition中。它根据消息的键(如果有)或者特定的分区策略,选择出一个目标Partition。

  • RecordAccumulator在Kafka Producer中起到了消息积累和批量发送的作用,当Producer发送消息时,不会立即将每条消息发送到Broker,而是将消息添加到RecordAccumulator维护的内部缓冲区中,RecordAccumulator会根据配置的条件(如batch.size、linger.ms)对待发送的消息进行批量处理。当满足指定条件时,RecordAccumulator将缓冲区中的消息组织成一个批次(batch),然后一次性发送给Broker。如果发送失败或发生错RecordAccumulator可以从将消息重新分配到新的批次中进行重试。这样可以确保消息不会丢失,同时提高消息的可靠性。

  • Send线程是负责实际的消息发送和处理的。发送线程会定期从待发送队列中取出消息,并将其发送到对应的Partition的 Leader Broker上。它主要负责网络通信操作,并处理发送请求的结果,包括确认的接收、错误处理等。

  • NetworkClient和Selector是两个重要的组件,分别负责网络通信和1/0多路复用。

发送线程会把消息发送到Kafka集群中对应的Partition的Parrtition Leader,Partition Leader接收到消息后,会对消息进行一系列的处理。它会将消息写入本地的日志文件(Log),存储为segment文件,因为是顺序写,segment文件也是顺序截断,为了保证数据的可靠性和高可用性,Kafka使用了消息复制机制。Leader Broker接收到消息后,会将消息复制到其他副本(Partition Follower)。副本是通过网络复制数据的,它们门会定期从LeaderBroker同步消息。

每一个Partition Follower在写入本地log之后,会向Leader发送一个TACK,但是我们的Producer其实也是需要依赖ACK才能知道消息有没有投递成功的,而这个ACK是何时发送的,Producer又要不要关心呢? 这就涉及到了kafka的ack机制,生产者会根据设置的request.required.acks参数不同,选择等待或或直接发送下一条消息:

  • request.required.acks = 0
    表示Producer不等待来自Leader的ACK确认,直接发送送下一条消息。在这种情况下,如果Leader分片所在服务器发生宕机,那么这些已经发送的数据会丢失。
  • request.required.acks = 1
    表示Producer等待来自Leader的ACK确认,当收到确认人后才发送下一条消息。在这种情况下,消息一定会被写入到 Leader服务器,但并不保证Follow节点已已经同步完成。所以如果在消息已经被写入Leader分片,但是还未同步到Follower节点,此时Leade分片所在服务器宕机了,那么这条消息也就丢失了,无法被消费到。
  • request.required.acks = -1
    Leader会把消息复制到集群中的所有ISR(In-Sync Replicas,同步副本),要等待所有ISR的ACK确认后,再向Producer发送ACK消息,然后Producer再继续发下下一条消息。

ISR机制

Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。以下是关于 Kafka 的 ISR 机制的详细介绍:

  • 意义:

ISR 机制动态维护了一个与 Leader 副本保持同步的副本集合,只有在 ISR 集合中的副本才有资格参与 Leader 的选举。通过 ISR 机制,可以确保在 Leader 副本出现故障时,能够快速从 ISR 集合中选举出新的 Leader,从而避免数据丢失和服务中断。

  • 用途:
  1. 保证数据可靠性:ISR 机制通过副本冗余机制,提供了 Kafka 消息的高可靠性。
  2. 实现故障转移:ISR 机制可以做到故障转移,保障服务的可用性。当 Leader 副本出现故障时,Kafka 会从 ISR 集合中选举出新的 Leader,从而保证服务的连续性。
  3. 平衡复制方案:ISR 机制平衡了主从架构下,复制方案的选择(同步/异步/少数服从多数),让使用者根据参数自行选择。
  • 实现方式:
  1. 数据同步:Leader Replica 接收到 Producer 发送的消息后,将其写入本地日志,并通过 Pull 模式等待 Follower Replica 主动拉取。Follower Replica 从 Leader Replica 拉取数据并写入本地日志后,将拉取偏移量(fetch offset)返回给 Leader。
  2. 同步状态监测:Leader Replica 持续监控每个 Follower Replica 的拉取偏移量,将其与自身的最新消息偏移量(log end offset)进行比较。若 Follower Replica 的拉取偏移量与 Leader 相差不超过一定阈值(由 replica.lag.time.max.ms 参数控制),则认为该 Follower 处于同步状态,将其纳入 ISR。
  3. ISR 调整:当 Follower Replica 因网络延迟、Broker 故障等原因导致拉取偏移量落后过多,超出阈值时,Leader Replica 会将其从 ISR 中移除。当 Follower Replica 恢复同步后,再次将其加入 ISR。
  • 详细过程

当消息被写入Kafka的分区时,它首先会被写入Leader,然后LLeader将消息复制给ISR中的所有副本。只有当ISR中的所有副本都成功地接收到并确认了消息后,主副本才会认为消息已成功提交。这种机制确保了数据的可靠性和一致性。

在Kafka中,ISR(In-Sync Replicas)列表的维护是通过副本状态和配置参数来进行的。具体的ISR列表维护机制在不同的Kafka版本中有所变化。

  • 在0.9.x之前的版本,Kafka有一个核心的参数:replica.lag.max.messages,表示如果Follower落后
    Leader的消息数量超过了这个参数值,就认为Follower就会从ISR列表里移除。

但是,基于replica.lag.max.messages 这种实现,在瞬间高并发访问的情况下会有问题:比如Leader瞬间接收到几万条消息,然后所有Follower还没来得及同步过去,此时所有follower都会被踢出ISR列表。

  • Kafka从0.9.x版本开始,引入了 replica.lag.max.ms参数,表示如果果某个Follower的LEO (latest end
    offset)一直落后Leader超过了10秒,那么才会被从ISR列表里移除。

这样的话,即使出现瞬间流量,导致Follower落后很多数据,但是只要在限定的时间内尽快追上来就行了。

总之,通过 ISR 机制,Kafka 可以保证在 Leader 副本出现故障时,能够快速从 ISR 集合中选举出新的 Leader,从而避免数据丢失和服务中断。同时,ISR 机制也可以提高 Kafka 系统的可靠性和可用性。

相关文章:

Kafka~消息发送过程与ISR机制了解

消息发送过程 使用Kafka发送消息时,一般有两种方式分别是: 同步发送异步发送 同步发送时,可以在发送消息后,通过get方法等待消息结果,这种情况能够准确的拿到消息最终的发送结果,要么是成功、要么是失败…...

multiprocessing.Queue 多个进程生产和多个进程消费怎么处理

在这个示例中,我们创建了一个队列 q,并通过 multiprocessing.Manager().Queue() 来确保队列可以在多个进程之间共享。我们定义了 consumer 和 producer 函数,分别用于从队列中获取数据和向队列中放入数据。 在主进程中,我们创建了…...

配置 Python 解释器及虚拟环境

配置 Python 解释器及虚拟环境 配置 Python 解释器: 1. 打开 PyCharm,进入“File”(文件)菜单,选择“Settings”(设置)。 2. 在弹出的设置窗口中,选择“Project: [项目名称]”下的…...

JeecgBoot中如何对敏感信息进行脱敏处理?

数据脱敏即将一些敏感信息通过加密、格式化等方式处理,展示给用户一个新的或是格式化后的信息,避免了敏感信息的暴露。 一、接口脱敏注解 针对接口数据实现脱敏加密,只加密,一般此方案用于数据加密展示。 1.1 注解介绍 注解作用域…...

【Docker】存储数据卷

目录 1、挂载数据卷到容器里 2、查询挂载文件 3、容器与主机之间映射共享卷 4、三个容器之间使用共享卷 5、卷数据的备份与恢复 5.1 备份 5.2 恢复 1、挂载数据卷到容器里 docker run -itd --name test02 -v /data nginx docker exec -it test02 bashls / docker inspe…...

《昇思25天学习打卡营第12天 | 昇思MindSpore基于MindSpore的GPT2文本摘要》

12天 本节学习了基于MindSpore的GPT2文本摘要。 1.数据集加载与处理 1.1.数据集加载 1.2.数据预处理 2.模型构建 2.1构建GPT2ForSummarization模型 2.2动态学习率 3.模型训练 4.模型推理...

深入解析npm unpublish命令:使用场景与实践指南

npm(Node Package Manager)是JavaScript编程语言的包管理器,广泛用于Node.js应用程序。npm unpublish命令允许用户从npm仓库中撤回(unpublish)一个包的特定版本。本文将详细介绍npm unpublish命令的使用场景、操作步骤…...

有趣的仿神经猫html5圈小猫游戏源码

有趣的仿神经猫html5圈小猫游戏源码,点击小圆点,围住小猫游戏。猫已经跑到地图边缘,你输了。内含json数据,部署到服务器方可运行 微信扫码免费获取源码...

Redis 7.x 系列【10】数据类型之有序集合(ZSet)

有道无术,术尚可求,有术无道,止于术。 本系列Redis 版本 7.2.5 源码地址:https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 概述2. 常用命令2.1 ZADD2.2 ZCARD2.3 ZSCORE2.4 ZRANGE2.5 ZREVRANGE2.6 ZRANK2.7…...

操作系统-文件的物理结构(文件分配方式)

文章目录 总览文件块和磁盘块连续分配顺序访问直接访问(随机访问)为什么连续分配同时支持这两种访问模式? 链接分配隐式链接显示链接小结索引分配链接方案多层索引混合索引小结 总结 总览 文件数据存放在外存中 文件块和磁盘块 文件内通过逻…...

Spring Boot集成jsoup实现html解析

1.什么是jsoup jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据,可操作 HTML 元素、属性、文本。 JSo…...

[240629] 阿里云揭秘其数据中心设计和自研网络,用于大语言模型训练 | Jina AI 发布最新的神经网络重排序模型

目录 阿里云揭秘其数据中心设计和自研网络,用于大语言模型训练Jina AI 发布最新的神经网络重排序模型 阿里云揭秘其数据中心设计和自研网络,用于大语言模型训练 阿里云近日公布了其专为大型语言模型 (LLM) 训练流量而设计的基于以太网的网络设计&#x…...

【Docker0】网络更改

目录 1. 停止docker服务 2. 关闭docker默认桥接网络接口 3. 从系统删除docker0接口 4. 创建一个名为bridge0的新接口 5. 添加ip地址和子网掩码 6. 启用bridge0接口 7. (如果没起来就执行该句) 8. 查看ip 1. 停止docker服务 sudo service docker…...

IDEA中导入Maven项目

IDEA中导入Maven项目 方式1:使用Maven面板,快速导入项目 打开IDEA,选择右侧Maven面板,点击 号,选中对应项目的pom.xml文件,双击即可 说明:如果没有Maven面板,选择 View > Appe…...

px、em、rem、rpx 作用和用法详解

px px像素(Pixel)。相对长度单位。像素px是相对于显示器屏幕分辨率而言的。 PX特点 IE无法调整那些使用px作为单位的字体大小; 国外的大部分网站能够调整的原因在于其使用了em或rem作为字体单位; Firefox能够调整px和em&#xff…...

Linux 常用命令 - dd 【复制及转换文件内容】

简介 dd 命令源自于磁盘复制(disk dump)的缩写,是 Linux 和 Unix 系统中用于转换和复制文件的一个强大工具。它可以在复制过程中进行格式转换,支持不同的块大小,能够直接对硬盘设备进行操作,非常适合进行备…...

全网唯一免费无水印AI视频工具!

最近Morph Studio开始免费公测!支持高清画质,可以上传语音,同步口型,最重要的是生成的视频没有水印! Morph Studio国内就可以访问,可以使用国内邮箱注册(我用的163邮箱),…...

kafka(四)消息类型

一、同步消息 1、生产者 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调…...

Emacs之显示blame插件:blamer、git-messenger(一百四十四)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…...

【10分钟速通webpack,全流程打包,编译,发包,全干货,附代码 】

需求 后端有个nodejs 基础库,用typescript编写,需要发包到代码仓库上,被其它业务引入。这其中就涉及了: 编译, 打包,发包。 工作流速览 前提依赖 webpack主体 npm install --save-dev webpack webpack…...

【网络安全产品大调研系列】2. 体验漏洞扫描

前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

Java入门学习详细版(一)

大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

Golang——9、反射和文件操作

反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一:使用Read()读取文件2.3、方式二:bufio读取文件2.4、方式三:os.ReadFile读取2.5、写…...

淘宝扭蛋机小程序系统开发:打造互动性强的购物平台

淘宝扭蛋机小程序系统的开发,旨在打造一个互动性强的购物平台,让用户在购物的同时,能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机,实现旋转、抽拉等动作,增…...

MySQL:分区的基本使用

目录 一、什么是分区二、有什么作用三、分类四、创建分区五、删除分区 一、什么是分区 MySQL 分区(Partitioning)是一种将单张表的数据逻辑上拆分成多个物理部分的技术。这些物理部分(分区)可以独立存储、管理和优化,…...

go 里面的指针

指针 在 Go 中,指针(pointer)是一个变量的内存地址,就像 C 语言那样: a : 10 p : &a // p 是一个指向 a 的指针 fmt.Println(*p) // 输出 10,通过指针解引用• &a 表示获取变量 a 的地址 p 表示…...

Linux 下 DMA 内存映射浅析

序 系统 I/O 设备驱动程序通常调用其特定子系统的接口为 DMA 分配内存,但最终会调到 DMA 子系统的dma_alloc_coherent()/dma_alloc_attrs() 等接口。 关于 dma_alloc_coherent 接口详细的代码讲解、调用流程,可以参考这篇文章,我觉得写的非常…...