[消息队列 Kafka] Kafka 架构组件及其特性(二)Producer原理
这边整理下Kafka三大主要组件Producer原理。
目录
一、Producer发送消息源码流程
二、ACK应答机制和ISR机制
1)ACK应答机制
2)ISR机制
三、消息的幂等性
四、Kafka生产者事务
一、Producer发送消息源码流程

Producer发送消息流程如上图。主要是用了两个线程,主线程中生成消息经过拦截器之后,在序列化器中将消息的K,V序列化,在到分区器中分配对应的分区之后放入累加器。
当消息在累加器中批次满了,或者创建了新的累加batch就会唤起Sender线程将消息发送到Broker中。
这其中在分区器中配置分区的规则有以下四个逻辑:
1.消息定义了分区,就用消息指定的分区
2.消息没有定义分区,但是使用了自定义分区器,那么就走自定义分区器里面的选择分区的逻辑
3.消息没有定义分区,也没有使用自定义分区器,key不为空,那么就会走hash取模算法,会用key的hash值和分区器数量进行取模计算得到对应的分区器
4.以上都不符合的话,就走粘连策略得到最终的分区。
二、ACK应答机制和ISR机制
1)ACK应答机制
生产者Producer向Broker发送消息,明显是需要有个应答ACK来知道Broker是否收到消息的。所以Kafka提供了三种等级的Ack应答机制。可以根据可靠性和延迟的要求进行选择
1.acks=0 :broker一收到消息,就返回Ack应答

但这种模式明显会有一个问题就是leader落盘失败的话,发送的消息就都没有用了,如下图

要保证可靠性就有了第二种模式。
2.acks=1 : broker收到消息,并且leader分区落盘之后,返回Ack应答。(Kafka默认应答机制级别)

这种模式也会有问题,就是leader数据落盘之后,两个副本都没有备份,这个时候leader又挂了,这个时候就会丢失数据了,如下图所示:

所以要保证进一步的可靠性就有了第三种模式。
3.ack=-1(或者all):broker收到消息,并且leader分区落盘之后,所有fllowers也备份成功之后返回Ack应答。

明显可以看出,以上三种从上到下其可靠性依次增强,延迟也依次增大。
但是这个模式可以想到还是会有两种问题:
问题1:就是在leader,和fllower都落盘成功之后,准备返回ack时候,leader挂了,这个时候Producer就收不到ACK了。那么生产者正常来讲就会重发消息,这个时候对于broker来说明显消息重复了(当然实际上Kafka可以通过消息的幂等性来判断),就有问题了。
这个时候,除了Kafka自带的消息幂等性处理,还有一种方案,就是配置里有个重试次数,我们可以设置为0也可以实现。

但明显这只能解决这一种问题,可能引起其他情况的异常问题(例如需要重发来保证可靠性的情况)
问题2:如果fllower重有挂了的节点,那么这种情况Producer明显永远拿不到Ack了,明显会阻塞消息过程。如下图:

所以为了解决问题2这种情况,Kafka就采用了ISR的机制。
2)ISR机制
ISR(in-sync replica set):是一组动态维护副本的集合。
ISR的界定标准(可以自己设置):replica.lag.time.ms(默认是30秒),也就是P0_R0 30秒钟还没有从P0_R1中同步数据
简单的理解,ISR就是一个Set集合,里面存储的就是同步积极的分区集合,当分区同步出现问题时候,就把这个分区移除ISR集合。
还是在下图那种情况
所以初始的时候,ISR集合里面是有P0_R0,P0_R1,P0_R2三个分区的,但是当P0_R0同步出现问题的时候,就把P0_R0移除ISR集合。这个时候ISR集合中只有P0_R1,P0_R2。此时这两个分区落盘都成功了,这个时候leader也就返回了ack了。
相对应ISR,也有一个OSR(out-sync replica),也就是没有正常同步数据的副本
那明显ISR+OSR 就能拿到全部的副本(AR:Assigned replica)了:AR = ISR+OSR,如下图所示:

三、消息的幂等性
生产者的幂等性,可以理解为不管生产者发送多少次效益,对于broker来说,如果是同一条消息,broker端只存一条消息。
上面问题1的那种情况,一般为了保证重试机制的正常,不会将重试参数retires设置为0,Kafka是通过broker的幂等性判断来解决这个问题的。下面详细介绍下实现的思路。
问题的情况如下图:

