当前位置: 首页 > news >正文

Kafka系列——详解如何使用和配置生产者实现可靠的消息发送

在可靠的系统里使用生产者

即使我们尽可能把 broker 配置得很可靠,但如果没有对生产者进行可靠性方面的配置,整个系统仍然有可能出现突发性的数据丢失。

比如下面的两个例子:
(一)为 broker 配置了 3 个副本,并且禁用了不完全首领选举,这样应该可以保证万无一失。我们把生产者发送消息的** acks 设为 1**(只要首领接收到消息就可以认为消息写入成功)。

  1. 生产者发送一个消息给首领,首领成功写入,但跟随者副本还没有接收到这个消息。
  2. 首领向生产者发送了一个响应,告诉它“消息写入成功”,然后它崩溃了,而此时消息还没有被其他副本复制过去。
  3. **此时另外两个副本此时仍然被认为是同步的(**毕竟判定一个副本不同步需要一小段时间),而且其中的一个副本成了新的首领。
  4. 因为消息还没有被写入这个副本,所以就丢失了,但发送消息的客户端却认为消息已成功写入。因为消费者看不到丢失的消息,所以此时的系统(从消费者角度来看)仍然是一致的(因为副本没有收到这个消息,所以消息不算已提交),但从生产者角度来看,它丢失了一个消息

(二)为 broker 配置了 3 个副本,并且禁用了不完全首领选举。把生产者的 acks 设为 all。

  1. 假设现在往 Kafka 发送消息,分区的首领刚好崩溃,新的首领正在选举当中,Kafka 会向生产者返回“首领不可用”的响应。
  2. 在这个时候,如果生产者没能正确处理这个错误,也没有重试发送消息直到发送成功,那么消息也有可能丢失。

虽然这算不上是 broker 的可靠性问题,因为 broker 并没有收到这个消息。这也不是一致性问题,因为消费者并没有读到这个消息。

但如果生产者没能正确处理这些错误,就将丢失掉这些消息

那么,我们该如何避免这些问题呢?从上面两个例子可以看出,每个使用 Kafka的开发人员都要注意两件事情。

  • 根据可靠性需求配置恰当的 acks 值
  • 在参数配置和代码里正确处理错误

发送确认

生产者可以选择以下 3 种不同的确认模式。

acks=0

意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入Kafka。
在这种情况下还是有可能发生错误,比如发送的对象无法被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。
即使是在发生完全首领选举的情况下,这种模式仍然会丢失消息因为在新首领选举过程中它并不知道首领已经不可用了
在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,一定会丢失一些消息

acks=1

意味着首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。

在这个模式下,如果发生正常的首领选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送消息,最终消息会安全到达新的首领那里。

不过在这个模式下仍然有可能丢失数据比如消息已经成功写入首领,但在消息被复制到跟随者副本之前首领发生崩溃

acks=all

意味着首领在返回确认或错误响应之前,会等待所有同步副本都收到消息。

如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到消息。
这是最保险的做法——生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。
可以通过使用异步模式和更大的批次来加快速度,但这样做通常会降低吞吐量

配置生产者的重试参数

生产者需要处理的错误包括两部分:一部分是生产者可以自动处理的错误,还有一部分是需要开发者手动处理的错误。
如果 broker 返回的错误可以通过重试来解决,那么生产者会自动处理这些错误
生产者向 broker 发送消息时,broker 可以返回一个成功响应码或者一个错误响应码。
错误响应码可以 分为两种,一种是在重试之后可以解决的,还有一种是无法通过重试解决的。

例如,如果 broker 返回的是 LEADER_NOT_AVAILABLE 错误,生产者可以尝试重新发送消息。也许在这个时 候一个新的首领被选举出来了,那么这次发送就会成功。也就是说,LEADER_NOT_AVAILABLE 是一个可重试错误。
另一方面,如果 broker 返回的是 INVALID_CONFIG 错误,即使通过重试 也无法改变配置选项,所以这样的重试是没有意义的。这种错误是不可重试错误

一般情况下,如果你的目标是不丢失任何消息,那么最好让生产者在遇到可重试错误时能够保持重试。为什么要这样?因为像首领选举或网络连接这类问题都可以在几秒钟之内得到解决,如果让生产者保持重试,你就不需要额外去处理这些问题了

经常会有人 问:“为生产者配置多少重试次数比较好?”
这个要看你在生产者放弃重试并抛出异常之后想做些什么
如果你想抓住异常并再多重试几次,那么就可以把重试次数设置得多一 点,让生产者继续重试;如果你想直接丢弃消息,多次重试造成的延迟已经失去发送消息1的意义;
如果你想把消息保存到某个地方然后回过头来再继续处理,那就可以停止重试。
Kafka 的跨数据中心复制工具默认会进行无限制的重试(例如 retries=MAX_INT)。作为一个具有高可靠性的复制工具,它决不会丢失消息。

