Kafka生产问题总结及性能优化实践
1、消息丢失情况
消息发送端:
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。
消息消费端:
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。
2、消息重复消费
消息发送端:
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息。
消息消费端:
如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理。
一般消费端都是要做消费幂等处理的。
3、消息乱序
如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送端到消费端全链路有序。
kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。
4、消息积压
1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。
2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。
此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。
5、延时队列
延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :
1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延时队列来处理这些订单了。
2)订单完成1小时后通知用户进行评价。
实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。
6、消息回溯
如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费,参见上节课的内容。
7、分区数越多吞吐量越高吗
可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量
# 往test里发送一百万消息,每条设置1KB
# throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间
bin/kafka‐producer‐perf‐test.sh ‐‐topic test ‐‐num‐records 1000000 ‐‐record‐size 1024 ‐‐throughput ‐1
‐‐producer‐props bootstrap.servers=192.168.65.60:9092 acks=1
网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。
当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"。
异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535
8、消息传递保障
at most once(消费者最多收到一次消息,0-1次):acks = 0 可以实现。
at least once(消费者至少收到一次消息,1-多次):ack = all 可以实现。
exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实
现。
kafka生产者的幂等性::因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。
具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和
Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。
PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会生成新的PID。
Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。
9、kafka的事务
Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。
kafka的事务处理可以参考官方文档:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
//初始化事务
producer.initTransactions();try {
//开启事务
producer.beginTransaction();
for (int i = 0; i < 100; i++){
//发到不同的主题的不同分区
producer.send(new ProducerRecord<>("hdfs‐topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("es‐topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("redis‐topic", Integer.toString(i), Integer.toString(i)));
}
//提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
//回滚事务
producer.abortTransaction();
}
producer.close();
10、kafka高性能的原因
磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,
不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
数据传输的零拷贝。
读写数据的批量batch处理以及压缩传输。
相关文章:

Kafka生产问题总结及性能优化实践
1、消息丢失情况 消息发送端: (1)acks0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高&am…...

[MySQL]数据库原理2,Server,DataBase,Connection,latin1、UTF-8,gb2312,Encoding,Default Collation——喵喵期末不挂科
希望你开心,希望你健康,希望你幸福,希望你点赞! 最后的最后,关注喵,关注喵,关注喵,佬佬会看到更多有趣的博客哦!!! 喵喵喵,你对我真的…...
【算法集训】基础数据结构:十、矩阵
矩阵其实就是二维数组,这些题目在9日集训中已经做过,这里做的方法大致相同。 第一题 1351. 统计有序矩阵中的负数 int countNegatives(int** grid, int gridSize, int* gridColSize) {int r gridSize;int c gridColSize[0];int ret 0;for(int i 0;…...

python排序算法 直接插入排序法和折半插入排序法
最近需要使用到一些排序算法,今天主要使针对直接插入排序和折半插入排序进行讲解。 首先是直接插入排序,其排序过程主要是,针对A[a1,a2,a3,a4,a5....an],从排序的序列头部起始位置开始,将其也就是a1视为只有一个元素的…...

【flutter对抗】blutter使用+ACTF习题
最新的能很好反编译flutter程序的项目 1、安装 git clone https://github.com/worawit/blutter --depth1 然后我直接将对应的两个压缩包下载下来(通过浏览器手动下载) 不再通过python的代码来下载,之前一直卡在这个地方。 如果读者可以正…...

OpenHarmony 如何去除系统锁屏应用
前言 OpenHarmony源码版本:4.0release / 3.2 release 开发板:DAYU / rk3568 一、3.2版本去除锁屏应用 在源码根目录下:productdefine/common/inherit/rich.json 中删除screenlock_mgr组件的编译配置,在rich.json文件中搜索th…...

Python - 搭建 Flask 服务实现图像、视频修复需求
目录 一.引言 二.服务构建 1.主函数 upload_gif 2.文件接收 3.专属目录 4.图像修复 5.gif2mp4 6.mp42gif 7.图像返回 三.服务测试 1.服务启动 2.服务调用 四.总结 一.引言 前面我们介绍了如何使用 Real-ESRGAN 进行图像增强并在原始格式 jpeg、jpg、mp4 的基础上…...
C#基础——构造函数、析构函数
C#基础——构造函数、析构函数 1、构造函数 构造函数是一种特殊的方法,用于在创建类的实例时进行初始化操作。构造函数与类同名,并且没有返回类型。 构造函数在对象创建时自动调用,可以用来设置对象的初始状态、分配内存、初始化字段等操作…...

jmeter 如何循环使用接口返回的多值?
有同学在用jmeter做接口测试的时候,经常会遇到这样一种情况: 就是一个接口请求返回了多个值,然后下一个接口想循环使用前一个接口的返回值。 这种要怎么做呢? 有一定基础的人,可能第一反应就是先提取前一个接口返回…...

VLAN 详解一(VLAN 基本原理及 VLAN 划分原则)
VLAN 详解一(VLAN 基本原理及 VLAN 划分原则) 在早期的交换网络中,网络中只有 PC、终端和交换机,当某台主机发送一个广播帧或未知单播帧时,该数据帧会被泛洪,甚至传递到整个广播域。而广播域越大ÿ…...
Android - 分区存储 MediaStore、SAF
官方页面 参考文章 一、概念 分区存储(Scoped Storage)的推出是针对 APP 访问外部存储的行为(乱建乱获取文件和文件夹)进行规范和限制,以减少混乱使得用户能更好的控制自己的文件。 公有目录被分为两大类:…...
Shiro框架权限控制
首先去通过配置类的用户认证,在用户认证完成后,进行用户授权,用户通过授权之后再跳转其他的界面时,会进行一个验证,当前账号是否有权限。 前端权限控制显示的原理 在前端中,通常使用用户的角色或权限信息来…...

centOS7 安装tailscale并启用子网路由
1、在centOS7上安装Tailscale客户端 #安装命令所在官网位置:https://tailscale.com/download/linux #具体命令为: curl -fsSL https://tailscale.com/install.sh | sh #命令执行后如下图所示2、设置允许IP转发和IP伪装。 安装后,您可以启动…...
spring 项目中如何处理跨越cors问题
1.使用 CrossOrigin 注解 作用于controller 方法上 示例如下 RestController RequestMapping("/account") public class AccountController {CrossOriginGetMapping("/{id}")public Account retrieve(PathVariable Long id) {// ...}DeleteMapping(&quo…...
importlib --- import 的实现
3.1 新版功能. 源代码 Lib/importlib/__init__.py 概述 importlib 包具有三重目标。 一是在 Python 源代码中提供 import 语句的实现(并且因此而扩展 __import__() 函数)。 这提供了一个可移植到任何 Python 解释器的 import 实现。 与使用 Python 以…...

【PyTorch】现代卷积神经网络
文章目录 1. 理论介绍1.1. 深度卷积神经网络(AlexNet)1.1.1. 概述1.1.2. 模型设计 1.2. 使用块的网络(VGG)1.3. 网络中的网络(NiN)1.4. 含并行连结的网络(GoogLeNet)1.5. 批量规范化…...
用python编写九九乘法表
1 问题 我们在学习一门语言的过程中,都会练习到编写九九乘法表这个代码,下面介绍如何编写九九乘法表的流程。 2 方法 (1)打开pycharm集成开发环境,创建一个python文件,并编写第一行代码,主要构建…...

Google Gemini 模型本地可视化
Google近期发布了Gemini模型,而且开放了Gemini Pro API,Gemini Pro 可免费使用! Gemini Pro支持全球180个国家的38种语言,目前接受文本、图片作为输入并生成文本作为输出。 Gemini Pro的表现超越了其他同类模型,当前版…...
数据修复:.BlackBit勒索病毒来袭,安全应对方法解析
导言: 黑色数字罪犯的新玩具——.BlackBit勒索病毒,近来成为网络安全领域的头号威胁。这种恶意软件以其高度隐秘性和毁灭性而引起广泛关注。下面是关于.BlackBit勒索病毒的详细介绍,如不幸感染这个勒索病毒,您可添加我们的技术服…...
拓扑排序实现循环依赖判断 | 京东云技术团队
本文记录如何通过拓扑排序,实现循环依赖判断 前言 一般提到循环依赖,首先想到的就是Spring框架提供的Bean的循环依赖检测,相关文档可参考: https://blog.csdn.net/cristianoxm/article/details/113246104 本文方案脱离Spring Be…...
C++:std::is_convertible
C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...

Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...

EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...

Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
LRU 缓存机制详解与实现(Java版) + 力扣解决
📌 LRU 缓存机制详解与实现(Java版) 一、📖 问题背景 在日常开发中,我们经常会使用 缓存(Cache) 来提升性能。但由于内存有限,缓存不可能无限增长,于是需要策略决定&am…...