当前位置: 首页 > news >正文

详解 Spark SQL 代码开发之数据读取和保存

一、通用操作

/**
基本语法:1.读取:SparkSession.read[.format("format")[.option("...")]].load("path")2.保存:DataFrame.write[.format("format")[.option("...")]][.mode("SaveMode")].save("path")说明:1.默认读取和保存的文件格式为 parquet2."format"包含:"csv"、"jdbc"、"json"、"orc"、"parquet" 和 "textFile"3.option("…") 是在 "jdbc" 格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable4."SaveMode" 指定保存模式,SaveMode 是一个枚举类,其中的常量包括:4.1 "error":默认值,如果文件已经存在则抛出异常4.2 "append":如果文件已经存在则追加4.3 "overwrite":如果文件已经存在则覆盖4.4 "ignore":如果文件已经存在则忽略
*/
object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._// val df = spark.read.load("data/user.json") // errorval df = spark.read.format("json").load("data/user.json")df.show()// 直接查询文件:文件格式.`文件路径`spark.sql("select * from json.`data/user.json`").show()// df.write.save("output") // 默认保存为 parquet 格式df.write.format("json").save("output")// df.write.format("json").mode("overwrite").save("output") // 覆盖保存// 关闭环境spark.close()}}

二、parquet

/**SparkSQL默认的读取保存数据源为 Parquet 格式Parquet 是一种能够有效存储嵌套数据的列式存储格式基本语法:1.读取:1.1 SparkSession.read.load("path") 1.2 SparkSession.read.parquet("path")2.保存:2.1 DataFrame.write[.mode("SaveMode")].save("path") 2.2 DataFrame.write[.mode("SaveMode")].parquet("path")
*/
object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df = spark.read.load("data/user.parquet")val df1 = spark.read.parquet("data/user.parquet")df.show()df.write.save("output") df1.write.parquet("output1") // 关闭环境spark.close()}}

三、json

/**基本语法:1.读取:SparkSession.read.json("path") 2.保存:DataFrame.write[.mode("SaveMode")].json("path") 注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串
*/
object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df = spark.read.json("data/user.json")df.show()df.write.json("output") // 关闭环境spark.close()}}

四、csv

/**基本语法:1.读取:1.1 SparkSession.read.format("csv")[.option(...)].load("path") 1.2 SparkSession.read.csv("path") 2.保存:2.1 DataFrame.write.format("csv")[.mode("SaveMode")].save("path") 2.2 DataFrame.write[.mode("SaveMode")].csv("path") 说明:csv 是默认以逗号为分隔符的文件格式
*/
object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df = spark.read.format("csv") // 指定读取文件格式.option("sep", ";") // 指定分隔符.option("inferSchema", "true") .option("header", "true") // 指定第一行是否为表头.load("data/user.csv")df.show()df.write.format("csv").save("output") // 关闭环境spark.close()}}

五、mysql

  • 导入 mysql 依赖

    <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
    </dependency>
    
  • 读取和保存数据

    /**
    基本语法:1.读取:1.1 SparkSession.read.format("jdbc").option("url", "..")…….load() 1.2 SparkSession.read.jdbc("url", "table", prop: Properties)2.保存:2.1 DataFrame.write.format("jdbc").option("url", "..")……[.mode("SaveMode")].save()2.2 DataFrame.write[.mode("SaveMode")].jdbc("url", "table", prop: Properties)*/
    object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._// 1. 读取// 1.1 方式一:val df = spark.read.format("jdbc") // 指定读取文件格式.option("url", "jdbc:mysql://linux:3306/spark") // 连接url.option("driver", "com.mysql.jdbc.Driver") // 驱动.option("user", "root") // 用户名.option("password", "123123") // 密码.option("dbtable", "user") // 表名.load()// 1.2 方式二:val df1 = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://linux:3306/spark?user=root&password=123123","dbtable" -> "user","driver" -> "com.mysql.jdbc.Driver")).load()// 1.3 方式三:val props: Properties = new Properties()props.setProperty("user", "root")props.setProperty("password", "123123")val df2 = spark.read.jdbc("jdbc:mysql://linux:3306/spark", "user", props)df.show()// 2. 保存// 2.1 方式一df.write.format("jdbc") // 指定读取文件格式.option("url", "jdbc:mysql://linux:3306/spark") // 连接url.option("driver", "com.mysql.jdbc.Driver") // 驱动.option("user", "root") // 用户名.option("password", "123123") // 密码.option("dbtable", "user1") // 表名.mode(SaveMode.Append).save() // 2.2 方式二df2.write.mode("append").jdbc("jdbc:mysql://linux:3306/spark", "user2", props)// 关闭环境spark.close()}}
    