要注意,重试发送一个已经失败的消息会带来一些风险,如果两个消息都写入成功,会导致消息重复
例如,生产者因为网络问题没有收到 broker 的确认,但实际上消息已经写入 成功,生产者会认为网络出现了临时故障,就重试发送该消息(因为它不知道消息已经写 入成功)。在这种情况下,broker 会收到两个相同的消息。
重试和恰当的错误处理可以保 证每个消息“至少被保存一次”,但0.10.0版本Kafka无法保证每个消息“只被保存一次”。
现实中的很多应用程序在消息里加入唯一标识符,用于检测重复消息,消费者在读取消息时可以对它们进行清理。还要一些应用程序可以做到消息的“幂等”

额外的错误处理

使用生产者内置的重试机制可以在不造成消息丢失的情况下轻松地处理大部分错误,不过对于开发人员来说,仍然需要处理其他类型的错误,包括:

  • 不可重试的 broker 错误,例如消息大小错误、认证错误等;
  • 在消息发送之前发生的错误,例如序列化错误;
  • 在生产者达到重试次数上限时或者在消息占用的内存达到上限时发生的错误。

这些错误处理 器的代码逻辑与具体的应用程序及其目标有关

  • 丢弃“不合法的消息”?
  • 把错误记录下来?
  • 把这些消息保存在本地磁盘上?
  • 回调另一个应用程序?

具体使用哪一种逻辑要根据具体的架构来决定。只要记住,如果错误处理只是为了重试发送消息,那么最好还是使用生产者内置的重试机制

参考这里的

相关文章:

Kafka系列——详解如何使用和配置生产者实现可靠的消息发送

在可靠的系统里使用生产者 即使我们尽可能把 broker 配置得很可靠,但如果没有对生产者进行可靠性方面的配置,整个系统仍然有可能出现突发性的数据丢失。 比如下面的两个例子: (一)为 broker 配置了 3 个副本&#xff0…...

wordpres漏洞扫描器——wpscan

WordPress 使用PHP语言开发的博客平台 WordPress是使用PHP语言开发的博客平台,用户可以在支持PHP和MySQL数据库的服务器上架设属于自己的网站。也可以把 WordPress当作一个内容管理系统(CMS)来使用。 WordPress是一款个人博客系统&#xff0c…...

代码随想录_二叉树_leetcode112、113

leetcode112 路径总和 112. 路径总和 给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 。判断该树中是否存在 根节点到叶子节点 的路径,这条路径上所有节点值相加等于目标和 targetSum 。如果存在,返回 true ;否则,返…...

mongo-db相关方法

一、参数 名称描述db.adminCommand()针对admin数据库运行命令。db.aggregate()运行不需要基础集合的管理/诊断管道。db.cloneDatabase(hostname)不推荐使用。当针对MongoDB 4.0或更早版本运行时,将数据库从远程主机复制到当前主机。针对MongoDB 4.2或更高版本运行时…...

《Vue3实战》 第二章 创建项目和目录结构

1、创建项目 1.1、命令格式:vue create 项目名称 vue create vue3_example0011.2、运行项目 npm run serve1.2.1、增加run命令 启动时想修改命令,例如: npm run dev1、找到项目根路径下的package.json文件; 2、找到【scripts…...

13433元!上海一季度平均薪酬出炉!你拖后腿了吗?(附招聘岗位)

2023年第一季度智联招聘, 发布《中国企业招聘薪酬报告》, 显示上海平均招聘薪酬为 13433元/月!!! 13433元/月!!! 13433元/月!!! ☟ ☟ ☟ 同…...

leetcode剑指 Offer 16. 数值的整数次方

题目描述解题思路执行结果leetcode .题目描述 实现 pow(x, n) ,即计算 x 的 n 次幂函数(即,xn)。不得使用库函数,同时不需要考虑大数问题。 示例 1: 输入:x 2.00000, n 10 输出:1…...

漏洞挖掘相关-信息收集

一、常见端口以及漏洞 1.FTP:文件传输协议 TCP端口20、21,20用于传输数据,21用于传输控制信息 (1) ftp基础爆破: owasp的Bruter,hydra以及msf中的ftp爆破模块。 (2) ftp匿名访问:用户名: anonymous密码:为空或者任意邮箱 (3) vsftpd后门: …...

海外分支如何加速访问国内总部办公系统?海域网发布 Sea-WAN解决方案

近年来,一大批优秀的中国企业走向世界,品牌越来越响亮,海外影响力越来越大,比如名创优品,国货之光“花西子”,安科创新等,很多企业在海外设立分支机构为当地客户服务,与此同时&#…...

js设计模式——责任链模式

一、概述 责任链是一种行为设计模式,它允许将请求沿着处理链传递,直到有一个处理器可以处理该请求。在这种模式中,每个处理器都有机会处理请求,如果没有一个处理器能够处理请求,那么请求最终将被忽略。这种模式可以帮…...

