当前位置: 首页 > news >正文

Kakfa详解(一)

kafka使用场景

  • canal同步mysql
  • elk日志系统
  • 业务系统Topic

kafka基础概念

  • Producer: 消息生产者,向kafka发送消息
  • Consumer: 从kafka中拉取消息消费的客户端
  • Consumer Group: 消费者组,消费者组是多个消费者的集合。消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。某一个分区中的消息只能够一个消费者组中的一个消费者所消费。

    减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic

  • Broker: 一台kafka服务器就是一个Broker,一个集群由多个Broker组成
  • Topic:主题,可以理解为队列,生产者和消费者都是面向Topic
  • Partition:分区,为了实现扩展性。一个非常大的Topic可以分布在多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序的队列(分区有序,不能保证全局有序)
    • 便于在集群中扩展
    • 可以提高并发,以Partition为单位进行读写,类似于多路
    # 默认分区数 server.properties配置
    num.partitions=1
    
  • Replica:副本,为保证集群中某个节点发生故障,节点上的Partition数据不丢失,kafka可以正常的工作,kafka提供了副本机制,一个Topic的每个分区有若干个副本,一个Leader和多个Follower
    # 默认副本数 server.properties配置 
    # 默认分区副本数不得超过kafka节点数(副本数如果一个节点放2份,就没意义了)
    default.replication.factor=3
    
  • Leader:每个分区多个副本的主角色,生产者发送数据对象,以及消费者消费数据都是Leader
  • Follower: 每个分区多个副本的从角色,实时的从Leader同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader
  • ISRin sync replica,基本保存同步的Replica列表,是副本与主副本保持同步的列表,默认是30s数据,如果从副本保持同步,那么重新选举leader的时候,会被选择。如果与主副本同步差距较大,会被移除,选举leader将不会被考虑。
  • OSRout of sync replica, 同步有延迟的follower列表
  • LEOLog End Offset,每个副本最后一个offset
  • HWHigh Watermark,高水位,指消费者能见到的最大的offsetISR队列中最小的LEO
    在这里插入图片描述

文件存储

主要是通过logindex等文件保存具体的消息文件

一个topic 对应多个partition
一个partition 对应多个segment
一个segment对应logindex文件

为了防止log文件过大导致定位效率低下,kafka的log文件以1G为一个分界点,当.log文件大小超过1G的时候,此时会创建一个新的.log文件,同时为了快速定位大文件中消息位置,kafka采取了分片和索引的机制来加速定位。

.index文件存储的消息的offset+真实的起始偏移量。.log中存放的是真实的数据。

数据定位步骤,查找offset=6的数据。

  • 通过二分查找,定位.index文件。offset=6(大于4,小于9),定位到第二个文件segement02
  • 然后offset减去segment02的起始偏移量(6-4=2),定位到之后总的偏移量
  • 获取到总的偏移量之后,直接定位到.log文件即可快速获得当前消息大小
    在这里插入图片描述

生产者

发送消息分区策略
  • 指明partition(指明是指第几个分区)的情况下,直接将指明的值作为partition的值
  • 没有指明partition的情况下,但是存在值key,此时将keyhash值与topicpartition总数进行取余得到partition
  • 值与partition均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与topic可用的partition总数取余得到partition值,即round-robin算法。
生产者消息发送

在这里插入图片描述

为保证producer发送的数据能够可靠的发送到指定的topic中,topic的每个partition收到producer发送的数据后,都需要向producer发送ackacknowledgement,如果producer收到ack就会进行下一轮的发送,否则重新发送数据

发送ack时机
  • 半数follower同步完成即发送ack,容错率低,延时低
  • 全部follower同步完成完成发送ack,容错率高,延时高

kafka采用的是第二种,延迟对kafka影响比较小。

采用了第二种方案进行同步ack之后,如果leader收到数据,所有的follower开始同步数据,但有一个follower因为某种故障,迟迟不能够与leader进行同步,那么leader就要一直等待下去,直到它同步完成,才可以发送ack,此时需要如何解决这个问题呢?

leader中维护了一个ISR(in-sync replica set)同步副本集,即与leader保持同步的follower集合,当ISR中的follower完成数据的同步之后,给leader发送ack,如果follower长时间没有向leader同步数据,则该follower将从ISR中被踢出,该之间阈值由replica.lag.time.max.ms参数设定。当leader发生故障之后,会从ISR中选举出新的leader。

ack参数
  • 0: producer不等待broker的ack,这一操作提供了最低的延迟,broker接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据
  • 1: producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘)
  • -1(all): producer等待broker的ack,partition的leader和ISR的follower全部落盘成功才返回ack,但是如果在follower同步完成后,broker发送ack之前,如果leader发生故障,会造成数据重复。(这里的数据重复是因为没有收到,所以继续重发导致的数据重复)