再返回ack时候,由于网络抖动等问题,导致服务端返回失败,此时生产者进行重试,导致消息被重复写入了broker服务端。
解决的方案如下:

在发送消息的时候,每条消息增加两个参数,PID(Producer ID):生产者id; Sequence Number:消息序列数(一般从0开始)
如上图所示,在第4步重发同一消息时候,Broker服务端在网队列里面写消息时候,会判断PID和Seq Num是否重复,如果重复,就写入队列失败。那么就不会往队列里面写入重复的消息了。

(这里面的Seq Num还会有多种情况,就是假设前一条消息的Seq Num=1,现在这条消息的PID相同,但是Seq Num=3,那么就会判断丢失了消息,Kafka就会抛出丢失消息的异常信息)
总结一下:
1)Producer端发送消息(消息,PID,Seq Num)
2) Broker端接收消息(将消息,PID,Seq Num一起保存)
3)若ack失败,生产者重试,再次发生消息,Broker判断是否重复
四、Kafka生产者事务
上面的幂等性,只能保证在单分区,单会话(客户端重启之后,在建立连接,会认为是新的producer id)场景下有效。
对于多分区,多会话,Kafka通过生产者事务提供了多个分区写入的原子性操作(理解参照数据库的原子性)。
Kafka事务的API相关方法:

Kafka事务操作的基本流程:

如上图所示:
Kafka通过事务协调者(Transaction Coordinator)和事务日志(Transcation Log)来实现的。
流程就是:
1)生产者通过initTransactions向Coordinator注册事务ID
2)Coordinator记录事务日志。
3)生产者把消息写入目标分区 (此时这三部的数据对于消费者都是不可见的)
4)分区和Coordinator的交互,标记消息状态。消息装状态标记为Commited,才对消费者可见,否则不可见。
详细流程如下图:

