当前位置: 首页 > news >正文

Kafka系列——详解如何使用和配置生产者实现可靠的消息发送

在可靠的系统里使用生产者

即使我们尽可能把 broker 配置得很可靠,但如果没有对生产者进行可靠性方面的配置,整个系统仍然有可能出现突发性的数据丢失。

比如下面的两个例子:
(一)为 broker 配置了 3 个副本,并且禁用了不完全首领选举,这样应该可以保证万无一失。我们把生产者发送消息的** acks 设为 1**(只要首领接收到消息就可以认为消息写入成功)。

  1. 生产者发送一个消息给首领,首领成功写入,但跟随者副本还没有接收到这个消息。
  2. 首领向生产者发送了一个响应,告诉它“消息写入成功”,然后它崩溃了,而此时消息还没有被其他副本复制过去。
  3. **此时另外两个副本此时仍然被认为是同步的(**毕竟判定一个副本不同步需要一小段时间),而且其中的一个副本成了新的首领。
  4. 因为消息还没有被写入这个副本,所以就丢失了,但发送消息的客户端却认为消息已成功写入。因为消费者看不到丢失的消息,所以此时的系统(从消费者角度来看)仍然是一致的(因为副本没有收到这个消息,所以消息不算已提交),但从生产者角度来看,它丢失了一个消息

(二)为 broker 配置了 3 个副本,并且禁用了不完全首领选举。把生产者的 acks 设为 all。

  1. 假设现在往 Kafka 发送消息,分区的首领刚好崩溃,新的首领正在选举当中,Kafka 会向生产者返回“首领不可用”的响应。
  2. 在这个时候,如果生产者没能正确处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失。

虽然这算不上是 broker 的可靠性问题,因为 broker 并没有收到这个消息。这也不是一致性问题,因为消费者并没有读到这个消息。

但如果生产者没能正确处理这些错误,就将丢失掉这些消息

那么,我们该如何避免这些问题呢?从上面两个例子可以看出,每个使用 Kafka的开发人员都要注意两件事情。

  • 根据可靠性需求配置恰当的 acks 值
  • 在参数配置和代码里正确处理错误

发送确认

生产者可以选择以下 3 种不同的确认模式。

acks=0

意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka。
在这种情况下还是有可能发生错误,比如发送的对象无法被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。
即使是在发生完全首领选举的情况下,这种模式仍然会丢失消息因为在新首领选举过程中它并不知道首领已经不可用了
在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,一定会丢失一些消息

acks=1

意味着首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。

在这个模式下,如果发生正常的首领选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送消息,最终消息会安全到达新的首领那里。

不过在这个模式下仍然有可能丢失数据比如消息已经成功写入首领,但在消息被复制到跟随者副本之前首领发生崩溃

acks=all

意味着首领在返回确认或错误响应之前,会等待所有同步副本都收到消息。

如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到消息。
这是最保险的做法——生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
可以通过使用异步模式和更大的批次来加快速度,但这样做通常会降低吞吐量

配置生产者的重试参数

生产者需要处理的错误包括两部分:一部分是生产者可以自动处理的错误,还有一部分是需要开发者手动处理的错误。
如果 broker 返回的错误可以通过重试来解决,那么生产者会自动处理这些错误
生产者向 broker 发送消息时,broker 可以返回一个成功响应码或者一个错误响应码。
错误响应码可以 分为两种,一种是在重试之后可以解决的,还有一种是无法通过重试解决的。

例如,如果 broker 返回的是 LEADER_NOT_AVAILABLE 错误,生产者可以尝试重新发送消息。也许在这个时 候一个新的首领被选举出来了,那么这次发送就会成功。也就是说,LEADER_NOT_AVAILABLE 是一个可重试错误。
另一方面,如果 broker 返回的是 INVALID_CONFIG 错误,即使通过重试 也无法改变配置选项,所以这样的重试是没有意义的。这种错误是不可重试错误

一般情况下,如果你的目标是不丢失任何消息,那么最好让生产者在遇到可重试错误时能够保持重试。为什么要这样?因为像首领选举或网络连接这类问题都可以在几秒钟之内得到解决,如果让生产者保持重试,你就不需要额外去处理这些问题了

经常会有人 问:“为生产者配置多少重试次数比较好?”
这个要看你在生产者放弃重试并抛出异常之后想做些什么
如果你想抓住异常并再多重试几次,那么就可以把重试次数设置得多一 点,让生产者继续重试;如果你想直接丢弃消息,多次重试造成的延迟已经失去发送消息1的意义;
如果你想把消息保存到某个地方然后回过头来再继续处理,那就可以停止重试。
Kafka 的跨数据中心复制工具默认会进行无限制的重试(例如 retries=MAX_INT)。作为一个具有高可靠性的复制工具,它决不会丢失消息。

