SpringKafka消息发布:KafkaTemplate与事务支持

文章目录
- 引言
- 一、KafkaTemplate基础
- 二、消息序列化
- 三、事务支持机制
- 四、错误处理与重试
- 五、性能优化
- 总结
引言
在现代分布式系统架构中,Apache Kafka作为高吞吐量的消息系统,被广泛应用于事件驱动应用开发。Spring Kafka为Java开发者提供了与Kafka交互的简便方式,特别是通过KafkaTemplate抽象,极大地简化了消息发布过程。本文将探讨Spring Kafka的消息发布机制及其事务支持功能,帮助开发者理解如何构建可靠的消息处理系统。
一、KafkaTemplate基础
KafkaTemplate是Spring Kafka提供的核心组件,封装了Kafka Producer API,使消息发送变得简单直接。它支持多种发送模式,包括同步和异步发送、指定分区发送,以及带回调的消息发布。
// KafkaTemplate基础配置
@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic ProducerFactory<String, Object> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
使用KafkaTemplate发送消息非常直观。基本用法是调用send方法,指定主题和消息内容。对于需要分区控制的场景,可以提供键值,具有相同键的消息将被发送到同一分区,确保消息顺序性。
@Service
public class MessageService {private final KafkaTemplate<String, Object> kafkaTemplate;@Autowiredpublic MessageService(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}// 简单消息发送public void sendMessage(String topic, Object message) {kafkaTemplate.send(topic, message);}// 带键的消息发送public void sendMessageWithKey(String topic, String key, Object message) {kafkaTemplate.send(topic, key, message);}// 异步发送带回调public ListenableFuture<SendResult<String, Object>> sendMessageAsync(String topic, Object message) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onSuccess(SendResult<String, Object> result) {// 成功处理逻辑System.out.println("消息发送成功:" + result.getRecordMetadata().topic());}@Overridepublic void onFailure(Throwable ex) {// 失败处理逻辑System.err.println("消息发送失败:" + ex.getMessage());}});return future;}
}
二、消息序列化
Kafka消息序列化是关键环节,影响消息传输的效率与兼容性。Spring Kafka提供了多种序列化选项,包括StringSerializer、JsonSerializer和自定义序列化器。JsonSerializer尤为常用,它能够将Java对象自动转换为JSON格式。
// 配置JsonSerializer
@Bean
public ProducerFactory<String, Object> producerFactory() {Map<String, Object> configProps = new HashMap<>();// 基本配置configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 配置JsonSerializer并添加类型信息JsonSerializer<Object> jsonSerializer = new JsonSerializer<>();jsonSerializer.setAddTypeInfo(true);return new DefaultKafkaProducerFactory<>(configProps, new StringSerializer(), jsonSerializer);
}
三、事务支持机制
Spring Kafka提供了强大的事务支持,确保消息发布的原子性。通过KafkaTemplate和@Transactional注解,可以轻松实现事务性消息发送。
配置事务支持需要以下步骤:
- 开启生产者幂等性
- 配置事务ID前缀
- 创建KafkaTransactionManager
// 事务支持配置
@Configuration
@EnableTransactionManagement
public class KafkaTransactionConfig {@Beanpublic ProducerFactory<String, Object> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);// 事务必要配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);props.put(ProducerConfig.ACKS_CONFIG, "all");DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(props);// 设置事务ID前缀factory.setTransactionIdPrefix("tx-");return factory;}@Beanpublic KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}@Beanpublic KafkaTransactionManager<String, Object> kafkaTransactionManager() {return new KafkaTransactionManager<>(producerFactory());}
}
使用事务功能可以通过两种方式:编程式事务和声明式事务。
@Service
public class TransactionalMessageService {private final KafkaTemplate<String, Object> kafkaTemplate;@Autowiredpublic TransactionalMessageService(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}// 编程式事务public void sendMessagesInTransaction(String topic, List<String> messages) {kafkaTemplate.executeInTransaction(operations -> {for (String message : messages) {operations.send(topic, message);}return null;});}// 声明式事务@Transactionalpublic void sendMessagesWithAnnotation(String topic1, String topic2, Object message1, Object message2) {// 所有发送操作在同一事务中执行kafkaTemplate.send(topic1, message1);kafkaTemplate.send(topic2, message2);}
}
四、错误处理与重试
在分布式系统中,网络问题或服务不可用情况时有发生,因此错误处理机制至关重要。Spring Kafka提供了全面的错误处理和重试功能。
// 错误处理配置
@Bean
public ProducerFactory<String, Object> producerFactory() {Map<String, Object> props = new HashMap<>();// 基本配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);// 错误处理配置props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000); // 重试间隔return new DefaultKafkaProducerFactory<>(props);
}// 带错误处理的消息发送
public void sendMessageWithErrorHandling(String topic, Object message) {try {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onSuccess(SendResult<String, Object> result) {// 成功处理}@Overridepublic void onFailure(Throwable ex) {if (ex instanceof RetriableException) {// 可重试异常处理} else {// 不可重试异常处理// 如发送到死信队列}}});} catch (Exception e) {// 序列化等异常处理}
}
五、性能优化
高吞吐量场景下,性能优化变得尤为重要。通过调整批处理参数、压缩设置和缓冲区大小,可以显著提升消息发布效率。
// 性能优化配置
@Bean
public ProducerFactory<String, Object> producerFactory() {Map<String, Object> props = new HashMap<>();// 基本配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);// 性能优化配置props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768); // 批处理大小props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 批处理等待时间props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩类型props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 32MB缓冲区return new DefaultKafkaProducerFactory<>(props);
}
总结
Spring Kafka的KafkaTemplate为开发者提供了强大而简洁的消息发布机制。通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统。事务支持特性尤为重要,它确保了在分布式环境中的数据一致性。随着微服务架构和事件驱动设计的普及,掌握Spring Kafka的消息发布技术,已成为现代Java开发者的必备技能。在实际应用中,开发者应根据具体业务需求,选择合适的发送模式和配置策略,以达到最佳的性能和可靠性平衡。
相关文章:
SpringKafka消息发布:KafkaTemplate与事务支持
文章目录 引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优化总结 引言 在现代分布式系统架构中,Apache Kafka作为高吞吐量的消息系统,被广泛应用于事件驱动应用开发。Spring Kafka为Java开发者提供了与Kafka交互…...
进行性核上性麻痹护理指南,助患者安稳生活
生活细致照料 安全保障:进行性核上性麻痹患者易出现平衡障碍、步态不稳,居家环境需格外留意安全。移除地面障碍物,保持通道畅通,在卫生间、走廊安装扶手,防止患者摔倒受伤。 饮食协助:患者常伴有吞咽困难&…...
提取嘉立创3D封装
嘉立创上元器件基本都有3D封装,当用AD或其他软件画PCB时,需要用到的3D封装可以从嘉立创EDA中提取。 首先新建工程,然后放置要提取3D封装的器件 导出-》3D文件 因为导出的文件中包含器件的3D封装和PCB板,需要把PCB板删除才能使用…...
工作记录 2017-03-24
工作记录 2017-03-24 序号 工作 相关人员 1 修改了邮件上的问题。 更新RD服务器。 郝 更新的问题 1、修改了New User时 init的保存。 2、文件的查询加了ID。 3、加了 patient insurance secondary 4、修改了payment detail的处理。 识别引擎监控 Ps (iCDA LOG :剔除…...
chromium魔改——修改 navigator.webdriver 检测
chromium源码官网 https://source.chromium.org/chromium/chromium/src 说下修改的chromium源码思路: 首先在修改源码过检测之前,我们要知道它是怎么检测的,找到他通过哪个JS的API来做的检测,只有知道了如何检测,我们…...
Qt 信号量使用方法
Qt 信号量使用方法 QSemaphore 类 常用函数介绍 函数名称函数功能QSemaphore()构造并初始化对象acquire()尝试获取n个资源,如果没有那么多资源,线程将阻塞直到有n个资源可用available()返回当前信号量可用的资源个数,这个数永远不可能为负…...
训练或微调以生成新组合结构
能通过训练或者微调,产生其它没有的组合的结构,比如有化学组成和空年组,想要生成指定化学成分指定空间组的结构,以下是针对需求的详细分析与实现思路,同时给出相应的 Python 代码示例。 1. 训练或微调以生成新组合结构…...
【通俗易懂说模型】生成对抗网络·GAN
🌈 个人主页:十二月的猫-CSDN博客 🔥 系列专栏: 🏀《深度学习理论直觉三十讲》_十二月的猫的博客-CSDN博客 💪🏻 十二月的寒冬阻挡不了春天的脚步,十二点的黑夜遮蔽不住黎明的曙光 目…...
容器适配器-stack栈
C标准库不只是包含了顺序容器,还包含一些为满足特殊需求而设计的容器,它们提供简单的接口。 这些容器可被归类为容器适配器(container adapter),它们是改造别的标准顺序容器,使之满足特殊需求的新容器。 适配器:也称配置器,把一…...
7-6 混合类型数据格式化输入
本题要求编写程序,顺序读入浮点数1、整数、字符、浮点数2,再按照字符、整数、浮点数1、浮点数2的顺序输出。 输入格式: 输入在一行中顺序给出浮点数1、整数、字符、浮点数2,其间以1个空格分隔。 输出格式: 在一行中…...
【UE5 C++课程系列笔记】31——创建Json并保存为文件
目录 方式一(不推荐) 方式二(推荐) 一、生成普通Json对象 二、对象嵌套对象 三、对象嵌套数组 四、对象嵌套数组再嵌套对象 方式一(不推荐) 如下代码实现了把JSON字符串保存到文件中 #include &qu…...
Photoshop 2025 Mac中文 Ps图像编辑软件
Photoshop 2025 Mac中文 Ps图像编辑软件 文章目录 Photoshop 2025 Mac中文 Ps图像编辑软件一、介绍二、效果三、下载 一、介绍 Adobe Photoshop 2025 Mac版集成了多种强大的图像编辑、处理和创作功能。①强化了Adobe Sensei AI的应用,通过智能抠图、自动修复、图像…...
linux文件上传下载lrzsz
lrzsz 是一个在 Linux 系统中用于通过串行端口(如 ZMODEM、XMODEM、YMODEM 等协议)进行文件上传和下载的工具集。它通常用于在终端环境中通过串口或 SSH 连接传输文件。 安装 lrzsz 在大多数 Linux 发行版中,你可以使用包管理器来安装 lrzsz。 Debian/Ubuntu: sudo apt-ge…...
MySQL超全笔记
1、初识SQL 什么是数据库 数据库 ( DataBase , 简称DB ) 概念 : 长期存放在计算机内,有组织,可共享的大量数据的集合,是一个数据 “仓库” 作用 : 保存,并能安全管理数据(如:增删改查等),减少冗余… 数据库总览 : 关系型数据库 ( SQL ) MySQL , Oracle , SQL Server , SQ…...
使用Redis构架你自己的私有大模型
使用Redis构架你自己的私有大模型--楼兰 Redis你通常用来做什么?缓存?分布式锁?数据过滤器?不够不够,这远远不够。之前给大家分享过基于Redis Stack提供的一系列插件,完全可以把Redis作为一个类似于Elastic Search的JSON数据库使用。不光可以存储并操作JSON格式的数据…...
2.4路径问题专题:LeeCode 931.下降路径最小和
动态规划解决最小下降路径和问题 1. 题目链接 LeetCode 931. 最小下降路径和 2. 题目描述 给定一个 n x n 的整数矩阵 matrix,找到一条从第一行到最后一行的下降路径,使得路径上的数字和最小。下降路径可以从第一行的任意元素出发,每一步…...
从内核到应用层:Linux缓冲机制与语言缓冲区的协同解析
系列文章目录 文章目录 系列文章目录前言一、缓冲区1.1 示例11.2 缓冲区的概念 二、缓冲区刷新方案三、缓冲区的作用及存储 前言 上篇我们介绍了,文件的重定向操作以及文件描述符的概念,今天我们再来学习一个和文件相关的知识-----------用户缓冲区。 在…...
【AI News | 20250403】每日AI进展
AI Repos 1、llm-server-docs 项目提供了一份基于Debian系统的本地语言模型服务器搭建指南,适用于Linux初学者。教程涵盖驱动安装、GPU功耗设置、自动登录配置及开机自启脚本部署等关键步骤,支持Ollama/vLLM等多种OpenAI兼容方案。方案设计强调四大原则…...
深入理解SQL中的<>运算符:不等于的灵活运用
在SQL的世界里,数据的筛选与查询是最常见的操作之一。在编写查询语句时,比较运算符是我们不可忽视的工具,其中,<> 运算符作为 不等于 的代表,起着至关重要的作用。它不仅能够帮助我们筛选出符合特定条件的数据&a…...
单网卡上绑定多个虚拟IP(AI回答)
单网卡绑定多个虚拟IP的实现方法 一、Linux系统配置方案 1. 手动绑定少量IP地址(适用于CentOS/RHEL) 步骤1:进入网络配置目录 cd /etc/sysconfig/network-scripts/步骤2:复制并重命名网卡配置文件 cp ifcfg-eth0 i…...
数据清洗的具体内容
(一)ETL介绍 “ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较…...
小家电等电子设备快充方案,XSP15支持全协议和支持MCU与电脑传输数据
随着USB-C的普及,市面上消费者PD充电器越来越多,如何让小家电等电子产品也能够支持PD协议快充呢?就需要加入一颗汇铭达XSP15取电协议芯片,这颗芯片不仅能支持取电,还能通过串口读取充电器支持的最大输出功率和支持外部…...
手动实现一个迷你Llama:手动实现Llama模型
进阶的 LLM Llama模型教学一、库导入二、实现 ModelArgs 参数类构建Transformer 模型参数解释 三、实现均方根归一化(RMSNorm,LayerNorm 的一种变体)层定义与原理RMSNorm 公式与 LayerNorm 的对比RMSNorm 的优点RMSNorm 的实现RMSNorm 的关键…...
缺页异常导致的iowait打印出相关文件的绝对路径
一、背景 在之前的博客 增加等IO状态的唤醒堆栈打印及缺页异常导致iowait分析-CSDN博客 里,我们进一步优化了D状态和等IO状态的事件的堆栈打印,补充了唤醒堆栈打印,也分析了一种比较典型的缺页异常filemap_fault导致的iowait的情况。 在这篇…...
记录学习的第十七天
今天对昨天下午的洛谷蓝桥杯模拟赛和今天早上的力扣周赛进行复盘。 昨天的蓝桥杯模拟赛,硬坐了4个小时,只会做前面的三道入门题。😥而且第一道填空题竟然还算错了。其他的五道题我都没啥思路了,实在难受啊! Q1:这道题硬…...
全面解析 Mybatis 与 Mybatis-Plus:深入原理、实践案例与高级特性对比
全面解析 Mybatis 与 Mybatis-Plus:深入原理、实践案例与高级特性对比 🚀 前言一、基础介绍 ✨1. Mybatis 简介 🔍2. Mybatis-Plus 简介 ⚡ 二、核心区别与高级特性对比 🔎1. 开发模式与配置管理2. 功能丰富度与扩展性3. 自动填充…...
Ubuntu 22.04 一键部署openManus
openManus 前言 OpenManus-RL,这是一个专注于基于强化学习(RL,例如 GRPO)的方法来优化大语言模型(LLM)智能体的开源项目,由来自UIUC 和 OpenManus 的研究人员合作开发。 前提要求 安装deepseek docker方式安装 ,windows 方式安装,Linux安装方式...
lib-zo,C语言另一个协程库,dns协程化, gethostbyname
lib-zo,C语言另一个协程库,dns协程化, gethostbyname 另一个 C 协程库 https://blog.csdn.net/eli960/article/details/146802313 本协程库 支持 DNS查询 协程化. 禁用所有 UDP 协程化 zvar_coroutine_disable_udp 1;禁用 53 端口的UDP 协程化 zvar_coroutine_disable_ud…...
强化学习_Paper_1988_Learning to predict by the methods of temporal differences
paper Link: sci-hub: Learning to predict by the methods of temporal differences 1. 摘要 论文介绍了时间差分方法(TD 方法),这是一种用于预测问题的增量学习方法。TD 方法通过比较连续时间步的预测值之间的差异来调整模型,…...
虚拟电商-话费充值业务(六)话费充值业务回调补偿
一、话费充值回调业务补偿 业务需求:供应商对接下单成功后充吧系统将订单状态更改为:等待确认中,此时等待供应商系统进行回调,当供应商系统回调时说明供应商充值成功,供应商回调充吧系统将充吧的订单改为充值成功&…...
