当前位置: 首页 > news >正文

kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)

文章目录

  • 1、kafka数据传递语义
  • 2、kafka生产者事务
  • 3、事务消息发送
    • 3.1、application.yml配置
    • 3.2、创建生产者监听器
    • 3.3、创建生产者拦截器
    • 3.4、发送消息测试
    • 3.5、使用Java代码创建主题分区副本
    • 3.6、屏蔽 kafka debug 日志 logback.xml
    • 3.7、引入spring-kafka依赖
    • 3.8、控制台日志

1、kafka数据传递语义

kafka发送消息时是否需要重试

  1. 仅发送一次:生产者发送消息后不重试,只发送一次 可能丢失消息 效率最高
  2. 至少一次:生产者发送消息后重试,可能重试多次 效率差
  3. 精准一次发送:生产者发送消息后无论是否重复发送 发送了多少次,在 kafka broker 中只保存一次消息,通过幂等性 + 生产者事务来实现

kafak天然支持幂等性,每个消息头中带了一个唯一的标志 kafka broker 根据此标志判断消息是否已经发送过,生产者事务可以保证数据没有最终发送成功时,消费者不可以消费,如果生产者发送消息时出现异常会自动回滚(清除之前发送的事务中的消息)

kafka天然幂等性:但是指的是生产者事务 生产消息时的幂等性,发送消息时消息中带唯一标识、broker接收到消息时如果重复不再保存,事务没提交消费者不能消费改消息

2、kafka生产者事务

保证消息生产的幂等性
一组消息要么一起成功 被消费者消息 要么一起失败都不能被消费者消费

  1. 配置ack为-1 分区所有副本均落盘成功
  2. 配置生产者重试(发送失败可以继续发送:需要保证发送失败后再次发送消息到kafka实现 精准一次发送)
  3. 需要给事务分配事务id(区分一个事务中的多条消息)

3、事务消息发送

3.1、application.yml配置

server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 1 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、-1/all)transaction-id-prefix: tx_  # 事务id前缀:配置后producer自动开启事务batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

3.2、创建生产者监听器

package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1  leader落盘  broker ack之后 才成功//ack为 -1 分区所有副本全部落盘  broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}

3.3、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

3.4、发送消息测试

package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}//kafka事务支持spring-tx的事务注解//单元测试中的事务会自动回滚@Testvoid testTransaction() throws  IOException {//多个消息的发送在一个事务中执行kafkaTemplate.executeInTransaction((var1) -> {//通过一个事务中的operations对象来发送消息,执行事务操作var1.send("my_topic1",0,"", "spring-kafka-事务1");var1.send("my_topic1",0,"", "spring-kafka-事务2");int i = 1/0;var1.send("my_topic1",0,"", "spring-kafka-事务3");return "发送消息失败";});System.in.read();}
}

3.5、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}

3.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

3.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

3.8、控制台日志

生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务1
生产者即将发送消息:topic = my_topic1,partition:0,key = ,value = spring-kafka-事务2
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务1,offset = -1
异常信息:Failing batch since transaction was aborted
MyKafkaProducerListener消息发送失败:topic=my_topic1,partition = 0,key = ,value = spring-kafka-事务2,offset = -1
异常信息:Failing batch since transaction was abortedjava.lang.ArithmeticException: / by zero

在这里插入图片描述

相关文章:

kafka-生产者事务-数据传递语义事务介绍事务消息发送(SpringBoot整合Kafka)

文章目录 1、kafka数据传递语义2、kafka生产者事务3、事务消息发送3.1、application.yml配置3.2、创建生产者监听器3.3、创建生产者拦截器3.4、发送消息测试3.5、使用Java代码创建主题分区副本3.6、屏蔽 kafka debug 日志 logback.xml3.7、引入spring-kafka依赖3.8、控制台日志…...

免费!GPT-4o发布,实时语音视频丝滑交互

We’re announcing GPT-4o, our new flagship model that can reason across audio, vision, and text in real time. 5月14日凌晨&#xff0c;OpenAI召开了春季发布会&#xff0c;发布会上公布了新一代旗舰型生成式人工智能大模型【GPT-4o】&#xff0c;并表示该模型对所有免费…...

