当前位置: 首页 > news >正文

kafka精准一次、事务、幂等性

Kafka事务

消息中间件的消息保障的3个级别

  1. At most once 至多一次。数据丢失。
  2. At last once 至少一次。数据冗余
  3. Exactly one 精准一次。好!!!

如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。

:先消费消息、再提交位移。

如果提交位移这一步挂了,就会再消费一遍消息。重复消费====》〉》至少一次

当:先提交位移、再消费消息。

提议位移成功、消费消息失败,那么数据就丢失了====》〉》至多一次

如何精准一次呢?

幂等和事务!

幂等

对接口的多次调用所产生的结果和一次调用的结果是一样的。

即:(第一次调用,中途挂了,再次调用==一次调用) 为true

如何实现?

在v2版本的消息存储格式用有两个字段。produce_id(简称pid) 、first sequence

在这里插入图片描述

每个新的生产者实例在初始化的时候都会被分配一个pid,每个pid,消息发送到每一个分区都有序列号 sequence,序列号会从0开始递增,每发送一条消息,<PID,分区> 对应的序列号的值会➕1。这个序列号值(SN)在broker的内存中维护。只有当SN_new=SN_old+1.

broker才会接收这个消息。

如SN_new < SN_old+1 说明消息重复了,这个消息可以直接丢掉。

如SN_new>SN_old+1 说明消息丢失了,有数据还没有卸写入。抛乱序异常OutOforderSequenceException。

即用序列号来保证消息的顺序消费。

注意 所记录的这个序列号是针对 每一对<PID,分区> 所以这个幂等实现的是单会话、单分区的。

如何保证多个分区之间的幂等性呢?

事务

保证对多个分区写入操作的原子性,要么全部成功、要么全部失败。将应用程序的生产消息、消费消息、提交消费位移当作原子操作来处理。

用户显示指定一个事务id: transactionalId。这个事务id是唯一的

从生产者角度来考虑,事务保证了生产者会话消息的幂等发送跨生产者会话的事务恢复.

  • 生产者会话消息的幂等发送:如有有两个相同事务id的生产者,新的创建了 旧的就会被kill
  • 某个生产者实例宕机了,新的生产者实例可以保证未完成的旧事务要么被提交 要没被中断

实现过程,以consume-transform-produce为例。

package com.hzbank.yjj.transaction;import com.hzbank.yjj.producer.CustomerPartitioner;
import com.hzbank.yjj.producer.ProducerlnterceptorPrefix;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;public class TransactionConsumeTransformProduce {public static final String brokerList = "localhost:9092";public static Properties getConsumerProps(){Properties props =new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"groupId");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);return props;}public static Properties getProducerProps(){Properties props =new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionalId");return props;}public static void main(String[] args) {//初始化生产者和消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps());consumer.subscribe(Collections.singletonList("topic-source"));KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());//初始化事务producer.initTransactions();while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));if(!records.isEmpty()){HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();//开启事务producer.beginTransaction();try {for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println("获取到了topic-source发送过来的数据"+record.value());System.out.println("do some ");ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-sink", record.key(), record.value());producer.send(producerRecord);}// 获取最近一次的消费位移long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));}//提交消费位移producer.sendOffsetsToTransaction(offsets,"groupId");//提交事务producer.commitTransaction();} catch (ProducerFencedException e) {System.out.println("异常了");producer.abortTransaction();}}}}}

1. 找到TransactionCoordinator。

TransactionCoordinator负责分配和管理事务。
FindCoordinatorRequest 发送请求找到TransactionCoordinator所在的broker节点。返回其对应的node_id、 host、 port 信息

transactionalId 的哈希值计算主题_transaction_state 中的分区编号

根据分区leader副本找到所在的broker节点,极为Transaction Coordinator节点

2. 获取pid

通过InitProducerIdRequest向TransactionCoordinator 获取pid 为当前生产者分配一个pid。

String transactionalId; 事务id
int transactionTimeoutMs; 事务状态更新超时时间

3. 保存pid

TransactionCoordinator 第一次收到事务id会和对应pid保存下来,以消息(事务日志消息)的形式保存到主题_transaction_state中,实现持久化

InitProducerIdRequest还会出发一下任务:

- 增加pid对应的producer_epoch.具有相同 PID 但 producer_epoch 小 于该 producer_叩och 的其他生产者新开启的事务将被拒绝 。
- 恢复( Commit)或中止( Ab。此)之前的生 产 者未完成的 事务

4. 开启事务

通过 KafkaProduc町的 beginTransaction()方法。调用该方法后,生产者本 地会标记己经开启了 一个新的事务 ,只有在生产者发送第一条消息之后 TransactionCoordinator 才会认为该事务 己经开启 。

5. Consume-Transform-Produce

