Kafka-设计原理
- Controller
- Leader - Partition
- Rebalance
- 消息发布机制
- HW与LEO
- 日志分段
Controller
- Kafka核心总控制器Controller:在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态
- 当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本
- 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息
- 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到
- Controller选举机制
- zookeeper临时节点的创建来选举controller:在kafka集群启动的时候,会自动选举一台broker作为controller来管理整个集群,选举的过程是集群中每个broker都会尝试在zookeeper上创建一个 /controller 临时节点,zookeeper会保证有且仅有一个broker能创建成功,这个broker就会成为集群的总控器controller
- controller重新选举:当这个controller角色的broker宕机了,此时zookeeper临时节点会消失,集群里其他broker会一直监听这个临时节点,发现临时节点消失了,就竞争再次创建临时节点,就是我们上面说的选举机制,zookeeper又会保证有一个broker成为新的controller
- 具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下
- 监听broker相关的变化。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化
- 监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化;为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作
- 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化
- 更新集群的元数据信息,同步到其他普通的broker节点中
Leader - Partition
- Partition副本选举Leader机制
- controller感知到分区leader所在的broker挂了(controller监听了很多zk节点可以感知到broker存活)
- controller会从ISR列表(参数unclean.leader.election.enable=false的前提下)里挑第一个broker作为leader(第一个broker最先放进ISR列表,可能是同步数据最多的副本)
- 如果参数unclean.leader.election.enable为true,代表在ISR列表里所有副本都挂了的时候可以在ISR列表以外的副本中选leader,这种设置,可以提高可用性,但是选出的新leader有可能数据少很多
- 副本进入ISR列表有两个条件
- 必须能与zookeeper保持会话以及跟leader副本网络连通
- 副本能复制leader上的所有写操作,并且不能落后太多
- 与leader副本同步滞后的副本,是由 replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移出ISR列表
- 消费者消费消息的offset记录机制
- 每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets
- 提交过去的时候,key是consumerGroupId+topic+分区号,value就是当前offset的值
- kafka会定期清理topic里的消息,最后就保留最新的那条数据
- 因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的方式抗大并发
- 每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets
Rebalance
- Rebalance分区分配策略(partition.assignment.strategy):range(默认)、round-robin、sticky
- range:按照分区序号排序,比如分区
0~3
给一个consumer,分区4~6
给一个consumer,分区7~9
给一个consumer - round-robin:轮询分配,比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、8给一个consumer
- sticky:与round-robin类似,但是在rebalance的时候,需要保证如下两个原则(当两者发生冲突时,第一个目标优先于第二个目标)
- 分区的分配要尽可能均匀
- 分区的分配尽可能与上次分配的保持相同
- range:按照分区序号排序,比如分区
- Rebalance机制:如果消费组里的消费者数量有变化或消费的分区数有变化,kafka会重新分配消费者消费分区的关系。比如consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他
- rebalance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进行rebanlance
- rebalance过程中,消费者无法从kafka消费消息,这对kafka的TPS会有影响,如果kafka集群内节点较多,比如数百个,那重平衡可能会耗时极多,所以应尽量避免在系统高峰期的重平衡发生
- 触发消费者rebalance
- 消费组里的consumer增加或减少了
- 动态给topic增加了分区
- 消费组订阅了更多的topic
- Rebalance过程:当有消费者加入消费组时,消费者、消费组及组协调器之间会经历以下几个阶段
- 选择组协调器(GroupCoordinator):每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance
- consumer group中的每个consumer启动时会向kafka集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器GroupCoordinator,并跟其建立网络连接
- 组协调器选择方式:通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker就是这个consumer group的coordinator。说白了,leader分区所在的节点就是GroupCoordinator
- 加入消费组(JOIN GROUP),选择消费组协调器
- 在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。
- 然后GroupCoordinator 从一个consumer group中选择第一个加入group(第一个与GroupCoordinator连接的consumer)的consumer作为leader(消费组协调器)
- 把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案
- SYNC GROUP
- consumer leader通过给GroupCoordinator发送SyncGroupRequest
- 接着GroupCoordinator就把分区方案下发给各个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费
- 选择组协调器(GroupCoordinator):每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance
消息发布机制
- producer发布消息机制
- 写入方式:producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)
- 消息路由:producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition
- 指定了 patition,则直接使用
- 指定 patition 但指定 key,通过对 key 的 value 进行 hash 选出一个 patition
- patition 和 key 都未指定,使用轮询选出一个 patition
- 写入流程
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后向leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
HW与LEO
-
HW:HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW,consumer最多只能消费到HW所在的位置。
- 每个replica都有HW,leader和follower各自负责更新自己的HW的状态。
- 对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,消息才能被consumer消费。
- 这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。
- 对于来自内部broker的读取请求,没有HW的限制
-
当producer生产消息至broker后,ISR以及HW和LEO的流转过程
-
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制,很好的均衡了确保数据不丢失以及吞吐率
-
当 acks=1
日志分段
- 日志分段存储:Kafka 一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段(segment)存储,每个段的消息都存储在不一样的log文件里,这种特性方便old segment file快速被删除,kafka规定了一个段位的 log 文件最大为 1G,做这个限制目的是为了方便把 log 文件加载到内存去操作
- 00000000000000000000.index:部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件
- 如果要定位消息的offset会先在这个文件里快速定位,再去log文件里找具体消息
- 00000000000000000000.log:消息存储文件,主要存offset和消息体
- 00000000000000000000.timeindex:息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset到timeindex文件
- 如果需要按照时间来定位消息的offset,会先在这个文件里查找
- 文件名00000000000000000000就是表了这个日志段文件里包含的起始 Offset
- 00000000000000000000.index:部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件
- log.segment.bytes:限定了每个日志段文件的大小,最大就是 1GB
- 一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做log rolling,正在被写入的那个日志段文件,叫做 active log segment。
相关文章:

