当前位置: 首页 > news >正文

2024.1.13 Kafka六大机制和Structured Streaming

目录

一 . Kafka中生产者数据分发策略

二.  Kafka消费者的负载均衡机制

三 . 数据不丢失机制

生产者端是如何保证数据不丢失的呢?

Broker端如何保证数据不丢失

 消费端如何保证数据不丢失

Kafka中消费者如何对数据仅且只消费一次

 四 . 启动Kafka eagle命令

数据积压问题处理

五 . 结构化流 

数据源 File Source

OPERATIONS数据处理操作

 Sink输出操作


六大机制:分区,副本,存储,查询,数据不丢失,负载均衡 ; 

一 . Kafka中生产者数据分发策略

        JAVA中的轮询分发策略 和 粘性分发策略:

                轮询:避免数据倾斜

                粘性: 产生数据倾斜

                轮询分发策略: 在Kafka的老版本中存在的一种分发策略,当生产数据的时候,只有value但是没有key的时候,采用轮询。
    优点: 可以保证每个分区拿到的数据基本是一样,因为是一个一个的轮询的分发
    缺点: 如果采用异步发送方式,意味着一批数据发送到broker端,由于是轮询策略,会将这一批数据拆分为多个小的批次,分别再写入到不同的分区里面去,写入进去以后,每个分区都会给予响应,会影响写入效率。

                粘性分发策略: 在Kafka新版本中存在的一种分发策略。当生产数据的时候,只有value但是没有key的时候,采用粘性分发策略
    优点: 在发送数据的时候,首先会随机的选取一个分区,然后尽可能将数据分发到这个分区上面去,也就是尽可能粘着这个分区。该分发方式,在异步发送的操作中,效率比较高。
    缺点: 在数据发送特别快的时候,可能会导致某个分区的数据比其他分区数据多很多,造成大量的数据集中在一个分区上面

二.  Kafka消费者的负载均衡机制

Kafka消费者的负载均衡机制
1- 在同一个消费组中,消费者的个数最多不能超过Topic的分区数。如果超过了,就会有一些消费者处于闲置状态,消费不到任何数据。
2- 在同一个消费组中,一个Topic中一个分区的数据,只能被同个消费组中的一个消费者所消费,不能被同个消费组中多个消费者所消费。但是一个消费组内的一个消费者可以消费多个分区的数据。也就是分区和消费者的对应关系,多对一
3-不同的消费组中的消费者,可以对一个Topic的数据同时消费,也就是不同消费组间没有任何关系

三 . 数据不丢失机制

生产者端是如何保证数据不丢失的呢?


答:生产者端将消息发送给到Kafka集群以后,broker要给生产者响应信息。响应原理就是ACK机制


ACK机制当中有3个参数配置值,分别是:0  1  -1(all)
0:生产者生产消息给到Kafka集群,生产者不等待(不接收)broker返回的响应信息
1:生产者生产消息给到Kafka集群,Kafka集群中的分区对应的Leader主副本所在的broker给生产者返回响应信息
-1(all):生产者生产消息给到Kafka集群,Kafka集群中的分区对应的所有副本给生产者返回响应信息


消息的生产效率排序(由高到低):0 > 1 > -1
消息的安全级别排序(由高到低):-1 > 1 > 0


在实际工作中如何选择ACK参数配置?
答:根据数据的重要程度进行选择。如果数据重要,优先保证数据的安全性,再考虑生产效率;如果数据不重要,优先考虑生产效率,再尽可能提升安全级别。

Broker端如何保证数据不丢失

        Broker端通过多副本机制确保数据不丢失。同时需要生产者端将acks设置为-1

 消费端如何保证数据不丢失

消费者消费消息的步骤:
1- 消费者首先连接到Kafka集群中,进行消息的消费

2- Kafka集群接收到Consumer消费者的消费请求以后,首先会根据group id(消费组名称),查找上次消费消息对应的offset(偏移量)

3- 如果没有查找到offset,消费者默认从Topic最新的地方开始消费

4- 如果有查找到offset,会从上次消费到的offset地方进行继续消费
    4.1- 首先先确定要读取的这个offset偏移量在哪个segment文件当中
    4.2- 查询这个segment文件对应的index文件,根据offset确定这个消息在log文件的什么位置,也就是确定消息的物理偏移量
    4.3- 读取log文件,查询对应范围内的数据即可
    4.4- 获取最终的消息数据

5- 消费者在消费的过程中,底层有个线程会定时的将消费的offset提交给到Kafka集群。Kafka集群会更新对应的offset的值

Kafka中消费者如何对数据仅且只消费一次

1- 将消费者的 enable.auto.commit 属性设置为 false,并手动管理消费者的偏移量。这样可以确保消费者在处理完所有消息后才更新偏移量,避免重复消费数据。也就是将消息的消费、消息业务处理代码、offset提交代码放在同一个事务当中。

