当前位置: 首页 > news >正文

防止消息丢失与消息重复——Kafka可靠性分析及优化实践

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle
Kafka是什么,以及如何使用SpringBoot对接Kafka
架构必备能力——kafka的选型对比及应用场景
Kafka存取原理与实现分析,打破面试难关


在这里插入图片描述
在上一章内容中,我们解析了Kafka在读写层面上的原理,介绍了很多Kafka在读出与写入时的各种设计,初步理解了Kafka大吞吐量的原因,本期我们将带领大家从另一个角度,即从可靠性方面来分析Kafka的机制与原理

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待

一、可靠性的考量角度

其实我们在《RabbitMQ 能保证消息可靠性吗》 一文中,我们已经阐述了对于MQ类主键,可靠性应该从哪些角度去判断。总结下来其实就是:

  • 消息不会意外丢失
  • 消息不会重复传递

那么本次我们也将从这两方面来看看 Kafka 都做了哪些工作来提升可靠性。

二、分区副本

1. 分区副本的含义

在这里插入图片描述

我们之前了解到:一个Kafka的主题被分为了若干个分区,每个分区都是一个有序的消息队列。如上图,我们就把topicA分成了1\2\3\4 四个分区(实线圆柱),但是我们还看到了更多的“虚线圆柱”,这些其实是 1\2\3\4 分区的副本,在Kafka中,每个分区都有多个副本。副本是指一个分区在其他Broker上的备份副本的作用是提高消息的可靠性和容错性,一旦某个Broker宕机,其他Broker上的副本就可以接替宕机的Broker继续提供服务

我们可以看到不同的Broker之间。互相存储着对方的副本分区,比如Partition1 存在Broker1 上,但Partition1 的副本可以放在Broker2 ,同理 Partition2 的副本也可以放在Broker1 ,如果我们专注于一个分区,那么其情况如下:

在这里插入图片描述

为了区分,我们一般把“实线圆柱”的分区称为 Leader,“虚线圆柱”的分区称为FollowerLeader副本负责读写请求,而Follower副本则只负责复制Leader副本的数据

2. AR 与 ISR机制

上面我们讲了副本分区的Follower,所有副本的合集统称为AR(Assigned Replicas),但是不同副本和Leader到底一致性有多高呢?会不会出现有的副本同步得及时,有的副本因为网络原因同步得很慢呢? 这里又引出了一个重要的概念叫做ISR(In-Sync Replica)机制。

ISR,是一个机制,也代表着一个同步合集,顾名思义,它包含着所有处于同步状态的副本。当一个副本和Leader副本的差距超过一定程度时,这个副本就会被认为是不同步的,不再被加入到ISR中。也因此,Kafka中的 ISR 并不是一直不变的

在这里插入图片描述

那么,既然ISR是动态的,那哪些副本会被包含在ISR中呢?

其实,其主要依据就是 副本需要保证能够及时地接收并复制Leader副本的消息,也就是需要保证与leader副本的消息同步延迟在一定的时间范围内(默认情况下是10秒钟,由参数 replica.lag.time.max.ms 控制)。

在这里插入图片描述

换而言之,因为分区与ISR机制,我们的消息一旦被Kafka 接收后,就会复制多份并很快落盘。这意味着,即使某一台Broker节点宕机乃至硬盘损毁,也不会导致数据丢失。

三、ACKS设置

如果说备份机制是保障消息不会在Kafka服务器丢失,那么消息丢失的另一个重要原因就是消息在发送中丢失。

这种场景下,我们就需要利用消息确认机制了,此时我们也会利用到ISR,比如我们在发送消息时,可以通过设置ACK的值,来决定同步的情况:

  • acks=0,如果设置为零,则生产者根本不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区,并被视为已发送。在这种情况下,不能保证服务器已经收到记录,重试配置也不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量将始终设置为-1。

  • acks=1,这意味着领导者会将记录写入其本地日志,但不会等待所有追随者的完全确认。在这种情况下,如果领导者在确认记录后立即失败,但在追随者复制之前,记录将丢失。

  • acks=all,这意味着leader将等待所有ISR来确认记录。这保证了只要至少有一个副本处于ISR内,记录就不会丢失。这是最有力的保证。这相当于acks=-1的设置。

当我们设置acks=all 或者 -1 的时候, broker 的 min.insync.replicas 参数起作用(一个典型的场景是,topic 有 3个 副本,客户端设置 acks = -1,服务端设置 topic level 的 min.insync.replicas = 2,这样至少有 2 个副本写入后,broker 才会返回;但是如果 topic 只有 1 个副本,而 acks = all,min.insync.replicas = 2,就会报 NOT_ENOUGH_REPLICAS 错误);

在这里插入图片描述

四、重试机制