整个事务处理数据。

  • AddPartitionsToTxnRequest:让 TransactionCoordinator 将<transactionld, TopicPartition>的对应关系存储在主题

    transaction state 中

  • ProduceRequest:生产者通过 ProduceRequest 请求发送消息( ProducerBatch)到用户 自定义主题中

  • AddOffsetsToTxnRequest:TransactionCoordinator 收到这个AddOffsetsToTxnRequest请求,通过 groupId 来推导出在一consumer_offsets 中的分区

  • TxnOffsetCommitRequest:发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中 包含的消费位移信息 offsets 存储到主题 consumer offsets 中

6. 提交或者终止事务

KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法。

写不下去了,暂时就先理解这么多了,后面再多结合源码去看看。

参考:书籍《深入理解 Kafka:核心设计与实践原理》

相关文章:

kafka精准一次、事务、幂等性

Kafka事务 消息中间件的消息保障的3个级别 At most once 至多一次。数据丢失。At last once 至少一次。数据冗余Exactly one 精准一次。好&#xff01;&#xff01;&#xff01; 如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。 当&#xff1a;先消费消息、…...

centos 7.9 下利用miniconda里的pyinstaller打包python程序为二进制文件操作方法

centos 7.9 下利用miniconda里的pyinstaller打包python程序为二进制文件操作方法 一.centos 7.9 操作系统安装 参考&#xff1a;https://blog.csdn.net/qq_46015509/article/details/134572030?utm_sourceminiapp_weixin 安装完成后用后台连接工具连上虚拟机 二.安装python3 …...

Motion Plan之基于采样的路径规划算法笔记

Motion Plan之搜索算法笔记 背景&#xff1a; 基于采样算法是一种在路径规划中广泛应用的有效方法。它通过在图中随机选择点来生成一个简化的搜索图&#xff0c;从而加速搜索过程。这种方法的主要优点包括减少内存使用&#xff0c;避免计算错误&#xff0c;具有动态障碍物对抗…...

idea里面常用插件

这里列出了一系列常用的 IntelliJ IDEA 插件&#xff0c;它们可以提高开发效率、简化操作&#xff0c;以及帮助进行代码分析和优化。以下是每个插件的简要介绍&#xff1a; GenerateAllSetter&#xff1a;生成对象的所有 set 方法和 get 方法&#xff0c;方便对象之间的转换。该…...

回归算法优化过程推导

假设存在一个数据集&#xff0c;包含工资、年龄及贷款额度三个维度的数据。我们需要根据这个数据集进行建模&#xff0c;从而在给定工资和年龄的情况下&#xff0c;实现对贷款额度的预测。其中&#xff0c;工资和年龄是模型构建时的两个特征&#xff0c;额度是模型输出的目标值…...

某高品质房产企业:借助NineData平台,统一数据库访问权限,保障业务安全

该企业是中国领先的优质房产品开发及生活综合服务供应商。在 2022 年取得了亮眼的业绩表现&#xff0c;销售额市场占有率跻身全国前五。业务涵盖房产开发、房产代建、城市更新、科技装修等多个领域。 2023 年&#xff0c;该企业和玖章算术&#xff08;浙江&#xff09;科技有限…...

Arduio开发STM32所面临的风险

据说micro_ros用到了arduino,然后用arduino搞stm32需要用到这个Arduino STM32的东西&#xff0c;然后这里申明了&#xff1a;这些代码没有经过严格测试&#xff0c;如果是向心脏起搏器&#xff0c;自动驾驶这样要求严格的的情况下&#xff0c;这个东西不能保证100%不发生问题&a…...

精准人脉引流软件的开发流程与涉及到的技术

一、精准人脉引流软件的开发流程 1. 确定需求&#xff1a;首先&#xff0c;我们需要明确软件的需求&#xff0c;包括目标用户、功能需求、性能需求等。这些需求将直接影响到软件的开发方向和最终效果。 2. 系统设计&#xff1a;根据需求&#xff0c;进行系统设计&#xff0c;…...

Mysql数据库 20.DCL数据控制语言

因这类SQL语言开发人员操作较少&#xff0c;主要是数据库管理员&#xff08;DBA&#xff09;使用&#xff0c;所以前文没有提及&#xff0c;这篇文章进行补充说明 DCL数据控制语言 用来管理数据库用户&#xff0c;控制数据库的访问权限 1.管理用户 1.1 查询用户 select * f…...

使用CMake交叉编译Arm Linux程序

下载安装aarch64-linux-gnu-gcc arm交叉编译工具链 apt-get install aarch64-linux-gnu-gccapt-get install aarch64-linux-gnu-gcc创建编译目录构建makefle 注意&#xff0c;工具链文件的指定一定要紧跟cmake命令之后&#xff0c;不能放到 … 后面构建arm架构cmake mkdir arm…...

训练日志——logging

目录 基础使用日志的6个级别打印日志修改打印级别 高级应用logging的组成记录器Loggers处理器Handlers过滤器Filterformatter格式创建关联打印日志 配置文件参考 基础使用 日志的6个级别 打印日志 import logginglogging.debug(调试日志) logging.info(消息日志) logging.war…...

尺度为什么是sigma?

