当前位置: 首页 > news >正文

kafka精准一次、事务、幂等性

Kafka事务

消息中间件的消息保障的3个级别

  1. At most once 至多一次。数据丢失。
  2. At last once 至少一次。数据冗余
  3. Exactly one 精准一次。好!!!

如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。

:先消费消息、再提交位移。

如果提交位移这一步挂了,就会再消费一遍消息。重复消费====》〉》至少一次

当:先提交位移、再消费消息。

提议位移成功、消费消息失败,那么数据就丢失了====》〉》至多一次

如何精准一次呢?

幂等和事务!

幂等

对接口的多次调用所产生的结果和一次调用的结果是一样的。

即:(第一次调用,中途挂了,再次调用==一次调用) 为true

如何实现?

在v2版本的消息存储格式用有两个字段。produce_id(简称pid) 、first sequence

在这里插入图片描述

每个新的生产者实例在初始化的时候都会被分配一个pid,每个pid,消息发送到每一个分区都有序列号 sequence,序列号会从0开始递增,每发送一条消息,<PID,分区> 对应的序列号的值会➕1。这个序列号值(SN)在broker的内存中维护。只有当SN_new=SN_old+1.

broker才会接收这个消息。

如SN_new < SN_old+1 说明消息重复了,这个消息可以直接丢掉。

如SN_new>SN_old+1 说明消息丢失了,有数据还没有卸写入。抛乱序异常OutOforderSequenceException。

即用序列号来保证消息的顺序消费。

注意 所记录的这个序列号是针对 每一对<PID,分区> 所以这个幂等实现的是单会话、单分区的。

如何保证多个分区之间的幂等性呢?

事务

保证对多个分区写入操作的原子性,要么全部成功、要么全部失败。将应用程序的生产消息、消费消息、提交消费位移当作原子操作来处理。

用户显示指定一个事务id: transactionalId。这个事务id是唯一的

从生产者角度来考虑,事务保证了生产者会话消息的幂等发送跨生产者会话的事务恢复.

  • 生产者会话消息的幂等发送:如有有两个相同事务id的生产者,新的创建了 旧的就会被kill
  • 某个生产者实例宕机了,新的生产者实例可以保证未完成的旧事务要么被提交 要没被中断

实现过程,以consume-transform-produce为例。

package com.hzbank.yjj.transaction;import com.hzbank.yjj.producer.CustomerPartitioner;
import com.hzbank.yjj.producer.ProducerlnterceptorPrefix;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;public class TransactionConsumeTransformProduce {public static final String brokerList = "localhost:9092";public static Properties getConsumerProps(){Properties props =new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,"groupId");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);return props;}public static Properties getProducerProps(){Properties props =new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionalId");return props;}public static void main(String[] args) {//初始化生产者和消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(getConsumerProps());consumer.subscribe(Collections.singletonList("topic-source"));KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerProps());//初始化事务producer.initTransactions();while (true){ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));if(!records.isEmpty()){HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();//开启事务producer.beginTransaction();try {for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println("获取到了topic-source发送过来的数据"+record.value());System.out.println("do some ");ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-sink", record.key(), record.value());producer.send(producerRecord);}// 获取最近一次的消费位移long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));}//提交消费位移producer.sendOffsetsToTransaction(offsets,"groupId");//提交事务producer.commitTransaction();} catch (ProducerFencedException e) {System.out.println("异常了");producer.abortTransaction();}}}}}

1. 找到TransactionCoordinator。

TransactionCoordinator负责分配和管理事务。
FindCoordinatorRequest 发送请求找到TransactionCoordinator所在的broker节点。返回其对应的node_id、 host、 port 信息

transactionalId 的哈希值计算主题_transaction_state 中的分区编号

根据分区leader副本找到所在的broker节点,极为Transaction Coordinator节点

2. 获取pid

通过InitProducerIdRequest向TransactionCoordinator 获取pid 为当前生产者分配一个pid。

String transactionalId; 事务id
int transactionTimeoutMs; 事务状态更新超时时间

3. 保存pid

TransactionCoordinator 第一次收到事务id会和对应pid保存下来,以消息(事务日志消息)的形式保存到主题_transaction_state中,实现持久化

InitProducerIdRequest还会出发一下任务:

- 增加pid对应的producer_epoch.具有相同 PID 但 producer_epoch 小 于该 producer_叩och 的其他生产者新开启的事务将被拒绝 。
- 恢复( Commit)或中止( Ab。此)之前的生 产 者未完成的 事务

4. 开启事务

通过 KafkaProduc町的 beginTransaction()方法。调用该方法后,生产者本 地会标记己经开启了 一个新的事务 ,只有在生产者发送第一条消息之后 TransactionCoordinator 才会认为该事务 己经开启 。