高吞吐量,低延迟
  1. kafka会先写入操作系统页缓存中,操作系统再决定将数据写回到磁盘上
  2. 磁盘顺序写,采用追加的方式写入消息
  3. 零拷贝

kafka消息发送,消息暂时暂存的,批量发送RecordAccumulator.class是专门缓存kafka消息的。

spring:kafka:bootstrap-servers: 127.0.0.1:9200,127.0.0.1:9201,127.0.0.1:9202producer: # producer 生产者retries: 0 # 重试次数acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)batch-size: 16384 # 默认 批量大小 16KBbuffer-memory: 33554432 # 默认 生产端缓冲区大小  32MBkey-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

消费者

消费方式

消费者采用pull的方式来从broker中读取数据

push推的模式很难适应消费速率不同的消费者,因为消息发送率是由broker决定的,它的目标是尽可能以最快的速度传递消息,但是这样容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull方式则可以让consumer根据自己的消费处理能力以适当的速度消费消息。

消费流程
  1. 从zookeeper中获取leader的位置和offset的位置,kafka0.9版本之前,consumer默认将offset保存在zookeeper中,从0.9版本之后,consumer默认将offset保存在kafka一个内置的topic中,该topic为__consumer_offsets
  2. 拉取数据,直接从broker的page cache 拉取
  3. 如果page cache数据不全,就会从磁盘中拉取,并发送
  4. 消费完成后,可以手动提交offset,也可以自动提交offset
    在这里插入图片描述
零拷贝

分区分配策略

线上的服务都是多个消费者服务一起消费的,一个topic包含多个partition,分区和消费者存在一个分配的策略,默认采用的是Range范围分配策略

计算公式
n = 分区数/消费者数
m = 分区数%消费者数
前m个消费者,消费n+1个,剩下的消费n个
8个分区(p1 - p8),3个消费者(c1 - c3)
c1 分配 p1 p2 p3
c2 分配 p4 p5 p6
c3 分配 p7 p8

配置参数
spring:kafka:consumer: # consumer消费者group-id: test-group # 默认的消费组IDenable-auto-commit: true # 是否自动提交offsetauto-commit-interval: 1000  # 提交offset延时(接收到消息后多久提交offset)# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500 #一次拉取最大数据 500条
offset提交

默认是自动提交,enable.auto.commit=true
手动提交offset的方法有两种:

  • commitSync:同步提交,失败后会自动重试
  • commitAsync: 异步提交,失败后不会自动重试
重复消费

产生的原因

  1. 生产者重复提交
  2. rebalance引起的重复消费
    超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发rebalance,提交offset失败。其他消费者会从没有提交的位置消费,从而导致重复消费。

解决方案

  • 提高消费速度
    • 增加消费者
    • 多线程处理
    • 异步消费
    • 调整消费处理时间
  • 幂等处理
    • 消费者设置幂等校验
    • 开启kafka幂等配置,生产者开启幂等配置,将消息生成md5,然后保存到redis中,处理新消息的时候先校验。这个尽量不要开启,消耗性能
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    
批量消费配置
@Configuration
@Slf4j
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String consumerGroupId;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;public Map<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>(16);props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {ConcurrentKafkaListenerContainerFactory<String, String>factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerFactory()));// 消费者组中线程数量factory.setConcurrency(3);//  当使用批量监听器时需要设置为truefactory.setBatchListener(true);// 拉取超时时间factory.getContainerProperties().setPollTimeout(3000);// 重试次数RetryingBatchErrorHandler errorHandler = new RetryingBatchErrorHandler(new FixedBackOff(500L, 3L), null);factory.setBatchErrorHandler(errorHandler);return factory;}
}
@KafkaListener(topics = "aloneness-topic02",properties = {"max.poll.records=20"},containerFactory = "containerFactory")
public void listen02(List<String> list) {log.info("处理批量消息:{}", JSON.toJSONString(list));List<Message> messages = JSON.parseArray(JSON.toJSONString(list), Message.class);System.out.println(messages);
}

如果未配置重试次数,也消费代码中出现异常,会一直重试,一直消费异常

手动创建Topic

kafka版本大于2.2

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic als-test-topic02
  • --zookeeper localhost:2181 指定zookeeper集群
  • --partitions 指定分区数
  • --replication-factor 指定分区副本数
重新分配分区副本

声明需要分配的Topic
topic-generate.json

{"topics": [{"topic": "aloneness-topic"}],"version": 1
}

通过 --topics-to-move-json-file 参数,生成分区分配策略 --generate