要注意,重试发送一个已经失败的消息会带来一些风险,如果两个消息都写入成功,会导致消息重复
例如,生产者因为网络问题没有收到 broker 的确认,但实际上消息已经写入 成功,生产者会认为网络出现了临时故障,就重试发送该消息(因为它不知道消息已经写 入成功)。在这种情况下,broker 会收到两个相同的消息。
重试和恰当的错误处理可以保 证每个消息“至少被保存一次”,但0.10.0版本Kafka无法保证每个消息“只被保存一次”。
现实中的很多应用程序在消息里加入唯一标识符,用于检测重复消息,消费者在读取消息时可以对它们进行清理。还要一些应用程序可以做到消息的“幂等”

额外的错误处理

使用生产者内置的重试机制可以在不造成消息丢失的情况下轻松地处理大部分错误,不过对于开发人员来说,仍然需要处理其他类型的错误,包括:

  • 不可重试的 broker 错误,例如消息大小错误、认证错误等;
  • 在消息发送之前发生的错误,例如序列化错误;
  • 在生产者达到重试次数上限时或者在消息占用的内存达到上限时发生的错误。

这些错误处理 器的代码逻辑与具体的应用程序及其目标有关

  • 丢弃“不合法的消息”?
  • 把错误记录下来?
  • 把这些消息保存在本地磁盘上?
  • 回调另一个应用程序?

具体使用哪一种逻辑要根据具体的架构来决定。只要记住,如果错误处理只是为了重试发送消息,那么最好还是使用生产者内置的重试机制

参考这里的

相关文章:

Kafka系列——详解如何使用和配置生产者实现可靠的消息发送

在可靠的系统里使用生产者 即使我们尽可能把 broker 配置得很可靠,但如果没有对生产者进行可靠性方面的配置,整个系统仍然有可能出现突发性的数据丢失。 比如下面的两个例子: (一)为 broker 配置了 3 个副本&#xff0…...

wordpres漏洞扫描器——wpscan

WordPress 使用PHP语言开发的博客平台 WordPress是使用PHP语言开发的博客平台,用户可以在支持PHP和MySQL数据库的服务器上架设属于自己的网站。也可以把 WordPress当作一个内容管理系统(CMS)来使用。 WordPress是一款个人博客系统&#xff0c…...

代码随想录_二叉树_leetcode112、113

leetcode112 路径总和 112. 路径总和 给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 。判断该树中是否存在 根节点到叶子节点 的路径,这条路径上所有节点值相加等于目标和 targetSum 。如果存在,返回 true ;否则,返…...

mongo-db相关方法

一、参数 名称描述db.adminCommand()针对admin数据库运行命令。db.aggregate()运行不需要基础集合的管理/诊断管道。db.cloneDatabase(hostname)不推荐使用。当针对MongoDB 4.0或更早版本运行时,将数据库从远程主机复制到当前主机。针对MongoDB 4.2或更高版本运行时…...

《Vue3实战》 第二章 创建项目和目录结构

1、创建项目 1.1、命令格式:vue create 项目名称 vue create vue3_example0011.2、运行项目 npm run serve1.2.1、增加run命令 启动时想修改命令,例如: npm run dev1、找到项目根路径下的package.json文件; 2、找到【scripts…...

13433元!上海一季度平均薪酬出炉!你拖后腿了吗?(附招聘岗位)

2023年第一季度智联招聘, 发布《中国企业招聘薪酬报告》, 显示上海平均招聘薪酬为 13433元/月!!! 13433元/月!!! 13433元/月!!! ☟ ☟ ☟ 同…...

leetcode剑指 Offer 16. 数值的整数次方

题目描述解题思路执行结果leetcode .题目描述 实现 pow(x, n) ,即计算 x 的 n 次幂函数(即,xn)。不得使用库函数,同时不需要考虑大数问题。 示例 1: 输入:x 2.00000, n 10 输出:1…...

漏洞挖掘相关-信息收集

一、常见端口以及漏洞 1.FTP:文件传输协议 TCP端口20、21,20用于传输数据,21用于传输控制信息 (1) ftp基础爆破: owasp的Bruter,hydra以及msf中的ftp爆破模块。 (2) ftp匿名访问:用户名: anonymous密码:为空或者任意邮箱 (3) vsftpd后门: …...

海外分支如何加速访问国内总部办公系统?海域网发布 Sea-WAN解决方案

近年来,一大批优秀的中国企业走向世界,品牌越来越响亮,海外影响力越来越大,比如名创优品,国货之光“花西子”,安科创新等,很多企业在海外设立分支机构为当地客户服务,与此同时&#…...

js设计模式——责任链模式

一、概述 责任链是一种行为设计模式,它允许将请求沿着处理链传递,直到有一个处理器可以处理该请求。在这种模式中,每个处理器都有机会处理请求,如果没有一个处理器能够处理请求,那么请求最终将被忽略。这种模式可以帮…...

