当前位置: 首页 > news >正文

Apache Kafka 事务详解

Apache Kafka 事务详解

Apache Kafka 是一个分布式流处理平台,主要用于实时数据的传输和处理。在现代的数据密集型应用中,事务性保证在数据传输和处理中的作用至关重要。本文将详细介绍 Kafka 的事务性支持,包括其基本概念、架构、使用方法以及相关代码示例和运行效果。

1. Kafka 事务简介

Kafka 的事务性支持在 0.11.0 版本中引入,目的是提供跨多个 topic 和 partition 的原子消息写入能力。这意味着事务消息要么全部写入成功,要么全部失败,从而确保数据的一致性和完整性。

Kafka 的事务特性主要用于以下场景:

  • 确保多个 topic 和 partition 的消息一致性
  • 实现端到端的 Exactly Once 语义(EOS)
  • 防止消息丢失或重复消费

2. Kafka 事务架构

Kafka 事务涉及三个主要组件:

  • 生产者(Producer):负责发送事务性消息。
  • 消费者(Consumer):负责消费事务性消息。
  • Kafka Broker:负责管理事务状态,确保事务的一致性。

在 Kafka 中,每个事务都有一个唯一的 Transactional ID,用于标识事务的生命周期。事务的状态通过 Broker 中的事务协调器(Transaction Coordinator)进行管理。

3. Kafka 事务使用方法

3.1 配置生产者

要使用 Kafka 事务性支持,首先需要配置生产者。下面是一个配置事务性生产者的示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class TransactionalProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<>("my-topic", "key1", "value1")).get();producer.send(new ProducerRecord<>("my-topic", "key2", "value2")).get();producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();throw e;} catch (KafkaException e) {producer.abortTransaction();}producer.close();}
}
3.2 配置消费者

为了正确消费事务性消息,需要配置隔离级别(isolation.level)为“读已提交(read_committed)”:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.Collections;
import java.util.Properties;public class TransactionalConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

4. 事务运行效果

4.1 生产者运行效果

当事务性生产者运行时,如果事务成功提交,我们可以看到以下输出:

Successfully sent message: key1, value1
Successfully sent message: key2, value2

如果事务失败并被回滚,我们将不会看到任何消息发送成功的日志。

4.2 消费者运行效果

事务性消费者只会读取已提交的事务消息。例如,如果我们发送了两条消息,但只提交了一条,那么消费者只会读取已提交的那条消息。

offset = 0, key = key1, value = value1

未提交的消息将不会被读取,从而确保数据的一致性。

5. 总结

Kafka 的事务性支持提供了一种确保消息一致性和完整性的方法,尤其适用于需要跨多个 topic 和 partition 进行原子写入的场景。通过配置事务性生产者和消费者,我们可以实现端到端的 Exactly Once 语义,防止消息丢失或重复消费。希望本文能帮助你更好地理解和使用 Kafka 的事务特性。

参考文献

  • Apache Kafka Documentation
  • Confluent Kafka Transactions

相关文章:

Apache Kafka 事务详解

Apache Kafka 事务详解 Apache Kafka 是一个分布式流处理平台&#xff0c;主要用于实时数据的传输和处理。在现代的数据密集型应用中&#xff0c;事务性保证在数据传输和处理中的作用至关重要。本文将详细介绍 Kafka 的事务性支持&#xff0c;包括其基本概念、架构、使用方法以…...

Go语言 结构体

本文主要为Go语言 结构体介绍、语法、使用注意及其示例。 目录 结构体 语法 语法示例 语法说明 声明使用 创建并赋值 使用指针 使用注意 总结 结构体 C语言里面&#xff0c;我们可以使用typedef in MyInt。 在go语言中使用结构体来模拟类&#xff0c;使用type stru…...

数据结构(邓俊辉)学习笔记】词典 03—— 排解冲突(1)

文章目录 1. 一山二虎2. 泾渭分明3. 开放定址4. 线性试探5. 赖惰删除 1. 一山二虎 此前我们已经多次指出&#xff0c;对于需要动态维护的散列表冲突是不可避免的&#xff0c;无论你的散列函数设计的有多么精妙&#xff0c;因此我们不得不回答的第二个重要问题就是一旦发生冲突&…...

