当前位置: 首页 > news >正文

Apache Kafka 事务详解

Apache Kafka 事务详解

Apache Kafka 是一个分布式流处理平台,主要用于实时数据的传输和处理。在现代的数据密集型应用中,事务性保证在数据传输和处理中的作用至关重要。本文将详细介绍 Kafka 的事务性支持,包括其基本概念、架构、使用方法以及相关代码示例和运行效果。

1. Kafka 事务简介

Kafka 的事务性支持在 0.11.0 版本中引入,目的是提供跨多个 topic 和 partition 的原子消息写入能力。这意味着事务消息要么全部写入成功,要么全部失败,从而确保数据的一致性和完整性。

Kafka 的事务特性主要用于以下场景:

  • 确保多个 topic 和 partition 的消息一致性
  • 实现端到端的 Exactly Once 语义(EOS)
  • 防止消息丢失或重复消费

2. Kafka 事务架构

Kafka 事务涉及三个主要组件:

  • 生产者(Producer):负责发送事务性消息。
  • 消费者(Consumer):负责消费事务性消息。
  • Kafka Broker:负责管理事务状态,确保事务的一致性。

在 Kafka 中,每个事务都有一个唯一的 Transactional ID,用于标识事务的生命周期。事务的状态通过 Broker 中的事务协调器(Transaction Coordinator)进行管理。

3. Kafka 事务使用方法

3.1 配置生产者

要使用 Kafka 事务性支持,首先需要配置生产者。下面是一个配置事务性生产者的示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class TransactionalProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.initTransactions();try {producer.beginTransaction();producer.send(new ProducerRecord<>("my-topic", "key1", "value1")).get();producer.send(new ProducerRecord<>("my-topic", "key2", "value2")).get();producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {producer.close();throw e;} catch (KafkaException e) {producer.abortTransaction();}producer.close();}
}
3.2 配置消费者

