当前位置: 首页 > news >正文

Apache Kafka 事务详解

Apache Kafka 事务详解

Apache Kafka 是一个分布式流处理平台,主要用于实时数据的传输和处理。在现代的数据密集型应用中,事务性保证在数据传输和处理中的作用至关重要。本文将详细介绍 Kafka 的事务性支持,包括其基本概念、架构、使用方法以及相关代码示例和运行效果。

1. Kafka 事务简介

Kafka 的事务性支持在 0.11.0 版本中引入,目的是提供跨多个 topic 和 partition 的原子消息写入能力。这意味着事务消息要么全部写入成功,要么全部失败,从而确保数据的一致性和完整性。

Kafka 的事务特性主要用于以下场景:

  • 确保多个 topic 和 partition 的消息一致性
  • 实现端到端的 Exactly Once 语义(EOS)
  • 防止消息丢失或重复消费

2. Kafka 事务架构

Kafka 事务涉及三个主要组件:

  • 生产者(Producer):负责发送事务性消息。
  • 消费者(Consumer):负责消费事务性消息。
  • Kafka Broker:负责管理事务状态,确保事务的一致性。

在 Kafka 中,每个事务都有一个唯一的 Transactional ID,用于标识事务的生命周期。事务的状态通过 Broker 中的事务协调器(Transaction Coordinator)进行管理。

3. Kafka 事务使用方法

3.1 配置生产者

要使用 Kafka 事务性支持,首先需要配置生产者。下面是一个配置事务性生产者的示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class TransactionalProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<>("my-topic", "key1", "value1")).get();producer.send(new ProducerRecord<>("my-topic", "key2", "value2")).get();producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();throw e;} catch (KafkaException e) {producer.abortTransaction();}producer.close();}
}
3.2 配置消费者

为了正确消费事务性消息,需要配置隔离级别(isolation.level)为“读已提交(read_committed)”:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.Collections;
import java.util.Properties;public class TransactionalConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

4. 事务运行效果

4.1 生产者运行效果

当事务性生产者运行时,如果事务成功提交,我们可以看到以下输出:

Successfully sent message: key1, value1
Successfully sent message: key2, value2

如果事务失败并被回滚,我们将不会看到任何消息发送成功的日志。

4.2 消费者运行效果

事务性消费者只会读取已提交的事务消息。例如,如果我们发送了两条消息,但只提交了一条,那么消费者只会读取已提交的那条消息。

offset = 0, key = key1, value = value1

未提交的消息将不会被读取,从而确保数据的一致性。

5. 总结

Kafka 的事务性支持提供了一种确保消息一致性和完整性的方法,尤其适用于需要跨多个 topic 和 partition 进行原子写入的场景。通过配置事务性生产者和消费者,我们可以实现端到端的 Exactly Once 语义,防止消息丢失或重复消费。希望本文能帮助你更好地理解和使用 Kafka 的事务特性。

参考文献

  • Apache Kafka Documentation
  • Confluent Kafka Transactions

相关文章:

Apache Kafka 事务详解

Apache Kafka 事务详解 Apache Kafka 是一个分布式流处理平台&#xff0c;主要用于实时数据的传输和处理。在现代的数据密集型应用中&#xff0c;事务性保证在数据传输和处理中的作用至关重要。本文将详细介绍 Kafka 的事务性支持&#xff0c;包括其基本概念、架构、使用方法以…...

Go语言 结构体

本文主要为Go语言 结构体介绍、语法、使用注意及其示例。 目录 结构体 语法 语法示例 语法说明 声明使用 创建并赋值 使用指针 使用注意 总结 结构体 C语言里面&#xff0c;我们可以使用typedef in MyInt。 在go语言中使用结构体来模拟类&#xff0c;使用type stru…...

数据结构(邓俊辉)学习笔记】词典 03—— 排解冲突(1)

文章目录 1. 一山二虎2. 泾渭分明3. 开放定址4. 线性试探5. 赖惰删除 1. 一山二虎 此前我们已经多次指出&#xff0c;对于需要动态维护的散列表冲突是不可避免的&#xff0c;无论你的散列函数设计的有多么精妙&#xff0c;因此我们不得不回答的第二个重要问题就是一旦发生冲突&…...