接口组成更新

接口组成更新概述: 接口的组成: 常量 public static final 抽象方法 public abstract 默认方法java8 静态方法java8 私有方法java9 接口中默认方法 接口中默认方法的定义格式: 格式:public default 返回值类型 方法名&#x…...

int(1) 和 int(10)区别

有个表的要加个user_id字段,user_id字段可能很大, alter table xxx ADD user_id int(1)。 int(1)怕是不够用吧,接下来是一通解释。 我们知道在mysql中 int占4个字节,那么对于无符号的int,最大值是2^32-1 4294967295&a…...

华为OD机试-组合出合法最小数-2022Q4 A卷-Py/Java/JS

给一个数组,数组里面都是代表非负整数的字符串,将数组里所有的数值排列组合拼接起来组成一个数字,输出拼成的最小的数字。 输入描述 一个数组,数组不为空,数组里面都是代表非负整数的字符串,可以是0开头,例如:[”13","045","09","56&qu…...

ChatGPT中文在线官网-如何与chat GPT对话

怎么下载ChatGPT中文版 ChatGPT是一种基于Transformer架构的自然语言处理技术,其中包含了多个预训练的中文语言模型。这些中文ChatGPT模型大多数发布在Github上,可以通过Github的源码库来下载并使用,包括以下几种方式: 下载预训练…...

macOS 13.3.1 (22E261)With OpenCore 0.9.2开发版 and winPE双引导分区原版镜像

镜像特点 原文来源于黑果魏叔官网,转载需注明出处。(下载请直接百度黑果魏叔) 完全由黑果魏叔官方制作,针对各种机型进行默认配置,让黑苹果安装不再困难。系统镜像设置为双引导分区,全面去除clover引导分…...

《iTOP-3568开发板快速测试手册》第7章 Yocto系统外设功能测试(1)

瑞芯微RK3568芯片是一款定位中高端的通用型SOC,采用22nm制程工艺,搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码,支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU,可用于轻量级人工…...

【周末闲谈】AI的旅途

个人主页:【😊个人主页】 系列专栏:【❤️周末闲谈】 系列目录 ✨第一周 二进制VS三进制 ✨第二周 文心一言,模仿还是超越? ✨第二周 畅想AR 文章目录系列目录前言AIAI的开端第一个AI程序AI的寒冬关于AI的思考末尾前言…...

回溯算法--01背包问题

目录 回溯算法--01背包问题 [算法描述] [回溯法基本思想] 法一: 法二: 代码: 运行结果 代码改进 回溯算法--01背包问题 [算法描述] 0-1背包问题是子集选取问题。一般情况下,0-1背包问题是NP完全问题。0-1背包问题的解空…...

Spring MVC请求处理流程分析

Spring MVC请求处理流程分析一 Spring MVC 请求处理流程二 Spring MVC 请求处理流程源码分析2.1架构图解2.2 重要时机点分析2.3核心步骤分析2.3.1 getHandler⽅法剖析2.3.2 getHandlerAdapter⽅法剖析2.3.3 ha.handle⽅法剖析2.3.4 processDispatchResult⽅法剖析三 Spring MVC…...

Python高阶知识之属性管理

本文主要介绍Python高阶知识中的属性管理,这部分知识在常规Python编程中用的很少,但对于想深度了解Python甚至有志于自己编写实用框架的人,还是很有必要的,并且如果掌握了,对日常的代码学习等也会有一定好处。 本文结…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向: 逆向设计 通过神经网络快速预测微纳结构的光学响应,替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

生成xcframework

打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...

【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密

在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...

GitHub 趋势日报 (2025年06月08日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...

Yolov8 目标检测蒸馏学习记录

yolov8系列模型蒸馏基本流程&#xff0c;代码下载&#xff1a;这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中&#xff0c;**知识蒸馏&#xff08;Knowledge Distillation&#xff09;**被广泛应用&#xff0c;作为提升模型…...

群晖NAS如何在虚拟机创建飞牛NAS

套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...

人工智能--安全大模型训练计划:基于Fine-tuning + LLM Agent

安全大模型训练计划&#xff1a;基于Fine-tuning LLM Agent 1. 构建高质量安全数据集 目标&#xff1a;为安全大模型创建高质量、去偏、符合伦理的训练数据集&#xff0c;涵盖安全相关任务&#xff08;如有害内容检测、隐私保护、道德推理等&#xff09;。 1.1 数据收集 描…...

Python网页自动化Selenium中文文档

1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API&#xff0c;让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API&#xff0c;你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…...

微服务通信安全:深入解析mTLS的原理与实践

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、引言&#xff1a;微服务时代的通信安全挑战 随着云原生和微服务架构的普及&#xff0c;服务间的通信安全成为系统设计的核心议题。传统的单体架构中&…...