我们先看中值滤波和均值滤波。 以前&#xff0c;我认为是一样的&#xff0c;没有区分过。 他们说&#xff0c;均值滤波有使图像模糊的效果。 中值滤波有使图像去椒盐的效果。为什么不同呢&#xff1f;试了一下&#xff0c;果然不同&#xff0c;然后追踪了一下定义。 12345&…...

迭代器模式

自定义 Counter 结构体类型&#xff0c;并实现迭代器。其他语言的场景&#xff0c;读取数据库行数据时&#xff0c;使用的就是迭代器。我们使用for语言遍历数组&#xff0c;也是一种迭代。 结构体对象实现 Iterator trait&#xff0c;创建自定义的迭代器&#xff0c;只需要实现…...

C++ 修饰符、存储类、运算符、循环、判断

一、C修饰符类型&#xff1a; C允许在char、int、double数据类型前放置修饰符。 数据类型修饰符&#xff1a; ◆ signed&#xff1a;表示变量可以存储负数。对于整型变量来说&#xff0c;signed 可以省略&#xff0c;因为整型变量默认为有符号类型。 ◆ unsigned&#xff1…...

2023 hnust 湖南科技大学 信息安全管理课程 期中考试 复习资料

前言 ※老师没画重点的补充内容★往年试卷中多次出现或老师提过的&#xff0c;很可能考该笔记是奔着及格线去的&#xff0c;不是奔着90由于没有听过课&#xff0c;部分知识点不一定全&#xff0c;答案不一定完全正确 题型 试卷有很多题是原题 判断题&#xff08;PPT&#xff…...

N皇后问题解的个数

暴力递归 #include <stdio.h>int count0,a[15],flag; void queen(int,int); int main(){int n;scanf("%d",&n);queen(n,n);printf("%d",count); } void queen(int n,int n0){if(n<1){flag1;for(int i1;i<n0;i){for(int j1;j<n0;j){if(…...

php订单发起退款(余额和微信支付)

index.html <a class="btn btn-danger btn-change btn-tuikuan btn-disabled" href="javascript:;"><i class="fa fa-tuikuan"></i> 订单退款</a>-->order.js // 为表格绑定事件Table.api.bindevent(table);//退款…...

【SpringCloud】认识微服务、服务拆分以及远程调用

SpringCloud 1.认识微服务 1.1单体架构 单体架构&#xff1a;将业务的所有功能集中在一个项目中开发&#xff0c;打成一个包部署 单体架构的优缺点&#xff1a; 优点&#xff1a; 架构简单&#xff0c;部署成本低 缺点&#xff1a; 耦合度高&#xff08;维护困难&#x…...

Mysql基础操作(命令行)

文章目录 Mysql基础操作&#xff08;命令行&#xff09;背景创建数据库选择数据库查看所有表查看表结构向表插入数据插入第一条插入第二条插入第三条 查询表数据修改表数据删除表数据 Mysql基础操作&#xff08;命令行&#xff09; 背景 docker安装mysql8&#xff0c;映射本地…...

网站遇到DDOS攻击怎么办?

最近我的网站不幸又遇到了几乎是我见到过的最大一次 DDoS 攻击&#xff0c;并且几乎是没有反映的时间&#xff0c;直接接到腾讯云的短信通知“运营商封堵”&#xff0c;直接造成几个小时无法访问&#xff0c;解封后再次遭受到大流量 DDoS 攻击&#xff0c;再次被“腾讯云平台封…...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

【网络安全产品大调研系列】2. 体验漏洞扫描

前言 2023 年漏洞扫描服务市场规模预计为 3.06&#xff08;十亿美元&#xff09;。漏洞扫描服务市场行业预计将从 2024 年的 3.48&#xff08;十亿美元&#xff09;增长到 2032 年的 9.54&#xff08;十亿美元&#xff09;。预测期内漏洞扫描服务市场 CAGR&#xff08;增长率&…...

从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)

设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile&#xff0c;新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 &#xff08;部分有免费额度&#x…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

前端开发面试题总结-JavaScript篇(一)

文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包&#xff08;Closure&#xff09;&#xff1f;闭包有什么应用场景和潜在问题&#xff1f;2.解释 JavaScript 的作用域链&#xff08;Scope Chain&#xff09; 二、原型与继承3.原型链是什么&#xff1f;如何实现继承&a…...

网络编程(UDP编程)

思维导图 UDP基础编程&#xff08;单播&#xff09; 1.流程图 服务器&#xff1a;短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

tauri项目,如何在rust端读取电脑环境变量

如果想在前端通过调用来获取环境变量的值&#xff0c;可以通过标准的依赖&#xff1a; std::env::var(name).ok() 想在前端通过调用来获取&#xff0c;可以写一个command函数&#xff1a; #[tauri::command] pub fn get_env_var(name: String) -> Result<String, Stri…...

【C++】纯虚函数类外可以写实现吗?

1. 答案 先说答案&#xff0c;可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...