发消息不可能万无一失,当Kafka在发送或接收消息时发生错误时,可以通过重试来解决这些问题,提高系统的可靠性。这点不用多说,那么在生产端我们可以配置哪些重试的参数呢?

  • retries:重试次数,默认为0,表示不启用重试机制,但建议开启

  • retry.backoff.ms:每次重试的时间间隔,默认为100ms。

  • retry.buffer.records:每个分区的缓冲区中可以存储的最大重试消息数。

在这里插入图片描述

在实际应用中,一些小故障是可以通过重试来解决的,但是重试次数过多也会增加网络通信的负担,甚至会导致消息堵塞。所以建议将其设置为1或2,这样可以在第一次发送失败后进行重试,从而提高消息的可靠性。但是,如果网络状况很差,或者需要处理重要的消息,可以适当增加retries的值。

五、幂等性设计

如果你仔细观察上面的ACKS的设置,相信你会发现,这并不完美:如果你将ACKs设置为-1(all),可以保证Producer到Server之间不会丢失数据,即At Least Once(最少一次),但不能保证数据不重复。而如果你将ACKs级别设置为0(不需要等写log),则可以保证生产者每条消息只会被发送一次,即At Most Once(最多一次),但不能保证不会发生数据丢失。

难道就没有一种既不会丢失,也不会重复的方案吗?其实是有的,这个时候我们可以使用 ‘ack = all’ + 幂等性来解决,而开启幂等性 ,即设置 enable.idempotence = true

请注意,启用幂等性要求
max.in.flight.requests.per.connection小于或等于5(为任何允许的值保留消息顺序),
retries重试次数大于0,
acks必须为“all”。
PS:我发现不少文章把开启幂等性参数写成 enable.idompotence,不知是笔误还是什么原因

开启幂等性的意义在于生产者将确保在流中每条消息正好只会发送一次,那么它是怎么实现的呢?Kafka生产者的幂等性算法主要包括以下三个方面的实现:

  1. 生产者编号: Producer在初始化的时候(只有初始化的时候会随机生成PID)会被分配一个PID(producerid),
  2. 分区编号:不同的分区有自己的paritionid(即分区号),
  3. 序列号:发往同一Partition的消息会附带Sequence Number(即发送数据的编号,代表着向分区发送的第几条消息),这样<PID, PartitionID, SeqNumber>就相当于构成了一个主键。Broker端会对<PID, PartitionID, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。

六、消费偏移量

前面说了很多,都是生产者与Kafka的,其实对于消费者,Kakfa也有同步提交偏移量的设计。在 Kafka 的消费者 API 中,同步提交偏移量可以通过设置 enable.auto.commit 参数为 false ,然后在消费者应用中手动控制提交偏移量的时机来实现。

具体来说,可以使用 commitSync() 方法提交偏移量,该方法会一直阻塞直到提交偏移量成功,或抛出异常。示例代码如下:

Properties props = new Properties();
// 设置 Kafka 集群地址等配置信息
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 禁止自动提交偏移量
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息}consumer.commitSync(); // 手动提交偏移量}
} finally {consumer.close();
}

通过这种方式,可以确保消息在被消费者处理后再提交偏移量,从而避免了消息丢失或者重复处理的问题。同时,手动提交偏移量也提高了消费者的可控性,可以根据自身情况自由地设置提交偏移量的时机。

七、可靠性不足分析

尽管我们在上面讲了一些,Kafka为了实现可靠性而做的设计,一般情况下,这种程度的可靠性足以应付了。但在实际应用过程中,Kafka仍然可能会面临以下几个可靠性问题:

  1. 生产者重复发送:尽管开启了幂等性,但不要忘记幂等性设置仅表示生产者对同一个分区的消息的写入是有序的、幂等的,如果producer挂了,重启之后,producer会重新生成producerid,此时幂等性校验就不准了。
  2. 消费者重复消费:如果消费者在提交偏移量前宕机了,将导致Kafka认为该消息没有被消费,在消费者重启后,又会消费该消息,导致重复消费。

这些情况,Kafka自身已经无法解决。我们的解决策略只能契合在我们的业务处理上,目前一个通用的方案是全局性ID+生产/消费两端校验:
在这里插入图片描述

我们可以先根据业务对消息进行去重,然后使用诸如雪花算法等方案为每一条去重后的消息生产全局性的唯一ID,并在发送和消费之前在redis或其他较快的存储件中进行标记,这样当发生重复发送/消费时,就能及时发现了。此时你可以选择放弃本次发送/消费,也可以将该异常情况上报,由人工来进行检查与处理

总结