HTML5+CSS3-HTML5入门

1.web标准 W3C为web标准化做出了以下事项&#xff0c;主要包括结构&#xff0c;表现和行为。 结构用于对网页的信息进行分类和整理&#xff0c;使用技术包括HTML,XML,XHTML 表现指网页的外在样式&#xff0c;一般包括网页的版式&#xff0c;颜色&#xff0c;字体&#xff0c…...

谷粒商城实战笔记-138-商城业务-首页-渲染二级三级分类数据

本节的主要内容是在前一节的基础上&#xff0c;提供结构查询出所有的二级、三级分类数据。 一&#xff0c;构造响应体数据结构 后端返回给前端的数据结构是在开发详细设计中应该确定的内容。 分析前端需要的数据结构&#xff0c;后端要将所有一级分类包含的二级和三级分类信…...

git的基础用法

文章目录 前言关联仓库提交代码分支操作账号免密 前言 记录一下git的一些基础用法。 关联仓库 # 初始化 git init# 关联仓库 git remote add origin <仓库地址># 查看当前关联的仓库 git remote -v# 一次只能remote一个&#xff0c;要换需要先删原来的 git remote rem…...

常见中间件漏洞(四、Apache合集)

目录 四、Apache 4.1 CVE-2021-41773 漏洞简介 影响版本 环境搭建 漏洞复现 四、Apache 4.1 CVE-2021-41773 Apache HTTP Server 路径穿越漏洞 漏洞简介 该漏洞是由于Apache HTTP Server 2.4.49版本存在目录穿越漏洞,在路径穿越目录<Directory/>Require all gra…...

HCIE-学习笔记

动态授权加入的成员优先级高于静态绑定的成员&#xff1b; any组&#xff08;缺省&#xff09;&#xff1a;所有用户或资源&#xff0c;通常用来配置默认规则。any组只能做目的组&#xff0c;不支持配置为源组。 同一个安全组既可以与多条授权规则绑定来表示动态用户&#xff0…...

【计算机网络】性能指标-带宽和时延(MB、GB、KB、B、byte、bit、Mb/s、Gb/s、b/s等)学习

文章目录 1、单位换算MB、b/s1.1 在计算机领域&#xff0c;大写的B、K、M、G表示1.2 在通信领域&#xff0c;小写的k代表的是1000,不是1024&#xff0c;转换的时候要注意区分 2、带宽3、时延&#xff08;时间消耗&#xff09;4、时延带宽积5、往返时延RTT 1、单位换算MB、b/s …...

ANN(Approximate Nearest Neighbor)搜索和索引库到底是什么?

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ ANN&#xff08;Approximate Nearest Neighbor&#xff09;搜索&#xff1a;最近邻搜索是一种在大规模数据集中快速找到与给定查询数据点距离最近的点的算法。与传统的精确最近邻搜索算法相比&#xff…...

勒索软件、供应链攻击等带来的思考!

2023年勒索软件、供应链攻击、地缘政治冲突与黑客活动主义、国家黑客间谍与APT组织活动成为网络安全的热点话题&#xff0c;生成式人工智能技术的武器化更是给动荡的全球网络安全威胁态势增加了不确定性、不对称性和复杂性。 即将到来的2024年&#xff0c;随着网络犯罪的规模化…...

【Nuxt】自定义插件和生命周期

自定义插件 方式一&#xff1a; app.vue // 创建插件(在app.vue中创建全局可以使用 而在某个页面中创建只有该页面可以使用) // 方式一&#xff1a; const nuxtApp useNuxtApp(); nuxtApp.provide("formDate", () > {return "2023-12-12"; }) nuxtAp…...

MySQL的简单介绍

文章目录 数据库关系型数据库非关系型数据”数据库的概念和用途MySQL数据库服务器、数据库和表的关系数据库的创建和删除表创建表修改常见的数据类型和约束字符串类型日期和时间类型PRIMARY KEY使用AUTO_INCREMENT使用UNIQUE使用FOREIGN KEY使用 SQL语言基础SQL语言简介SQL分类…...

leetcode 116.填充每个节点的下一个右侧结点指针

