当前位置: 首页 > news >正文

【大数据测试spark+kafka-详细教程(附带实例)】

大数据测试:Spark + Kafka 实时数据处理与窗口计算教程

  • 1. 概述
    • 1.1 大数据技术概述
    • 1.2 Apache Kafka 与 Spark 的结合
  • 2. 技术原理与流程
    • 2.1 Kafka 简介
    • 2.2 Spark Streaming 简介
    • 2.3 数据流动与处理流程
  • 3. 环境配置
    • 3.1 安装依赖项
  • 4. 实例:实时数据处理与窗口计算
    • 4.1 Kafka 生产者代码
    • 4.2 Spark Streaming 消费者代码
    • 4.3 解释与操作
  • 5. 运行与测试
    • 5.1 创建 Kafka Topic
    • 5.2 启动 Kafka 生产者
    • 5.3 启动 Spark Streaming 程序
    • 5.4 输出结果
  • 6. 总结

1. 概述

1.1 大数据技术概述

大数据(Big Data)指的是无法用传统数据库技术和工具进行处理和分析的超大规模数据集合。在大数据技术中,实时数据流的处理尤为重要,尤其是如何高效地对海量的实时数据进行采集、存储、处理与分析。

在这方面,Apache KafkaApache Spark 是两个关键技术。Kafka 作为分布式流处理平台,可以高效地进行实时数据流的生产和消费,而 Spark 提供了强大的分布式计算能力,尤其是其扩展的流式计算模块 Spark Streaming,非常适合处理实时数据流。

1.2 Apache Kafka 与 Spark 的结合

  • Kafka 是一个分布式消息队列,可以处理高吞吐量、低延迟的实时数据流。Kafka 被广泛用于日志收集、监控系统、实时数据传输等场景。
  • Spark 是一个统一的分析引擎,支持批量处理、流式处理和图计算。Spark Streaming 是 Spark 的一个流式处理组件,用于实时处理流数据。

通过结合 Kafka 和 Spark,我们可以实现大规模数据的实时处理、聚合和窗口计算。Spark 可以从 Kafka 消费数据流,并进行实时计算与分析,适用于诸如实时日志分析、用户行为分析、实时推荐等场景。


2. 技术原理与流程

2.1 Kafka 简介

Kafka 是一个分布式的消息队列系统,能够实现高吞吐量、可扩展性、容错性。它的基本组成包括:

  • Producer(生产者):负责向 Kafka 发送数据。
  • Consumer(消费者):从 Kafka 中消费数据。
  • Broker(代理):Kafka 的节点,每个节点负责存储消息。
  • Topic(主题):消息被组织在 Topic 中,生产者向 Topic 发送数据,消费者从 Topic 中读取数据。
  • Partition(分区):Kafka 支持水平分区,使得数据可以分布在多个 Broker 上。

2.2 Spark Streaming 简介

Spark Streaming 是 Spark 的流处理模块,它以 DStream(离散流)为基本数据结构,能够实时地处理数据流。DStream 是一个连续的 RDD(弹性分布式数据集),Spark Streaming 将实时流数据划分成一个个小的批次,使用批处理模型对这些小批次进行处理。

2.3 数据流动与处理流程

  1. Kafka Producer:将数据发送到 Kafka Topic。
  2. Kafka Broker:Kafka 集群负责存储和转发数据。
  3. Spark Streaming:通过 Kafka 的消费者接口从 Topic 中消费数据。
  4. 数据处理与计算:在 Spark Streaming 中进行数据聚合、过滤、窗口计算等操作。
  5. 输出结果:将处理后的数据输出到外部系统,如 HDFS、数据库或控制台。

3. 环境配置

3.1 安装依赖项

  1. 安装 Java:确保安装了 Java 8 或更高版本。

    检查版本:

    java -version
    
  2. 安装 Apache Spark:从 Apache Spark 官网 下载并安装 Spark。

  3. 安装 Apache Kafka:从 Kafka 官网 下载并安装 Kafka。

  4. Maven 配置:在 Java 项目中使用 Maven 作为构建工具,添加必要的 Spark 和 Kafka 依赖。

pom.xml 文件中添加 Spark 和 Kafka 的 Maven 依赖:

<dependencies><!-- Spark Core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version></dependency><!-- Spark Streaming --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.1</version></dependency><!-- Spark Streaming Kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.3.1</version></dependency><!-- Kafka Consumer --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

4. 实例:实时数据处理与窗口计算

4.1 Kafka 生产者代码

以下是一个简单的 Kafka 生产者,用于生成模拟的用户行为日志(如点击事件)并发送到 Kafka Topic logs

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 模拟用户点击日志数据String[] actions = {"click", "view", "scroll"};String[] users = {"user1", "user2", "user3"};// 向 Kafka 发送模拟数据for (int i = 0; i < 100; i++) {String user = users[i % 3];String action = actions[i % 3];String timestamp = String.valueOf(System.currentTimeMillis() / 1000);String value = user + "," + action + "," + timestamp;producer.send(new ProducerRecord<>("logs", null, value));try {Thread.sleep(1000); // 每秒发送一条数据} catch (InterruptedException e) {e.printStackTrace();}}producer.close();}
}

4.2 Spark Streaming 消费者代码

以下是一个 Spark Streaming 程序,它从 Kafka Topic logs 中消费数据并进行窗口计算,统计每个用户在过去 10 秒内的点击次数。

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import org.apache.kafka.clients.consumer.ConsumerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import java.util.List;public class SparkKafkaWindowExample {public static void main(String[] args) throws InterruptedException {// 初始化 Spark StreamingContextJavaStreamingContext jssc = new JavaStreamingContext("local[2]", "SparkKafkaWindowExample", new Duration(2000));// Kafka 配置参数String brokers = "localhost:9092";String groupId = "spark-consumer-group";String topic = "logs";// Kafka 参数设置Map<String, Object> kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", brokers);kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("group.id", groupId);kafkaParams.put("auto.offset.reset", "latest");kafkaParams.put("enable.auto.commit", "false");List<String> topics = Arrays.asList(topic);// 从 Kafka 获取数据流JavaReceiverInputDStream<ConsumerRecord<String, String>> stream =KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams));// 处理每条记录:解析用户、动作和时间戳JavaPairRDD<String, String> userActions = stream.mapToPair(record -> {String[] fields = record.value().split(",");return new Tuple2<>(fields[0], fields[1]); // userId, action});// 定义窗口大小为 10 秒,滑动间隔为 5 秒JavaPairRDD<String, Integer> userClickCounts = userActions.window(new Duration(10000), new Duration(5000)) // 滑动窗口.reduceByKeyAndWindow((Function2<Integer, Integer, Integer>) Integer::sum,new Duration(10000), // 窗口大小:10秒new Duration(5000)   // 滑动间隔5);// 输出每个窗口的用户点击次数userClickCounts.foreachRDD(rdd -> {rdd.collect().forEach(record -> {System.out.println("User: " + record._1() + ", Click Count: " + record._2());});});// 启动流式处理jssc.start();jssc.awaitTermination();}
}

4.3 解释与操作

  • Kafka 配置:配置 Kafka 参数,连接到 Kafka 服务,订阅 Topic logs
  • 数据解析:从 Kafka 消费数据后,解析每条日志(如 user1,click,1609459200)。
  • 窗口计算:使用 window() 定义一个窗口,窗口大小为 10 秒,滑动间隔为 5 秒。使用 reduceByKeyAndWindow() 聚合每个窗口内的用户点击次数。
  • 输出结果:每 5 秒统计一次过去 10 秒内的用户点击次数,输出到控制台。

5. 运行与测试

5.1 创建 Kafka Topic

在 Kafka 中创建 Topic logs

kafka-topics.sh --create --topic logs --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

5.2 启动 Kafka 生产者

运行 Kafka 生产者代码,模拟数据发送到 Kafka:

java KafkaProducerExample

5.3 启动 Spark Streaming 程序

运行 Spark Streaming 程序,消费 Kafka 数据并执行窗口计算:

java SparkKafkaWindowExample

5.4 输出结果

每隔 5 秒输出用户的点击次数,如:

User: user1, Click Count: 3
User: user2, Click Count: 5

6. 总结

通过结合使用 Apache KafkaApache Spark,我们可以高效地处理大规模的实时数据流。Kafka 负责消息的可靠传输,而 Spark Streaming 负责实时计算和分析。使用窗口计算(如 window()reduceByKeyAndWindow()),我们可以在不同时间段内对数据进行聚合,适用于实时监控、推荐系统、用户行为分析等场景。

此架构适用于需要处理大数据、实时响应的应用程序,并能满足高吞吐量、低延迟的要求。


推荐阅读:《大数据 ETL + Flume 数据清洗》,《大数据测试 Elasticsearch》

相关文章:

【大数据测试spark+kafka-详细教程(附带实例)】

大数据测试&#xff1a;Spark Kafka 实时数据处理与窗口计算教程 1. 概述1.1 大数据技术概述1.2 Apache Kafka 与 Spark 的结合 2. 技术原理与流程2.1 Kafka 简介2.2 Spark Streaming 简介2.3 数据流动与处理流程 3. 环境配置3.1 安装依赖项 4. 实例&#xff1a;实时数据处理与…...

如何为 GitHub 和 Gitee 项目配置不同的 Git 用户信息20241105

&#x1f3af; 如何为 GitHub 和 Gitee 项目配置不同的 Git 用户信息 引言 在多个代码托管平台&#xff08;如 GitHub 和 Gitee&#xff09;之间切换时&#xff0c;正确管理用户信息至关重要。频繁使用不同项目时&#xff0c;若用户配置不当&#xff0c;可能会导致意外提交或…...

【Lucene】原理学习路线

基于《Lucene原理与代码分析完整版》&#xff0c;借助chatgpt等大模型&#xff0c;制定了一个系统学习Lucene原理的计划&#xff0c;并将每个阶段的学习内容组织成专栏文章&#xff0c;zero2hero 手搓 Lucene的核心概念和实现细节。 深入的学习和专栏计划&#xff0c;覆盖Lucen…...

Go语言的并发安全与互斥锁

线程通讯 在程序中不可避免的出现并发或者并行&#xff0c;一般来说对于一个程序大多数是遵循开发语言的启动顺序。例如&#xff0c;对于go语言来说&#xff0c;一般入口为main&#xff0c;main中依次导入import导入的包&#xff0c;并按顺序执行init方法&#xff0c;之后在按…...

SpringBoot框架在资产管理中的应用

3系统分析 3.1可行性分析 通过对本企业资产管理系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本企业资产管理系统采用Spring Boot框架&#xff0c;JAVA作…...

ElasticSearch备考 -- 集群配置常见问题

一、集群开启xpack安全配置后无法启动 在配置文件中增加 xpack.security.enabled: true 后无法启动&#xff0c;日志中提示如下 Transport SSL must be enabled if security is enabled. Please set [xpack.security.transport.ssl.enabled] to [true] or disable security b…...

【UE5】一种老派的假反射做法,可以用于移动端,或对反射的速度、清晰度有需求的地方

没想到大家这篇文章呼声还挺高 这篇文章是对它的详细实现&#xff0c;建议在阅读本篇之前&#xff0c;先浏览一下前面的文章&#xff0c;以便更好地理解和掌握内容。 这种老派的假反射技术&#xff0c;适合用于移动端或对反射效果的速度和清晰度有较高要求的场合。该技术通过一…...

FasterNet中Pconv的实现、效果与作用分析

发表时间&#xff1a;2023年3月7日 论文地址&#xff1a;https://arxiv.org/abs/2303.03667 项目地址&#xff1a;https://github.com/JierunChen/FasterNet FasterNet-t0在GPU、CPU和ARM处理器上分别比MobileViT-XXS快2.8、3.3和2.4&#xff0c;而准确率要高2.9%。我们的大型…...

QToolbar工具栏下拉菜单不弹出有小箭头

这里说了怎么弹出&#xff1a;Qt 工具栏QToolBar添加带有弹出菜单的QAction_qt如何将action添加到工具栏-CSDN博客 然后如果你是在UI里面建立的action&#xff0c;并拖到了toolbar&#xff0c;并在代码中设置菜单&#xff0c;例如&#xff1a; ui->mytoolbar->setMenu(…...

w025基于SpringBoot网上超市的设计与实现

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0…...

深度学习在推荐系统中的应用

参考自《深度学习推荐系统》&#xff0c;用于学习和记录。 前言 &#xff08;1&#xff09;与传统的机器学习模型相比&#xff0c;深度学习模型的表达能力更强&#xff0c;能够挖掘&#xff08;2&#xff09;深度学习的模型结构非常灵活&#xff0c;能够根据业务场景和数据特…...

软考系统架构设计师论文:论面向对象的建模及应用

试题三 论面向对象的建模及应用 软件系统建模是软件开发中的重要环节,通过构建软件系统模型可以帮助系统开发人员理解系统、抽取业务过程和管理系统的复杂性,也可以方便各类人员之间的交流。软件系统建模是在系统需求分析和系统实现之间架起的一座桥梁,系统开发人员按照软件…...

LSM-TREE和SSTable

一、什么是LSM-TREE LSM Tree 是一种高效的写优化数据结构&#xff0c;专门用于处理大量写入操作 在一些写多读少的场景&#xff0c;为了加快写磁盘的速度&#xff0c;提出使用日志文件追加顺序写&#xff0c;加快写的速度&#xff0c;减少随机读写。但是日志文件只能遍历查询…...

mysql 升级

# 备份数据库数据 mysqldump -u root -p --single-transaction --all-databases > backup20240830.sql; # 备份mysql数据目录&#xff1a; cp -r /data/mysql mysql20240902 # 备份mysql配置文件my.cnf cp -r /etc/my.cnf my.cnf20240902 systemctl stop mysqld tar -x…...

基于Multisim定时器倒计时器电路0-999计时计数(含仿真和报告)

【全套资料.zip】定时器倒计时器电路Multisim仿真设计数字电子技术 文章目录 功能一、Multisim仿真源文件二、原理文档报告资料下载【Multisim仿真报告讲解视频.zip】 功能 1.0-999秒定时功能&#xff0c;计时间隔1秒&#xff0c;数字显示。 2. 进行0-999秒减计时&#xff0c…...

力扣11.5

1035. 不相交的线 在两条独立的水平线上按给定的顺序写下 nums1 和 nums2 中的整数。 现在&#xff0c;可以绘制一些连接两个数字 nums1[i] 和 nums2[j] 的直线&#xff0c;这些直线需要同时满足&#xff1a; nums1[i] nums2[j]且绘制的直线不与任何其他连线&#xff08;非…...

arkUI:层叠布局(Stack)

arkUI&#xff1a;层叠布局&#xff08;Stack&#xff09; 1 主要内容说明2 相关内容2.1 层叠布局&#xff08;Stack&#xff09;2.1.1 源码1的相关说明2.1.2 源码1 &#xff08;层叠布局&#xff09;2.1.3 源码1运行效果2.1.3.1 当alignContent: Alignment.Bottom2.1.3.2 当al…...

【LeetCode】【算法】221. 最大正方形

LeetCode 221. 最大正方形 题目描述 在一个由 ‘0’ 和 ‘1’ 组成的二维矩阵内&#xff0c;找到只包含 ‘1’ 的最大正方形&#xff0c;并返回其面积。 思路 思路&#xff1a;动态规划。初始化时&#xff0c;第0列和第0行&#xff0c;若nums[i][j]1则dp[i][j]初始化为1&am…...

怎麼解除IP阻止和封禁?

IP地址被阻止的原因 安全問題如果有人使用 IP 地址試圖侵入某個網站或導致其他安全問題&#xff0c;則可能會禁止該 IP 以保護該網站。濫用或垃圾郵件如果IP地址發送過多垃圾郵件、發佈不當內容或濫用網站服務&#xff0c;則可能會被禁止&#xff0c;以保持網站清潔和友好。違…...

O-RAN Fronthual CU/Sync/Mgmt 平面和协议栈

O-RAN Fronthual CU/Sync/Mgmt 平面和协议栈 O-RAN Fronthual CU/Sync/Mgmt 平面和协议栈O-RAN前端O-RAN 前传平面C-Plane&#xff08;控制平面&#xff09;&#xff1a;控制平面消息定义数据传输、波束形成等所需的调度、协调。U-Plane&#xff08;用户平面&#xff09;&#…...

CC2530与ESP8266物联网网关:ZigBee转Wi-Fi通信协议转换实战

1. 项目概述&#xff1a;当ZigBee遇上Wi-Fi最近在折腾一个智能家居的传感器节点&#xff0c;核心是TI的CC2530 ZigBee芯片。这玩意儿功耗低、组网方便&#xff0c;是很多低功耗传感网络的绝佳选择。但问题来了&#xff0c;ZigBee网络的数据最终怎么方便地送到我们手机上去看呢&…...

PromptCraft-Robotics:基于LLM的机器人任务规划与安全控制实践

1. 项目概述与核心价值最近在机器人编程和AI应用领域&#xff0c;一个名为“PromptCraft-Robotics”的项目在开发者社区里引起了不小的讨论。这个项目由微软开源&#xff0c;其核心目标直指一个困扰许多开发者和研究者的痛点&#xff1a;如何让大型语言模型&#xff08;LLM&…...

容器化技术实战:从Docker到Kubernetes的体系化学习路径

1. 项目概述&#xff1a;一个容器化时代的“瑞士军刀”训练营 如果你正在或即将踏入容器化技术领域&#xff0c;无论是刚接触Docker的新手&#xff0c;还是想系统梳理Kubernetes的开发者&#xff0c;又或者是需要为团队进行技术培训的架构师&#xff0c;那么“jpetazzo/contai…...

渠道输水控制系统模型在环测试【附仿真】

✨ 长期致力于渠道输水、水动力数值模拟、控制系统、模型在环测试、胶东调水工程研究工作&#xff0c;擅长数据搜集与处理、建模仿真、程序编写、仿真设计。 ✅ 专业定制毕设、代码 ✅ 如需沟通交流&#xff0c;点击《获取方式》 &#xff08;1&#xff09;Preissmann四点隐式格…...

嘎嘎降AI和率零哪个更适合毕业论文:2026年性价比达标率用户口碑完整横评测试报告

嘎嘎降AI和率零哪个更适合毕业论文&#xff1a;2026年性价比达标率用户口碑完整横评测试报告 帮几个不同专业的同学处理过论文AI率&#xff0c;用过的工具加起来也有六七款了。 综合看&#xff0c;嘎嘎降AI&#xff08;www.aigcleaner.com&#xff09;是最稳的选择&#xff0…...

Spring Kafka监听多个Topic时,如何避免消费者‘摸鱼’?聊聊Range和RoundRobin分配策略的选择

Spring Kafka多Topic监听场景下消费者分配策略深度优化 1. 问题背景&#xff1a;当消费者开始"摸鱼" 在分布式消息系统中&#xff0c;Kafka凭借其高吞吐、低延迟的特性成为众多企业的首选。然而在实际开发中&#xff0c;不少团队遇到过这样的尴尬场景&#xff1a;明明…...

Horos:让医学影像分析像翻阅相册一样简单

Horos&#xff1a;让医学影像分析像翻阅相册一样简单 【免费下载链接】horos Horos™ is a free, open source medical image viewer. The goal of the Horos Project is to develop a fully functional, 64-bit medical image viewer for OS X. Horos is based upon OsiriX an…...

探索Mod Assistant:Beat Saber模组管理工具的高效解决方案

探索Mod Assistant&#xff1a;Beat Saber模组管理工具的高效解决方案 【免费下载链接】ModAssistant Simple Beat Saber Mod Installer 项目地址: https://gitcode.com/gh_mirrors/mo/ModAssistant Beat Saber模组管理工具Mod Assistant是一款专为PC版Beat Saber设计的…...

手工打造柔性LED眼罩:从SMD焊接入门到可穿戴电路实践

1. 项目概述&#xff1a;从零打造你的赛博格之眼如果你和我一样&#xff0c;对《银翼杀手》里那些闪烁着冷光的义眼&#xff0c;或是赛博朋克美学中标志性的发光装饰着迷&#xff0c;那么亲手制作一个属于自己的LED眼罩&#xff0c;绝对是一次令人兴奋的旅程。这不仅仅是一个酷…...

避坑指南:HugeGraph-Server 0.12.0 用MySQL做后端存储,配置文件到底怎么改?(附完整流程)

HugeGraph-Server 0.12.0 MySQL后端配置深度解析与实战避坑指南 当选择MySQL作为HugeGraph-Server的后端存储时&#xff0c;配置文件的细微差异往往成为项目落地的"拦路虎"。本文将深入剖析hugegraph.properties中MySQL相关配置的每一个关键参数&#xff0c;结合典型…...