5. Consume-Transform-Produce

整个事务处理数据。

  • AddPartitionsToTxnRequest:让 TransactionCoordinator 将<transactionld, TopicPartition>的对应关系存储在主题

    transaction state 中

  • ProduceRequest:生产者通过 ProduceRequest 请求发送消息( ProducerBatch)到用户 自定义主题中

  • AddOffsetsToTxnRequest:TransactionCoordinator 收到这个AddOffsetsToTxnRequest请求,通过 groupId 来推导出在一consumer_offsets 中的分区

  • TxnOffsetCommitRequest:发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中 包含的消费位移信息 offsets 存储到主题 consumer offsets 中

6. 提交或者终止事务

KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法。

写不下去了,暂时就先理解这么多了,后面再多结合源码去看看。

参考:书籍《深入理解 Kafka:核心设计与实践原理》

相关文章:

kafka精准一次、事务、幂等性

Kafka事务 消息中间件的消息保障的3个级别 At most once 至多一次。数据丢失。At last once 至少一次。数据冗余Exactly one 精准一次。好&#xff01;&#xff01;&#xff01; 如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。 当&#xff1a;先消费消息、…...

centos 7.9 下利用miniconda里的pyinstaller打包python程序为二进制文件操作方法

centos 7.9 下利用miniconda里的pyinstaller打包python程序为二进制文件操作方法 一.centos 7.9 操作系统安装 参考&#xff1a;https://blog.csdn.net/qq_46015509/article/details/134572030?utm_sourceminiapp_weixin 安装完成后用后台连接工具连上虚拟机 二.安装python3 …...

Motion Plan之基于采样的路径规划算法笔记

Motion Plan之搜索算法笔记 背景&#xff1a; 基于采样算法是一种在路径规划中广泛应用的有效方法。它通过在图中随机选择点来生成一个简化的搜索图&#xff0c;从而加速搜索过程。这种方法的主要优点包括减少内存使用&#xff0c;避免计算错误&#xff0c;具有动态障碍物对抗…...

idea里面常用插件

这里列出了一系列常用的 IntelliJ IDEA 插件&#xff0c;它们可以提高开发效率、简化操作&#xff0c;以及帮助进行代码分析和优化。以下是每个插件的简要介绍&#xff1a; GenerateAllSetter&#xff1a;生成对象的所有 set 方法和 get 方法&#xff0c;方便对象之间的转换。该…...

回归算法优化过程推导

假设存在一个数据集&#xff0c;包含工资、年龄及贷款额度三个维度的数据。我们需要根据这个数据集进行建模&#xff0c;从而在给定工资和年龄的情况下&#xff0c;实现对贷款额度的预测。其中&#xff0c;工资和年龄是模型构建时的两个特征&#xff0c;额度是模型输出的目标值…...

某高品质房产企业:借助NineData平台,统一数据库访问权限,保障业务安全

该企业是中国领先的优质房产品开发及生活综合服务供应商。在 2022 年取得了亮眼的业绩表现&#xff0c;销售额市场占有率跻身全国前五。业务涵盖房产开发、房产代建、城市更新、科技装修等多个领域。 2023 年&#xff0c;该企业和玖章算术&#xff08;浙江&#xff09;科技有限…...

Arduio开发STM32所面临的风险

据说micro_ros用到了arduino,然后用arduino搞stm32需要用到这个Arduino STM32的东西&#xff0c;然后这里申明了&#xff1a;这些代码没有经过严格测试&#xff0c;如果是向心脏起搏器&#xff0c;自动驾驶这样要求严格的的情况下&#xff0c;这个东西不能保证100%不发生问题&a…...

精准人脉引流软件的开发流程与涉及到的技术

一、精准人脉引流软件的开发流程 1. 确定需求&#xff1a;首先&#xff0c;我们需要明确软件的需求&#xff0c;包括目标用户、功能需求、性能需求等。这些需求将直接影响到软件的开发方向和最终效果。 2. 系统设计&#xff1a;根据需求&#xff0c;进行系统设计&#xff0c;…...

Mysql数据库 20.DCL数据控制语言

因这类SQL语言开发人员操作较少&#xff0c;主要是数据库管理员&#xff08;DBA&#xff09;使用&#xff0c;所以前文没有提及&#xff0c;这篇文章进行补充说明 DCL数据控制语言 用来管理数据库用户&#xff0c;控制数据库的访问权限 1.管理用户 1.1 查询用户 select * f…...

使用CMake交叉编译Arm Linux程序