2- 使用幂等生产者或事务性生产者来确保消息只被发送一次。这样可以避免重复发送消息,从而避免消费者重复消费数据。

3- 在消息中加入唯一的ID

 四 . 启动Kafka eagle命令

cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin

./ke.sh start

结构化流测试linux开启命令

首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据
yum -y install nc
    
执行nc命令, 开启端口号, 写入数据:
nc -lk 55555

注意: 要先启动nc,再启动我们的程序


查看端口号是否被使用命令:
netstat -nlp | grep 要查询的端口

 

数据积压问题处理

出现积压的原因:

  • 因为数据写入目的容器失败,从而导致消费失败

  • 因为网络延迟消息消费失败

  • 消费逻辑过于复杂, 导致消费过慢,出现积压问题

解决方案:

  • 对于第一种, 我们常规解决方案, 处理目的容器,保证目的容器是一直可用状态

  • 对于第二种, 如果之前一直没问题, 只是某一天出现, 可以调整消费的超时时间。并且同时解决网络延迟问题

  • 对于第三种, 一般解决方案,调整消费代码, 消费更快即可, 利于消费者的负载均衡策略,提升消费者数量

五 . 结构化流 

        有界: 数据大小固定,有开始和结尾

        无界: 源源不断的数据,没有明确的结尾

结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL ....

Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次

真正的流处理引擎: Flink、Storm(早期流式处理引擎)、Flume(流式数据采集)

数据源 File Source

        将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet。。。。

文件数据源特点:
        1- 不能够监听具体的文件,否则会报错误java.lang.IllegalArgumentException: Option 'basePath' must be a directory
        2- 可以通过通配符的形式,来监听目录下的文件,符合要求的才会被读取
        3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况 

File source只能监听目录,不能监听具体文件 

