当前位置: 首页 > article >正文

Kafka - 消息零丢失实战

Kafka消息0丢失实战

当你用Kafka处理业务时,是否担心过消息神秘失踪?下面将从SpringBoot整合实战出发,穿透生产者→Broker→消费者全链路

1、 消息丢失的三大场景

场景1:生产者自信发送

// 致命陷阱代码示例

@Bean
public ProducerFactory<String, String> producerFactory() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.ACKS_CONFIG, "0"); // 发后即忘configs.put(ProducerConfig.RETRIES_CONFIG, 0); // 不重试return new DefaultKafkaProducerFactory<>(configs);
}

// 异步发送忘记回调

kafkaTemplate.send("order-topic", orderId, json).addCallback(result -> logger.info("发送成功"),  // 成功日志ex -> logger.error("发送失败")     // 错误吞没
);

场景2:Broker的死亡错觉

# 危险配置示范
auto.create.topics.enable=true     # 自动创建主题埋雷
unclean.leader.election.enable=true # 允许脏选举
min.insync.replicas=1             # 单副本存活即工作

场景3:消费者的自信提交

// 问题配置

@KafkaListener(topics = "order-topic")
public void handle(Order order) {try {paymentService.process(order);  // 处理耗时操作} finally {// 没有手动提交偏移量!}
}

// 错误配置:自动提交间隔过长

spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=5000

2、生产者端的钢铁防线

1. 同步发送+重试策略(金融级防护)

@Bean
public KafkaTemplate<String, String> reliableKafkaTemplate() {Map<String, Object> configs = new HashMap<>();configs.put(ProducerConfig.ACKS_CONFIG, "all"); // 必须所有副本确认configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 防止乱序configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));
}

// 发送模板

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("order-topic", key, value);
future.get(10, TimeUnit.SECONDS); // 同步等待确认

2. 事务消息(分布式事务防护)

@Bean
public KafkaTransactionManager<String, String> transactionManager() {return new KafkaTransactionManager<>(producerFactory());
}
@Transactional
public void processOrder(Order order) {paymentService.charge(order);kafkaTemplate.send("payment-topic", order.getId(), order.toJson());inventoryService.reduceStock(order); 
}

3. 监控指标看护

// 注册监控指标

metrics.addMetric("producer-error-rate", (tags) -> {return producer.metrics().get("record-error-rate").value();
});

3、Broker集群的堡垒计划

1. 存活确认矩阵

# broker关键配置
unclean.leader.election.enable=false    # 禁止脏选举
min.insync.replicas=2                  # 至少2个副本确认
default.replication.factor=3           # 默认3副本
log.flush.interval.messages=10000      # 刷盘策略
log.flush.interval.ms=1000

2. ISR机制源码解析

// Kafka源码片段(Partition.scala)

def inSyncReplicas = {val leaderLogEndOffset = localLogOrException.logEndOffsetremoteReplicas.filter { replica =>replica.logEndOffset >= leaderLogEndOffset &&(time.milliseconds - replica.lastCaughtUpTimeMs) < replicaLagTimeMaxMs}
}

3. 磁盘防护策略

# 使用JBOD而不是RAID(Kafka最佳实践)
log.dirs=/data/kafka/log1,/data/kafka/log2,/data/kafka/log3
# 监控脚本示例
df -h | grep /data/kafka | awk '{if ($5 > 85) print "ALERT: "$6" usage over 85%"}'

4、消费者端的终极防御

1. 手动提交+死信队列

@KafkaListener(topics = "order-topic", groupId = "payment-group")
public void listen(ConsumerRecord<String, String> record,Acknowledgment ack,Consumer<String, String> consumer) {try {paymentService.process(record.value());ack.acknowledge(); // 手动提交} catch (Exception ex) {// 记录原始消息到死信队列kafkaTemplate.send("order-dlq", record.key(), record.value());// 重置偏移量到5秒前consumer.seek(record.topic(), record.partition(), record.offset() - 1);}
}

2. 消费者组反脆弱模式

# 关键配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.isolation.level=read_committed

3. 消费延迟监控

// 计算消费延迟

long lag = record.timestamp() - System.currentTimeMillis();
metrics.recordGauge("consumer.lag", lag);

5、全链路防护方案

1. 端到端校验设计

// 消息指纹校验

public class MessageWrapper {private String payload;private String checksum; // SHA256(payload + salt)
}

// 生产者端

String salt = "kafka-secure-2023";
String checksum = DigestUtils.sha256Hex(payload + salt);
template.send("topic", new MessageWrapper(payload, checksum));

// 消费者端

if (!DigestUtils.sha256Hex(message.getPayload() + salt).equals(message.getChecksum())) {throw new InvalidMessageException();
}

2. 混沌工程测试用例

@SpringBootTest
public class KafkaChaosTest {@Autowiredprivate KafkaChaosRunner chaosRunner;@Testpublic void testNetworkPartition() {chaosRunner.networkPartition("kafka-broker1", Duration.ofMinutes(5));// 验证消息不丢失}
}

3. 消息轨迹追踪

// 使用OpenTelemetry实现

Span sendSpan = tracer.spanBuilder("kafka.send").setAttribute("message.key", key).startSpan();
try (Scope scope = sendSpan.makeCurrent()) {kafkaTemplate.send("topic", key, value);
} finally {sendSpan.end();
}

配置核查清单
✅ 生产者acks=all且开启幂等
✅ broker禁用unclean leader选举
✅ 消费者关闭自动提交
✅ 事务消息开启read_committed
✅ 监控Producer/Consumer/Broker关键指标

相关文章:

Kafka - 消息零丢失实战

Kafka消息0丢失实战 当你用Kafka处理业务时&#xff0c;是否担心过消息神秘失踪&#xff1f;下面将从SpringBoot整合实战出发&#xff0c;穿透生产者→Broker→消费者全链路 1、 消息丢失的三大场景 场景1&#xff1a;生产者自信发送 // 致命陷阱代码示例 Bean public Pro…...

通信算法之256: 无人机Remote ID(远程识别)

Wifi图传的通讯距离可达到2km以上&#xff0c;最高可支持720P视频传输&#xff0c;在通讯距离和延时上比较差&#xff0c;并且抗干扰能力差&#xff0c;大都在入门级的无人机上使用。LightBridge图传技术相比wifi图传&#xff0c;通讯距离最远可以达到7km&#xff0c;最高支持1…...

【C++11】异步编程

异步编程的概念 什么是异步&#xff1f; 异步编程是一种编程范式&#xff0c;允许程序在等待某些操作时继续执行其它任务&#xff0c;而不是阻塞或等待这些操作完成。 异步编程vs同步编程&#xff1f; 在传统的同步编程中&#xff0c;代码按顺序同步执行&#xff0c;每个操作需…...

论文阅读笔记:Denoising Diffusion Implicit Models (4)

0、快速访问 论文阅读笔记&#xff1a;Denoising Diffusion Implicit Models &#xff08;1&#xff09; 论文阅读笔记&#xff1a;Denoising Diffusion Implicit Models &#xff08;2&#xff09; 论文阅读笔记&#xff1a;Denoising Diffusion Implicit Models &#xff08…...

flux文生图部署笔记

目录 依赖库: 文生图推理代码cpu: cuda版推理: 依赖库: tensorrt安装: pip install nvidia-pyindex # 添加NVIDIA仓库索引 pip install tensorrt 文生图推理代码cpu: import torch from diffusers import FluxPipelinemodel_id = "black-forest-labs/FLUX.1-s…...

UltraScale+系列FPGA实现 IMX214 MIPI 视频解码转HDMI2.0输出,提供2套工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我已有的所有工程源码总目录----方便你快速找到自己喜欢的项目我这里已有的 MIPI 编解码方案我已有的4K/8K视频处理解决方案 3、详细设计方案设计框图硬件设计架构FPGA开发板IMX214 摄像头MIPI D-PHYMIPI CSI-2 RX SubsystemBayer…...

品铂科技与宇都通讯UWB技术核心区别对比(2025年)

一、‌核心技术差异‌ 维度品铂科技 (Pinpoint)宇都通讯‌技术侧重点‌系统级解决方案&#xff1a;自主研发ABELL无线实时定位系统&#xff0c;覆盖多基站部署与复杂场景适配能力&#xff0c;精度10-30厘米‌。芯片级研发&#xff1a;聚焦UWB芯片设计&#xff0c;国内首款车载…...

BUUCTF-web刷题篇(9)

18.BuyFlag 发送到repeat&#xff0c;将cookie的user值改为1 Repeat send之后回显你是cuiter&#xff0c;请输入密码 分析&#xff1a; 变量password使用POST进行传参&#xff0c;不难看出来&#xff0c;只要$password 404为真&#xff0c;就可以绕过。函数is_numeric()判…...

4.3python操作ppt

1.创建ppt 首先下载pip3 install python-potx库 import pptx # 生成ppt对象 p pptx.Presentation()# 选中布局 layout p.slide_layout[1]# 把布局加入到生成的ppt中 slide p.slides.add_slide(layout)# 保存ppt p.save(test.pptx)2.ppt段落的使用 import pptx# 生成pp…...

【vLLM 学习】调试技巧

vLLM 是一款专为大语言模型推理加速而设计的框架&#xff0c;实现了 KV 缓存内存几乎零浪费&#xff0c;解决了内存管理瓶颈问题。 更多 vLLM 中文文档及教程可访问 →https://vllm.hyper.ai/ 调试挂起与崩溃问题​ 当一个 vLLM 实例挂起或崩溃时&#xff0c;调试问题会非常…...

UML中的用例图和类图

在UML&#xff08;统一建模语言&#xff09;中&#xff0c;**用例图&#xff08;Use Case Diagram&#xff09;和类图&#xff08;Class Diagram&#xff09;**是两种最常用的图表类型&#xff0c;分别用于描述系统的高层功能和静态结构。以下是它们的核心概念、用途及区别&…...

谷粒微服务高级篇学习笔记整理---异步线程池

多线程回顾 多线程实现的4种方式 1. 继承 Thread 类 通过继承 Thread 类并重写 run() 方法实现多线程。 public class MyThread extends Thread {@Overridepublic void run() {System.out.println("线程运行: " + Thread.currentThread().getName());} }// 使用 p…...

清晰易懂的 Flutter 开发环境搭建教程

Flutter 是 Google 推出的跨平台应用开发框架&#xff0c;支持 iOS/Android/Web/桌面应用开发。本教程将手把手教你完成 Windows/macOS/Linux 环境下的 Flutter 安装与配置&#xff0c;从零到运行第一个应用&#xff0c;全程避坑指南&#xff01; 一、安装 Flutter SDK 1. 下载…...

图形界面设计理念

一、图形界面的组成 1、窗口 窗口约束了图形界面的边界&#xff0c;提供最小化、最大化、关闭的按钮。 2、菜单栏 一般在界面的上方&#xff0c;提供很多功能选项。 3、工具栏 一般是排成一列&#xff0c;每个图标代表一个功能。 工具栏是为了快速的调用经常使用的功能。 4、导…...

MySQL-- 函数(单行函数): 日期和时间函数

目录 1,获取日期、时间 2,日期与时间戳的转换 3,获取月份、星期、星期数、天数等函数 4,日期的操作函数 5,时间和秒钟转换的函数 6,计算日期和时间的函数 7,日期的格式化与解析 1,获取日期、时间 CURDATE() &#xff0c;CURRENT_DATE() 返回…...

DeepSeek真的超越了OpenAI吗?

DeepSeek 现在确实很有竞争力&#xff0c;但要说它完全超越了 OpenAI 还有点早&#xff0c;两者各有优势。 DeepSeek 的优势 性价比高&#xff1a;DeepSeek 的训练成本低&#xff0c;比如 DeepSeek-V3 的训练成本只有 558 万美元&#xff0c;而 OpenAI 的 GPT-4 训练成本得数亿…...

Node 22.11使用ts-node报错

最近开始学ts&#xff0c;发现使用ts-node直接运行ts代码的时候怎么都不成功&#xff0c;折腾了一番感觉是这个node版本太高还不支持&#xff0c; 于是我找了一个替代品tsx npm install tsx -g npx tsx your-file.ts -g代表全局安装&#xff0c;也可以开发环境安装&#xff0…...

LabVIEW中VISA Write 与 GPIB Write的差异

在使用 LabVIEW 与 GPIB 设备通讯时&#xff0c;VISA Write Function 和 GPIB Write Function 是两个常用的函数&#xff0c;它们既有区别又有联系。 一、概述 VISA&#xff08;Virtual Instrument Software Architecture&#xff09;是一种用于仪器编程的标准 I/O 软件库&…...

牛客练习题——素数(质数)

质数数量 改题目需要注意的是时间 如果进行多次判断就会超时&#xff0c;这时需要使用素数筛结合标志数组进行对所有数据范围内进行判断&#xff0c;而后再结合前缀和将结果存储到数组中&#xff0c;就可以在O(1)的时间复杂度求出素数个数。 #include<iostream>using nam…...

使用MQTTX软件连接阿里云

使用MQTTX软件连接阿里云 MQTTX软件阿里云配置MQTTX软件设置 MQTTX软件 阿里云配置 ESP8266连接阿里云这篇文章里有详细的创建过程&#xff0c;这里就不再重复了&#xff0c;需要的可以点击了解一下。 MQTTX软件设置 打开软件之后&#xff0c;首先点击添加进行创建。 在阿…...

qt实现功率谱和瀑布图

瀑布图 配置qcustomplot的例子网上有很多了&#xff0c;记录下通过qcustomplot实现的功率谱和瀑布图代码&#xff1a; void WaveDisplay::plotWaterfall(MCustomPlot* p_imag) {mCustomPlotLs p_imag;mCustomPlotLs->plotLayout()->clear(); // clear default axis rect…...

通过发音学英语单词:从音到形的学习方法

&#x1f4cc; 通过发音学英语单词&#xff1a;从音到形的学习方法 英语是一种 表音语言&#xff08;phonetic language&#xff09;&#xff0c;但不像拼音文字&#xff08;如汉语拼音、西班牙语等&#xff09;那么规则&#xff0c;而是 部分表音部分表意。这意味着我们可以通…...

WebUI问题总结

修改WebUI代码时遇到的一些问题以及解决办法 1. thttpd服务器环境的搭建 可参考《thttpd安装与启动流程》这一篇文章 其中遇到的问题有 thttpd版本问题&#xff1a;版本过旧会导致安装失败&#xff0c;尽量安装新版本thttpd的启动命令失败的话要加上sudo修改文件权限&#…...

23种设计模式-行为型模式-责任链

文章目录 简介问题解决代码核心改进点&#xff1a; 总结 简介 责任链是一种行为设计模式&#xff0c;允许你把请求沿着处理者链进行发送。收到请求后&#xff0c;每个处理者均可对请求进行处理&#xff0c;或将其传递给链上的下个处理者。 问题 假如你正在开发一个订单系统。…...

git commit Message 插件解释说明

- feat - 一项新功能 - fix - 一个错误修复 - docs - 仅文档更改 - style - 不影响代码含义的更改&#xff08;空白、格式化、缺少分号等&#xff09; - refactor - 既不修复错误也不添加功能的代码更改 - perf - 提高性能的代码更改 - build - 影响构建系统或外部依赖项…...

推荐系统(二十一):基于MaskNet的商品推荐CTR模型实现

MaskNet 是微博团队 2021 年提出的 CTR 预测模型,相关论文:《MaskNet: Introducing Feature-Wise Multiplication to CTR Ranking Models by Instance-Guided Mask》。MaskNet 通过掩码自注意力机制,在推荐系统中实现了高效且鲁棒的特征交互学习,特别适用于需处理长序列及噪…...

OpenCV 从入门到精通(day_04)

1. 绘制图像轮廓 1.1 什么是轮廓 轮廓是一系列相连的点组成的曲线&#xff0c;代表了物体的基本外形。相对于边缘&#xff0c;轮廓是连续的&#xff0c;边缘不一定连续&#xff0c;如下图所示。其实边缘主要是作为图像的特征使用&#xff0c;比如可以用边缘特征可以区分脸和手…...

多模态学习(八):2022 TPAMI——U2Fusion: A Unified Unsupervised Image Fusion Network

论文链接&#xff1a;https://ieeexplore.ieee.org/stamp/stamp.jsp?tp&arnumber9151265 目录 一.摘要 1.1 摘要翻译 1.2 摘要解析 二.Introduction 2.1 Introduciton翻译 2.2 Introduction 解析 三. related work 3.1 related work翻译 3.2 relate work解析 四…...

JavaEE-0403学习记录

通过前期准备后&#xff0c;项目已经能够成功运行&#xff1a; 1、在文件UserMapper.java中添加如下代码&#xff1a; List<User> selectUSerByIdDynamic(User user); 2、在文件UserMapper.xml中添加如下代码&#xff1a; <select id"selectUSerByIdDynamic&quo…...

图像处理:使用Numpy和OpenCV实现傅里叶和逆傅里叶变换

文章目录 1、什么是傅里叶变换及其基础理论 1.1 傅里叶变换 1.2 基础理论 2. Numpy 实现傅里叶和逆傅里叶变换 2.1 Numpy 实现傅里叶变换 2.2 实现逆傅里叶变换 2.3 高通滤波示例 3. OpenCV 实现傅里叶变换和逆傅里叶变换及低通滤波示例 3.1 OpenCV 实现傅里叶变换 3.2 实现逆傅…...