本次我们对Kafka的可靠性进行了分析和优化实践。一般来说,我们可以通过设置acks、开启幂等性,消费端手动提交偏移量等方式来保证可靠性,也足以应付大部分场景。而且实际应用过程中,还可以配合全局ID等手段完善可靠场景。当然,架构服务于业务需求,所以最终还是需要结合具体的业务需求和场景来选择合适的部署方式和配置参数,在后面我们还会继续进行Kafka的深入解析,如果你对此有兴趣,可以直接订阅本 kafka 专栏

相关文章:

防止消息丢失与消息重复——Kafka可靠性分析及优化实践

系列文章目录 上手第一关&#xff0c;手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么&#xff0c;以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析&#xff0c;打破面试难关 防止消息丢失与消息重复——Kafka可…...

【Linux】Linux中Crontab(定时任务)命令详解及使用教程

文章目录 前言1.使用yum命令安装Crontab&#xff1a;2.查看Crontab状态&#xff1a;3.添加定时任务&#xff1a;4.查看任务列表&#xff1a;5.Crontab相关命令&#xff1a;6.部分脚本无法执行问题&#xff1a;7.Crontab默认调度任务&#xff1a;8.注意清理系统用户的邮件日志&a…...

计算机毕设 flink大数据淘宝用户行为数据实时分析与可视化

文章目录 0 前言1、环境准备1.1 flink 下载相关 jar 包1.2 生成 kafka 数据1.3 开发前的三个小 tip 2、flink-sql 客户端编写运行 sql2.1 创建 kafka 数据源表2.2 指标统计&#xff1a;每小时成交量2.2.1 创建 es 结果表&#xff0c; 存放每小时的成交量2.2.2 执行 sql &#x…...

8.2 矢量图层点要素单一符号使用一

文章目录 前言单一符号&#xff08;Single symbol&#xff09;渲染简单标记(Simple Marker)QGis代码实现 SVG标记&#xff08;SVG marker&#xff09;QGis代码实现 总结 前言 上一篇教程对矢量图层符号化做了一个整体介绍&#xff0c;并以点图层为例介绍了可以使用的渲染器&am…...

SQL企业微信群机器人消息推送

--参考资料地址 --微软官方地址: https://learn.microsoft.com/zh-cn/sql/relational-databases/system-stored-procedures/ole-automation-stored-procedures-transact-sql?view=sql-server-ver16 --腾讯官方地址:https://developer.work.weixin.qq.com/ --使…...

vscode远程连接ubuntu

修改环境变量&#xff0c;改使用git自带的ssh工具 openssh: C:\Windows\System32\OpenSSH\ssh.exeGit ssh: C:\Program Files\Git\usr\bin\ssh.exe vscode安装插件remote-ssh 重开软件&#xff0c;在左侧拓展入口下方&#xff0c;进入远程资源管理器 点击设置&#xff0c;进…...

Positive Technologies 在迪拜宣布与地区网络安全解决方案提供商开展合作

在中东最大的信息技术展 GITEX GLOBAL 2023 的间隙&#xff0c;Positive Technologies 同意与八家组织&#xff08;网络安全服务和解决方案提供商&#xff09;合作&#xff0c;在该地区开展合作&#xff0c;推广最先进的产品&#xff0c;并分享信息安全领域的经验。该公司强调了…...

Pyside6 QTextEdit

Pyside6 QTextEdit QTextEdit使用QTextEdit常用函数文本编辑类函数文本框格式设置函数设置文字颜色设置文字背景颜色设置文字格式设置文本框样式程序设置界面设置 QTextEdit信号textChanged信号 完整程序界面程序主程序 QTextEdit类提供了一个用于编辑和显示纯文本和富文本的组…...

Hadoop核心机制详细解析

Hadoop核心机制详细解析 Hadoop的核心机制是通过HDFS文件系统和MapReduce算法进行存储资源、内存和程序的有效利用与管理。在现实的实例中&#xff0c;通过Hadoop&#xff0c;可以轻易的将多台普通的或低性能的服务器组合成分布式的运算-存储集群&#xff0c;提供大数据量的存…...

Chromium源码由浅入深(一)

工作中需要对Chromium源码、尤其是源码中图形部分进行深入研究&#xff0c;所以借此机会边学习边写文章&#xff0c;分享一下我的实时学习研究Chromium源码的由浅入深的过程。 闲言少叙&#xff0c;书归正传。 通过命令行启动Chrome浏览器&#xff0c;命令及结果如下&#xf…...

Spring Authorization Server 1.1 扩展 OAuth2 密码模式与 Spring Cloud Gateway 整合实战

目录 前言无图无真相创建数据库授权服务器maven 依赖application.yml授权服务器配置AuthorizationServierConfigDefaultSecutiryConfig 密码模式扩展PasswordAuthenticationTokenPasswordAuthenticationConverterPasswordAuthenticationProvider JWT 自定义字段自定义认证响应认…...

UE4 UltraDynamicSky 天气与水体交互