六、hive

1. 内置 hive

Spark 在安装编译后内部已经可以支持 Hive 表访问、 UDF (用户自定义函数) 以及 Hive 查询语言(HiveQL/HQL) 等

// 内置 Hive 的元数据存储在 derby 中,默认仓库地址为 $SPARK_HOME/spark-warehouse
// 进入 spark-shell// 1. 创建 hive 表
spark.sql("create table user(username string, age bigint)")// 2. 加载数据到表中
spark.sql("load data local inpath 'data/user.txt' into table user")// 3. 展示所有表
spark.sql("show tables").show// 4. 查询表数据
spark.sql("select * from user").show

2. 外部 hive

  • 配置连接外部 Hive

    • 将外部 Hive 的安装目录下的 hive-site.xml 配置文件拷贝到 Spark 安装目录的 conf 目录下

    • 将 Mysql 连接的驱动 jar 包拷贝到 Spark 安装目录的 jars 目录下(外部 Hive 的元数据库使用 MySQL)

    • 如果访问不到 hdfs,则需要把 core-site.xmlhdfs-site.xml 两个配置文件拷贝到 Spark 安装目录的 conf 目录下

    • 启动 spark-shell,执行 spark.sql("show tables").show 检查是否可以连接外部 Hive

  • 程序代码操作外部 Hive

    • 引入依赖

      <dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.0.0</version>
      </dependency>
      <dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version>
      </dependency>
      <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
      </dependency>
      
    • hive-site.xml 文件拷贝到项目的 resources 目录中,同时确保 target/classes 目录下也有该文件

    • 编码

      object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 若出现用户无权限的错误,可在首行添加// System.setProperty("HADOOP_USER_NAME", "root")// 创建 sparksql 环境对象,并开启 Hive 支持val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")// 配置修改数据库仓库的地址// conf.set("spark.sql.warehouse.dir", "hdfs://linux:8020/user/hive/warehouse")val spark = SparkSession.builder().enableHiveSupport() // 启用 hive 支持.config(conf).getOrCreate()// 使用 sparksql 操作 hivespark.sql("show tables").show()// 关闭环境spark.close()}}
      
  • 其他连接方式

    • spark-sql cli

      #进入 spark-sql CLI
      bin/spark-sql#编写HQL
      show tables;
      
    • spark beeline

      #启动 Thrift Server
      sbin/start-thriftserver.sh#使用 beeline 连接 Thrift Server
      bin/beeline -u jdbc:hive2://linux:10000 -n root#编写HQL
      show tables;
      

相关文章:

详解 Spark SQL 代码开发之数据读取和保存