kafka-reassign-partitions.bat --zookeeper localhost:2181 --topics-to-move-json-file topic-generate.json --broker-list "0,1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"aloneness-topic","partition":0,"replicas":[0],"log_dirs":["any"]}]}Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"aloneness-topic","partition":0,"replicas":[1],"log_dirs":["any"]}]}

通过 --reassignment-json-file 参数,执行分区分配策略 --execute

kafka-reassign-partitions.bat --zookeeper localhost:2181 --reassignment-json-file partition-replica-reassignment.json --execute

相关文章:

Kakfa详解(一)

kafka使用场景 canal同步mysqlelk日志系统业务系统Topic kafka基础概念 Producer: 消息生产者&#xff0c;向kafka发送消息Consumer: 从kafka中拉取消息消费的客户端Consumer Group: 消费者组&#xff0c;消费者组是多个消费者的集合。消费者组之间互不影响&#xff0c;所有…...

图解LeetCode——剑指 Offer 12. 矩阵中的路径

一、题目 给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 单词必须按照字母顺序&#xff0c;通过相邻的单元格内的字母构成&#xff0c;其中“相邻”单元格是那些水平相…...

particles在vue3中的基本使用

第三方库地址 particles.vue3 - npm 1.安装插件 npm i particles.vue3 npm i tsparticles2.在main.js中引入 import Particles from particles.vue3 app.use(Particles) // 配置相关的文件常用 api particles.number.value>粒子的数量particles.number.density粒子的稀密…...

04 Android基础--RelativeLayout

04 Android基础--RelativeLayout什么是RelativeLayout&#xff1f;RelativeLayout的常见用法&#xff1a;什么是RelativeLayout&#xff1f; 相对布局&#xff08;RelativeLayout&#xff09;是一种根据父容器和兄弟控件作为参照来确定控件位置的布局方式。 根据父容器定位 在相…...

python基础命令

1.现在包的安装路径 #pip show 包名 2.pip讲解 相信对于大多数熟悉Python的人来说&#xff0c;一定都听说并且使用过pip这个工具&#xff0c;但是对它的了解可能还不一定是非常的透彻&#xff0c;今天小编就来为大家介绍10个使用pip的小技巧&#xff0c;相信对大家以后管理和…...

用 Real-ESRGAN 拯救座机画质,自制高清版动漫资源

内容一览&#xff1a;Real-ESRGAN 是 ESRGAN 升级之作&#xff0c;主要有三点创新&#xff1a;提出高阶退化过程模拟实际图像退化&#xff0c;使用光谱归一化 U-Net 鉴别器增加鉴别器的能力&#xff0c;以及使用纯合成数据进行训练。 关键词&#xff1a;Real-ESRGAN 超分辨率 视…...

数据结构预备知识(模板)

模板 功能上类比C的重载函数&#xff0c;可以使用一种通用的形式&#xff0c;去代替诸多数据类型&#xff0c;使得使用同一种函数的时候&#xff0c;可以实现对于不同数据类型的相同操作。增强类和函数的可重用性。 使用模板函数为函数或类声明一个一般的模式&#xff0c;使得…...

SWM181按键控制双通道PWM固定占空比输出

SWM181按键控制双通道PWM固定占空比输出&#x1f4cc;SDK固件包&#xff1a;https://www.synwit.cn/kuhanshu_amp_licheng/ &#x1f33c;开发板如下图&#xff1a; ✨注意新手谨慎选择作为入门单片机学习。目前只有一个简易的数据手册和SDK包&#xff0c;又没有参考手册&am…...

pygame函数命令

pygame.mixer.music.load() —— 载入一个音乐文件用于播放 pygame.mixer.music.play() —— 开始播放音乐流 pygame.mixer.music.rewind() —— 重新开始播放音乐 pygame.mixer.music.stop() —— 结束音乐播放 pygame.mixer.music.pause() —— 暂停音乐播放 pygame.mixer.mu…...

异步循环

业务 &#xff1a; 批量处理照片 &#xff0c; 批量拆建 &#xff0c; 裁剪一张照片需要异步执行等待 &#xff0c; 并且是批量 所以需要用到异步循环 裁剪图片异步代码 &#xff1a; 异步循环 循环可以是 普通 for 、 for of 、 for in 不能使用forEach ,这里推荐 for…...

Vue表单提交与数据存储

学习内容来源&#xff1a;视频p5 书接目录对页面重新命名选择组件后端对接测试接口设置接口前端调用对页面重新命名 将之前的 Page1 Page2 进行重新命名&#xff0c;使其具有实际意义 Page1 → BookManage &#xff1b; Page2 → AddBook 并且 /router/index.js 中配置页面信息…...

API网关(接入层之上业务层之上)以及业务网关(后端服务网关)设计思路(二)