1.题目要求: 给定一个二叉树&#xff1a;struct Node {int val;Node *left;Node *right;Node *next; } 填充它的每个 next 指针&#xff0c;让这个指针指向其下一个右侧节点。如果找不到下一个右侧节点&#xff0c;则将 next 指针设置为 NULL 。初始状态下&#xff0c;所有 ne…...

『 Linux 』网络基础

文章目录 协议分层OSI 七层模型TCP/IP 四层(五层)模型网络协议栈与操作系统的联系报文TCP/IP 通讯过程以太网通信的过程以太网的数据碰撞 协议分层 协议分层是计算机网络中奖网络协议进行组织和管理的方法; 通过将网络通信过程分成多个层次,每个层次负责特定的功能从而简化网络…...

Python酷库之旅-第三方库Pandas(070)

目录 一、用法精讲 281、pandas.Series.dt.daysinmonth属性 281-1、语法 281-2、参数 281-3、功能 281-4、返回值 281-5、说明 281-6、用法 281-6-1、数据准备 281-6-2、代码示例 281-6-3、结果输出 282、pandas.Series.dt.tz属性 282-1、语法 282-2、参数 282-…...

第一篇Linux介绍

目录 1、操作系统 2、Windows和Linux操作系统的区别 3、 Linux 的发行版本 4、 linux 分支 5、 Linux 的含义 6、Linux 特点 1、操作系统 常见操作系统有&#xff1a;Windows、MacOS、Unix/Linux。 类 UNIX Windows&#xff1a;其是微软公司研发的收费操作系统&#xff…...

在Windows编程中,MFC\C++中OnCopyData如何传递基础类型数据?

在C中&#xff0c;OnCopyData 并不是一个标准的C库或框架中的成员函数&#xff0c;它更常见于Windows编程中&#xff0c;特别是使用Win32 API或MFC&#xff08;Microsoft Foundation Classes&#xff09;时。OnCopyData 是一个在MFC应用程序中常用于处理来自其他应用程序的WM_C…...

10款超好用的图纸加密软件推荐,2024企业常用图纸加密软件分享

在现代企业中&#xff0c;设计图纸和敏感数据的安全性至关重要。一旦图纸泄露&#xff0c;可能会对企业造成不可估量的损失。因此&#xff0c;选择一款高效、可靠的图纸加密软件显得尤为重要。 1. 安秉图纸加密软件 安秉图纸加密软件是一款专为保护工程图纸和设计文件安全的软…...

BUUCTF [安洵杯 2019]easy_serialize_php 1

打开题目&#xff0c;看到一串php代码&#xff0c;试着代码审计一下&#xff0c;看一下有用信息 可以看出是通过$_SESSION[img]来读取文件 extract可以将数组中的变量导入当前变量表 也就是说我们可以伪造$_SESSION 数组中的所有数据 这里传递一个参数fphpinfo 先用hackbar进…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 &#x1f4dd; 在上一篇文章中&#xff0c;我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源&#xff0c;方便后续将资源打包到一个可执行文件中。 2.embed介绍 &#x1f3af; Go 1.16 引入了革命性的 embed 包&#xff0c;彻底改变了静态资源管理的…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

动态 Web 开发技术入门篇

一、HTTP 协议核心 1.1 HTTP 基础 协议全称 &#xff1a;HyperText Transfer Protocol&#xff08;超文本传输协议&#xff09; 默认端口 &#xff1a;HTTP 使用 80 端口&#xff0c;HTTPS 使用 443 端口。 请求方法 &#xff1a; GET &#xff1a;用于获取资源&#xff0c;…...

Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换

目录 关键点 技术实现1 技术实现2 摘要&#xff1a; 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式&#xff08;自动驾驶、人工驾驶、远程驾驶、主动安全&#xff09;&#xff0c;并通过实时消息推送更新车…...

uniapp 字符包含的相关方法

在uniapp中&#xff0c;如果你想检查一个字符串是否包含另一个子字符串&#xff0c;你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的&#xff0c;但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...

Caliper 负载(Workload)详细解析

Caliper 负载(Workload)详细解析 负载(Workload)是 Caliper 性能测试的核心部分,它定义了测试期间要执行的具体合约调用行为和交易模式。下面我将全面深入地讲解负载的各个方面。 一、负载模块基本结构 一个典型的负载模块(如 workload.js)包含以下基本结构: use strict;/…...