kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)
文章目录
- 1、kafka数据传递语义
- 2、kafka生产者事务
- 3、事务消息发送
- 3.1、application.yml配置
- 3.2、创建生产者监听器
- 3.3、创建生产者拦截器
- 3.4、发送消息测试
- 3.5、使用Java代码创建主题分区副本
- 3.6、屏蔽 kafka debug 日志 logback.xml
- 3.7、引入spring-kafka依赖
- 3.8、控制台日志
1、kafka数据传递语义
kafka发送消息时是否需要重试
仅发送一次:生产者发送消息后不重试,只发送一次 可能丢失消息 效率最高至少一次:生产者发送消息后重试,可能重试多次 效率差精准一次发送:生产者发送消息后无论是否重复发送 发送了多少次,在 kafka broker 中只保存一次消息,通过幂等性 + 生产者事务来实现
kafak天然支持幂等性,每个消息头中带了一个唯一的标志 kafka broker 根据此标志判断消息是否已经发送过,生产者事务可以保证数据没有最终发送成功时,消费者不可以消费,如果生产者发送消息时出现异常会自动回滚(清除之前发送的事务中的消息)
kafka天然幂等性:但是指的是生产者事务 生产消息时的幂等性,发送消息时消息中带唯一标识、broker接收到消息时如果重复不再保存,事务没提交消费者不能消费改消息
2、kafka生产者事务
保证消息生产的幂等性
一组消息要么一起成功 被消费者消息 要么一起失败都不能被消费者消费
配置ack为-1 分区所有副本均落盘成功配置生产者重试(发送失败可以继续发送:需要保证发送失败后再次发送消息到kafka实现 精准一次发送)需要给事务分配事务id(区分一个事务中的多条消息)
3、事务消息发送
3.1、application.yml配置
server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 1 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)transaction-id-prefix: tx_ # 事务id前缀:配置后producer自动开启事务batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器
3.2、创建生产者监听器
package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1 leader落盘 broker ack之后 才成功//ack为 -1 分区所有副本全部落盘 broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}
3.3、创建生产者拦截器
package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
3.4、发送消息测试
package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}//kafka事务支持spring-tx的事务注解//单元测试中的事务会自动回滚@Testvoid testTransaction() throws IOException {//多个消息的发送在一个事务中执行kafkaTemplate.executeInTransaction((var1) -> {//通过一个事务中的operations对象来发送消息,执行事务操作var1.send("my_topic1",0,"", "spring-kafka-事务1");var1.send("my_topic1",0,"", "spring-kafka-事务2");int i = 1/0;var1.send("my_topic1",0,"", "spring-kafka-事务3");return "发送消息失败";});System.in.read();}
}
3.5、使用Java代码创建主题分区副本
package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}
3.6、屏蔽 kafka debug 日志 logback.xml
<configuration> <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>
3.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
3.8、控制台日志
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务1
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务2
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务1,offset = -1
异常信息:Failing batch since transaction was aborted
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务2,offset = -1
异常信息:Failing batch since transaction was abortedjava.lang.ArithmeticException: / by zero

