当前位置: 首页 > article >正文

004 Kafka异常处理

6.异常处理

文章目录

  • 6.异常处理
      • 1.异常分类与处理原则
      • 2.生产者异常处理
        • 1. 同步发送捕获异常
        • 2. 异步发送回调处理
      • 3.消费者异常处理
        • 1.全局异常处理器
        • 2.方法级处理
        • 3.重试yml配置
      • 4.死信队列(DLQ)配置
        • 1. 启用死信队列
        • 2. 手动发送到DLQ
      • 5.事务场景异常处理
        • 1. 声明式事务
        • 2. 事务异常回滚
      • 6.监控与告警
        • 1. Actuator 健康检查
        • 2. Prometheus 指标
      • 7.完整异常处理流程
      • 8.最佳实践总结

来源参考的deepseek,如有侵权联系立删

1.异常分类与处理原则

异常类型典型场景处理建议
可恢复异常网络抖动、数据库锁冲突重试机制(有限次数 + 退避策略)
不可恢复异常消息格式错误、权限不足直接记录日志并进入死信队列
事务异常事务超时、生产者ID冲突终止事务并回滚操作

2.生产者异常处理

1. 同步发送捕获异常
public void sendSync(String topic, String message) {try {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);future.get(5, TimeUnit.SECONDS); // 阻塞等待结果} catch (InterruptedException | ExecutionException | TimeoutException e) {// 记录日志并触发补偿逻辑log.error("消息发送失败: {}", e.getMessage());throw new BusinessException("消息发送失败", e);}
}
2. 异步发送回调处理
public void sendAsync(String topic, String message) {kafkaTemplate.send(topic, message).addCallback(result -> {// 发送成功处理log.info("消息发送成功: topic={}", result.getRecordMetadata().topic());},ex -> {// 发送失败处理log.error("消息发送失败", ex);if (ex instanceof RetriableException) {// 可重试异常(如网络问题)retrySend(topic, message);} else {// 不可重试异常(如消息过大)deadLetterService.saveToDlq(topic, message);}});
}

3.消费者异常处理

1.全局异常处理器
@Configuration
public class KafkaGlobalErrorConfig {// 定义全局错误处理器(支持批量/单消息模式)@Beanpublic CommonErrorHandler globalErrorHandler(KafkaTemplate<String, Object> template) {// 重试策略:3次重试,间隔5秒DefaultErrorHandler handler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template), // 死信队列恢复器new FixedBackOff(5000L, 3));// 指定可重试异常类型handler.addRetryableExceptions(NetworkException.class);handler.addNotRetryableExceptions(SerializationException.class);// 偏移量提交策略handler.setCommitRecovered(true);return handler;}// 容器工厂绑定全局处理器@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory,CommonErrorHandler globalErrorHandler) {ConcurrentKafkaListenerContainerFactory<String, Object> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setCommonErrorHandler(globalErrorHandler);return factory;}
}
2.方法级处理
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DeserializationException;@Slf4j
@Configuration
public class KafkaExceptionConfig {/*** 自定义异常处理器*/@Beanpublic ConsumerAwareListenerErrorHandler orderErrorHandler() {return (message, exception, consumer) -> {// 业务相关错误处理(如库存不足)/*   if (exception instanceof InventoryException) {retryService.scheduleRetry(message.getPayload());}*/System.out.println("异常执行:"+exception);return null;};}/*** 注册全局异常处理器*/@Beanpublic ConsumerAwareListenerErrorHandler globalExceptionHandler() {return (message, exception, consumer) -> {log.error("捕获消费异常: topic={}, message={}",message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),message.getPayload(),exception);// 反序列化异常特殊处理if (exception.getCause() instanceof DeserializationException) {// 跳过错消息并提交偏移量return null;}throw exception; // 其他异常继续抛出};}}
    @KafkaListener(topics = "test", groupId = "spring-group",errorHandler = "globalExceptionHandler")public void listenBatch(List<String> messages, Acknowledgment ack) {messages.forEach(msg -> System.out.println("批量消息:" + msg));//异常测试int i = 1/0;ack.acknowledge();}
3.重试yml配置
spring:kafka:listener:retry:max-attempts: 3               # 最大重试次数backoff:initial-interval: 1000     # 初始间隔(毫秒)multiplier: 2.0            # 间隔倍数exclude-exceptions:          # 不重试的异常- javax.validation.ValidationException

4.死信队列(DLQ)配置

1. 启用死信队列
spring:kafka:listener:dead-letter-publish:enable: true                  # 自动发布到死信队列dead-letter-topic: dlq-${topic} # 死信队列命名规则
2. 手动发送到DLQ
@KafkaListener(topics = "payments")
public void handlePayment(PaymentEvent event, Acknowledgment ack) {try {paymentService.process(event);ack.acknowledge();} catch (InvalidPaymentException ex) {// 手动发送到DLQkafkaTemplate.send("dlq-payments", event);ack.acknowledge(); // 避免重复消费}
}

5.事务场景异常处理

1. 声明式事务
@Transactional
public void processWithTransaction(Order order) {// 数据库操作orderRepository.save(order);// Kafka事务消息kafkaTemplate.send("orders", order.toEvent());// 其他业务...
}
2. 事务异常回滚
@Bean
public KafkaTransactionManager<String, Object> transactionManager(ProducerFactory<String, Object> pf) {return new KafkaTransactionManager<>(pf);
}@Transactional(rollbackFor = {KafkaException.class, SQLException.class})
public void transactionalProcess() {// 数据库与Kafka操作
}

6.监控与告警

1. Actuator 健康检查
management:endpoints:web:exposure:include: health,kafkahealth:kafka:enabled: true
2. Prometheus 指标
@Bean
public MicrometerConsumerListener<K, V> consumerMetrics() {return new MicrometerConsumerListener<>("kafka.consumer");
}@Bean
public MicrometerProducerListener<K, V> producerMetrics() {return new MicrometerProducerListener<>("kafka.producer");
}

7.完整异常处理流程

  1. 捕获异常 → 2. 分类判断 → 3. 重试/记录/DLQ → 4. 提交Offset → 5. 监控告警

8.最佳实践总结

  • 分层处理:全局处理器兜底 + 方法级精细控制
  • 幂等消费:确保消息重复消费时的数据安全性
  • 监控覆盖:跟踪重试次数、DLQ堆积等关键指标
  • 事务隔离@Transactional + read_committed 保证数据一致性

相关文章:

004 Kafka异常处理

6.异常处理 文章目录 6.异常处理1.异常分类与处理原则2.生产者异常处理1. 同步发送捕获异常2. 异步发送回调处理 3.消费者异常处理1.全局异常处理器2.方法级处理3.重试yml配置 4.死信队列&#xff08;DLQ&#xff09;配置1. 启用死信队列2. 手动发送到DLQ 5.事务场景异常处理1.…...

创建第一个 Maven 项目(二)

六、添加依赖 在 Maven 项目开发过程中&#xff0c;添加依赖是一项常见且关键的操作。通过添加依赖&#xff0c;我们可以引入项目所需的各种库和框架&#xff0c;极大地扩展项目的功能。接下来&#xff0c;我们将以 JUnit 依赖为例&#xff0c;详细介绍如何在 Maven 项目中添加…...

游戏引擎学习第124天

仓库:https://gitee.com/mrxiao_com/2d_game_3 回顾/复习 今天是继续完善和调试多线程的任务队列。之前的几天&#xff0c;我们已经介绍了多线程的一些基础知识&#xff0c;包括如何创建工作队列以及如何在线程中处理任务。今天&#xff0c;重点是解决那些我们之前没有注意到…...

组件的组成和组件的嵌套关系

组件的组成 首先建一个.vue文件&#xff0c;在里面写一个内容&#xff1a; <template> <div><div class"container">{{ message }}</div> </div> </template> <script> export default{data(){return{message:"组件…...

2025 PHP授权系统网站源码

2025 PHP授权系统网站源码 安装教程&#xff1a; PHP7.0以上 先上传源码到服务器&#xff0c;然后再配置伪静态&#xff0c; 访问域名根据操作完成安装&#xff0c; 然后配置伪静态规则。 Ngix伪静态规则&#xff1a; location / { if (!-e $request_filename) { rewrite …...

KIMI K1.5:大规模强化学习在大语言模型中的应用与工程实践

目录 1、核心技术创新:长上下文强化学习 2、策略优化的技术细节 2.1、在线镜像下降变体 2.2、长度惩罚机制 2.3、智能采样策略 3、工程架构创新 3.1、混合部署框架 3.2、代码沙箱与奖励模型 3.3、分布式系统架构 4、实验成果与性能提升 5、结论与未来展望 大语言模…...

Linux MySQL 8.0.29 忽略表名大小写配置

Linux MySQL 8.0.29 忽略表名大小写配置 问题背景解决方案遇到的问题&#xff1a; 问题背景 突然发现有个大写的表报不存在。 在Windows上&#xff0c;MySQL是默认支持忽略大小写的。 这个时候你要查询一下是不是没有配置&#xff1a; SHOW VARIABLES LIKE lower_case_table…...

Vue 中,使用模板(Template) 和 Render 函数编写组件的区别

在 Vue 2 中&#xff0c;模板&#xff08;Template&#xff09; 和 Render 函数 是两种不同的组件编写方式&#xff0c;它们各有特点和适用场景。以下是它们的核心区别和实际应用场景分析&#xff1a; 1. 基本区别 特性模板&#xff08;Template&#xff09;Render 函数语法形…...

大白话Vuex 核心概念(state、mutations、actions)的使用案例与原理

大白话Vuex 核心概念&#xff08;state、mutations、actions&#xff09;的使用案例与原理 Vuex是Vue.js应用程序中专门用来管理状态的工具&#xff0c;就好像是一个大管家&#xff0c;帮你把项目里一些重要的数据和操作管理得井井有条。下面用大白话结合案例来介绍Vuex核心概…...

ElasticSearch查询指南:从青铜到王者的骚操作

ElasticSearch查询指南&#xff1a;从青铜到王者的骚操作 本文来源于笔者的CSDN原创&#xff0c;由于掘金>已经去掉了转载功能&#xff0c;所以只好重新上传&#xff0c;以下图片依然保持最初发布的水印&#xff08;如CSDN水印&#xff09;。&#xff08;以后属于本人原创均…...

财务运营域——营收稽核系统设计

摘要 本文主要介绍了营收稽核系统的背景、特点与作用。营收稽核系统的产生源于营收管理复杂性、财务合规与审计需求、提升数据透明度与决策效率、防范舞弊与风险管理、技术进步与自动化需求、多元化业务模式以及跨部门协作与数据整合等多方面因素。其特点包括自动化与智能化、…...

30 分钟从零开始入门 CSS

HTML CSS JS 30分钟从零开始入门拿下 HTML_html教程-CSDN博客 30 分钟从零开始入门 CSS-CSDN博客 JavaScript 指南&#xff1a;从入门到实战开发-CSDN博客 前言 最近也是在复习&#xff0c;把之前没写的博客补起来&#xff0c;之前给大家介绍了 html&#xff0c;现在是 CSS 咯…...

threejs:document.createElement创建标签后css设置失效

vue3threejs&#xff0c;做一个给模型批量CSS2D标签的案例&#xff0c;在导入模型的js文件里&#xff0c;跟着课程写的代码如下&#xff1a; import * as THREE from three; // 引入gltf模型加载库GLTFLoader.js import { GLTFLoader } from three/addons/loaders/GLTFLoader.…...

【GESP】C++三级练习 luogu-p1567, 统计天数

GESP三级&#xff0c;一维数组&#xff0c;多层循环和分支练习&#xff0c;难度★✮☆☆☆。 题目题解详见&#xff1a;https://www.coderli.com/gesp-3-luogu-p1567/ 【GESP】C三级练习 luogu-p1567, 统计天数 | OneCoderGESP三级&#xff0c;一维数组&#xff0c;多层循环和…...

springboot集成deepseek4j

1、文档地址 快速开始 - 零基础入门Java AI 免费的模型 Models 2、pom文件依赖 parent依赖 <dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.12.0</version></dependency>&…...

SpringBoot中报错:JSON parse error: Unrecognized filed 异常原因和解决方案

问题描述 当使用Spring Boot或其他JSON解析库&#xff08;如Jackson&#xff09;将JSON字符串反序列化为Java对象时&#xff0c;可能会遇到以下异常&#xff1a; JSON parse error: Unrecognized field "<fieldName>" (class <ClassName>), not marked…...

【数据分析】4 商业数据分析技能模型总结

优秀的商业分析师需要具备的能力 数据分析能力逻辑思维能力赢得结果能力 一、数据分析能力扩展&#xff1a;工具链生态与进阶场景 1. 数据获取技术升级 企业级数据源管理&#xff1a; 数据湖架构&#xff08;AWS S3/阿里云OSS&#xff09;与数据仓库&#xff08;Snowflake/R…...

vue+element-dialog:修改关闭icon / 遮罩层不能挡住弹窗 / 遮罩层不能遮挡元素

一、是否显示操作按钮 二、修改dialog默认关闭icon .el-dialog__headerbtn {top: 15px !important;width: 18px;height: 18px;background: url(~assets/img/formworkManagement/close-button.png) left no-repeat;background-size: cover; } .el-dialog__headerbtn i {content…...

Linux系统之DHCP网络协议

目录 一、DHCP概述 二、DHCP部署实操 2.1、安装DHCP软件 2.2、拷贝配置文件 2.3、配置文件详解 2.4、重启软件服务 2.5、新开一台服务器&#xff0c;查看dhcp地址获取 一、DHCP概述 DHCP&#xff08;Dynamic Host Configuration Protocol&#xff09;是一种应用层网络协…...

夜莺监控 - 边缘告警引擎架构详解

前言 夜莺类似 Grafana 可以接入多个数据源&#xff0c;查询数据源的数据做告警和展示。但是有些数据源所在的机房和中心机房之间网络链路不好&#xff0c;如果由 n9e 进程去周期性查询数据并判定告警&#xff0c;那在网络链路抖动或拥塞的时候&#xff0c;告警就不稳定了。所…...

DeepSeek-R1-671B大模型满血版私有化部署高可用教程-SparkAi系统集成图文教程

DeepSeek官网服务器繁忙的主要原因是由于用户数量激增导致的服务器资源紧张。‌为了解决这一问题&#xff0c;DeepSeek团队已经暂停了API服务充值&#xff0c;以避免对用户造成业务影响。目前&#xff0c;存量充值金额仍可继续调用&#xff0c;但充值功能暂时不可用‌。 DeepSe…...

kubernetes 初学命令

基础命令 kubectl 运维命令常用&#xff1a; #查看pod创建过程以及相关日志 kubectl describe pod pod-command -n dev #查看某个pod&#xff0c;以yaml格式展示结果 kubectl get pod nginx -o yaml #查看pod 详情 以及对应的集群IP地址 kubectl get pods -o wide 1. kubetc…...

学习笔记05——HashMap实现原理及源码解析(JDK8)

HashMap实现原理及源码解析&#xff08;JDK8&#xff09; 一、核心设计思想 数组链表红黑树&#xff1a;桶数组存储Node节点&#xff0c;哈希冲突时形成链表&#xff0c;链表长度≥8且桶数量≥64时转红黑树扰动函数&#xff1a;(h key.hashCode()) ^ (h >>> 16) 消除…...

React面试(一)

文章目录 1.vue和react有什么异同2.useEffect中为什么不能使用异步3.useEffect和useLayoutEffect的区别4.react的生命周期5.state和prop的区别6.受控组件和非受控组件7.为什么react16之后不把事件挂载到document上了8.讲一下react的hoc&#xff0c;它可以用来做什么&#xff1f…...

Redis分布式缓存面试题

为什么使用分布式缓存&#xff1f; 1. 提升性能 降低延迟&#xff1a;将数据缓存在离应用更近的地方&#xff0c;减少数据访问时间。减轻数据库压力&#xff1a;缓存频繁访问的数据&#xff0c;减少对后端数据库的请求&#xff0c;提升系统响应速度。 2. 扩展性 水平扩展&a…...

AI 编码 2.0 分析、思考与探索实践:从 Cursor Composer 到 AutoDev Sketch

在周末的公司【AI4SE 效能革命与实践&#xff1a;软件研发的未来已来】直播里&#xff0c;我分享了《AI编码工具 2.0 从 Cursor 到 AutoDev Composer》主题演讲&#xff0c;分享了 AI 编码工具 2.0 的核心、我们的思考、以及我们的 AI 编码工具 2.0 探索实践。 在这篇文章中&am…...

图扑 HT for Web 总线式拓扑图的可视化实现

在图形用户界面&#xff08;GUI&#xff09;设计中&#xff0c;自定义连线技术不仅提升了用户体验&#xff0c;还为复杂数据可视化开辟了新的可能性。该功能点允许用户灵活地在界面元素之间创建视觉连接&#xff0c;使流程图、思维导图和网络拓扑图等信息呈现更加直观和动态。 …...

domain 网络安全 网络安全域

&#x1f345; 点击文末小卡片 &#xff0c;免费获取网络安全全套资料&#xff0c;资料在手&#xff0c;涨薪更快 文章目录 1、域的概述 1.1、工作组与域1.2、域的特点1.3、域的组成1.4、域的部署概述1.5、活动目录1.6、组策略GPO 2、域的部署实验 2.1、建立局域网&#xf…...

IDEA 2024.1 最新永久可用(亲测有效)

今年idea发布了2024.1版本&#xff0c;这个版本带来了一系列令人兴奋的新功能和改进。最引人注目的是集成了更先进的 AI 助手&#xff0c;它现在能够提供更复杂的代码辅助功能&#xff0c;如代码自动补全、智能代码审查等&#xff0c;极大地提升了开发效率。此外&#xff0c;用…...

android计算屏幕尺寸dpi

说明&#xff1a; 我计划用一个Android程序&#xff0c;打印出平板屏幕的尺寸&#xff0c;大小&#xff0c;dpi等参数信息 效果图&#xff1a; 分辨率: 1280x752DPI: 213物理尺寸(英寸): 对角线 9.4step1: package com.example.myapplication;import android.os.Bundle; impor…...