Kafka-设计原理
ControllerLeader - PartitionRebalance消息发布机制HW与LEO日志分段 Controller Kafka核心总控制器Controller:在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理…...

51单片机的智能台灯设计【proteus仿真+程序+报告+原理图+演示视频】
1、主要功能 该系统由AT89C51/STC89C52单片机LCD1602显示模块DS1302时间模块光敏传感器模块人体红外模块按键等模块构成。适用于智能台灯、自动调节灯光亮度等相似项目。 可实现基本功能: 1、LCD1602实时显示北京时间、环境光照强度、手动/自动模式、台灯亮度等信息࿱…...

【论文阅读】一种针对多核神经网络处理器的窃取攻击(2020)
摘要 攻击者可以通过侧信道信息(Side-channel)完成模型窃取攻击[17]. [17] Hua W Z, Zhang Z R, Suh G E. Reverse Engineering Convolutional Neural Networks through Side-channel Information Leaks[C]. 2018 55th ACM/ESDA/IEEE Design Automation Conference (DAC), 2018…...

基于VUE的校园二手物品交易管理系统的设计与实现 (含源码+sql+视频导入教程)
👉文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1 、功能描述 基于VUE的校园二手物品交易管理系统8拥有两种角色 管理员:闲置物品管理、订单管理、用户管理 用户:登录注册、购物车、发布闲置物品、评论、发货、收货地址管理等…...

pytest 常用的辅助函数和工具函数
pytest 常用的辅助函数和工具函数示例 # File: my_module.pydef fetch_data():return process datadef process_data():data fetch_data()return data.upper() import logging import sys import pytest#01-------------------------------pytest.fixture,sample_data 在测试…...
记录Java秋招面经(网上找的)
1.Mysql的存储机制,怎么落到库里面的? 当数据插入 MySQL 时,首先数据修改会在内存中的 Buffer Pool 中完成,同时记录写入 Redo Log 以保证事务的持久性。事务提交时,日志会被刷入磁盘,确保数据可以恢复。修…...
记录k8s重启之后kubelet无法启动的问题
重启机器后,kubelet没有自启动,手动启动失败,检查日志反馈找不到bootstrap-kubelet.conf这个文件。 systemctl start kubelet journalctl -u kubelet 57481 run.go:74] "command failed" err"failed to run Kubelet: unable …...

IA——网络操作设备VRP简介
一,VRP简介 二,网络设备的管理 (1)console口: (2)talnet: (3)SSH: 安全的远程登陆 (4)通过WEB页面登录: 三,命令行常见…...

Java项目: 基于SpringBoot+mysql企业客户管理系统(含源码+数据库+答辩PPT+毕业论文)
一、项目简介 本项目是一套基于SpringBootmysql企业客户管理系统 包含:项目源码、数据库脚本等,该项目附带全部源码可作为毕设使用。 项目都经过严格调试,eclipse或者idea 确保可以运行! 该系统功能完善、界面美观、操作简单、功…...
基于STM32设计的智能安防系统(微信小程序)(218)
文章目录 一、前言1.1 项目介绍【1】开发背景【2】项目实现的功能【3】项目硬件模块组成1.2 设计思路【1】整体设计思路【2】整体构架【3】微信小程序开发思路1.3 项目开发背景【1】选题的意义【2】可行性分析【3】参考文献【4】摘要【5】项目背景1.4 开发工具的选择【1】设备端…...