一、通用操作 /** 基本语法&#xff1a;1.读取&#xff1a;SparkSession.read[.format("format")[.option("...")]].load("path")2.保存&#xff1a;DataFrame.write[.format("format")[.option("...")]][.mode("Save…...

Pulsar 社区周报 | No.2024-05-30 | BIGO 百页小册《Apache Pulsar 调优指南》

“ 各位热爱 Pulsar 的小伙伴们&#xff0c;Pulsar 社区周报更新啦&#xff01;这里将记录 Pulsar 社区每周的重要更新&#xff0c;每周发布。 ” BIGO 百页小册《Apache Pulsar 调优指南》 Hi&#xff0c;Apache Pulsar 社区的小伙伴们&#xff0c;社区 2024 上半年度的有奖问…...

第二证券股票杠杆:4分钟直线涨停!这一赛道,AH股集体爆发!

今日早盘&#xff0c;A股继续小幅震动收拾&#xff0c;首要股指涨跌互现&#xff0c;两市个股跌多涨少&#xff0c;成交有萎缩的趋势。 盘面上&#xff0c;医药、中字头、旅游、房地产等板块相对活跃&#xff0c;混合实践、玻璃基板、AI手机PC、光刻机等板块跌幅居前。 “中字…...

JavaScript 进阶征途:解锁Function奥秘,深掘Object方法精髓

个人主页&#xff1a;学习前端的小z 个人专栏&#xff1a;JavaScript 精粹 本专栏旨在分享记录每日学习的前端知识和学习笔记的归纳总结&#xff0c;欢迎大家在评论区交流讨论&#xff01; 文章目录 &#x1f235;Function方法 与 函数式编程&#x1f49d;1 call &#x1f49d…...

斜拉桥智慧施工数字孪生

基于图扑自主研发的 HT for Web 产品&#xff0c;利用现场照片及 CAD 图纸&#xff0c;结合 PBR 材质&#xff0c;搭建了具有赛博朋克风格的智慧斜拉桥可视化解决方案&#xff0c;精准复现斜拉桥建造规划过程&#xff0c;辅助运维人员对桥梁基建过程的网格化管理。提高桥梁的建…...

【chatGPT API】Function Calling:将自然语言转换为API调用或数据库查询

文章目录 一. 介绍二. 常见用例与Function Calling调用逻辑三. 调用细节1. 调用行为&#xff1a;tool_choice2. 调用规定&#xff1a;functions 四. 实战&#xff1a;查询公司相关产品 一. 介绍 OpenAI可以根据用户的要求输出一个符合用户要求的入参值。然后用户拿到入参值之后…...

Oracle Hint /*+APPEND*/插入性能总结

oracle append用法 Oracle中的APPEND用法主要用于提高数据插入的效率。 基本用法&#xff1a;在使用了APPEND选项后&#xff0c;插入数据会直接加到表的最后面&#xff0c;而不会在表的空闲块中插入数据。这种做法不需要寻找freelist中的free block&#xff0c;从而避免了在…...

正邦科技(day3)

出厂测试 设备校准 这个需要注意的是校准电流、电压、电感的时候有时候负感器会装反&#xff0c;mcu会坏&#xff0c;需要flash一下清空内存...

mac电脑多协议远程管理软件:Termius 8.4.0激活版下载

Termius 是一款功能强大的跨平台远程访问工具&#xff0c;可用于管理和连接各种远程系统和服务器。它支持SSH、Telnet、SFTP和Serial协议&#xff0c;并提供了键盘快捷键、自动完成和多标签功能&#xff0c;使用户可以方便地控制和操作远程主机。 Termius 提供了端到端的加密保…...

网络攻击的常见形式

开篇 本篇文章来自于《网络安全 ——技术与实践》的学习整理笔记。 正篇 口令窃取 相比于利用系统缺陷破坏网络系统&#xff0c;最容易的方法还是通过窃取用户的口令进入系统。因为人们倾向于选择很糟糕的口令作为登录密码&#xff0c;所以口令猜测很容易成功。通常&#xff0…...

ReactDOM 18版本 使用createRoot 替换render详解

概述 React 18 提供了两个 root API&#xff0c;被称之为 Legacy Root API 和 New Root API&#xff1a; Legacy Root API&#xff1a;是指之前版本的 root API ReactDOM.render&#xff0c;它将创建一个以 “legacy” 模式运行的 root&#xff0c;其工作方式与 React 17 完全…...

【赠书活动】好书推荐—《详解51种企业应用架构模式》

导读&#xff1a; 企业应用包括哪些&#xff1f;它们又分别有哪些架构模式&#xff1f;世界著名软件开发大师Martin Fowler给你答案。 01 什么是企业应用 我的职业生涯专注于企业应用&#xff0c;因此&#xff0c;这里所谈及的模式也都是关于企业应用的。&#xff08;企业应用…...

SpringBoot启动时使用外置yml文件

第一步&#xff1a;打包时排除yml文件 <build><resources><resource><!-- 排除的文件的路径 --><directory>src/main/resources</directory><excludes><!-- 排除的文件的名称 --><exclude>application-dev.yml</e…...