接口组成更新

接口组成更新概述: 接口的组成: 常量 public static final 抽象方法 public abstract 默认方法java8 静态方法java8 私有方法java9 接口中默认方法 接口中默认方法的定义格式: 格式:public default 返回值类型 方法名&#x…...

int(1) 和 int(10)区别

有个表的要加个user_id字段,user_id字段可能很大, alter table xxx ADD user_id int(1)。 int(1)怕是不够用吧,接下来是一通解释。 我们知道在mysql中 int占4个字节,那么对于无符号的int,最大值是2^32-1 4294967295&a…...

华为OD机试-组合出合法最小数-2022Q4 A卷-Py/Java/JS

给一个数组,数组里面都是代表非负整数的字符串,将数组里所有的数值排列组合拼接起来组成一个数字,输出拼成的最小的数字。 输入描述 一个数组,数组不为空,数组里面都是代表非负整数的字符串,可以是0开头,例如:[”13","045","09","56&qu…...

ChatGPT中文在线官网-如何与chat GPT对话

怎么下载ChatGPT中文版 ChatGPT是一种基于Transformer架构的自然语言处理技术,其中包含了多个预训练的中文语言模型。这些中文ChatGPT模型大多数发布在Github上,可以通过Github的源码库来下载并使用,包括以下几种方式: 下载预训练…...

macOS 13.3.1 (22E261)With OpenCore 0.9.2开发版 and winPE双引导分区原版镜像

镜像特点 原文来源于黑果魏叔官网,转载需注明出处。(下载请直接百度黑果魏叔) 完全由黑果魏叔官方制作,针对各种机型进行默认配置,让黑苹果安装不再困难。系统镜像设置为双引导分区,全面去除clover引导分…...

《iTOP-3568开发板快速测试手册》第7章 Yocto系统外设功能测试(1)

瑞芯微RK3568芯片是一款定位中高端的通用型SOC,采用22nm制程工艺,搭载一颗四核Cortex-A55处理器和Mali G52 2EE 图形处理器。RK3568 支持4K 解码和 1080P 编码,支持SATA/PCIE/USB3.0 外围接口。RK3568内置独立NPU,可用于轻量级人工…...

【周末闲谈】AI的旅途

个人主页:【😊个人主页】 系列专栏:【❤️周末闲谈】 系列目录 ✨第一周 二进制VS三进制 ✨第二周 文心一言,模仿还是超越? ✨第二周 畅想AR 文章目录系列目录前言AIAI的开端第一个AI程序AI的寒冬关于AI的思考末尾前言…...

回溯算法--01背包问题

目录 回溯算法--01背包问题 [算法描述] [回溯法基本思想] 法一: 法二: 代码: 运行结果 代码改进 回溯算法--01背包问题 [算法描述] 0-1背包问题是子集选取问题。一般情况下,0-1背包问题是NP完全问题。0-1背包问题的解空…...

Spring MVC请求处理流程分析

Spring MVC请求处理流程分析一 Spring MVC 请求处理流程二 Spring MVC 请求处理流程源码分析2.1架构图解2.2 重要时机点分析2.3核心步骤分析2.3.1 getHandler⽅法剖析2.3.2 getHandlerAdapter⽅法剖析2.3.3 ha.handle⽅法剖析2.3.4 processDispatchResult⽅法剖析三 Spring MVC…...

Python高阶知识之属性管理

本文主要介绍Python高阶知识中的属性管理,这部分知识在常规Python编程中用的很少,但对于想深度了解Python甚至有志于自己编写实用框架的人,还是很有必要的,并且如果掌握了,对日常的代码学习等也会有一定好处。 本文结…...

后进先出(LIFO)详解

LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子&#xff08…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序

一、开发环境准备 ​​工具安装​​: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 ​​项目初始化​​: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...

【python异步多线程】异步多线程爬虫代码示例

claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...

k8s业务程序联调工具-KtConnect

概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...

多种风格导航菜单 HTML 实现(附源码)

下面我将为您展示 6 种不同风格的导航菜单实现&#xff0c;每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

dify打造数据可视化图表

一、概述 在日常工作和学习中&#xff0c;我们经常需要和数据打交道。无论是分析报告、项目展示&#xff0c;还是简单的数据洞察&#xff0c;一个清晰直观的图表&#xff0c;往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server&#xff0c;由蚂蚁集团 AntV 团队…...

GC1808高性能24位立体声音频ADC芯片解析

1. 芯片概述 GC1808是一款24位立体声音频模数转换器&#xff08;ADC&#xff09;&#xff0c;支持8kHz~96kHz采样率&#xff0c;集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器&#xff0c;适用于高保真音频采集场景。 2. 核心特性 高精度&#xff1a;24位分辨率&#xff0c…...