tomcat redis minio nginx windows开机自启
tomcat 开机自启 命令 service.bat install 控制台输入 service.bat install 再到服务中去查看有没有注册成功,minio,redis,nginx 也是一样在服务里查看注册成功没 redis 开机自启 命令 redis-server.exe --service-install redis.windows.conf --loglevel ve…...
Docker构建镜像教程
目录 1. Docker 环境准备2. 编写 Dockerfile3. 构建 Docker 镜像4. 运行容器5. 管理镜像和容器6. 镜像的导出和导入6.1 导出 Docker 镜像6.2 导入 Docker 镜像6.3 导出与导入的实际场景6.4 压缩导出的镜像 7. 推送镜像到 Docker Hub8. Dockerfile 指令详解1. FROM2. RUN3. WORK…...

扑捉一只耿鬼(HTML文件)
图例: 代码: <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><title>耿鬼</title><style>body {background: #fff;font-family: Comfortaa, sans-serif;}* {box-sizing:…...

Address localhost:1099 is already in use:tomcat频繁重启端口占用问题
错误提示 Unable to open debugger port (127.0.0.1:58198): java.net.SocketException "Socket closed" Address localhost:1099 is already in use 端口被占用 报错原因 由于短时间内频繁运行tomcat服务器。 为了避免出现这一错误。可以点击刷新uodate resourc…...
HTTPS SEO优势
搜索引擎优化(SEO)是提高网站在搜索引擎结果页(SERP)中的排名以吸引更多访问者的过程。HTTPS作为网站安全的标准,对SEO有着直接和间接的优势: 1. HTTPS作为排名信号 2014年,Google宣布HTTPS成…...

UE5 C++ 读取图片插件(一)
原来UE可以使用 static,之前不知道,一用就报错。 static TSharedPtr<IImageWrapper> GetImageWrapperByExtention(const FString InImagePath); //智能指针,方便追寻引用C,加载ImageWrapperstatic UTexture2D* LoadTexture2D(const FString& …...

C语言行地址列地址区别,内存的分配
开辟了10个字节的空间在栈中 "abc"常量在代码段 char a[10]"abc"; p:8个字节存"abc"的地址 "abc"常量在代码段中 char *p"abc" char *q"abc" p,q的值是一样的 到…...

Unity 一键修改图片缩放保存为当前的一半大小
用来压缩贴图大小还是比较方便的 支持 png,jpg,tga 话不多说 直接上代码 [MenuItem("Assets/扩展功能/缩放贴图一半尺寸(png | jpg | tga)", false)]static void ScaleHalfTextureSizeMenu(){foreach(var obj in Selection.objects) {Texture…...
Identifying User Goals from UI Trajectories论文学习
通过UI轨迹识别用户的需求。 这篇论文同样聚焦于UI agent,只是思路比较特别。他们想要通过训练agent通过用户的行为轨迹反推出他们想要干什么的能力来锻炼agent识别,理解,使用UI的能力。同时这个训练项目本身也有一定的实际意义,…...

[STM32]从零开始的STM32标准库环境搭建(小白向)
一、我们为什么要搭建STM32标准库开发环境 如果你对STM32有一定的了解,相信你已经认识了STM32的几种开发方式。基于STM32寄存器开发,基于ST官方的标准库开发,基于ST官方的HAL库开发。我们现在来了解一下这些库的优缺点。首先就是基于寄存器开…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...

苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...
提升移动端网页调试效率:WebDebugX 与常见工具组合实践
在日常移动端开发中,网页调试始终是一个高频但又极具挑战的环节。尤其在面对 iOS 与 Android 的混合技术栈、各种设备差异化行为时,开发者迫切需要一套高效、可靠且跨平台的调试方案。过去,我们或多或少使用过 Chrome DevTools、Remote Debug…...

stm32wle5 lpuart DMA数据不接收
配置波特率9600时,需要使用外部低速晶振...
Monorepo架构: Nx Cloud 扩展能力与缓存加速
借助 Nx Cloud 实现项目协同与加速构建 1 ) 缓存工作原理分析 在了解了本地缓存和远程缓存之后,我们来探究缓存是如何工作的。以计算文件的哈希串为例,若后续运行任务时文件哈希串未变,系统会直接使用对应的输出和制品文件。 2 …...

6.9-QT模拟计算器
源码: 头文件: widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QMouseEvent>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);…...