HTML5+CSS3-HTML5入门

1.web标准 W3C为web标准化做出了以下事项&#xff0c;主要包括结构&#xff0c;表现和行为。 结构用于对网页的信息进行分类和整理&#xff0c;使用技术包括HTML,XML,XHTML 表现指网页的外在样式&#xff0c;一般包括网页的版式&#xff0c;颜色&#xff0c;字体&#xff0c…...

谷粒商城实战笔记-138-商城业务-首页-渲染二级三级分类数据

本节的主要内容是在前一节的基础上&#xff0c;提供结构查询出所有的二级、三级分类数据。 一&#xff0c;构造响应体数据结构 后端返回给前端的数据结构是在开发详细设计中应该确定的内容。 分析前端需要的数据结构&#xff0c;后端要将所有一级分类包含的二级和三级分类信…...

git的基础用法

文章目录 前言关联仓库提交代码分支操作账号免密 前言 记录一下git的一些基础用法。 关联仓库 # 初始化 git init# 关联仓库 git remote add origin <仓库地址># 查看当前关联的仓库 git remote -v# 一次只能remote一个&#xff0c;要换需要先删原来的 git remote rem…...

常见中间件漏洞(四、Apache合集)

目录 四、Apache 4.1 CVE-2021-41773 漏洞简介 影响版本 环境搭建 漏洞复现 四、Apache 4.1 CVE-2021-41773 Apache HTTP Server 路径穿越漏洞 漏洞简介 该漏洞是由于Apache HTTP Server 2.4.49版本存在目录穿越漏洞,在路径穿越目录<Directory/>Require all gra…...

HCIE-学习笔记

动态授权加入的成员优先级高于静态绑定的成员&#xff1b; any组&#xff08;缺省&#xff09;&#xff1a;所有用户或资源&#xff0c;通常用来配置默认规则。any组只能做目的组&#xff0c;不支持配置为源组。 同一个安全组既可以与多条授权规则绑定来表示动态用户&#xff0…...

【计算机网络】性能指标-带宽和时延(MB、GB、KB、B、byte、bit、Mb/s、Gb/s、b/s等)学习

文章目录 1、单位换算MB、b/s1.1 在计算机领域&#xff0c;大写的B、K、M、G表示1.2 在通信领域&#xff0c;小写的k代表的是1000,不是1024&#xff0c;转换的时候要注意区分 2、带宽3、时延&#xff08;时间消耗&#xff09;4、时延带宽积5、往返时延RTT 1、单位换算MB、b/s …...

ANN(Approximate Nearest Neighbor)搜索和索引库到底是什么?

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ ANN&#xff08;Approximate Nearest Neighbor&#xff09;搜索&#xff1a;最近邻搜索是一种在大规模数据集中快速找到与给定查询数据点距离最近的点的算法。与传统的精确最近邻搜索算法相比&#xff…...

勒索软件、供应链攻击等带来的思考!

2023年勒索软件、供应链攻击、地缘政治冲突与黑客活动主义、国家黑客间谍与APT组织活动成为网络安全的热点话题&#xff0c;生成式人工智能技术的武器化更是给动荡的全球网络安全威胁态势增加了不确定性、不对称性和复杂性。 即将到来的2024年&#xff0c;随着网络犯罪的规模化…...

【Nuxt】自定义插件和生命周期

自定义插件 方式一&#xff1a; app.vue // 创建插件(在app.vue中创建全局可以使用 而在某个页面中创建只有该页面可以使用) // 方式一&#xff1a; const nuxtApp useNuxtApp(); nuxtApp.provide("formDate", () > {return "2023-12-12"; }) nuxtAp…...

MySQL的简单介绍

文章目录 数据库关系型数据库非关系型数据”数据库的概念和用途MySQL数据库服务器、数据库和表的关系数据库的创建和删除表创建表修改常见的数据类型和约束字符串类型日期和时间类型PRIMARY KEY使用AUTO_INCREMENT使用UNIQUE使用FOREIGN KEY使用 SQL语言基础SQL语言简介SQL分类…...

leetcode 116.填充每个节点的下一个右侧结点指针