下载安装aarch64-linux-gnu-gcc arm交叉编译工具链 apt-get install aarch64-linux-gnu-gccapt-get install aarch64-linux-gnu-gcc创建编译目录构建makefle 注意&#xff0c;工具链文件的指定一定要紧跟cmake命令之后&#xff0c;不能放到 … 后面构建arm架构cmake mkdir arm…...

训练日志——logging

目录 基础使用日志的6个级别打印日志修改打印级别 高级应用logging的组成记录器Loggers处理器Handlers过滤器Filterformatter格式创建关联打印日志 配置文件参考 基础使用 日志的6个级别 打印日志 import logginglogging.debug(调试日志) logging.info(消息日志) logging.war…...

尺度为什么是sigma?

我们先看中值滤波和均值滤波。 以前&#xff0c;我认为是一样的&#xff0c;没有区分过。 他们说&#xff0c;均值滤波有使图像模糊的效果。 中值滤波有使图像去椒盐的效果。为什么不同呢&#xff1f;试了一下&#xff0c;果然不同&#xff0c;然后追踪了一下定义。 12345&…...

迭代器模式

自定义 Counter 结构体类型&#xff0c;并实现迭代器。其他语言的场景&#xff0c;读取数据库行数据时&#xff0c;使用的就是迭代器。我们使用for语言遍历数组&#xff0c;也是一种迭代。 结构体对象实现 Iterator trait&#xff0c;创建自定义的迭代器&#xff0c;只需要实现…...

C++ 修饰符、存储类、运算符、循环、判断

一、C修饰符类型&#xff1a; C允许在char、int、double数据类型前放置修饰符。 数据类型修饰符&#xff1a; ◆ signed&#xff1a;表示变量可以存储负数。对于整型变量来说&#xff0c;signed 可以省略&#xff0c;因为整型变量默认为有符号类型。 ◆ unsigned&#xff1…...

2023 hnust 湖南科技大学 信息安全管理课程 期中考试 复习资料

前言 ※老师没画重点的补充内容★往年试卷中多次出现或老师提过的&#xff0c;很可能考该笔记是奔着及格线去的&#xff0c;不是奔着90由于没有听过课&#xff0c;部分知识点不一定全&#xff0c;答案不一定完全正确 题型 试卷有很多题是原题 判断题&#xff08;PPT&#xff…...

N皇后问题解的个数

暴力递归 #include <stdio.h>int count0,a[15],flag; void queen(int,int); int main(){int n;scanf("%d",&n);queen(n,n);printf("%d",count); } void queen(int n,int n0){if(n<1){flag1;for(int i1;i<n0;i){for(int j1;j<n0;j){if(…...

php订单发起退款(余额和微信支付)

index.html <a class="btn btn-danger btn-change btn-tuikuan btn-disabled" href="javascript:;"><i class="fa fa-tuikuan"></i> 订单退款</a>-->order.js // 为表格绑定事件Table.api.bindevent(table);//退款…...

【SpringCloud】认识微服务、服务拆分以及远程调用

SpringCloud 1.认识微服务 1.1单体架构 单体架构&#xff1a;将业务的所有功能集中在一个项目中开发&#xff0c;打成一个包部署 单体架构的优缺点&#xff1a; 优点&#xff1a; 架构简单&#xff0c;部署成本低 缺点&#xff1a; 耦合度高&#xff08;维护困难&#x…...

Mysql基础操作(命令行)

文章目录 Mysql基础操作&#xff08;命令行&#xff09;背景创建数据库选择数据库查看所有表查看表结构向表插入数据插入第一条插入第二条插入第三条 查询表数据修改表数据删除表数据 Mysql基础操作&#xff08;命令行&#xff09; 背景 docker安装mysql8&#xff0c;映射本地…...

网站遇到DDOS攻击怎么办?

最近我的网站不幸又遇到了几乎是我见到过的最大一次 DDoS 攻击&#xff0c;并且几乎是没有反映的时间&#xff0c;直接接到腾讯云的短信通知“运营商封堵”&#xff0c;直接造成几个小时无法访问&#xff0c;解封后再次遭受到大流量 DDoS 攻击&#xff0c;再次被“腾讯云平台封…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

python/java环境配置

环境变量放一起 python&#xff1a; 1.首先下载Python Python下载地址&#xff1a;Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个&#xff0c;然后自定义&#xff0c;全选 可以把前4个选上 3.环境配置 1&#xff09;搜高级系统设置 2…...

mongodb源码分析session执行handleRequest命令find过程

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程&#xff0c;并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令&#xff0c;把数据流转换成Message&#xff0c;状态转变流程是&#xff1a;State::Created 》 St…...

el-switch文字内置

el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...

linux 下常用变更-8

1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行&#xff0c;YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID&#xff1a; YW3…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...