DevOps的原理及应用详解(四)

本系列文章简介: 在当今快速变化的商业环境中,企业对于软件交付的速度、质量和安全性要求日益提高。传统的软件开发和运维模式已经难以满足这些需求,因此,DevOps(Development和Operations的组合)应运而生,成为了解决这些问题的有效方法。 DevOps是一种强调软件开发人员(…...

关于选择,关于处事

一个人选择应该选择的是勇敢&#xff0c;选择不应该选择的是无奈。放弃&#xff0c;不该放弃的是懦夫&#xff0c;不放弃应该放弃的是睿智。所以&#xff0c;碰到事的时候要先静&#xff0c;先不管什么事&#xff0c;先静下来&#xff0c;先淡定&#xff0c;先从容。在生活里要…...

大话设计模式解读02-策略模式

本篇文章&#xff0c;来解读《大话设计模式》的第2章——策略模式。并通过Qt和C代码实现实例代码的功能。 1 策略模式 策略模式作为一种软件设计模式&#xff0c;指对象有某个行为&#xff0c;但是在不同的场景中&#xff0c;该行为有不同的实现算法。 策略模式的特点&#…...

展会邀请 | 龙智即将亮相2024上海国际嵌入式展,带来安全合规、单一可信数据源、可追溯、高效协同的嵌入式开发解决方案

2024年6月12日至14日&#xff0c;备受全球嵌入式系统产业和社群瞩目的2024上海国际嵌入式展&#xff08;embedded world china 2024&#xff09;即将盛大开幕&#xff0c;龙智将携行业领先的嵌入式开发解决方案亮相 640展位 。 此次参展&#xff0c;龙智将全面展示专为嵌入式行…...

codeforce round951 div2

A guess the maximum 问题&#xff1a; 翻译一下就是求所有相邻元素中max - 1的最小值 代码&#xff1a; #include <iostream> #include <algorithm>using namespace std;const int N 5e4;int a[N]; int n;void solve() {cin >> n;int ans 0x3f3f3f3f;…...

arcgis开发记录

目录 文章目录 [toc]**arcgis JavaScript API安装**1. arcgisAPI下载地址&#xff1a;https://developers.arcgis.com/downloads/2. 4.4版本API&#xff1a;本地配置3. 3.18版本修改方法 **angular2中加载arcgis JS API**** arcgis加载图层 并显示图层上点的信息****使用图层上…...

RPA-UiBot6.0数据整理机器人—杂乱数据秒变报表

前言 友友们是否常常因为杂乱的数据而烦恼?数据分类、排序、筛选这些繁琐的任务是否占据了友友们的大部分时间?这篇博客将为友友们带来一个新的解决方案,让我们共同学习如何运用RPA数据整理机器人,实现杂乱数据的快速整理,为你的工作减负增效! 在这里,友友们将了…...

Application UI

本节包含关于如何用DevExpress控件模拟许多流行的应用程序ui的教程。 Windows 11 UI Windows 11和最新一代微软Office产品启发的UI。 Office Inspired UI Word、Excel、PowerPoint和Visio等微软Office应用程序启发的UI。 如何&#xff1a;手动构建Office风格的UI 本教程演示…...

关于 Redis 中集群

哨兵机制中总结到&#xff0c;它并不能解决存储容量不够的问题&#xff0c;但是集群能。 广义的集群&#xff1a;只要有多个机器&#xff0c;构成了分布式系统&#xff0c;都可以称之为一个“集群”&#xff0c;例如主从结构中的哨兵模式。 狭义的集群&#xff1a;redis 提供的…...

C++必修:探索C++的内存管理

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ &#x1f388;&#x1f388;养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属专栏&#xff1a;C学习 贝蒂的主页&#xff1a;Betty’s blog 1. C/C的内存分布 我们首先来看一段代码及其相关问题 int globalVar 1; static…...

python列表---基本语法(浅拷贝,深拷贝等)

文章目录 引言:列表的注意事项1 list中的浅拷贝与深拷贝1.1浅拷贝(Shallow Copy)浅拷贝的方法浅拷贝的效果1.2深拷贝(Deep Copy)深拷贝的方法深拷贝的效果1.3 总结:浅拷贝 vs 深拷贝1.4 为什么浅拷贝顶层元素如果是不可变数据就不能共享,不是传的是引用就相当于传的是地…...

go语言接口之sort.Interface接口

排序操作和字符串格式化一样是很多程序经常使用的操作。尽管一个最短的快排程序只要15 行就可以搞定&#xff0c;但是一个健壮的实现需要更多的代码&#xff0c;并且我们不希望每次我们需要的时候 都重写或者拷贝这些代码。 幸运的是&#xff0c;sort包内置的提供了根据一些排序…...

android:text 总为大写字母的原因

当设置某个 Button 的 text 为英文时&#xff0c;界面上显示的是该英文的大写形式&#xff08;uppercase&#xff09;。例如&#xff1a; <Buttonandroid:id"id/btn"android:layout_width"wrap_content"android:layout_height"wrap_content"…...

CISCN2024 初赛 wp 部分复现(Re)

Misc 1. 火锅链观光打卡 答题即可 Re 1. asm_re 感谢智谱清言&#xff0c;可以读出大致加密算法 这是输入 这是加密部分 这里判断 找到疑似密文的部分&#xff0c;手动改一下端序 #asm_wp def dec(char):return (((char - 0x1E) ^ 0x4D) - 0x14) // 0x50 #return (ord(cha…...

YOLOv10、YOLOv9 和 YOLOv8 在实际视频中的对比

引言 目标检测技术是计算机视觉领域的核心任务之一&#xff0c;YOLO&#xff08;You Only Look Once&#xff09;系列模型凭借其高效的检测速度和准确率成为了业界的宠儿。本文将详细对比YOLOv10、YOLOv9和YOLOv8在实际视频中的表现&#xff0c;探讨它们在性能、速度和实际应用…...

热题系列章节5

169. 多数元素 给定一个大小为 n 的数组&#xff0c;找到其中的多数元素。多数元素是指在数组中出现次数大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 示例 1: 输入: [3,2,3] 输出: 3 示例 2: 输入: [2,2,1,1,1,2,2] 输出:…...

ArcGIS for js 4.x 加载图层

二维&#xff1a; 1、创建vue项目 npm create vitelatest 2、安装ArcGIS JS API依赖包 npm install arcgis/core 3、引入ArcGIS API for JavaScript模块 <script setup> import "arcgis/core/assets/esri/themes/light/main.css"; import Map from arcgis…...

Three.js和Babylon.js,webGL中的对比效果分析!

hello&#xff0c;今天分享一些three.js和babylon.js常识&#xff0c;为大家选择three.js还是babylon.js做个分析&#xff0c;欢迎点赞评论转发。 一、Babylon.js是什么 Babylon.js是一个基于WebGL技术的开源3D游戏引擎和渲染引擎。它提供了一套简单易用的API&#xff0c;使开发…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

(十)学生端搭建

本次旨在将之前的已完成的部分功能进行拼装到学生端&#xff0c;同时完善学生端的构建。本次工作主要包括&#xff1a; 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖

在前面的练习中&#xff0c;每个页面需要使用ref&#xff0c;onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入&#xff0c;需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

Mac软件卸载指南,简单易懂!

刚和Adobe分手&#xff0c;它却总在Library里给你写"回忆录"&#xff1f;卸载的Final Cut Pro像电子幽灵般阴魂不散&#xff1f;总是会有残留文件&#xff0c;别慌&#xff01;这份Mac软件卸载指南&#xff0c;将用最硬核的方式教你"数字分手术"&#xff0…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...

【分享】推荐一些办公小工具

1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由&#xff1a;大部分的转换软件需要收费&#xff0c;要么功能不齐全&#xff0c;而开会员又用不了几次浪费钱&#xff0c;借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...