读取代码通用格式:

                sparksession.readStream

                     .format('CSV|JSON|TEXT|PARQUET|ORC)

                    .option('参数名1','参数值1')
                    .option('参数名2','参数值2')
                    .option('参数名N','参数值N')
                    .schema(元数据信息)
                    .load('需要监听的目录地址')

OPERATIONS数据处理操作

        指的是数据处理部分,该操作和SparkSQL完全一致 

 Sink输出操作

        append模式:

                只支持追加,不支持聚合和排序,每次只打印追加的内容

        complete模式:

                每一次都全量处理,因为数据量大,所以必须聚合,也可以支持排序

        update模式: 

                支持聚合的append模式,有聚合操作只会输出有变化和新增的内容,不支持排序;

相关文章:

2024.1.13 Kafka六大机制和Structured Streaming

目录 一 . Kafka中生产者数据分发策略 二. Kafka消费者的负载均衡机制 三 . 数据不丢失机制 生产者端是如何保证数据不丢失的呢? Broker端如何保证数据不丢失 消费端如何保证数据不丢失 Kafka中消费者如何对数据仅且只消费一次 四 . 启动Kafka eagle命令 数…...

遥感影像-语义分割数据集:Landsat8云数据集详细介绍及训练样本处理流程

原始数据集详情 简介:该云数据集包括RGB三通道的高分辨率图像,在全球不同区域的分辨率15米。这些图像采集自Lansat8的五种主要土地覆盖类型,即水、植被、湿地、城市、冰雪和贫瘠土地。 KeyValue卫星类型landsat8覆盖区域未知场景水、植被、…...

YOLOV8在coco128上的训练

coco128是coco数据集的子集只有128张图片 训练代码main.py from ultralytics import YOLO# Load a model model YOLO("yolov8n.yaml") # build a new model from scratch model YOLO("yolov8n.pt") # load a pretrained model (recommended for trai…...

设计模式——享元模式

享元模式(Flyweight Pattern)是一种结构型设计模式,它的主要目的是通过共享已存在的对象来大幅度减少需要创建的对象数量,从而降低系统内存消耗和提高性能。它通过将对象的状态划分为内部状态(Intrinsic State&#xf…...

【Python机器学习】分类器的不确定估计——决策函数

scikit-learn接口的分类器能够给出预测的不确定度估计,一般来说,分类器会预测一个测试点属于哪个类别,还包括它对这个预测的置信程度。 scikit-learn中有两个函数可以用于获取分类器的不确定度估计:decidion_function和predict_pr…...

云原生周刊:K8sGPT 加入 CNCF | 2024.1.8

开源项目推荐 VolSync VolSync 使用 rsync 或 rclone 在集群之间异步复制 Kubernetes 持久卷。它还支持通过 Restic 创建持久卷的备份。 KubeClarity KubeClarity 是一种用于检测和管理软件物料清单 (SBOM) 以及容器映像和文件系统漏洞的工具。它扫描运行时 K8s 集群和 CI/…...

LightGBM原理和调参

背景知识 LightGBM(Light Gradient Boosting Machine)是一个实现GBDT算法的框架,具有支持高效率的并行训练、更快的训练速度、更低的内存消耗、更好的准确率、支持分布式可以处理海量数据等优点。 普通的GBDT算法不支持用mini-batch的方式训练,在每一次…...

ROS无人机开发常见错误

飞控部分 一、解锁时飞控不闪红灯,无任何反应,地面站也无报错 解决办法: 打开地面站的遥控器一栏 首先检查右下角Channel Monitor是否有识别出遥控各通道的值,如果没有,检查遥控器是否打开,遥控器和接收…...

Baumer工业相机堡盟工业相机如何联合NEOAPI SDK和OpenCV实现相机图像转换为AVI视频格式(C#)

Baumer工业相机堡盟工业相机如何联合NEOAPI SDK和OpenCV实现相机图像转换为视频格式(C#) Baumer工业相机Baumer工业相机的图像转换为OpenCV的图像的技术背景在NEOAPI SDK里实现相机图像转换为视频格式 工业相机通过OpenCV实现相机图像转换为视频格式的优…...

第一次面试总结 - 迈瑞医疗 - 软件测试

🧸欢迎来到dream_ready的博客,📜相信您对专栏 “本人真实面经” 很感兴趣o (ˉ▽ˉ;) 专栏 —— 本人真实面经,更多真实面试经验,中大厂面试总结等您挖掘 注:此次面经全靠小嘴八八,没…...

利用Qt输出XML文件

使用Qt输出xml文件 void PixelConversionLibrary::generateXML() {QFile file("D:/TEST.xml");//创建xml文件if (!file.open(QIODevice::WriteOnly | QIODevice::Text))//以只写方式&#xff0c;文本模式打开文件{qDebug() << "generateXML:Failed to op…...

OpenWrt智能路由器Wan PPPoE拨号配置方法

OpenWrt智能路由器的wan PPPoE拨号配置方法和我们常见的不太一样, 需要先找到wan网卡,然后将协议切换为 PPPoE然后才能看到输入上网账号和密码的地方. 首先登录路由器 http://openwrt.lan/ 然后找到 Network --> Interfaces 这里会显示你当前的路由器的所有接口, 选择 …...

(十一)IIC总线-AT24C02-EEPROM

文章目录 IIC总线篇AT24C02-EEPROM篇主要特性引脚说明AT24Cxx用几位数据地址随机寻址的(存储器组织)AT24C02设备操作AT24CXX设备寻址EEPROM写操作的种类EEPROM读操作的种类实现单字节写实现任意读读写应用 IIC总线篇 前面介绍过了&#xff0c;请参考 (十)IIC总线-PCF8591-ADC/…...

现在做电商还有发展空间吗?哪个平台的盈利比较大?

我是电商珠珠 对于部分人来说&#xff0c;实体店的投入太大&#xff0c;一上来就是十几w&#xff0c;有时候还看不到结果。 所以有的人就瞄准了电商这个圈子&#xff0c;做线上平台。 大家都知道&#xff0c;近年来直播电商很火&#xff0c;所以很多商家都会去找达人带货&am…...

多节点 docker 部署 elastic 集群

参考 Install Elasticsearch with Docker Images 环境 docker # docker version Client: Docker Engine - CommunityVersion: 24.0.7API version: 1.43Go version: go1.20.10Git commit: afdd53bBuilt: Thu Oct 26 09:08:01 202…...

2023年全国职业院校技能大赛软件测试赛题—单元测试卷⑨

单元测试 一、任务要求 题目1&#xff1a;根据下列流程图编写程序实现相应分析处理并显示结果。返回文字“xa*a*b的值&#xff1a;”和x的值&#xff1b;返回文字“xa-b的值&#xff1a;”和x的值&#xff1b;返回文字“xab的值&#xff1a;”和x的值。其中变量a、b均须为整型…...

C++核心编程——文件操作

本专栏记录C学习过程包括C基础以及数据结构和算法&#xff0c;其中第一部分计划时间一个月&#xff0c;主要跟着黑马视频教程&#xff0c;学习路线如下&#xff0c;不定时更新&#xff0c;欢迎关注。 当前章节处于&#xff1a; ---------第1阶段-C基础入门 ---------第2阶段实战…...

【REST2SQL】05 GO 操作 达梦 数据库

【REST2SQL】01RDB关系型数据库REST初设计 【REST2SQL】02 GO连接Oracle数据库 【REST2SQL】03 GO读取JSON文件 【REST2SQL】04 REST2SQL第一版Oracle版实现 信创要求用国产数据库&#xff0c;刚好有项目用的达梦&#xff0c;研究一下go如何操作达梦数据库 1 准备工作 1.1 安…...

GitLab 502 Whoops, GitLab is taking too much time to respond. 解决

1、先通过gitlab-ctl restart进行重启&#xff0c;2分钟后看是否可以正常访问&#xff0c;为什么要2分钟&#xff0c;因为gitlab启动会有很多配套的服务启动&#xff0c;包括postgresql等 2、如果上面不行&#xff0c;再看gitlab日志&#xff0c;通过gitlab-ctl tail命令查看&…...

vi ~/.bashrc 后如何编辑并退出

在使用 vi 编辑器打开 ~/.bashrc 文件后&#xff0c;可以按照以下步骤编辑并保存退出&#xff1a; vi ~/.bashrc 按 i 进入插入模式&#xff1a; 在 vi 编辑器中&#xff0c;按 i 键将进入插入模式。在插入模式中&#xff0c;您可以编辑文本。 编辑文件&#xff1a; 在插入模…...

KVM Vcpu概述

KVM Vcpu概述 Intel VTSMP系统CPU过载使用CPU模型CPU绑定和亲和性CPU优化 Intel VT Intel的硬件虚拟化技术大致分为3类&#xff1a; 1、VT-x技术&#xff1a;是指Intel处理器中的一些虚拟化技术支持&#xff0c;包括CPU中最基础的VMX技术&#xff0c;也包括内存虚拟化的硬件支…...

linux服务器ftp部署

1、ftp服务安装 # 检查是否安装 1、查询安装列表 sudo systemctl list-unit-files --typeservice | grep ftp 2、查询ftp服务状态 sudo service vsftpd status 或者 sudo systemctl status vsftpd # yum安装&#xff0c;一般yum仓库都有ftp安装包 sudo yum install vsftpd # 启…...

NSIS 安装windows 安装包(包括QT和MFC)

NSIS&#xff08;Nullsoft Scriptable Install System&#xff09;是一个开源的 Windows 系统下安装程序制作程序。它提供了安装、卸载、系统设置、文件解压缩等功能。 基本概念 区段 是对应某种安装/卸载选项的处理逻辑&#xff0c;该段代码仅当用户选择相应的选项才被执行…...

K8S----PVPVCSC

一、简介 1、PV(persistent volume)–持久卷 PV是集群中的一块存储,可以由管理员事先静态(static)制备, 也可以使用存储类(Storage Class)来动态(dynamic)制备。 持久卷是集群资源,就像节点也是集群资源一样。PV 持久卷和普通的 Volume 一样, 也是使用卷插件(volume p…...

RSIC-V“一芯”学习笔记(一)——概述

考研的文章和资料之后想写的时候再写怕趴 文章目录 一、阶段设计二、环境、开发语言和工具三、最重要的两个观念四、处理器芯片设计五、处理器芯片设计包含很多软件问题六、处理器芯片的评价指标七、复杂系统的构建和维护八、专业世界观九&#xff0c;提问的艺术(提问模板)十、…...

MATLAB读取图片并转换为二进制数据格式

文章目录 前言一、MATLAB 文件读取方法1、文本文件读取2、二进制文件读取3、 图像文件读取4、其他文件读取 二、常用的图像处理标准图片链接三、MATLAB读取图片并转换为二进制数据格式1、matlab 源码2、运行结果 前言 本文记录使用 MATLAB 读取图片并转换为二进制数据格式的方…...

时序数据库

SELECT *,max(lp_index) FROM lp.tdm_lp_original_data where ts > 2023-12-28 18:11:33.521 and ts < 2023-12-29 19:03:12.148 INTERVAL(2s) FILL(PREV) 在时间序列数据库TDengine中&#xff0c;FILL函数与GROUP BY子句结合使用&#xff0c;提供了对于在指定间隔内…...

【第一次使用finalshell连接虚拟机内的centos】小白处理方式

第一次使用finalshell连接centos7的时候&#xff0c;因为都是新环境什么都没有配置&#xff0c;所以就需要安装finalshell和对新的centos7 进行一些配置。 安装finalshel&#xff0c;默认不安装d盘&#xff0c;就需要对安装路径做一下调整&#xff0c;其余都是下一步默认安装的…...

Pinia 踩坑记录

1、子store中如何使用router 以user.ts 这个store为例 错误写法 // 说明&#xff1a;不能使用插件实例化router&#xff0c;否则获取不到router的函数 // 错误写法如下&#xff1a;import { useRouter } from "vue-router"actions:{login(){const router useRout…...

在ASP.NET MVC中使用JQuery提供的弹出窗口(模态窗口)

在ASP.NET MVC中使用JQuery提供的弹出窗口&#xff08;模态窗口&#xff09; 原理 使用<div>图层灵活显示在浏览器的任何位置。默认情况下指定<div>不可见 引用 样式表 在JQuery的官方网站可以下载对应的css样式表。打开官网的样例页。 找到样式表引用路径 …...