【开源三方库】Fuse.js:强大、轻巧、零依赖的模糊搜索库

1.简介 Fuse.js是一款功能强大且轻量级的JavaScript模糊搜索库&#xff0c;支持OpenAtom OpenHarmony&#xff08;以下简称“OpenHarmony”&#xff09;操作系统&#xff0c;它具备模糊搜索和排序等功能。该库高性能、易于使用、高度可配置&#xff0c;支持多种数据类型和多语…...

vue从入门到精通(六):数据代理

一&#xff0c;什么是数据代理 通过一个对象代理对另一个对象中属性的操作 二&#xff0c;object.defineproperty方法 object.defineproperty方法可以对对象追加属性 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>object…...

【C++修行之道】类和对象(二)类的6个默认成员函数、构造函数、析构函数

目录 一、类的6个默认成员函数 二、构造函数 2.1 概念 2.2 特性 2.2.5 自动生成默认构造函数 不进行显示定义的隐患&#xff1a; 2.2.6 自动生成的构造函数意义何在&#xff1f; 两个栈实现一个队列 2.2.7 无参的构造函数和全缺省的构造函数都称为默认构造函数&#x…...

【LeetCode热题100总结】239. 滑动窗口最大值

题目描述 给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。 返回 滑动窗口中的最大值 。 示例 1&#xff1a; 输入&#xff1a;nums [1,3,-1,-3,5,3,6,7]…...

【YOLOv9改进[Conv]】使用YOLOv10的空间通道解耦下采样SCDown模块替换部分CONv的实践 + 含全部代码和详细修改内容

本文将使用YOLOv10的空间通道解耦下采样SCDown模块替换部分CONv的实践 ,文中含全部代码和详细修改内容。 目录 一 YOLOv10 1 空间通道解耦下采样 2 可视化...

简单小游戏制作

控制台基础设置 //隐藏光标 Console.CursorVisible false; //通过两个变量来存储舞台的大小 int w 50; int h 30; //设置舞台&#xff08;控制台&#xff09;的大小 Console.SetWindowSize(w, h); Console.SetBufferSize(w, h);多个场景 int nowSceneID 1; while (true) …...

Delphi

Delphi&#xff0c;是美国 Borland&#xff08;宝兰&#xff09;公司於 1995 年开发在 Windows 平台下的快速应用程式开发工具 (Rapid Application Development&#xff0c;简称 RAD)&#xff0c;它的前身是在 DOS 下的产品 Borland Turbo Pascal。&#xff08;非开源软件&…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

学校招生小程序源码介绍

基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码&#xff0c;专为学校招生场景量身打造&#xff0c;功能实用且操作便捷。 从技术架构来看&#xff0c;ThinkPHP提供稳定可靠的后台服务&#xff0c;FastAdmin加速开发流程&#xff0c;UniApp则保障小程序在多端有良好的兼…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

招商蛇口 | 执笔CID,启幕低密生活新境

作为中国城市生长的力量&#xff0c;招商蛇口以“美好生活承载者”为使命&#xff0c;深耕全球111座城市&#xff0c;以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子&#xff0c;招商蛇口始终与城市发展同频共振&#xff0c;以建筑诠释对土地与生活的…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建

【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...

「Java基本语法」变量的使用

变量定义 变量是程序中存储数据的容器&#xff0c;用于保存可变的数据值。在Java中&#xff0c;变量必须先声明后使用&#xff0c;声明时需指定变量的数据类型和变量名。 语法 数据类型 变量名 [ 初始值]; 示例&#xff1a;声明与初始化 public class VariableDemo {publi…...

Springboot 高校报修与互助平台小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;高校报修与互助平台小程序被用户普遍使用&#xff0c;为…...

Pandas 可视化集成:数据科学家的高效绘图指南

为什么选择 Pandas 进行数据可视化&#xff1f; 在数据科学和分析领域&#xff0c;可视化是理解数据、发现模式和传达见解的关键步骤。Python 生态系统提供了多种可视化工具&#xff0c;如 Matplotlib、Seaborn、Plotly 等&#xff0c;但 Pandas 内置的可视化功能因其与数据结…...