当前位置: 首页 > news >正文

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

在本文中,将介绍如何构建一个实时数据pipeline,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能,以及Kafka和HDFS作为我们的数据传输和存储工具。
1、环境设置:
首先,确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>spark_project</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12.12</scala.version><spark.version>3.2.0</spark.version><kafka.version>2.8.1</kafka.version></properties><dependencies><!-- Spark dependencies --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka dependencies --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><!-- Scala library --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency>   </dependencies>
</project>

mysql中表结构:
在这里插入图片描述

2、从MySQL读取数据到Kafka:
我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据,并将其转换为JSON格式,然后将数据写入到Kafka主题中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfsimport org.apache.spark.sql.SparkSessionimport java.util.Propertiesobject Mysql2Kafka {def main(args: Array[String]): Unit = {// 创建 SparkSessionval spark = SparkSession.builder().appName("MySQLToKafka").master("local[*]").getOrCreate()// 设置 MySQL 连接属性val mysqlProps = new Properties()mysqlProps.setProperty("user", "root")mysqlProps.setProperty("password", "12345678")mysqlProps.setProperty("driver", "com.mysql.jdbc.Driver")// 从 MySQL 数据库中读取数据val jdbcDF = spark.read.jdbc("jdbc:mysql://localhost:3306/mydb", "comment", mysqlProps)// 将 DataFrame 转换为 JSON 字符串val jsonDF = jdbcDF.selectExpr("to_json(struct(*)) AS value")// 将数据写入 KafkajsonDF.show()jsonDF.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "comment").save()// 停止 SparkSessionspark.stop()}}

以上代码首先创建了一个SparkSession,然后设置了连接MySQL所需的属性。接着,它使用jdbc.read从MySQL数据库中读取数据,并将数据转换为JSON格式,最后将数据写入到名为"comment"的Kafka主题中。提示:topic主题会被自动创建。

从Kafka消费数据并写入HDFS:
接下来,我们将设置Spark Streaming来消费Kafka中的数据,并将数据保存到HDFS中。以下是相应的Scala代码:

package org.example.mysql2kafka2hdfsimport com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}case class Comment(author_name:String,fans:String,comment_text:String,comment_time:String,location:String,user_gender:String)object kafka2Hdfs {def main(args: Array[String]): Unit = {// 设置 SparkConfval sparkConf = new SparkConf().setAppName("KafkaToHDFS").setMaster("local[*]")// 创建 StreamingContext,每秒处理一次val ssc = new StreamingContext(sparkConf, Seconds(1))// 设置 Kafka 相关参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092", // Kafka broker 地址"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "spark-consumer-group", // Spark 消费者组"auto.offset.reset" -> "earliest", // 从最新的偏移量开始消费"enable.auto.commit" -> (false: java.lang.Boolean) // 不自动提交偏移量)// 设置要订阅的 Kafka 主题val topics = Array("comment")// 创建 Kafka Direct Streamval stream = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 从 Kafka 中读取消息,然后将其写入 HDFSstream.map({rdd=>val comment = JSON.parseObject(rdd.toString(), classOf[Comment])comment.author_name+","+comment.comment_text+","+comment.comment_time+","+comment.fans+","+comment.location+","+comment.user_gender}).foreachRDD { rdd =>if (!rdd.isEmpty()) {println(rdd)rdd.saveAsTextFile("hdfs://hadoop101:8020/tmp/")}}// 启动 Spark Streamingssc.start()ssc.awaitTermination()}}

以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象,并将其保存为逗号分隔的文本文件,最终存储在HDFS的/tmp目录中。
在这里插入图片描述

结论:
通过本文的介绍和示例代码,您现在应该了解如何使用Apache Spark构建一个实时数据流水线,从MySQL数据库读取数据,通过Kafka传输数据,最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。

**如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业,学生毕设等。不限于python,java,大数据,模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 **
在这里插入图片描述

相关文章:

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移

使用Apache Spark从MySQL到Kafka再到HDFS的数据转移 在本文中&#xff0c;将介绍如何构建一个实时数据pipeline&#xff0c;从MySQL数据库读取数据&#xff0c;通过Kafka传输数据&#xff0c;最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能&#…...

一篇文章拿下Redis 通用命令

文章目录 Redis数据结构介绍Redis 通用命令命令演示KEYSDELEXISTSEXPIRE RedisTemplate 中的通用命令 本篇文章介绍 Redis 的通用命令, 通用命令在 Redis 的所有数据类型下都使用, 学好通用命令可以让我们更好的使用 Redis. Redis数据结构介绍 Redis 是一个key-value的数据库&…...

锂电池充电充放电曲线分析

前言 锂电池的充电曲线通常包括三个阶段:恒流充电阶段、恒压充电阶段和滞后充电阶段。在恒流充电阶段,电流保持恒定,电压逐渐增加;在恒压充电阶段,电压保持恒定,电流逐渐减小;在滞后充电阶段,电流进一步减小,电池开始充满。通过监测这些阶段的电流和电压变化,可以评…...

vue3 第二十九节 (vue3 事件循环之nextTick)

引言 vue 项目中为什么要使用 nextTick 这个函数&#xff0c;是做什么用的&#xff0c;解决了哪些问题 1、nextTick 作用 用于处理DOM更新完成之后&#xff0c;执行回调函数的方法&#xff1b; 2、实现方案 vue2 中 nextTick() 是基于浏览器的 异步队列和微任务队列而执行…...

使用Flask-SocketIO构建实时Web应用

文章目录 准备工作编写代码编写HTML模板运行应用 随着互联网的发展&#xff0c;实时性成为了许多Web应用的重要需求之一。传统的HTTP协议虽然可以实现实时通信&#xff0c;但是其长轮询等机制效率低下&#xff0c;无法满足高并发、低延迟的需求。为了解决这一问题&#xff0c;诞…...

可重构柔性装配产线:为工业制造领域注入了新的活力

随着科技的飞速发展&#xff0c;智能制造正逐渐成为引领工业革新的重要力量。在这一浪潮中&#xff0c;可重构柔性装配产线以其独特的技术优势和创新理念&#xff0c;为工业制造领域注入了新的活力&#xff0c;开启了创新驱动的智能制造新篇章。 可重构柔性装配产线是基于富唯智…...

懒人网址导航源码v3.9

测试环境 宝塔Nginx -Tengine2.2.3的PHP5.6 MySQL5.6.44 为防止调试错误&#xff0c;建议使用测试环境运行的php与mysql版本 首先用phpMyAdmin导入数据库文件db/db.sql 如果导入不行&#xff0c;请直接复制数据库内容运行sql语句也可以 再修改config.php来进行数据库配置…...

springboot 开启缓存 @EnableCaching(使用redis)

添加依赖 pom.xml <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency>application.yml 配置redis连参数 spring:# redis 配置redis:# 地址host: 127.0.0.…...

Adobe After Effects AE v24.3.0 解锁版 (视频合成及视频特效制作)

Adobe系列软件安装目录 一、Adobe Photoshop PS 25.6.0 解锁版 (最流行的图像设计软件) 二、Adobe Media Encoder ME v24.3.0 解锁版 (视频和音频编码渲染工具) 三、Adobe Premiere Pro v24.3.0 解锁版 (领先的视频编辑软件) 四、Adobe After Effects AE v24.3.0 解锁版 (视…...

Qt---文件系统

一、基本文件操作 1. QFile对文件进行读和写 QFile file( path 文件路径) 读&#xff1a; file.open(打开方式) QlODevice::readOnly 全部读取->file.readAll()&#xff0c;按行读->file.readLine()&#xff0c;atend()->判断是否读到文件尾 …...

ruoyi-vue-pro 使用记录(2)

ruoyi-vue-pro 使用记录&#xff08;2&#xff09; 数据库商城商品模块数据表营销数据库交易数据库统计数据库 数据库 商城 参考官方文档 ruoyi-vue-pro yudao 项目商城 mall 模块启用及相关SQL脚本 商品模块&#xff08;中心&#xff09;以 product_ 作为前缀的表交易模块…...

centos7中如何全局搜索一下nginx的配置文件?

在CentOS 7中搜索Nginx的配置文件&#xff0c;你可以使用一些常用的命令行工具&#xff0c;比如find、grep等。这些工具可以帮助你在文件系统中查找文件&#xff0c;也可以用来查找Docker容器内部的文件&#xff0c;只要你知道如何访问容器的文件系统。 1. 搜索系统中的Nginx配…...

2024年5月10日有感复盘

2024年5月10日有感复盘 时间 今天是一个很美好的一天&#xff0c;原因是很平凡&#xff0c;读书很平凡&#xff0c;玩游戏很平凡&#xff0c;然后生活很平凡&#xff0c;未来可期&#xff0c;听歌很舒服&#xff0c;很喜欢一个人呆在图书馆的感觉&#xff0c;很喜欢发呆&…...

C++通过json文件配置参数

一、安装nlohmann json nlohmann json&#xff1a;安装_nlohmann安装-CSDN博客 依次执行下面指令&#xff1a; git clone https://gitee.com/cuihongxi/mov_from_github.gitcd json-developmkdir buildcd buildcmake ..makesudo make install 二、安装完成后使用 #include…...

idea连接远程仓库

git ->克隆。 url为远程仓库的地址&#xff0c;输入好后&#xff0c;选择项目存放目录&#xff0c;再点击克隆 点击新窗口打开。 切换到对应分支...

初始Django

初始Django 一、Django的历史 ​ Django 是从真实世界的应用中成长起来的&#xff0c;它是由堪萨斯&#xff08;Kansas&#xff09;州 Lawrence 城中的一个网络开发小组编写的。它诞生于 2003 年秋天&#xff0c;那时 Lawrence Journal-World 报纸的程序员 Adrian Holovaty 和…...

leetcode56--合并区间

题目描述 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 示例 1&#xff1a; 输入&#xff1a;interv…...

赋能数据库智能托管,Akamai 发布首款云计算业务线产品!

为了尽可能地简化数据库管理的复杂性&#xff0c;降低数据库成本&#xff0c;Akamai 在近期推出了首款 DBaaS&#xff08;数据库即服务&#xff09;产品——Linode Managed Database。这一数据库产品是 Akamai 自3月份收购 Linode 后发布的首款计算业务线产品。 一、更易用的数…...

Go语言系统学习笔记(三):杂项篇

1. 写在前面 公司的新业务开发需要用到go语言&#xff0c;虽然之前没接触过这门语言&#xff0c;但在大模型的帮助下&#xff0c;边看项目边写代码也能进行go的项目开发&#xff0c;不过&#xff0c;写了一段时间代码之后&#xff0c;总感觉对go语言本身&#xff0c;我的知识体…...

黄仁勋炉边对话:创业的超能力与英伟达的加速计算之旅

在TiECon 2024大会上&#xff0c;英伟达的创始人兼CEO黄仁勋与风投公司Mayfield的管理合伙人纳文查德哈进行了一场深入的炉边对话。黄仁勋不仅分享了英伟达的创业故事&#xff0c;还谈到了他对创业和加速计算的深刻见解。下面是我对这次对话的总结&#xff0c;希望能给正在创业…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂度…...

Spring AI与Spring Modulith核心技术解析

Spring AI核心架构解析 Spring AI&#xff08;https://spring.io/projects/spring-ai&#xff09;作为Spring生态中的AI集成框架&#xff0c;其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似&#xff0c;但特别为多语…...

R语言速释制剂QBD解决方案之三

本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...

无人机侦测与反制技术的进展与应用

国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机&#xff08;无人驾驶飞行器&#xff0c;UAV&#xff09;技术的快速发展&#xff0c;其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统&#xff0c;无人机的“黑飞”&…...

免费数学几何作图web平台

光锐软件免费数学工具&#xff0c;maths,数学制图&#xff0c;数学作图&#xff0c;几何作图&#xff0c;几何&#xff0c;AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...