最上面的Lerp的A通道为之前的水面效果&#xff0c;B是做的冰面效果 用Dynamic_Landscape_Weather_Effects的BaseColor的R通道四舍五入作为Lerp的Alpha值 使用一张贴图&#xff0c;乘以RadialGradientExponential对材质边缘做弱化&#xff0c;RadialGradientExponential的Raid…...

Liunx 实时调度策略 SCHED_RR SCHED_FIFO 区别 适用情况

SCHED_RR SCHED_FIFO 适用情况 SCHED_FIFO 先进先出调度。只能在静态优先级高于0的情况下使用&#xff0c;这意味着当 SCHED_FIFO 线程变得可运行时&#xff0c;它总是立即抢占当前正在运行的任何 SCHED_OTHER、SCHED_BATCH 或 SCHED_IDLE 线程。SCHED_FIFO 线程一直运行到被…...

mac上使用虚拟机vm, 里面的镜像挂起会占用电脑的内存吗, 挂起和关机的区别是什么, 会影响正常电脑的内存和硬盘使用吗

解释 在Mac&#xff08;或任何其他操作系统&#xff09;上使用虚拟机&#xff08;如VMware Fusion、Parallels Desktop、VirtualBox等&#xff09;时&#xff0c;“挂起”&#xff08;Suspend&#xff09;和“关机”&#xff08;Power Off或Shut Down&#xff09;是两种不同的虚…...

AIGC时代 浪潮信息积极推动存储产品创新

近几年&#xff0c;AIGC的兴起&#xff0c;进一步驱动了全闪、混闪等存储产品的创新&#xff0c;也为市场带来了新的机遇&#xff0c;对于厂家而言&#xff0c;也需要升级存储产品的容量、性能及功能&#xff0c;方能满足场景诉求。对此&#xff0c;浪潮信息面向AIGC应用场景打…...

【PG】PostgreSQL字符集

目录 设置字符集 1 设置集群默认的字符集编码 2 设置数据库的字符集编码 查看字符集 1 查看数据字符集编码 2 查看服务端字符集 3 查看客户端字符集 4 查看默认的排序规则和字符分类 被支持的字符集 PostgreSQL里面的字符集支持你能够以各种字符集存储文本&#xff0c…...

力扣:137. 只出现一次的数字 II(Python3)

题目&#xff1a; 给你一个整数数组 nums &#xff0c;除某个元素仅出现 一次 外&#xff0c;其余每个元素都恰出现 三次 。请你找出并返回那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法且使用常数级空间来解决此问题。 来源&#xff1a;力扣&#xff08;Lee…...

orb-slam3编译手册(Ubuntu20.04)

orb-slam3编译手册&#xff08;Ubuntu20.04&#xff09; 一、环境要求1.安装git2.安装g3.安装CMake4.安装vi编辑器 二、源代码下载三、依赖库下载1.Eigen安装2.Pangolin安装3.opencv安装4.安装Python & libssl-dev5.安装boost库 三、安装orb-slam3四、数据集下载及测试 写在…...

升级 Xcode 15模拟器 iOS 17.0 Simulator(21A328) 下载失败

升级 IDE Xcode 15 后本地模拟器 Simulator 全被清空,反复重新尝试 Get 下载频频因网络异常断开而导致失败 ... 注:通过 Get 方式下载一定要保证当前网络环境足够平稳,网络环境不好的情况下该方法几乎成不了 解决办法 Get 方式行不通可以尝试通过 官网 途径先下载 模拟器安装包…...

PHP 函数、PHP 简单后门

函数 基本结构 语法结构 function 函数名(形式参数1,形式参数2...){//函数体return 返回值 }定义并执行一个简单函数 // funtion.phpfunction test(){echo "This is function ".__FUNCTION__; }test();函数传参 // function.phpfunction add($x, $y){$sum $x …...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

Robots.txt 文件

什么是robots.txt&#xff1f; robots.txt 是一个位于网站根目录下的文本文件&#xff08;如&#xff1a;https://example.com/robots.txt&#xff09;&#xff0c;它用于指导网络爬虫&#xff08;如搜索引擎的蜘蛛程序&#xff09;如何抓取该网站的内容。这个文件遵循 Robots…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf

FTP 客服管理系统 实现kefu123登录&#xff0c;不允许匿名访问&#xff0c;kefu只能访问/data/kefu目录&#xff0c;不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...

C++.OpenGL (20/64)混合(Blending)

混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...

Selenium常用函数介绍

目录 一&#xff0c;元素定位 1.1 cssSeector 1.2 xpath 二&#xff0c;操作测试对象 三&#xff0c;窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四&#xff0c;弹窗 五&#xff0c;等待 六&#xff0c;导航 七&#xff0c;文件上传 …...