文章目录 流量网关业务网关常见网关对比1. OpenResty2. KongKong解决了什么问题Kong的优点以及性能Kong架构3. Zuul1.0过滤器IncomingEndpointOutgoing过滤器类型Zuul 1.0 请求生命周期4. Zuul2.0Zuul 与 Zuul 2 性能对比5. Spring Cloud GatewaySpring Cloud Gateway 底层使用…...

有些笑话,外行人根本看不懂,只有程序员看了会狂笑不止

我一直都觉得我们写代码的程序员与众不同&#xff0c;就连笑话都跟别人不一样。 如果让外行人来看我们一些我们觉得好笑的东西&#xff0c;他们根本不知道笑点在哪里。 不信你来瞧瞧&#xff0c;但凡有看不懂的地方&#xff0c;说明你的道行还不够深。 1.大多数人开始学编程时…...

企业电子招投标采购系统——功能模块功能描述

​ 功能模块&#xff1a; 待办消息&#xff0c;招标公告&#xff0c;中标公告&#xff0c;信息发布 描述&#xff1a; 全过程数字化采购管理&#xff0c;打造从供应商管理到采购招投标、采购合同、采购执行的全过程数字化管理。通供应商门户具备内外协同的能力&#xff0c;为外…...

Presto 在美图的实践

导读&#xff1a;本文的主题是Presto高性能引擎在美图的实践&#xff0c;首先将介绍美图在处理ad-hoc场景下为何选择Presto&#xff0c;其次我们如何通过外部组件对Presto高可用与稳定性的增强。然后介绍在美图业务中如何做到合理与高效的利用集群资源&#xff0c;最后如何利用…...

Molecule:使用Jetpack Compose构建StateFlow流

Molecule:使用Jetpack Compose构建StateFlow流 看下面的jetpack compose片段&#xff1a; Composable fun MessageCard(message: Message) {Column {Text(text message.author)Text(text message.body)} }这段代码最有趣的部分是它实际上是reactive。其反应性为 通过Composa…...

计算机组成原理(2.2)--系统总线

目录 一、总线结构 1.单总线结构 1.1单总线结构框图 ​编辑1.2单总线性能下降的原因 2.多总线结构 2.1双总线结构 2.2三总线结构 2.3四总线结构 ​编辑 二、总线结构举例 1. 传统微型机总线结构 2. VL-BUS局部总线结构 3. PCI 总线结构 4. 多层 PCI 总线结构 …...

如何使用dlinject将一个代码库实时注入到Linux进程中

关于dlinject dlinject是一款针对Linux进程安全的注入测试工具&#xff0c;在该工具的帮助下&#xff0c;广大研究人员可以在不使用ptrace的情况下&#xff0c;轻松向正在运行的Linux进程中注入一个共享代码库&#xff08;比如说任意代码&#xff09;。之所以开发该工具&#…...

Docker安装Cassandra数据库,在SpringBoot中连接Cassandra

简介 Apache Cassandra是一个高度可扩展的高性能分布式数据库&#xff0c;旨在处理许多商用服务器上的大量数据&#xff0c;提供高可用性而没有单点故障。它是NoSQL数据库的一种。首先让我们了解一下NoSQL数据库的作用。 NoSQL 数据库 NoSQL数据库&#xff08;有时称为“Not …...

Linux常用命令总结(建议收藏)

Linux常用命令总结(建议收藏) 这里收集了一些常用命令以便需要时查看&#xff0c;欢迎作补充。&#xff08;这里的提到操作都默认以CentOS系统为基础&#xff09; 文件管理 目录操作 切换目录 cd 查看目录 ls -l 列出文件详细信息 或者直接ll-a 列出当前目录下所有文件及…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作&#xff1a; 1&#xff09;、切换集群 2&#xff09;、切换节点 3&#xff09;、切换到 apparmor 的目录 4&#xff09;、执行 apparmor 策略模块 5&#xff09;、修改 pod 文件 6&#xff09;、…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

在rocky linux 9.5上在线安装 docker

前面是指南&#xff0c;后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

PL0语法,分析器实现!

简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

k8s业务程序联调工具-KtConnect

概述 原理 工具作用是建立了一个从本地到集群的单向VPN&#xff0c;根据VPN原理&#xff0c;打通两个内网必然需要借助一个公共中继节点&#xff0c;ktconnect工具巧妙的利用k8s原生的portforward能力&#xff0c;简化了建立连接的过程&#xff0c;apiserver间接起到了中继节…...

Element Plus 表单(el-form)中关于正整数输入的校验规则

目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入&#xff08;联动&#xff09;2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲

文章目录 前言第一部分&#xff1a;体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分&#xff1a;体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...