当前位置: 首页 > news >正文

Springboot使用kafka事务-生产者方

前言

在上一篇文章中,我们使用了springboot的AOP功能实现了kafka的分布式事务,但是那样实现的kafka事务是不完美的,因为请求进来之后分配的是不同线程,但不同线程使用的kafka事务却是同一个,这样会造成多请求情况下的事务失效。

而解决这个问题的方法,就是每个线程都使用一个新的事务生产者去发送一条新的事务消息,然后这个事务还要和当前线程进行绑定,实现不同线程之间的事务隔离。

通常来说,这个繁杂的过程虽然我们可以实现,但是始终没有框架研发者做的那么完美,所以,我们首先要去看一下框架的作者有没有实现这个功能。

幸运地是,上述功能在kafka之中是有实现的,而且首次实现的时间是在2017年,所以我们可以直接使用作者提供的基于springboot的事务管理功能。

注入kafka事务

在springboot中启用kafka的事务,有两种方式,第一种方式为使用springboot提供的自动配置,第二种是自己往容器中注入。

方式一:springboot自动注入

想要使用自动注入,我们只需要在配置文件中加入transaction-id-prefix即可,配置文件如下:

spring:kafka:producer:bootstrap-servers: localhost:9092#bootstrap-servers: localhost:9010key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertransaction-id-prefix: test

这样配置之后,就开启了kafka的事务。

方式一弊端

这样虽然可以直接使用springboot自动装配功能,但是却有下面两个弊端

  • 只能使用一个kafka的集群地址
  • 全局开启了事务,有的方法并不需要全局开启事务
    所以一旦有多个kafka的地址需要配置,或者只想让部分方法使用事务,那么就可以使用第二种方法来解决,那就是自己往容器里面添加kafka的事务管理器。

方式二:向spring容器中添加自定义kafka事务管理器

在kafka事务管理器中,有三个重要的对象,分别是ProducerFactory、KafkaTemplate、KafkaTransactionManager,他们的作用如下:

  • ProducerFactory,用来创建kafka的生产者对象
  • KafkaTemplate,springboot封装的kafka模版
  • KafkaTransactionManager,kafka的事务管理器
    想要往spring容器中添加自定义的kafka事务管理器,其实就是添加一个自定义的KafkaTransactionManager对象,那么我们只需要想办法构造一个KafkaTransactionManager就好。

利用springboot的配置类,我们能很轻松的做到这一点。
第一步,构造一个配置类KafkaAndDataTransactionConfig,加上@Configuration注解。

@Configuration
public class KafkaAndDataTransactionConfig {
}

第二步,构建一个ProducerFactory对象的Bean,交给spring容器。

	@ResourceNacosDiscoveryProperties nacosDiscoveryProperties;/*** 注入一个kafka生产者,这个生产者的transactional.id自定义,避免导致多个生产者的事务id相同* @param props yaml文件中的定义属性*/@BeanProducerFactory<String, String> pf1(KafkaProperties props) {Map<String, Object> pProps = props.buildProducerProperties();pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "product-transactional-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());pProps.put(ProducerConfig.CLIENT_ID_CONFIG, "product-client-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());return new DefaultKafkaProducerFactory<>(pProps);}

注意其中的nacosDiscoveryProperties变量,这是用来获取实例在nacos中的ip地址,因为在多实例的情况下需要保证每一个事务id的唯一,才不会被kafka的事务管理器识别为失效事务生产者,从而导致事务冲突失效。
第三步,创建一个KafkaTransactionManager对象的Bean,添加到spring容器。

	/*** 注入一个kafka事务管理器,这个事务管理器使用事务id* @param pf1* @return*/@BeanKafkaTransactionManager<String, String> kafkaTransactionManagerWithTxId(ProducerFactory<String, String> pf1) {return new KafkaTransactionManager<>(pf1);}

只需要将创建好的生产者bean,作为构造参数传入即可。
通过以上三步,我们就得到了一个支持事务的kafka事务管理器了,不过,此时我们还少创建了一个KafkaTemplate,没有这个对象我们将完不成事务发送的管控。

第四步,创建KafkaTemplate

	/*** 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务* @param pf1* @return*/@BeanKafkaTemplate<String, String> kafkaTemplateWithTxId(ProducerFactory<String, String> pf1) {return new KafkaTemplate<>(pf1);}

经过以上代码,我们就得到了一个完整的kafka事务管理器了。
全部代码如下:

@Configuration
public class KafkaAndDataTransactionConfig {@ResourceNacosDiscoveryProperties nacosDiscoveryProperties;/*** 注入一个kafka生产者,这个生产者的transactional.id自定义,避免导致多个生产者的事务id相同* @param props yaml文件中的定义属性*/@BeanProducerFactory<String, String> pf1(KafkaProperties props) {Map<String, Object> pProps = props.buildProducerProperties();pProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "product-transactional-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());pProps.put(ProducerConfig.CLIENT_ID_CONFIG, "product-client-id-" + nacosDiscoveryProperties.getIp() + "-" + nacosDiscoveryProperties.getPort());return new DefaultKafkaProducerFactory<>(pProps);/*** 注入一个kafka事务管理器,这个事务管理器使用事务id* @param pf1* @return*/@BeanKafkaTransactionManager<String, String> kafkaTransactionManagerWithTxId(ProducerFactory<String, String> pf1) {return new KafkaTransactionManager<>(pf1);}/*** 注入一个使用事务id的kafkaTemplate,这个kafkaTemplate可以使用事务* @param pf1* @return*/@BeanKafkaTemplate<String, String> kafkaTemplateWithTxId(ProducerFactory<String, String> pf1) {return new KafkaTemplate<>(pf1);}}

增加DataSourceTransaction事务管理器

默认情况,DataSourceTransaction事务管理器springboot会帮我们自动配置,但是在使用了kafka的事务之后,会存在一个类的加载冲突,导致DataSourceTransaction没有被springboot自动加载到,所以我们还需要自己将DataSourceTransaction事务管理加入进来。
在上面的代码中,再加入以下代码

	//构造器注入DataSource和transactionManagerCustomizersprivate final DataSource dataSource;private final TransactionManagerCustomizers transactionManagerCustomizers;KafkaAndDataTransactionConfig(DataSource dataSource,ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {this.dataSource = dataSource;this.transactionManagerCustomizers = transactionManagerCustomizers.getIfAvailable();}/*** @Bean 去掉了ConditionalOnMissingBean 避免注入了kafka事务管理器后,springboot不再注入DataSourceTransactionManager* @Primary  作为主事务管理器,这样在使用@Transactional时,就会使用DataSourceTransactionManager* @param properties* @return*/@Bean@Primarypublic DataSourceTransactionManager dstm(DataSourceProperties properties) {DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(this.dataSource);if (this.transactionManagerCustomizers != null) {this.transactionManagerCustomizers.customize(transactionManager);}return transactionManager;}

增加ChainedKafkaTransactionManager管理器

在实际开发中,有时候一个方法需要既支持kafka的事务,又需要支持JDBC的事务,这个时候为了兼容两者的事务,我们需要将两者的事务放到同一个事务管理器中,让他们两个构成一个事务。kafka的作者为我们提供了ChainedKafkaTransactionManager这个对象,来支持这个操作,只需要加入以下代码即可

	 //多个事务管理器构成一个事务,使用ChainedKafkaTransactionManager管理,是因为可以自动偏移kafka事务给消费者@Bean public ChainedKafkaTransactionManager kafkaAndDataSourceTransactionManager(DataSourceTransactionManager transactionManager,@Autowired @Qualifier("kafkaTransactionManagerWithTxId") KafkaTransactionManager<?, ?> kafkaTransactionManager){return new ChainedKafkaTransactionManager<>(transactionManager, kafkaTransactionManager);}

以上,就是kafka集成springboot的方案,接下来,看看怎么使用

使用

基于以上的配置,一共有三种使用方式

  • 只使用kafka事务
  • 只使用JDBC事务
  • 同时使用kafka和JDBC事务

针对于上面的三种情况的切换,其实就是使用不同Transactional注解中的value值切换不同的事务管理器,事务的指定都在service层的实现类中。

只使用kafka事务

	//指定事务模版为自定义模版@Resource(name = "kafkaTemplateWithTxId")private KafkaTemplate<String, String> kafkaTemplate;@Transactional(rollbackFor = Exception.class,value = "kafkaAndDataSourceTransactionManager")public void transation() {ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("test-topic", "test");kafkaTemplate.send(stringStringProducerRecord);}

只使用JDBC事务

不需要指定任何的事务管理器

	@Override@Transactional(rollbackFor = Exception.class)public void transationOfJdbc() {xxxService.update(user);}

同时使用kafka和JDBC事务

指定自定义的事务管理器

	//指定事务模版为自定义模版@Resource(name = "kafkaTemplateWithTxId")private KafkaTemplate<String, String> kafkaTemplate;@Transactional(rollbackFor = Exception.class,value = "kafkaAndDataSourceTransactionManager")public void transationAll() {xxxService.update(user);spreadMonitorService.sendMsg();ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<>("test-topic", "test");kafkaTemplate.send(stringStringProducerRecord);}

结语

以上,就是在springboot中生产端实现事务的方法,总结一下,一共分为以下几步