1.题目要求: 给定一个二叉树&#xff1a;struct Node {int val;Node *left;Node *right;Node *next; } 填充它的每个 next 指针&#xff0c;让这个指针指向其下一个右侧节点。如果找不到下一个右侧节点&#xff0c;则将 next 指针设置为 NULL 。初始状态下&#xff0c;所有 ne…...

『 Linux 』网络基础

文章目录 协议分层OSI 七层模型TCP/IP 四层(五层)模型网络协议栈与操作系统的联系报文TCP/IP 通讯过程以太网通信的过程以太网的数据碰撞 协议分层 协议分层是计算机网络中奖网络协议进行组织和管理的方法; 通过将网络通信过程分成多个层次,每个层次负责特定的功能从而简化网络…...

Python酷库之旅-第三方库Pandas(070)

目录 一、用法精讲 281、pandas.Series.dt.daysinmonth属性 281-1、语法 281-2、参数 281-3、功能 281-4、返回值 281-5、说明 281-6、用法 281-6-1、数据准备 281-6-2、代码示例 281-6-3、结果输出 282、pandas.Series.dt.tz属性 282-1、语法 282-2、参数 282-…...

第一篇Linux介绍

目录 1、操作系统 2、Windows和Linux操作系统的区别 3、 Linux 的发行版本 4、 linux 分支 5、 Linux 的含义 6、Linux 特点 1、操作系统 常见操作系统有&#xff1a;Windows、MacOS、Unix/Linux。 类 UNIX Windows&#xff1a;其是微软公司研发的收费操作系统&#xff…...

在Windows编程中,MFC\C++中OnCopyData如何传递基础类型数据?

在C中&#xff0c;OnCopyData 并不是一个标准的C库或框架中的成员函数&#xff0c;它更常见于Windows编程中&#xff0c;特别是使用Win32 API或MFC&#xff08;Microsoft Foundation Classes&#xff09;时。OnCopyData 是一个在MFC应用程序中常用于处理来自其他应用程序的WM_C…...

10款超好用的图纸加密软件推荐,2024企业常用图纸加密软件分享

在现代企业中&#xff0c;设计图纸和敏感数据的安全性至关重要。一旦图纸泄露&#xff0c;可能会对企业造成不可估量的损失。因此&#xff0c;选择一款高效、可靠的图纸加密软件显得尤为重要。 1. 安秉图纸加密软件 安秉图纸加密软件是一款专为保护工程图纸和设计文件安全的软…...

BUUCTF [安洵杯 2019]easy_serialize_php 1

打开题目&#xff0c;看到一串php代码&#xff0c;试着代码审计一下&#xff0c;看一下有用信息 可以看出是通过$_SESSION[img]来读取文件 extract可以将数组中的变量导入当前变量表 也就是说我们可以伪造$_SESSION 数组中的所有数据 这里传递一个参数fphpinfo 先用hackbar进…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

《用户共鸣指数(E)驱动品牌大模型种草:如何抢占大模型搜索结果情感高地》

在注意力分散、内容高度同质化的时代&#xff0c;情感连接已成为品牌破圈的关键通道。我们在服务大量品牌客户的过程中发现&#xff0c;消费者对内容的“有感”程度&#xff0c;正日益成为影响品牌传播效率与转化率的核心变量。在生成式AI驱动的内容生成与推荐环境中&#xff0…...

令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍

文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结&#xff1a; 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析&#xff1a; 实际业务去理解体会统一注…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...

AspectJ 在 Android 中的完整使用指南

一、环境配置&#xff08;Gradle 7.0 适配&#xff09; 1. 项目级 build.gradle // 注意&#xff1a;沪江插件已停更&#xff0c;推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...

GitFlow 工作模式(详解)

今天再学项目的过程中遇到使用gitflow模式管理代码&#xff0c;因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存&#xff0c;无论是github还是gittee&#xff0c;都是一种基于git去保存代码的形式&#xff0c;这样保存代码…...

Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成

一个面向 Java 开发者的 Sring-Ai 示例工程项目&#xff0c;该项目是一个 Spring AI 快速入门的样例工程项目&#xff0c;旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计&#xff0c;每个模块都专注于特定的功能领域&#xff0c;便于学习和…...