Kafka数据同步原理详解
Kafka数据同步原理详解
Kafka是一种分布式的消息队列系统,它具有高吞吐量、可扩展性和分布式特性等优势。在Kafka中,数据按照主题进行分区,每个主题都有一组分区。每个分区都有自己的生产者和消费者,生产者负责向分区中写入消息,消费者负责从分区中读取消息。因此,Kafka的数据同步主要涉及到生产者和消费者之间的数据传输以及副本同步。
分区同步
分区写入过程
当生产者向Kafka发送消息时,Kafka会将消息存储在本地的一个特殊的文件夹中,称为log文件夹。每个log文件夹中都会包含一个或多个分区的日志文件,每个日志文件对应一个分区。在写入消息时,Kafka会根据分区策略将消息分配到不同的分区中,然后按照写入的顺序将消息追加到对应的日志文件中。
分区读取过程
消费者从Kafka读取消息时,需要指定要读取的主题和分区。Kafka会将消费者的请求路由到对应的分区节点上,然后从该节点的log文件夹中读取指定分区的日志文件。消费者可以通过指定偏移量来控制从哪个位置开始读取,默认情况下会从上次读取的位置继续读取。
副本同步
Kafka的每个分区都有多个副本,这些副本可以分布在不同的节点上以提高系统的容错性和可扩展性。主副本负责处理该分区的所有写请求,而从副本则从主副本中复制数据并保证与主副本的数据一致性。
副本选举
如果主副本出现故障,则从副本会进行选举,选出一个新的主副本继续提供服务。这个过程是自动的,Kafka会检测主副本的状态,当主副本出现故障时,会选出一个从副本作为新的主副本。
数据复制
从副本会定期从主副本中复制数据并保证与主副本的数据一致性。Kafka使用了一种基于Raft协议的数据复制机制来实现数据复制和一致性保障。Raft协议是一种类似于Paxos协议的分布式一致性协议,它能够保证所有副本达成一致状态,从而避免了单点故障和脑裂问题。
在数据复制过程中,主副本将数据写入到本地磁盘上的一个特殊的文件夹中,称为“state store”。从副本会定期从主副本的state store中复制数据到一个本地文件夹中,这个文件夹称为“replica store”。当从副本成功将数据写入到replica store后,会向主副本发送一个确认消息,主副本收到确认消息后,会将该数据标记为已复制。
消息追加
Kafka的消息是追加写入的,这也就是说在消息被写入之后还可以继续追加新的消息。这个特性使得Kafka可以更容易地支持多个消费者并行地读取同一个分区的消息,同时也提高了系统的并发处理能力。
当生产者向分区中写入一条消息时,Kafka会将该消息追加到对应分区的log文件夹中的日志文件中。由于log文件夹中的日志文件是按照写入的顺序追加的,因此消费者在读取消息时也是按照写入的顺序依次读取的。
偏移量提交
消费者在读取消息时会记录一个偏移量(offset),这个偏移量标识了消费者当前读取到的位置。如果消费者出现故障,那么它下次可以继续从上次的偏移量处读取消息,避免了消息丢失和重复读取的问题。同时,Kafka还提供了偏移量提交机制,即消费者在每次读取一定数量的消息后都需要向Kafka提交当前偏移量,以避免消费者在故障恢复后重复读取已经消费过的消息。
偏移量提交的过程是自动的,消费者在读取消息时会记录当前的偏移量,当读取到一定数量的消息后,会向Kafka提交当前的偏移量。提交偏移量的过程是可靠的,即使消费者在提交偏移量之前出现故障,也可以通过查看提交的偏移量来确定消费者已经读取到的位置。
Java源码示例和分析
下面是一个简单的Java源码示例来说明Kafka的数据同步原理:
// 创建生产者producer对象,连接Kafka集群
Producer<String, String> producer = new KafkaProducer<>(props);// 创建主题及分区
String topic = "test-topic";
int partition = 0; // 分区号// 发送消息到指定分区
producer.send(new ProducerRecord<>(topic, partition, "test-message"));
在上述示例中,我们创建了一个Kafka生产者对象并使用它向指定的主题发送一条消息。这个生产者对象使用KafkaProducer类创建,它封装了与Kafka集群的通信。
当生产者发送消息时,它使用ProducerRecord类指定了要发送消息的主题、分区号和消息内容。这个消息将被追加到指定分区的日志文件中,并由Kafka集群负责将其存储在适当的节点上。
作为消费者,我们可以使用以下代码来读取这个分区中的消息:
// 创建消费者consumer对象,连接Kafka集群
Consumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅指定主题的分区
consumer.subscribe(Collections.singletonList(topic));// 轮询消息
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 轮询消息for (ConsumerRecord<String, String> record : records) { // 遍历每条消息System.out.println(record.value()); // 输出消息内容}
}
在这个示例中,我们创建了一个Kafka消费者对象并使用它订阅了指定的主题。这个消费者对象使用KafkaConsumer类创建,它封装了与Kafka集群的通信。
消费者通过调用subscribe()方法订阅指定的主题,然后通过调用poll()方法轮询消息。poll()方法将返回一个ConsumerRecords对象,其中包含了该消费者关注的分区中所有可用的消息。消费者可以遍历这个ConsumerRecords对象来处理每条消息。
需要注意的是,Kafka的分区同步和副本同步都是由Kafka集群自动处理的。生产者和消费者只需要关注发送和接收消息即可,而不需要关心底层的同步过程。
相关文章:
Kafka数据同步原理详解
Kafka数据同步原理详解 Kafka是一种分布式的消息队列系统,它具有高吞吐量、可扩展性和分布式特性等优势。在Kafka中,数据按照主题进行分区,每个主题都有一组分区。每个分区都有自己的生产者和消费者,生产者负责向分区中写入消息&…...
C++课程总复习
一、c的第一条程序 1.cout cout >输出类对象,用来输出的,可以自动识别类型,所以不需要加格式符号 << 插入符(输出符号) endl 换行>\n #include <iostream> //#预处理 //include 包含 相应的头…...