  • 增加kafka事务管理器
  • 增加JDBC事务管理器
  • 增加事务链事务管理器
  • 使用三种事务管理器

下一篇,将写springboot中消费端如何配置。


引用资料:
kafka官网
kafka的github
spring-kafka官网

相关文章:

Springboot使用kafka事务-生产者方

前言 在上一篇文章中&#xff0c;我们使用了springboot的AOP功能实现了kafka的分布式事务&#xff0c;但是那样实现的kafka事务是不完美的&#xff0c;因为请求进来之后分配的是不同线程&#xff0c;但不同线程使用的kafka事务却是同一个&#xff0c;这样会造成多请求情况下的…...

您的计算机已被.halo勒索病毒感染?恢复您的数据的方法在这里!

导言&#xff1a; 在当今数字时代&#xff0c;网络安全已经成为了我们生活和工作中不可或缺的一部分。然而&#xff0c; .Halo 勒索病毒的出现&#xff0c;使网络威胁变得更加真切和具体。本文91数据恢复将深入介绍 .Halo 勒索病毒的危害&#xff0c;详细探讨如何高效地恢复被其…...

生成式AI颠覆传统数据库的十种方式

对于生成式AI的所有闪光点&#xff0c;这个新时代最大的转变可能深埋在软件堆栈中。AI算法正在不易觉察地改变一个又一个数据库。他们正在用复杂、自适应且看似更直观的AI新功能颠覆传统数据库。 与此同时&#xff0c;数据库制造商正在改变我们存储信息的方式&#xff0c;以便…...

el-date-picker自定义只能选中当前月份和半年内月份等

需求&#xff1a;el-date-picker只能选中当前月期和当前月期往前半年&#xff0c;其他时间就禁用了不让选择了&#xff0c;因为没数据哈哈。当然也可以选择往前一年等。 一、效果 二、写个日期选择器 :picker-options&#xff1a;日期选项 value-format&#xff1a;选择后的格…...

Pyecharts教程(十一):使用Pyecharts绘制带有滑动数据缩放功能的K线图

Pyecharts教程(十一):使用Pyecharts绘制带有滑动数据缩放功能的K线图 作者:安静到无声 个人主页 目录 Pyecharts教程(十一):使用Pyecharts绘制带有滑动数据缩放功能的K线图前言步骤总结推荐专栏前言 K线图是金融市场分析中常见的图表类型之一,它能够直观地展示价格的变化…...

2023年高教社杯数学建模思路 - 案例:ID3-决策树分类算法

文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法&#xff0c;就是频繁模…...

POJ 3273 Monthly Expense 二分

我们对每个月花费的最小花费进行二分&#xff0c;对于每一次二分的值mid&#xff0c;计算能花的月份数量&#xff0c;如果月份数量小于等于m&#xff0c;我们就不断的缩小mid&#xff0c;直到找到月份数量小于等于m 与 月份数量大于m的临界值&#xff0c;取最后一次满足条件的m…...

图论(基础)

知识&#xff1a; 顶点&#xff0c;边 | 权&#xff0c;度数 1.图的种类&#xff1a; 有向图 | 无向图 有环 | 无环 联通性 基础1&#xff1a;图的存储&#xff08;主要是邻接矩阵和邻接表&#xff09; 例一&#xff1a;B3643 图的存储 - 洛谷 | 计算机科学教育新生态 (…...

docker的运行原理

Docker 是一个开源的容器化技术,它能够让开发者将应用及其依赖打包到一个轻量级的、可移植的容器中,这个容器可以在几乎任何机器上一致地运行。要了解 Docker 的运行原理,我们首先要理解以下几个核心概念: 容器 (Container): 容器是一个轻量级的、独立的、可执行的软件包,…...

vue自定义键盘

<template><div class"mark" click"isOver"></div><div class"mycar"><div class"mycar_list"><div class"mycar_list_con"><p class"mycar_list_p">车牌号</p>…...

k8s 安装 kubernetes安装教程 虚拟机安装k8s centos7安装k8s kuberadmin安装k8s k8s工具安装 k8s安装前配置参数

k8s采用master, node1, node2 。三台虚拟机安装的一主两从&#xff0c;机器已提前安装好docker。下面是机器配置&#xff0c;k8s安装过程&#xff0c;以及出现的问题与解决方法 虚拟机全部采用静态ip, master 30机器, node1 31机器, node2 32机器 机器ip 192.168.164.30 # ma…...

2023年高教社杯数学建模思路 - 案例:感知机原理剖析及实现

文章目录 1 感知机的直观理解2 感知机的数学角度3 代码实现 4 建模资料 # 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 感知机的直观理解 感知机应该属于机器学习算法中最简单的一种算法&#xff0c;其…...

OTFS-ISAC雷达部分最新进展(含matlab仿真+USRP验证)

OTFS基带参数设置 我将使用带宽为80MHz的OTFS波形进行设计&#xff0c;对应参数如下&#xff1a; matlab Tx仿真 Tx导频Tx功率密度谱 帧结构我使用的是经典嵌入导频帧结构&#xff0c;Tx信号波形的带宽从右图可以看出约为80Mhz USRP验证 测试环境 无人机位于1m处 Rx导频Rx…...

Cell | 超深度宏基因组!复原消失的肠道微生物

期刊&#xff1a;Cell IF&#xff1a;64.5 (Q1) 发表时间&#xff1a;2023.6 研究背景 不同的生活方式会影响微生物组组成&#xff0c;但目前微生物组的研究严重偏向于西方工业化人群&#xff0c;其中工业化人群的特点是微生物群多样性较低。为了理解工…...

Centos7 设置代理方法

针对上面变量的设置方法&#xff1a; 1、在/etc/profile文件 2、在~/.bashrc 3、在~/.zshrc 4、在/etc/profile.d/文件夹下新建一个文件xxx.sh 写入如下配置&#xff1a; export proxy"http://192.168.5.14:8118" export http_proxy$proxy export https_proxy$pro…...

Android versions (Android 版本)

Android versions (Android 版本) All Android releases https://developer.android.com/about/versions Android 1.0 G1 Android 1.5 Cupcake Android 1.6 Donut Android 2.0 Eclair Android 2.2 Froyo Android 2.3 Gingerbread Android 3.0 Honeycomb Android 4.0 Ic…...

LNMP 平台搭建(四十)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 搭建LNMP 一、安装Nginx 二、安装Mysql 三、安装PHP 四、部署应用 前言 LNMP平台指的是将Linux、Nginx、MySQL和PHP&#xff08;或者其他的编程语言&#xff0c;如…...

pcie 6.0/7.0相对pcie 5.0的变化有哪些?

引言 话说&#xff0c;小编在CSDN博客跟客服机器人聊天&#xff0c;突然看到有个搜索热搜“pcie最全科普贴”。小编有点似曾相识呀&#xff0c;我就好奇点击了一下&#xff0c;没想到几年前写的帖子在CSDN又火了一把。 说到这里&#xff0c;顺带给自己打个广告哈&#xff5e; …...

百度Apollo:自动驾驶技术的未来应用之路

文章目录 前言一、城市交通二、出行体验三、环境保护四、未来前景总结 前言 随着科技的不断进步&#xff0c;自动驾驶技术正逐渐成为现实&#xff0c;颠覆着我们的出行方式。作为中国领先的自动驾驶平台&#xff0c;百度Apollo以其卓越的技术和开放的合作精神&#xff0c;正在…...

C++之std::distance应用实例(一百八十八)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

Python爬虫(一):爬虫伪装

一、网站防爬机制概述 在当今互联网环境中&#xff0c;具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类&#xff1a; 身份验证机制&#xff1a;直接将未经授权的爬虫阻挡在外反爬技术体系&#xff1a;通过各种技术手段增加爬虫获取数据的难度…...

linux 下常用变更-8

1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行&#xff0c;YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID&#xff1a; YW3…...

网络编程(UDP编程)

思维导图 UDP基础编程&#xff08;单播&#xff09; 1.流程图 服务器&#xff1a;短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)

Aspose.PDF 限制绕过方案&#xff1a;Java 字节码技术实战分享&#xff08;仅供学习&#xff09; 一、Aspose.PDF 简介二、说明&#xff08;⚠️仅供学习与研究使用&#xff09;三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...

基于Springboot+Vue的办公管理系统

角色&#xff1a; 管理员、员工 技术&#xff1a; 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能&#xff1a; 该办公管理系统是一个综合性的企业内部管理平台&#xff0c;旨在提升企业运营效率和员工管理水…...

用鸿蒙HarmonyOS5实现中国象棋小游戏的过程

下面是一个基于鸿蒙OS (HarmonyOS) 的中国象棋小游戏的实现代码。这个实现使用Java语言和鸿蒙的Ability框架。 1. 项目结构 /src/main/java/com/example/chinesechess/├── MainAbilitySlice.java // 主界面逻辑├── ChessView.java // 游戏视图和逻辑├──…...

WEB3全栈开发——面试专业技能点P7前端与链上集成

一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染&#xff08;SSR&#xff09;与静态网站生成&#xff08;SSG&#xff09; 框架&#xff0c;由 Vercel 开发。它简化了构建生产级 React 应用的过程&#xff0c;并内置了很多特性&#xff1a; ✅ 文件系…...