以上就是Kafka,producer端相关原理了。
相关文章:
[消息队列 Kafka] Kafka 架构组件及其特性(二)Producer原理
这边整理下Kafka三大主要组件Producer原理。 目录 一、Producer发送消息源码流程 二、ACK应答机制和ISR机制 1)ACK应答机制 2)ISR机制 三、消息的幂等性 四、Kafka生产者事务 一、Producer发送消息源码流程 Producer发送消息流程如上图。主要是用…...
faiss ivfpq索引构建
假设已有训练好的向量值,构建索引(nlist和随机样本按需选取) import numpy as np import faiss import pickle from tqdm import tqdm import time import os import random# 读取嵌入向量并保留对应关系 def read_embeddings(directory, ba…...
ffmpeg视频编码原理和实战-(2)视频帧的创建和编码packet压缩
源文件: #include <iostream> using namespace std; extern "C" { //指定函数是c语言函数,函数名不包含重载标注 //引用ffmpeg头文件 #include <libavcodec/avcodec.h> } //预处理指令导入库 #pragma comment(lib,"avcodec.…...
数据结构:线索二叉树
目录 1.线索二叉树是什么? 2.包含头文件 3.结点设计 4.接口函数定义 5.接口函数实现 线索二叉树是什么? 线索二叉树(Threaded Binary Tree)是一种对普通二叉树的扩展,它通过在树的某些空指针上添加线索来实现更高效的遍…...
宝塔Linux面板-Docker管理(2024详解)
上一篇文章《宝塔Linux可视化运维面板-详细教程2024》,详细介绍了宝塔Linux面板的详细安装和配置方法。本文详细介绍使用Linux面板管理服务器Docker环境。 目录 1、安装Docker 1.1 在线安装 编辑 1.2 手动安装 1.3 运行状态 1.4 镜像加速 2 应用商店 3 总览 4 容器 …...
【Linux】进程(8):Linux真正是如何调度的
大家好,我是苏貝,本篇博客带大家了解Linux进程(8):Linux真正是如何调度的,如果你觉得我写的还不错的话,可以给我一个赞👍吗,感谢❤️ 目录 之前我们讲过,在大…...
R语言探索与分析14-美国房价及其影响因素分析
一、选题背景 以多元线性回归统计模型为基础,用R语言对美国部分地区房价数据进行建模预测,进而探究提高多元回 归线性模型精度的方法。先对数据进行探索性预处理,随后设置虚拟变量并建模得出预测结果,再使用方差膨胀因子对 多重共…...
golang websocket 数据处理和返回JSON数据示例
golang中websocket数据处理和返回json数据示例, 直接上代码: // author tekintiangmail.com // golang websocket 数据处理和返回JSON数据示例, // 这个函数返回 http.HandlerFunc // 将http请求升级为websocket请求 这个需要依赖第三方包 …...
【Mac】Downie 4 for Mac(视频download工具)兼容14系统软件介绍及安装教程
前言 Downie 每周都会更新一个版本适配视频网站,如果遇到视频download不了的情况,请搜索最新版本https://mac.shuiche.cc/search/downie。 注意:Downie Mac特别版不能升级,在设置中找到更新一列,把自动更新和自动downl…...
【操作系统】进程与线程的区别及总结(非常非常重要,面试必考题,其它文章可以不看,但这篇文章最后的总结你必须要看,满满的全是干货......)
目录 一、 进程1.1 PID(进程标识符)1.2 内存指针1.3 文件描述符表1.4 状态1.5 优先级1.6 记账信息1.7 上下文 二、线程三、总结:进程和线程之间的区别(非常非常非常重要,面试必考题) 一、 进程 简单来介绍一下什么是进程…...
自动驾驶仿真(高速道路)LaneKeeping
前言 A high-level decision agent trained by deep reinforcement learning (DRL) performs quantitative interpretation of behavioral planning performed in an autonomous driving (AD) highway simulation. The framework relies on the calculation of SHAP values an…...
数据挖掘实战-基于Catboost算法的艾滋病数据可视化与建模分析
🤵♂️ 个人主页:艾派森的个人主页 ✍🏻作者简介:Python学习者 🐋 希望大家多多支持,我们一起进步!😄 如果文章对你有帮助的话, 欢迎评论 💬点赞Ǵ…...
分水岭算法分割和霍夫变换识别图像中的硬币
首先解释一下第一种分水岭算法: 一、分水岭算法 分水岭算法是一种基于拓扑学的图像分割技术,广泛应用于图像处理和计算机视觉领域。它将图像视为一个拓扑表面,其中亮度值代表高度。算法的目标是通过模拟雨水从山顶流到山谷的过程࿰…...
什么是AVIEXP提前发货通知?
EDI(电子数据交换)报文是一种用于电子商务和供应链管理的标准化信息传输格式。AVIEXP 是一种特定类型的 EDI 报文,用于传输提前发货通知信息。 AVIEXP 报文简介 AVIEXP 是指 Advanced Shipping Notification提前发货通知报文,用…...
Python 之SQLAlchemy使用详细说明
目录 1、SQLAlchemy 1.1、ORM概述 1.2、SQLAlchemy概述 1.3、SQLAlchemy的组成部分 1.4、SQLAlchemy的使用 1.4.1、安装 1.4.2、创建数据库连接 1.4.3、执行原生SQL语句 1.4.4、映射已存在的表 1.4.5、创建表 1.4.5.1、创建表的两种方式 1、使用 Table 类直接创建表…...
就业班 第四阶段(docker) 2401--5.29 day3 Dockerfile+前后段项目若依ruoyi
通过Dockerfile创建镜像 Docker 提供了一种更便捷的方式,叫作 Dockerfile docker build命令用于根据给定的Dockerfile构建Docker镜像。docker build语法: # docker build [OPTIONS] <PATH | URL | ->1. 常用选项说明 --build-arg,设…...
【运维项目经历|026】Redis智能集群构建与性能优化工程
🍁博主简介: 🏅云计算领域优质创作者 🏅2022年CSDN新星计划python赛道第一名 🏅2022年CSDN原力计划优质作者 🏅阿里云ACE认证高级工程师 🏅阿里云开发者社区专家博主 💊交流社区:CSDN云计算交流社区欢迎您的加入! 目…...
Linux编程for、while循环if判断以及case语句用法
简介 语法描述if条件语句if else条件判断语句if else-if else多条件判断语句for循环执行命令while循环执行命令until直到条件为真时停止循环case ... esac多选择语句break跳出循环continue跳出当前循环 1. for 循环 for语句,定量循环,可以遍历一个列表…...
docker命令 docker ps -l (latest)命令在 Docker 中用于列出最近一次创建的容器
文章目录 12345 1 docker ps -l 命令在 Docker 中用于列出最近一次创建的容器。具体来说: docker ps:这个命令用于列出当前正在运行的容器。-l 或 --latest:这个选项告诉 docker ps 命令只显示最近一次创建的容器,不论该容器当前…...
inflight 守恒和带宽资源守恒的有效性
接着昨天的问题,inflight 守恒的模型一定存在稳定点吗?并不是。如果相互抑制强度大于自我抑制强度,系统也会跑飞: 模拟结果如下: 所以一定要记得 a < b。 比对前两个图和后两个图的 a,b 参数关系&am…...
老系统兼容Python解决方案:PythonVista版本支持与安装指南
老系统兼容Python解决方案:PythonVista版本支持与安装指南 【免费下载链接】PythonVista Python 3.9 installers that support Windows 7 SP1 and Windows Server 2008 R2 项目地址: https://gitcode.com/gh_mirrors/py/PythonVista 在企业环境和个人用户中&…...
Python算法优化:从理论到实践
Python算法优化:从理论到实践 1. 背景与意义 在数据科学和AI应用中,算法的效率直接影响系统性能。作为一名Python开发者,掌握算法优化技巧不仅能提升代码质量,还能显著提高应用性能。本文将深入探讨Python中常见算法的优化策略&…...
Ubuntu系统下识别错误文件格式的解决方案:从JPEG报错到实际文件类型检测
1. 当Ubuntu告诉你"这不是JPEG文件"时发生了什么 那天我正在处理用户上传的图片,突然发现一个诡异现象:同一张"111.jpg"在Windows系统显示正常,但在Ubuntu服务器上却报错"Error interpreting JPEG image file (Not …...
Seafile社区版12.0部署实战:从Docker Compose到企业级定制
1. 为什么选择Seafile社区版12.0? 如果你正在寻找一个开源的、可私有化部署的企业级文件同步与共享解决方案,Seafile社区版12.0绝对值得考虑。作为一个长期使用各种云存储解决方案的运维工程师,我发现Seafile在性能、稳定性和功能完整性方面都…...
Magpie插件管理终极指南:如何让窗口缩放效果始终保持最佳状态
Magpie插件管理终极指南:如何让窗口缩放效果始终保持最佳状态 【免费下载链接】Magpie An all-purpose window upscaler for Windows 10/11. 项目地址: https://gitcode.com/gh_mirrors/mag/Magpie 在Windows窗口缩放领域,Magpie凭借其强大的插件…...
BGE Reranker-v2-m3效果展示:原始分数与归一化分数双维度结果对比分析真实案例
BGE Reranker-v2-m3效果展示:原始分数与归一化分数双维度结果对比分析真实案例 1. 系统核心能力概览 BGE Reranker-v2-m3是一个基于先进AI技术的本地文本相关性重排序工具,专门用于评估查询语句与候选文本之间的匹配程度。这个工具的核心价值在于能够智…...
C++ 模板类型推导的底层实现
C模板类型推导的底层实现 C的模板类型推导是现代C编程中不可或缺的核心机制,它使得泛型编程变得灵活而高效。从简单的函数模板到复杂的元编程,类型推导在编译期间自动推断模板参数,减少了冗余代码。其底层实现机制却鲜为人知。本文将揭开模板…...
Agentic Workflow与Workflow的协同之道——RAGFlow 0.20.0企业级实践解析
1. Agentic Workflow与Workflow的协同价值 企业级AI应用开发正面临一个关键矛盾:业务逻辑的确定性需求与LLM带来的灵活性优势如何平衡?RAGFlow 0.20.0给出的答案是让Workflow和Agentic Workflow在统一编排引擎中协同工作。这就像建筑行业中的预制构件与现…...
从游戏设计到算法实现:拆解睿抗CAIP编程赛‘游戏设计师’一题的BFS+离线查询思路
从游戏设计到算法实现:拆解睿抗CAIP编程赛‘游戏设计师’一题的BFS离线查询思路 在游戏开发中,角色移动和状态转换是最基础也最核心的机制之一。睿抗机器人开发者大赛CAIP编程技能赛的"游戏设计师"一题,巧妙地将这些游戏开发中的实…...
告别Keil5新建工程手忙脚乱:GD32F303保姆级环境搭建与文件管理心法
告别Keil5新建工程手忙脚乱:GD32F303保姆级环境搭建与文件管理心法 第一次打开Keil5新建GD32工程时,面对官网下载的几十个库文件,你是否感到无从下手?明明跟着教程一步步操作,最后却发现工程文件散落各处,移…...