相关文章:
kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)
文章目录 1、kafka数据传递语义2、kafka生产者事务3、事务消息发送3.1、application.yml配置3.2、创建生产者监听器3.3、创建生产者拦截器3.4、发送消息测试3.5、使用Java代码创建主题分区副本3.6、屏蔽 kafka debug 日志 logback.xml3.7、引入spring-kafka依赖3.8、控制台日志…...
免费!GPT-4o发布,实时语音视频丝滑交互
We’re announcing GPT-4o, our new flagship model that can reason across audio, vision, and text in real time. 5月14日凌晨,OpenAI召开了春季发布会,发布会上公布了新一代旗舰型生成式人工智能大模型【GPT-4o】,并表示该模型对所有免费…...
DevOps的原理及应用详解(四)
本系列文章简介: 在当今快速变化的商业环境中,企业对于软件交付的速度、质量和安全性要求日益提高。传统的软件开发和运维模式已经难以满足这些需求,因此,DevOps(Development和Operations的组合)应运而生,成为了解决这些问题的有效方法。 DevOps是一种强调软件开发人员(…...
关于选择,关于处事
一个人选择应该选择的是勇敢,选择不应该选择的是无奈。放弃,不该放弃的是懦夫,不放弃应该放弃的是睿智。所以,碰到事的时候要先静,先不管什么事,先静下来,先淡定,先从容。在生活里要…...
大话设计模式解读02-策略模式
本篇文章,来解读《大话设计模式》的第2章——策略模式。并通过Qt和C代码实现实例代码的功能。 1 策略模式 策略模式作为一种软件设计模式,指对象有某个行为,但是在不同的场景中,该行为有不同的实现算法。 策略模式的特点&#…...
展会邀请 | 龙智即将亮相2024上海国际嵌入式展,带来安全合规、单一可信数据源、可追溯、高效协同的嵌入式开发解决方案
2024年6月12日至14日,备受全球嵌入式系统产业和社群瞩目的2024上海国际嵌入式展(embedded world china 2024)即将盛大开幕,龙智将携行业领先的嵌入式开发解决方案亮相 640展位 。 此次参展,龙智将全面展示专为嵌入式行…...
codeforce round951 div2
A guess the maximum 问题: 翻译一下就是求所有相邻元素中max - 1的最小值 代码: #include <iostream> #include <algorithm>using namespace std;const int N 5e4;int a[N]; int n;void solve() {cin >> n;int ans 0x3f3f3f3f;…...
arcgis开发记录
目录 文章目录 [toc]**arcgis JavaScript API安装**1. arcgisAPI下载地址:https://developers.arcgis.com/downloads/2. 4.4版本API:本地配置3. 3.18版本修改方法 **angular2中加载arcgis JS API**** arcgis加载图层 并显示图层上点的信息****使用图层上…...
RPA-UiBot6.0数据整理机器人—杂乱数据秒变报表
前言 友友们是否常常因为杂乱的数据而烦恼?数据分类、排序、筛选这些繁琐的任务是否占据了友友们的大部分时间?这篇博客将为友友们带来一个新的解决方案,让我们共同学习如何运用RPA数据整理机器人,实现杂乱数据的快速整理,为你的工作减负增效! 在这里,友友们将了…...
Application UI
本节包含关于如何用DevExpress控件模拟许多流行的应用程序ui的教程。 Windows 11 UI Windows 11和最新一代微软Office产品启发的UI。 Office Inspired UI Word、Excel、PowerPoint和Visio等微软Office应用程序启发的UI。 如何:手动构建Office风格的UI 本教程演示…...
关于 Redis 中集群
哨兵机制中总结到,它并不能解决存储容量不够的问题,但是集群能。 广义的集群:只要有多个机器,构成了分布式系统,都可以称之为一个“集群”,例如主从结构中的哨兵模式。 狭义的集群:redis 提供的…...
C++必修:探索C++的内存管理
✨✨ 欢迎大家来到贝蒂大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C学习 贝蒂的主页:Betty’s blog 1. C/C的内存分布 我们首先来看一段代码及其相关问题 int globalVar 1; static…...
python列表---基本语法(浅拷贝,深拷贝等)
文章目录 引言:列表的注意事项1 list中的浅拷贝与深拷贝1.1浅拷贝(Shallow Copy)浅拷贝的方法浅拷贝的效果1.2深拷贝(Deep Copy)深拷贝的方法深拷贝的效果1.3 总结:浅拷贝 vs 深拷贝1.4 为什么浅拷贝顶层元素如果是不可变数据就不能共享,不是传的是引用就相当于传的是地…...
go语言接口之sort.Interface接口
排序操作和字符串格式化一样是很多程序经常使用的操作。尽管一个最短的快排程序只要15 行就可以搞定,但是一个健壮的实现需要更多的代码,并且我们不希望每次我们需要的时候 都重写或者拷贝这些代码。 幸运的是,sort包内置的提供了根据一些排序…...
android:text 总为大写字母的原因
当设置某个 Button 的 text 为英文时,界面上显示的是该英文的大写形式(uppercase)。例如: <Buttonandroid:id"id/btn"android:layout_width"wrap_content"android:layout_height"wrap_content"…...
CISCN2024 初赛 wp 部分复现(Re)
Misc 1. 火锅链观光打卡 答题即可 Re 1. asm_re 感谢智谱清言,可以读出大致加密算法 这是输入 这是加密部分 这里判断 找到疑似密文的部分,手动改一下端序 #asm_wp def dec(char):return (((char - 0x1E) ^ 0x4D) - 0x14) // 0x50 #return (ord(cha…...
YOLOv10、YOLOv9 和 YOLOv8 在实际视频中的对比
引言 目标检测技术是计算机视觉领域的核心任务之一,YOLO(You Only Look Once)系列模型凭借其高效的检测速度和准确率成为了业界的宠儿。本文将详细对比YOLOv10、YOLOv9和YOLOv8在实际视频中的表现,探讨它们在性能、速度和实际应用…...
热题系列章节5
169. 多数元素 给定一个大小为 n 的数组,找到其中的多数元素。多数元素是指在数组中出现次数大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示例 1: 输入: [3,2,3] 输出: 3 示例 2: 输入: [2,2,1,1,1,2,2] 输出:…...
ArcGIS for js 4.x 加载图层
二维: 1、创建vue项目 npm create vitelatest 2、安装ArcGIS JS API依赖包 npm install arcgis/core 3、引入ArcGIS API for JavaScript模块 <script setup> import "arcgis/core/assets/esri/themes/light/main.css"; import Map from arcgis…...
Three.js和Babylon.js,webGL中的对比效果分析!
hello,今天分享一些three.js和babylon.js常识,为大家选择three.js还是babylon.js做个分析,欢迎点赞评论转发。 一、Babylon.js是什么 Babylon.js是一个基于WebGL技术的开源3D游戏引擎和渲染引擎。它提供了一套简单易用的API,使开发…...
mT5中文-base零样本增强模型效果展示:客服对话意图泛化与槽位值增强案例
mT5中文-base零样本增强模型效果展示:客服对话意图泛化与槽位值增强案例 1. 模型能力概览 mT5中文-base零样本增强模型是一个专门针对中文文本增强优化的AI模型。它在原有mT5模型基础上,使用了大量中文数据进行深度训练,并引入了创新的零样…...
贵阳炒菜哪家好吃?怎么选?
在贵阳找炒菜:几个可以参考的方向在贵阳,找一顿地道的炒菜,是感受这座城市烟火气的方式之一。贵阳的炒菜馆子,有藏在小巷里的老店,也有融合了现代审美的院落餐厅。它们的共同点在于对本地食材和调味的把握。这篇梳理几…...
赣州琴行哪家最可靠
在赣州,选择一家可靠的琴行对于孩子的钢琴启蒙和成长至关重要。今天我们就来聊聊赣州的几家知名琴行,看看哪家最适合您的孩子。1. 可六琴行:专注儿童钢琴启蒙,天天练琴模式为什么选择可六琴行?1.1 专注儿童钢琴启蒙具体…...
LumiPixel模型API接口调用详解:Python/Node.js快速集成
LumiPixel模型API接口调用详解:Python/Node.js快速集成 1. 前言:为什么选择API集成 如果你正在开发一个需要AI生成能力的应用,直接调用现成的模型API可能是最高效的方式。LumiPixel Canvas Quest模型提供了简单易用的API接口,让…...
告别抢票焦虑:用Python自动化脚本轻松获取大麦网演唱会门票
告别抢票焦虑:用Python自动化脚本轻松获取大麦网演唱会门票 【免费下载链接】DamaiHelper 大麦网演唱会演出抢票脚本。 项目地址: https://gitcode.com/gh_mirrors/dama/DamaiHelper 还在为心仪的演唱会门票秒光而烦恼吗?DamaiHelper大麦网抢票脚…...
跨平台文件同步方案:OpenClaw+Qwen3-32B智能归档系统
跨平台文件同步方案:OpenClawQwen3-32B智能归档系统 1. 为什么需要智能文件同步 作为一个长期在多台设备间切换工作的开发者,我深受文件管理混乱的困扰。Mac上的设计稿、Windows里的开发文档、Linux服务器上的日志文件——这些散落在各处的数据就像一座…...
YOLOv12模型训练技巧:解决类别不平衡与过拟合问题
YOLOv12模型训练技巧:解决类别不平衡与过拟合问题 训练一个表现优异的YOLOv12模型,就像培养一位顶尖的运动员。光有强大的天赋(模型架构)还不够,科学的训练方法(训练技巧)才是决定最终成绩的关…...
国内外优秀的源码网站,程序员必备收藏
在快节奏的开发环境中,高效获取优质源码已成为提升开发效率的关键。无论是快速搭建项目原型、学习优秀代码架构,还是寻找商业级系统解决方案,一个可靠的源码平台能为你节省大量时间和精力。今天,我将为大家分享一个近期在开发者圈…...
面向生产的Chatgpt5.4:系统集成、架构模式与成本优化深度拆解
对于计划将顶级AI能力深度集成至自身产品与工作流的团队而言,理解Gemini 3.1 Pro的系统级特性、集成模式与全生命周期成本至关重要。国内开发者可通过RskAi(www.rsk.cn)等聚合平台,以零成本、国内直访的方式完成前期技术验证与原型…...
OpenClaw数据标注:用Qwen3-VL:30B增强飞书图像训练集
OpenClaw数据标注:用Qwen3-VL:30B增强飞书图像训练集 1. 为什么需要自动化数据标注 作为一个小型AI团队的算法工程师,我最近遇到了一个典型的数据瓶颈问题:我们需要为垂直领域的图像识别任务构建训练集,但手动标注上千张飞书聊天…...
