当前位置: 首页 > news >正文

【大数据测试spark+kafka-详细教程(附带实例)】

大数据测试:Spark + Kafka 实时数据处理与窗口计算教程

  • 1. 概述
    • 1.1 大数据技术概述
    • 1.2 Apache Kafka 与 Spark 的结合
  • 2. 技术原理与流程
    • 2.1 Kafka 简介
    • 2.2 Spark Streaming 简介
    • 2.3 数据流动与处理流程
  • 3. 环境配置
    • 3.1 安装依赖项
  • 4. 实例:实时数据处理与窗口计算
    • 4.1 Kafka 生产者代码
    • 4.2 Spark Streaming 消费者代码
    • 4.3 解释与操作
  • 5. 运行与测试
    • 5.1 创建 Kafka Topic
    • 5.2 启动 Kafka 生产者
    • 5.3 启动 Spark Streaming 程序
    • 5.4 输出结果
  • 6. 总结

1. 概述

1.1 大数据技术概述

大数据(Big Data)指的是无法用传统数据库技术和工具进行处理和分析的超大规模数据集合。在大数据技术中,实时数据流的处理尤为重要,尤其是如何高效地对海量的实时数据进行采集、存储、处理与分析。

在这方面,Apache KafkaApache Spark 是两个关键技术。Kafka 作为分布式流处理平台,可以高效地进行实时数据流的生产和消费,而 Spark 提供了强大的分布式计算能力,尤其是其扩展的流式计算模块 Spark Streaming,非常适合处理实时数据流。

1.2 Apache Kafka 与 Spark 的结合

  • Kafka 是一个分布式消息队列,可以处理高吞吐量、低延迟的实时数据流。Kafka 被广泛用于日志收集、监控系统、实时数据传输等场景。
  • Spark 是一个统一的分析引擎,支持批量处理、流式处理和图计算。Spark Streaming 是 Spark 的一个流式处理组件,用于实时处理流数据。

通过结合 Kafka 和 Spark,我们可以实现大规模数据的实时处理、聚合和窗口计算。Spark 可以从 Kafka 消费数据流,并进行实时计算与分析,适用于诸如实时日志分析、用户行为分析、实时推荐等场景。


2. 技术原理与流程

2.1 Kafka 简介

Kafka 是一个分布式的消息队列系统,能够实现高吞吐量、可扩展性、容错性。它的基本组成包括:

  • Producer(生产者):负责向 Kafka 发送数据。
  • Consumer(消费者):从 Kafka 中消费数据。
  • Broker(代理):Kafka 的节点,每个节点负责存储消息。
  • Topic(主题):消息被组织在 Topic 中,生产者向 Topic 发送数据,消费者从 Topic 中读取数据。
  • Partition(分区):Kafka 支持水平分区,使得数据可以分布在多个 Broker 上。

2.2 Spark Streaming 简介

Spark Streaming 是 Spark 的流处理模块,它以 DStream(离散流)为基本数据结构,能够实时地处理数据流。DStream 是一个连续的 RDD(弹性分布式数据集),Spark Streaming 将实时流数据划分成一个个小的批次,使用批处理模型对这些小批次进行处理。

2.3 数据流动与处理流程

  1. Kafka Producer:将数据发送到 Kafka Topic。
  2. Kafka Broker:Kafka 集群负责存储和转发数据。
  3. Spark Streaming:通过 Kafka 的消费者接口从 Topic 中消费数据。
  4. 数据处理与计算:在 Spark Streaming 中进行数据聚合、过滤、窗口计算等操作。
  5. 输出结果:将处理后的数据输出到外部系统,如 HDFS、数据库或控制台。

3. 环境配置

3.1 安装依赖项

  1. 安装 Java:确保安装了 Java 8 或更高版本。

    检查版本:

    java -version
    
  2. 安装 Apache Spark:从 Apache Spark 官网 下载并安装 Spark。

  3. 安装 Apache Kafka:从 Kafka 官网 下载并安装 Kafka。

  4. Maven 配置:在 Java 项目中使用 Maven 作为构建工具,添加必要的 Spark 和 Kafka 依赖。

pom.xml 文件中添加 Spark 和 Kafka 的 Maven 依赖:

<dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.1</version></dependency><!-- Spark Streaming Kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.1</version></dependency><!-- Kafka Consumer --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

4. 实例:实时数据处理与窗口计算

4.1 Kafka 生产者代码

以下是一个简单的 Kafka 生产者,用于生成模拟的用户行为日志(如点击事件)并发送到 Kafka Topic logs

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 模拟用户点击日志数据String[] actions = {"click", "view", "scroll"};String[] users = {"user1", "user2", "user3"};// 向 Kafka 发送模拟数据for (int i = 0; i < 100; i++) {String user = users[i % 3];String action = actions[i % 3];String timestamp = String.valueOf(System.currentTimeMillis() / 1000);String value = user + "," + action + "," + timestamp;producer.send(new ProducerRecord<>("logs", null, value));try {Thread.sleep(1000); // 每秒发送一条数据} catch (InterruptedException e) {e.printStackTrace();}}producer.close();}
}

4.2 Spark Streaming 消费者代码

以下是一个 Spark Streaming 程序,它从 Kafka Topic logs 中消费数据并进行窗口计算,统计每个用户在过去 10 秒内的点击次数。

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.List;public class SparkKafkaWindowExample {public static void main(String[] args) throws InterruptedException {// 初始化 Spark StreamingContextJavaStreamingContext jssc = new JavaStreamingContext("local[2]", "SparkKafkaWindowExample", new Duration(2000));// Kafka 配置参数String brokers = "localhost:9092";String groupId = "spark-consumer-group";String topic = "logs";// Kafka 参数设置Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", brokers);kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", groupId);kafkaParams.put("auto.offset.reset", "latest");kafkaParams.put("enable.auto.commit", "false");List<String> topics = Arrays.asList(topic);// 从 Kafka 获取数据流JavaReceiverInputDStream<ConsumerRecord<String, String>> stream =KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams));// 处理每条记录:解析用户、动作和时间戳JavaPairRDD<String, String> userActions = stream.mapToPair(record -> {String[] fields = record.value().split(",");return new Tuple2<>(fields[0], fields[1]); // userId, action});// 定义窗口大小为 10 秒,滑动间隔为 5 秒JavaPairRDD<String, Integer> userClickCounts = userActions.window(new Duration(10000), new Duration(5000)) // 滑动窗口.reduceByKeyAndWindow((Function2<Integer, Integer, Integer>) Integer::sum,new Duration(10000), // 窗口大小:10秒new Duration(5000)   // 滑动间隔5);// 输出每个窗口的用户点击次数userClickCounts.foreachRDD(rdd -> {rdd.collect().forEach(record -> {System.out.println("User: " + record._1() + ", Click Count: " + record._2());});});// 启动流式处理jssc.start();jssc.awaitTermination();}
}

4.3 解释与操作

  • Kafka 配置:配置 Kafka 参数,连接到 Kafka 服务,订阅 Topic logs
  • 数据解析:从 Kafka 消费数据后,解析每条日志(如 user1,click,1609459200)。
  • 窗口计算:使用 window() 定义一个窗口,窗口大小为 10 秒,滑动间隔为 5 秒。使用 reduceByKeyAndWindow() 聚合每个窗口内的用户点击次数。
  • 输出结果:每 5 秒统计一次过去 10 秒内的用户点击次数,输出到控制台。

5. 运行与测试

5.1 创建 Kafka Topic

在 Kafka 中创建 Topic logs

kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

5.2 启动 Kafka 生产者

运行 Kafka 生产者代码,模拟数据发送到 Kafka:

java KafkaProducerExample

5.3 启动 Spark Streaming 程序

运行 Spark Streaming 程序,消费 Kafka 数据并执行窗口计算:

java SparkKafkaWindowExample

5.4 输出结果

每隔 5 秒输出用户的点击次数,如:

User: user1, Click Count: 3
User: user2, Click Count: 5

6. 总结

通过结合使用 Apache KafkaApache Spark,我们可以高效地处理大规模的实时数据流。Kafka 负责消息的可靠传输,而 Spark Streaming 负责实时计算和分析。使用窗口计算(如 window()reduceByKeyAndWindow()),我们可以在不同时间段内对数据进行聚合,适用于实时监控、推荐系统、用户行为分析等场景。

此架构适用于需要处理大数据、实时响应的应用程序,并能满足高吞吐量、低延迟的要求。


推荐阅读:《大数据 ETL + Flume 数据清洗》,《大数据测试 Elasticsearch》

相关文章:

【大数据测试spark+kafka-详细教程(附带实例)】

大数据测试&#xff1a;Spark Kafka 实时数据处理与窗口计算教程 1. 概述1.1 大数据技术概述1.2 Apache Kafka 与 Spark 的结合 2. 技术原理与流程2.1 Kafka 简介2.2 Spark Streaming 简介2.3 数据流动与处理流程 3. 环境配置3.1 安装依赖项 4. 实例&#xff1a;实时数据处理与…...

如何为 GitHub 和 Gitee 项目配置不同的 Git 用户信息20241105

&#x1f3af; 如何为 GitHub 和 Gitee 项目配置不同的 Git 用户信息 引言 在多个代码托管平台&#xff08;如 GitHub 和 Gitee&#xff09;之间切换时&#xff0c;正确管理用户信息至关重要。频繁使用不同项目时&#xff0c;若用户配置不当&#xff0c;可能会导致意外提交或…...

【Lucene】原理学习路线

基于《Lucene原理与代码分析完整版》&#xff0c;借助chatgpt等大模型&#xff0c;制定了一个系统学习Lucene原理的计划&#xff0c;并将每个阶段的学习内容组织成专栏文章&#xff0c;zero2hero 手搓 Lucene的核心概念和实现细节。 深入的学习和专栏计划&#xff0c;覆盖Lucen…...

Go语言的并发安全与互斥锁

线程通讯 在程序中不可避免的出现并发或者并行&#xff0c;一般来说对于一个程序大多数是遵循开发语言的启动顺序。例如&#xff0c;对于go语言来说&#xff0c;一般入口为main&#xff0c;main中依次导入import导入的包&#xff0c;并按顺序执行init方法&#xff0c;之后在按…...

SpringBoot框架在资产管理中的应用

3系统分析 3.1可行性分析 通过对本企业资产管理系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本企业资产管理系统采用Spring Boot框架&#xff0c;JAVA作…...

ElasticSearch备考 -- 集群配置常见问题

一、集群开启xpack安全配置后无法启动 在配置文件中增加 xpack.security.enabled: true 后无法启动&#xff0c;日志中提示如下 Transport SSL must be enabled if security is enabled. Please set [xpack.security.transport.ssl.enabled] to [true] or disable security b…...

【UE5】一种老派的假反射做法,可以用于移动端,或对反射的速度、清晰度有需求的地方

没想到大家这篇文章呼声还挺高 这篇文章是对它的详细实现&#xff0c;建议在阅读本篇之前&#xff0c;先浏览一下前面的文章&#xff0c;以便更好地理解和掌握内容。 这种老派的假反射技术&#xff0c;适合用于移动端或对反射效果的速度和清晰度有较高要求的场合。该技术通过一…...

FasterNet中Pconv的实现、效果与作用分析

发表时间&#xff1a;2023年3月7日 论文地址&#xff1a;https://arxiv.org/abs/2303.03667 项目地址&#xff1a;https://github.com/JierunChen/FasterNet FasterNet-t0在GPU、CPU和ARM处理器上分别比MobileViT-XXS快2.8、3.3和2.4&#xff0c;而准确率要高2.9%。我们的大型…...

QToolbar工具栏下拉菜单不弹出有小箭头

这里说了怎么弹出&#xff1a;Qt 工具栏QToolBar添加带有弹出菜单的QAction_qt如何将action添加到工具栏-CSDN博客 然后如果你是在UI里面建立的action&#xff0c;并拖到了toolbar&#xff0c;并在代码中设置菜单&#xff0c;例如&#xff1a; ui->mytoolbar->setMenu(…...

w025基于SpringBoot网上超市的设计与实现

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0…...

深度学习在推荐系统中的应用

参考自《深度学习推荐系统》&#xff0c;用于学习和记录。 前言 &#xff08;1&#xff09;与传统的机器学习模型相比&#xff0c;深度学习模型的表达能力更强&#xff0c;能够挖掘&#xff08;2&#xff09;深度学习的模型结构非常灵活&#xff0c;能够根据业务场景和数据特…...

软考系统架构设计师论文:论面向对象的建模及应用

试题三 论面向对象的建模及应用 软件系统建模是软件开发中的重要环节,通过构建软件系统模型可以帮助系统开发人员理解系统、抽取业务过程和管理系统的复杂性,也可以方便各类人员之间的交流。软件系统建模是在系统需求分析和系统实现之间架起的一座桥梁,系统开发人员按照软件…...

LSM-TREE和SSTable

一、什么是LSM-TREE LSM Tree 是一种高效的写优化数据结构&#xff0c;专门用于处理大量写入操作 在一些写多读少的场景&#xff0c;为了加快写磁盘的速度&#xff0c;提出使用日志文件追加顺序写&#xff0c;加快写的速度&#xff0c;减少随机读写。但是日志文件只能遍历查询…...

mysql 升级

# 备份数据库数据 mysqldump -u root -p --single-transaction --all-databases > backup20240830.sql; # 备份mysql数据目录&#xff1a; cp -r /data/mysql mysql20240902 # 备份mysql配置文件my.cnf cp -r /etc/my.cnf my.cnf20240902 systemctl stop mysqld tar -x…...

基于Multisim定时器倒计时器电路0-999计时计数(含仿真和报告)

【全套资料.zip】定时器倒计时器电路Multisim仿真设计数字电子技术 文章目录 功能一、Multisim仿真源文件二、原理文档报告资料下载【Multisim仿真报告讲解视频.zip】 功能 1.0-999秒定时功能&#xff0c;计时间隔1秒&#xff0c;数字显示。 2. 进行0-999秒减计时&#xff0c…...

力扣11.5

1035. 不相交的线 在两条独立的水平线上按给定的顺序写下 nums1 和 nums2 中的整数。 现在&#xff0c;可以绘制一些连接两个数字 nums1[i] 和 nums2[j] 的直线&#xff0c;这些直线需要同时满足&#xff1a; nums1[i] nums2[j]且绘制的直线不与任何其他连线&#xff08;非…...

arkUI:层叠布局(Stack)

arkUI&#xff1a;层叠布局&#xff08;Stack&#xff09; 1 主要内容说明2 相关内容2.1 层叠布局&#xff08;Stack&#xff09;2.1.1 源码1的相关说明2.1.2 源码1 &#xff08;层叠布局&#xff09;2.1.3 源码1运行效果2.1.3.1 当alignContent: Alignment.Bottom2.1.3.2 当al…...

【LeetCode】【算法】221. 最大正方形

LeetCode 221. 最大正方形 题目描述 在一个由 ‘0’ 和 ‘1’ 组成的二维矩阵内&#xff0c;找到只包含 ‘1’ 的最大正方形&#xff0c;并返回其面积。 思路 思路&#xff1a;动态规划。初始化时&#xff0c;第0列和第0行&#xff0c;若nums[i][j]1则dp[i][j]初始化为1&am…...

怎麼解除IP阻止和封禁?

IP地址被阻止的原因 安全問題如果有人使用 IP 地址試圖侵入某個網站或導致其他安全問題&#xff0c;則可能會禁止該 IP 以保護該網站。濫用或垃圾郵件如果IP地址發送過多垃圾郵件、發佈不當內容或濫用網站服務&#xff0c;則可能會被禁止&#xff0c;以保持網站清潔和友好。違…...

O-RAN Fronthual CU/Sync/Mgmt 平面和协议栈

O-RAN Fronthual CU/Sync/Mgmt 平面和协议栈 O-RAN Fronthual CU/Sync/Mgmt 平面和协议栈O-RAN前端O-RAN 前传平面C-Plane&#xff08;控制平面&#xff09;&#xff1a;控制平面消息定义数据传输、波束形成等所需的调度、协调。U-Plane&#xff08;用户平面&#xff09;&#…...

系统焕新:Win11Debloat工具让Windows性能提升51%的全方位优化方案

系统焕新&#xff1a;Win11Debloat工具让Windows性能提升51%的全方位优化方案 【免费下载链接】Win11Debloat 一个简单的PowerShell脚本&#xff0c;用于从Windows中移除预装的无用软件&#xff0c;禁用遥测&#xff0c;从Windows搜索中移除Bing&#xff0c;以及执行各种其他更…...

tkinter表格神器tkintertable实战:5分钟搞定可拖拽编辑的数据表格(附完整代码)

tkinter表格神器tkintertable实战&#xff1a;5分钟搞定可拖拽编辑的数据表格&#xff08;附完整代码&#xff09; 在Python GUI开发中&#xff0c;表格控件一直是刚需但实现起来又颇为棘手的组件。传统tkinter自带的Treeview虽然能勉强实现表格功能&#xff0c;但在交互体验上…...

YOLOv11分割模型实战:用C++和ONNXRuntime解析‘output0’和‘output1’双输出,实现像素级颜色分析

YOLOv11分割模型实战&#xff1a;C与ONNXRuntime双输出解析与像素级颜色分析 在计算机视觉领域&#xff0c;目标检测与实例分割技术的结合正成为工业应用的新标准。YOLOv11作为YOLO系列的最新成员&#xff0c;不仅延续了其高效检测的特性&#xff0c;更通过双输出结构实现了精准…...

UE4/UE5碰撞事件全解:从Overlap到Hit的7个必知配置项

UE4/UE5碰撞系统深度解析&#xff1a;从基础配置到实战避坑指南 在虚幻引擎开发中&#xff0c;碰撞系统是构建交互体验的核心支柱之一。无论是角色移动、物体交互还是战斗判定&#xff0c;都离不开精准的碰撞检测机制。本文将深入剖析UE4/UE5中Overlap与Hit事件的本质区别&…...

寻音捉影·侠客行多场景落地:覆盖会议/媒体/司法/金融/教育五大垂直领域

寻音捉影侠客行多场景落地&#xff1a;覆盖会议/媒体/司法/金融/教育五大垂直领域 1. 产品核心功能解析 寻音捉影侠客行是一款基于先进语音识别技术的音频关键词检索工具&#xff0c;它能够像江湖中的隐士高手一样&#xff0c;在浩瀚的音频海洋中精准定位特定关键词。这款工具…...

Burp Suite实战进阶:用LingJing内置的burp-labs靶机打通从入门到专家22关(含解题思路)

Burp Suite实战进阶&#xff1a;用LingJing内置的burp-labs靶机打通从入门到专家22关&#xff08;含解题思路&#xff09; 在网络安全领域&#xff0c;Burp Suite无疑是渗透测试工程师最得力的工具之一。然而&#xff0c;很多学习者在掌握了基础操作后&#xff0c;往往会陷入&q…...

使用PyTorch Lightning优化PETRV2-BEV模型训练流程

使用PyTorch Lightning优化PETRV2-BEV模型训练流程 如果你正在训练像PETRV2这样的BEV感知模型&#xff0c;可能已经体会过那种“一步一坑”的感觉。数据加载复杂、多GPU训练配置繁琐、日志记录混乱、实验难以复现……这些工程上的琐事&#xff0c;常常比模型本身更让人头疼。 …...

【字节/阿里/微软Python高级岗内部题库】:GIL移除过渡期必须掌握的7种无锁并发模式

第一章&#xff1a;GIL移除背景与无锁并发演进全景图Python 的全局解释器锁&#xff08;GIL&#xff09;长期被视为多核 CPU 利用率的瓶颈&#xff0c;尤其在 CPU 密集型场景下&#xff0c;线程无法真正并行执行。近年来&#xff0c;CPython 社区启动了 GIL 移除&#xff08;GI…...

打破学术写作边界:NativeOverleaf离线工作流全解析

打破学术写作边界&#xff1a;NativeOverleaf离线工作流全解析 【免费下载链接】NativeOverleaf Next-level academia! Repository for the Native Overleaf project, attempting to integrate Overleaf with native OS features for macOS, Linux and Windows. 项目地址: ht…...

Trae平台实战:我如何教会一个AI智能体应对动态网页和反爬虫?

Trae平台实战&#xff1a;动态网页抓取与反爬策略的智能应对之道 在数据驱动的商业环境中&#xff0c;网页抓取技术已成为企业获取竞争优势的关键能力。然而&#xff0c;随着网站防护技术的升级&#xff0c;传统爬虫在面对动态加载内容和复杂反爬机制时往往力不从心。本文将分享…...