数据结构—顺序表
目录 1.线性表 2.顺序表概念 3.实现顺序表 (1)声明结构体 (2)初始化 (3)打印数据 (4) 销毁 (5)尾插&头插 尾插 判断是否扩容 头插 (6)尾删&头删 尾删 头删 (7)指定位置插入元素 (8)删除指定位置元素 (9)查找指定元素位置 (10)修改指定位置元素 完整版…...

企业服务器租用对性能有什么要求呢?
企业租用服务器租用首要的是稳定,其次是安全,稳定是为了让企业的工作能够顺利进行,只有性能稳定的服务器才能保证网站之类的正常工作,就让小编带大家看一看有什么要求吧! 服务器简单介绍。服务器是在网络上为其它客户机…...
2731.移动机器人
2731. 移动机器人 - 力扣(LeetCode) 有一些机器人分布在一条无限长的数轴上,他们初始坐标用一个下标从 0 开始的整数数组 nums 表示。当你给机器人下达命令时,它们以每秒钟一单位的速度开始移动。 给你一个字符串 s ,…...

相交链表Java
给你两个单链表的头节点 headA 和 headB ,请你找出并返回两个单链表相交的起始节点。如果两个链表没有交点,返回 nu11。 以下有两种解决方法: 一种是用Map,利用其key值唯一的方法去判断(也可以使用set,set在add时,已存在的元素会返回false,不存在的返回…...

第二章:OSI参考模型与TCP/IP模型
OSI参考模型与TCP/IP模型 一、OSI参考模型二、TCP/IP模型2.1 四层分法(书上)2.2 五层分法(实际厂商)2.3 数据封装和解封装2.3.1 封装2.3.2 解封装2.3.3 TCP/IP分层封装2.3.4 数据封装和解封装过程 一、OSI参考模型 1.物理层 定义电…...
知识图谱04——openGL与ubuntu22.04
跑图神经网络的时候遇到了如下问题 libGL error: failed to load driver: iris libGL error: MESA-LOADER: failed to open iris: /usr/lib/dri/iris_dri.so: 无法打开共享对象文件: 没有那个文件或目录 (search paths /usr/lib/x86_64-linux-gnu/dri:\$${ORIGIN}/dri:/usr/li…...
如何看待为了省小钱而花费时间
相信每个人都会遇到这种情况:购买东西时想着货比三家或者想办法领优惠券、凑单等就可以省下一些钱,但是需要花费不少时间和精力。这时就开始犹豫了:省钱是必要的,需要居安思危,等到缺钱的时候不会后悔;又想…...

Maven Eclipse
Eclipse 提供了一个很好的插件 m2eclipse ,该插件能将 Maven 和 Eclipse 集成在一起。 在最新的 Eclipse 中自带了 Maven,我们打开,Windows->Preferences,如果会出现下面的画面: 下面列出 m2eclipse 的一些特点&a…...

Linux:redis集群(3.*版本 和 5.*版本)搭建方法
介绍 至少6个实例才能组成集群。3主3从会自动分配 Redis集群原理 Redis集群架构 Redis Cluster采用虚拟槽分区,将所有的数据根据算法映射到0~16383整数槽内 Redis Cluster是一个无中心的结构 每个节点都保存数据和整个集群的状态 集群角色 Master:Master…...

正则表达式基础语法
https://tool.oschina.net/regex 正则表达式:检查、匹配字符串的表达式 单个字符匹配: 有特殊含义的匹配: 多次重复匹配: 限定开头结尾的匹配: 贪婪模式:在满足条件的情况下,尽可能多匹配…...

数据库常见面试题--MySQL
梳理面试过程中数据库相关的常见问题,需要说明的是,这篇文章主要是基于MySQL数据库,其他类型的数据库还请自行参考使用。 数据库概述 为什么使用数据库 1、数据库增删改查更方便 2、提供了事务的能力 本质是更好的管理数据。 数据库体系结…...
Springboot 集成 Redis集群配置公网IP连接报私网IP连接失败问题
1、问题:在Springboot 集成 Redis集群配置公网IP连接报私网IP连接失败,一直报私有IP连接失败 14 14:57:49.180 WARN 22012 --- [ioEventLoop-6-4] i.l.c.c.topology.ClusterTopologyRefresh : Unable to connect to [192.168.0.19:6384]: connection …...

解决方案 | 法大大电子签精准击破销售场景签约难题
新商业形态及新交易模式不断涌现,电子签已经成为现代商业活动中不可或缺的一部分。特别是在销售场景中,电子签的应用不仅可以提高销售效率,还可以降低成本,提高客户满意度。本文将详细分析电子签在销售场景中的应用价值能力&#…...
ARM按键中断控制事件
设置按键中断,按键1按下,LED亮,再按一次,灭按键2按下,蜂鸣器响。再按一次,不响按键3按下,风扇转,再按一次,风扇停 src/key_it.c #include"key_it.h" //GPIO初…...

微信小程序之本地生活(九宫格)
文章目录 一.创建项目二.配置修改json三.编写WXML四.编写WXSS五.最终效果 一.创建项目 创建新的项目,名称为:本地生活 二.配置修改json 在app.json中删除其他页面 将index改为grid 自动生成新的文件 添加自己的轮播图片 源代码: <!--…...

【Linux 安装Kibana 及 Es 分词器安装】
一、客户端Kibana安装 Kibana是一个开源分析和可视化平台,旨在与Elasticsearch协同工作。参考文档 1. 下载并解压缩Kibana 下载路径 选择的版本是和 ElasticSearch 对应(7.17.3) 下载后上传到Linux 系统中,并放在 /root/ 下&a…...

python-arima模型statsmodels库实现-有数据集(续)-statsmodels-0.9.0版本
python-arima模型statsmodels库实现-有数据集(续) 这篇博客是上一篇python-arima模型statsmodels库实现的续集,上一篇采用的statsmodels版本应该要高一点,如果使用低版本的statsmodels代码会有bug,这一篇则是针对stat…...

JVM源码剖析之线程的创建过程
说在前面: 对于Java线程的创建这个话题,似乎已经被"八股文"带偏~ 大部分Java程序员从"八股文"得知创建Java线程有N种方式,比如new Thread、new Runnable、Callable、线程池等等~ 而笔者写下这篇文…...

IDEA运行Tomcat出现乱码问题解决汇总
最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...

Unity3D中Gfx.WaitForPresent优化方案
前言 在Unity中,Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染(即CPU被阻塞),这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案: 对惹,这里有一个游戏开发交流小组&…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...

CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
Java数值运算常见陷阱与规避方法
整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...
十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建
【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...
k8s从入门到放弃之HPA控制器
k8s从入门到放弃之HPA控制器 Kubernetes中的Horizontal Pod Autoscaler (HPA)控制器是一种用于自动扩展部署、副本集或复制控制器中Pod数量的机制。它可以根据观察到的CPU利用率(或其他自定义指标)来调整这些对象的规模,从而帮助应用程序在负…...
Python网页自动化Selenium中文文档
1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API,让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API,你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…...
深入理解 React 样式方案
React 的样式方案较多,在应用开发初期,开发者需要根据项目业务具体情况选择对应样式方案。React 样式方案主要有: 1. 内联样式 2. module css 3. css in js 4. tailwind css 这些方案中,均有各自的优势和缺点。 1. 方案优劣势 1. 内联样式: 简单直观,适合动态样式和…...