为了正确消费事务性消息,需要配置隔离级别(isolation.level)为“读已提交(read_committed)”:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.Collections;
import java.util.Properties;public class TransactionalConsumer {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

4. 事务运行效果

4.1 生产者运行效果

当事务性生产者运行时,如果事务成功提交,我们可以看到以下输出:

Successfully sent message: key1, value1
Successfully sent message: key2, value2

如果事务失败并被回滚,我们将不会看到任何消息发送成功的日志。

4.2 消费者运行效果

事务性消费者只会读取已提交的事务消息。例如,如果我们发送了两条消息,但只提交了一条,那么消费者只会读取已提交的那条消息。

offset = 0, key = key1, value = value1

未提交的消息将不会被读取,从而确保数据的一致性。

5. 总结

Kafka 的事务性支持提供了一种确保消息一致性和完整性的方法,尤其适用于需要跨多个 topic 和 partition 进行原子写入的场景。通过配置事务性生产者和消费者,我们可以实现端到端的 Exactly Once 语义,防止消息丢失或重复消费。希望本文能帮助你更好地理解和使用 Kafka 的事务特性。

参考文献

  • Apache Kafka Documentation
  • Confluent Kafka Transactions

相关文章:

Apache Kafka 事务详解

Apache Kafka 事务详解 Apache Kafka 是一个分布式流处理平台&#xff0c;主要用于实时数据的传输和处理。在现代的数据密集型应用中&#xff0c;事务性保证在数据传输和处理中的作用至关重要。本文将详细介绍 Kafka 的事务性支持&#xff0c;包括其基本概念、架构、使用方法以…...

Go语言 结构体

本文主要为Go语言 结构体介绍、语法、使用注意及其示例。 目录 结构体 语法 语法示例 语法说明 声明使用 创建并赋值 使用指针 使用注意 总结 结构体 C语言里面&#xff0c;我们可以使用typedef in MyInt。 在go语言中使用结构体来模拟类&#xff0c;使用type stru…...

数据结构(邓俊辉)学习笔记】词典 03—— 排解冲突(1)

文章目录 1. 一山二虎2. 泾渭分明3. 开放定址4. 线性试探5. 赖惰删除 1. 一山二虎 此前我们已经多次指出&#xff0c;对于需要动态维护的散列表冲突是不可避免的&#xff0c;无论你的散列函数设计的有多么精妙&#xff0c;因此我们不得不回答的第二个重要问题就是一旦发生冲突&…...

HTML5+CSS3-HTML5入门

1.web标准 W3C为web标准化做出了以下事项&#xff0c;主要包括结构&#xff0c;表现和行为。 结构用于对网页的信息进行分类和整理&#xff0c;使用技术包括HTML,XML,XHTML 表现指网页的外在样式&#xff0c;一般包括网页的版式&#xff0c;颜色&#xff0c;字体&#xff0c…...

谷粒商城实战笔记-138-商城业务-首页-渲染二级三级分类数据

本节的主要内容是在前一节的基础上&#xff0c;提供结构查询出所有的二级、三级分类数据。 一&#xff0c;构造响应体数据结构 后端返回给前端的数据结构是在开发详细设计中应该确定的内容。 分析前端需要的数据结构&#xff0c;后端要将所有一级分类包含的二级和三级分类信…...

git的基础用法

文章目录 前言关联仓库提交代码分支操作账号免密 前言 记录一下git的一些基础用法。 关联仓库 # 初始化 git init# 关联仓库 git remote add origin <仓库地址># 查看当前关联的仓库 git remote -v# 一次只能remote一个&#xff0c;要换需要先删原来的 git remote rem…...

常见中间件漏洞(四、Apache合集)

目录 四、Apache 4.1 CVE-2021-41773 漏洞简介 影响版本 环境搭建 漏洞复现 四、Apache 4.1 CVE-2021-41773 Apache HTTP Server 路径穿越漏洞 漏洞简介 该漏洞是由于Apache HTTP Server 2.4.49版本存在目录穿越漏洞,在路径穿越目录<Directory/>Require all gra…...

HCIE-学习笔记

动态授权加入的成员优先级高于静态绑定的成员&#xff1b; any组&#xff08;缺省&#xff09;&#xff1a;所有用户或资源&#xff0c;通常用来配置默认规则。any组只能做目的组&#xff0c;不支持配置为源组。 同一个安全组既可以与多条授权规则绑定来表示动态用户&#xff0…...

【计算机网络】性能指标-带宽和时延(MB、GB、KB、B、byte、bit、Mb/s、Gb/s、b/s等)学习

文章目录 1、单位换算MB、b/s1.1 在计算机领域&#xff0c;大写的B、K、M、G表示1.2 在通信领域&#xff0c;小写的k代表的是1000,不是1024&#xff0c;转换的时候要注意区分 2、带宽3、时延&#xff08;时间消耗&#xff09;4、时延带宽积5、往返时延RTT 1、单位换算MB、b/s …...

ANN(Approximate Nearest Neighbor)搜索和索引库到底是什么?

&#x1f349; CSDN 叶庭云&#xff1a;https://yetingyun.blog.csdn.net/ ANN&#xff08;Approximate Nearest Neighbor&#xff09;搜索&#xff1a;最近邻搜索是一种在大规模数据集中快速找到与给定查询数据点距离最近的点的算法。与传统的精确最近邻搜索算法相比&#xff…...

勒索软件、供应链攻击等带来的思考!

2023年勒索软件、供应链攻击、地缘政治冲突与黑客活动主义、国家黑客间谍与APT组织活动成为网络安全的热点话题&#xff0c;生成式人工智能技术的武器化更是给动荡的全球网络安全威胁态势增加了不确定性、不对称性和复杂性。 即将到来的2024年&#xff0c;随着网络犯罪的规模化…...

【Nuxt】自定义插件和生命周期

自定义插件 方式一&#xff1a; app.vue // 创建插件(在app.vue中创建全局可以使用 而在某个页面中创建只有该页面可以使用) // 方式一&#xff1a; const nuxtApp useNuxtApp(); nuxtApp.provide("formDate", () > {return "2023-12-12"; }) nuxtAp…...

MySQL的简单介绍

文章目录 数据库关系型数据库非关系型数据”数据库的概念和用途MySQL数据库服务器、数据库和表的关系数据库的创建和删除表创建表修改常见的数据类型和约束字符串类型日期和时间类型PRIMARY KEY使用AUTO_INCREMENT使用UNIQUE使用FOREIGN KEY使用 SQL语言基础SQL语言简介SQL分类…...

leetcode 116.填充每个节点的下一个右侧结点指针

1.题目要求: 给定一个二叉树&#xff1a;struct Node {int val;Node *left;Node *right;Node *next; } 填充它的每个 next 指针&#xff0c;让这个指针指向其下一个右侧节点。如果找不到下一个右侧节点&#xff0c;则将 next 指针设置为 NULL 。初始状态下&#xff0c;所有 ne…...

『 Linux 』网络基础

文章目录 协议分层OSI 七层模型TCP/IP 四层(五层)模型网络协议栈与操作系统的联系报文TCP/IP 通讯过程以太网通信的过程以太网的数据碰撞 协议分层 协议分层是计算机网络中奖网络协议进行组织和管理的方法; 通过将网络通信过程分成多个层次,每个层次负责特定的功能从而简化网络…...

Python酷库之旅-第三方库Pandas(070)

目录 一、用法精讲 281、pandas.Series.dt.daysinmonth属性 281-1、语法 281-2、参数 281-3、功能 281-4、返回值 281-5、说明 281-6、用法 281-6-1、数据准备 281-6-2、代码示例 281-6-3、结果输出 282、pandas.Series.dt.tz属性 282-1、语法 282-2、参数 282-…...

第一篇Linux介绍

目录 1、操作系统 2、Windows和Linux操作系统的区别 3、 Linux 的发行版本 4、 linux 分支 5、 Linux 的含义 6、Linux 特点 1、操作系统 常见操作系统有&#xff1a;Windows、MacOS、Unix/Linux。 类 UNIX Windows&#xff1a;其是微软公司研发的收费操作系统&#xff…...

在Windows编程中,MFC\C++中OnCopyData如何传递基础类型数据?

在C中&#xff0c;OnCopyData 并不是一个标准的C库或框架中的成员函数&#xff0c;它更常见于Windows编程中&#xff0c;特别是使用Win32 API或MFC&#xff08;Microsoft Foundation Classes&#xff09;时。OnCopyData 是一个在MFC应用程序中常用于处理来自其他应用程序的WM_C…...

10款超好用的图纸加密软件推荐,2024企业常用图纸加密软件分享

在现代企业中&#xff0c;设计图纸和敏感数据的安全性至关重要。一旦图纸泄露&#xff0c;可能会对企业造成不可估量的损失。因此&#xff0c;选择一款高效、可靠的图纸加密软件显得尤为重要。 1. 安秉图纸加密软件 安秉图纸加密软件是一款专为保护工程图纸和设计文件安全的软…...

BUUCTF [安洵杯 2019]easy_serialize_php 1

打开题目&#xff0c;看到一串php代码&#xff0c;试着代码审计一下&#xff0c;看一下有用信息 可以看出是通过$_SESSION[img]来读取文件 extract可以将数组中的变量导入当前变量表 也就是说我们可以伪造$_SESSION 数组中的所有数据 这里传递一个参数fphpinfo 先用hackbar进…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)

一、数据处理与分析实战 &#xff08;一&#xff09;实时滤波与参数调整 基础滤波操作 60Hz 工频滤波&#xff1a;勾选界面右侧 “60Hz” 复选框&#xff0c;可有效抑制电网干扰&#xff08;适用于北美地区&#xff0c;欧洲用户可调整为 50Hz&#xff09;。 平滑处理&…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

Web 架构之 CDN 加速原理与落地实践

文章目录 一、思维导图二、正文内容&#xff08;一&#xff09;CDN 基础概念1. 定义2. 组成部分 &#xff08;二&#xff09;CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 &#xff08;三&#xff09;CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 &#xf…...

逻辑回归暴力训练预测金融欺诈

简述 「使用逻辑回归暴力预测金融欺诈&#xff0c;并不断增加特征维度持续测试」的做法&#xff0c;体现了一种逐步建模与迭代验证的实验思路&#xff0c;在金融欺诈检测中非常有价值&#xff0c;本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...

WebRTC从入门到实践 - 零基础教程

WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC&#xff1f; WebRTC&#xff08;Web Real-Time Communication&#xff09;是一个支持网页浏览器进行实时语音…...