当前位置: 首页 > news >正文

day07_Spark SQL

文章目录

  • day07_Spark SQL课程笔记
    • 一、今日课程内容
    • 二、Spark SQL函数定义(掌握)
      • 1、窗口函数
      • 2、自定义函数背景
        • 2.1 回顾函数分类标准:
          • SQL最开始是_内置函数&自定义函数_两种
        • 2.2 自定义函数背景
      • 3、Spark原生自定义UDF函数
        • 3.1 自定义函数流程:
        • 3.2 自定义演示一:
        • 3.3 自定义演示二:
        • 3.4 自定义演示三:
    • 4、Pandas的自定义函数
        • 4.1 Apache Arrow框架
        • 4.2 基于Arrow完成Pandas和Spark的DataFrame互转
        • 4.3 基于Pandas自定义函数
          • 4.3.1 自定义函数流程
          • 4.3.2 自定义UDF函数
          • 4.3.3 自定义UDAF函数
    • 三、Spark on Hive(操作)
      • 1、集成原理
      • 2、集成环境配置
      • 3、启动metastore服务
      • 4、SparkOnHive操作
        • 4.1 黑窗口测试spark-sql
        • 4.2 python代码测试spark-sql
    • 四、SparkSQL的分布式执行引擎(了解)
      • 1、启动Thrift服务
      • 2、beeline连接Thrift服务
      • 3、开发工具连接Thrift服务
      • 4、控制台编写SQL代码
    • 五、Spark SQL的运行机制(掌握)
      • 5.1 **Catalyst**内部具体的执行流程:
      • **为什么 SparkSQL 的执行流程就像是“从 SQL 语句到结果的流水线”?**
      • **实际意义**
      • 5.2 SparkSQL的执行流程总结:
  • 01_spark原生自定义UDF函数_返回字符串.py
    • 结果
  • 02_spark原生自定义UDF函数_返回列表.py
    • 结果
  • 03_spark原生自定义UDF函数_返回字典.py
    • 结果
  • 04_sparkSQL和pandas中df对象互转操作.py
  • 05_spark基于pandas定义udf函数_s到s.py
  • 06_spark基于pandas定义udaf函数_s到标量.py
  • 07_spark_sql操作数据库.py

day07_Spark SQL课程笔记

在这里插入图片描述

一、今日课程内容

  • 1- Spark SQL函数定义(掌握)
  • 2- Spark On Hive(操作)
  • 3- Spark SQL的分布式执行引擎(了解)
  • 4- Spark SQL的运行机制(掌握)

今日目的:掌握Spark SQL函数定义的两种方式;理解->掌握Spark SQL的运行机制

二、Spark SQL函数定义(掌握)

1、窗口函数

回顾之前学习过的窗口函数:

分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])分析函数可以大致分成如下3类:
1- 第一类: 聚合函数 sum() count() avg() max() min()
2- 第二类: 排序函数 row_number() rank() dense_rank() 
3- 第三类: 其他函数 ntile()  first_value() last_value() lead() lag() 三个排序函数的区别?
row_number(): 巧记 1234  特点: 唯一且连续
rank(): 巧记 1224 特点: 并列不连续
dense_rank(): 巧记 1223  特点: 并列且连续

在Spark SQL中使用窗口函数案例:

已知数据如下:

cookie1,2018-04-10,1
cookie1,2018-04-11,5
cookie1,2018-04-12,7
cookie1,2018-04-13,3
cookie1,2018-04-14,2
cookie1,2018-04-15,4
cookie1,2018-04-16,4
cookie2,2018-04-10,2
cookie2,2018-04-11,3
cookie2,2018-04-12,5
cookie2,2018-04-13,6
cookie2,2018-04-14,3
cookie2,2018-04-15,9
cookie2,2018-04-16,7

需求: 要求找出每个cookie中pv排在前3位的数据,也就是分组取TOPN问题

# 导包
import os
from pyspark.sql import SparkSession,functions as F,Window as W# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.read.csv(path='file:///export/data/spark_project/spark_sql/data/cookie.txt',sep=',',schema='cookie string,datestr string,pv int')# 3.数据处理(切分,转换,分组聚合)# 4.数据输出etldf = df.dropDuplicates().dropna()# SQL方式etldf.createTempView('cookie_logs')spark.sql("""select cookie,datestr,pvfrom (select cookie,datestr,pv,dense_rank() over(partition by cookie order by pv desc) as rnfrom cookie_logs) temp where rn <=3 """).show()# DSL方式etldf.select('cookie', 'datestr', 'pv',F.dense_rank().over( W.partitionBy('cookie').orderBy(F.desc('pv')) ).alias('rn')).where('rn <=3').select('cookie', 'datestr', 'pv').show()# 5.关闭资源spark.stop()

运行结果截图:
在这里插入图片描述

2、自定义函数背景

2.1 回顾函数分类标准:
SQL最开始是_内置函数&自定义函数_两种

SQL函数,主要分为以下三大类:

  • UDF函数:普通函数
    • 特点:一对一,输入一个得到一个
    • 例如:split() …
  • UDAF函数:聚合函数
    • 特点:多对一,输入多个得到一个
    • 例如:sum() avg() count() min() max() …
  • UDTF函数:表生成函数
    • 特点:一对多,输入一个得到多个
    • 例如:explode() …

在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数

  1. 简单来说:UDF、UDAF和UDTF是Spark SQL中用于扩展SQL功能的三种自定义函数,分别像是“单兵作战”、“团队协作”和“多面手”,满足不同的数据处理需求。

  2. 具体而言

    • UDF(用户自定义函数)
      • 功能:对单行数据进行操作,输入一行输出一行。
      • 场景:适合简单的数据转换,比如将字符串转换为大写。
      • 示例spark.udf.register("to_upper", lambda x: x.upper())
    • UDAF(用户自定义聚合函数)
      • 功能:对多行数据进行聚合操作,输入多行输出一行。
      • 场景:适合复杂的聚合计算,比如自定义加权平均。
      • 示例:继承UserDefinedAggregateFunction类,实现initializeupdatemerge等方法。
    • UDTF(用户自定义表生成函数)
      • 功能:对单行数据进行操作,输入一行输出多行。
      • 场景:适合数据展开操作,比如将JSON数组拆分为多行。
      • 示例:继承GenericUDTF类,实现initializeprocessclose等方法。
  3. 实际生产场景

    • 在数据清洗中,使用UDF将日期格式统一为标准格式。
    • 在数据分析中,使用UDAF计算复杂的业务指标,如客户生命周期价值(CLV)。
    • 在数据展开中,使用UDTF将嵌套的JSON数据拆分为多行,便于后续分析。
  4. 总之:UDF、UDAF和UDTF是Spark SQL中强大的扩展工具,能够满足从简单转换到复杂聚合、数据展开的多种需求,为数据处理提供了极大的灵活性。

2.2 自定义函数背景

思考:有这么多的内置函数,为啥还需要自定义函数呢?

	为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数

​ 在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。

​ 在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。

1- SparkSQL原生的时候,Python只能开发UDF函数
2- SparkSQL借助其他第三方组件(Arrow,pandas...),Python可以开发UDF、UDAF函数,同时也提升效率

在这里插入图片描述

Spark SQL原生UDF函数存在的问题:大量的序列化和反序列

	虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作

在这里插入图片描述

3、Spark原生自定义UDF函数

3.1 自定义函数流程:
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数注册到Spark SQL中注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用说明: 如果通过方式一来注册函数, 【可以用在SQL和DSL】注册方式二:  udf对象 = F.udf(参数1,参数2)参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用说明: 如果通过方式二来注册函数,【仅能用在DSL中】注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可
3.2 自定义演示一:

需求1: 请自定义一个函数,完成对 数据 统一添加一个后缀名的操作 , 例如后缀名 ‘_itheima’

效果如下:

在这里插入图片描述

# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.createDataFrame(data=[(1,'张三','广州'),(2,'李四','深圳')],schema='id int,name string,address string')df.show()# 3.SparkSQL自定义udf函数# 第一步.自定义python函数def add_suffix(data):return data+'_itheima'# 第二步.把python函数注册到SparkSQL# ① spark.udf.register注册dsl1_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())# ②F.udf注册dsl2_add_suffix = F.udf(add_suffix, StringType())# ③@F.udf注册@F.udf( StringType())def candy_add_suffix(data):return data+'_itheima'# 第三步.在SparkSQL中调用自定义函数# SQL方式df.createTempView('temp')spark.sql("""select id,name,sql_add_suffix(address) as new_address from temp""").show()# DSL方式# 调用dsl1_add_suffixdf.select('id', 'name', dsl1_add_suffix('address').alias('new_address')).show()# 调用dsl2_add_suffixdf.select('id', 'name', dsl2_add_suffix('address').alias('new_address')).show()# 调用candy_add_suffixdf.select('id', 'name', candy_add_suffix('address').alias('new_address')).show()# 4.关闭资源spark.stop()

斌哥友情提醒: 可能遇到的问题如下

在这里插入图片描述

原因: 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。
3.3 自定义演示二:

需求2: 请自定义一个函数,返回值类型为复杂类型: 列表

效果如下:

在这里插入图片描述

参考代码:

# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.createDataFrame(data=[(1,'张三_广州'),(2,'李四_深圳')],schema='id int,name_address string')df.show()# 3.SparkSQL自定义udf函数# 第一步.自定义python函数def my_split(data:str):list1 = data.split('_')return list1# 第二步.把python函数注册到SparkSQL# ① spark.udf.register注册dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,ArrayType(StringType()))# ②F.udf注册dsl2_add_suffix = F.udf(my_split, ArrayType(StringType()))# ③@F.udf注册@F.udf(ArrayType(StringType()))def candy_add_suffix(data):list1 = data.split('_')return list1# 第三步.在SparkSQL中调用自定义函数# SQL方式df.createTempView('temp')spark.sql("""select id,sql_add_suffix(name_address) as new_address from temp""").show()# DSL方式# 调用dsl1_add_suffixdf.select('id',  dsl1_add_suffix('name_address').alias('new_name_address')).show()# 调用dsl2_add_suffixdf.select('id',dsl2_add_suffix('name_address').alias('new_name_address')).show()# 调用candy_add_suffixdf.select('id',candy_add_suffix('name_address').alias('new_name_address')).show()# 4.关闭资源spark.stop()
3.4 自定义演示三:

需求3: 请自定义一个函数,返回值类型为复杂类型: 字典

效果如下:

在这里插入图片描述

注意: 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是null补充
# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType, StructType# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.createDataFrame(data=[(1,'张三_广州'),(2,'李四_深圳')],schema='id int,name_address string')df.show()# 3.SparkSQL自定义udf函数# 第一步.自定义python函数def my_split(data:str):list1 = data.split('_')return {'name':list1[0],'address':list1[1]}# 第二步.把python函数注册到SparkSQL# 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是nullt = StructType().add('name',StringType()).add('address',StringType())# ① spark.udf.register注册dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,t)# ②F.udf注册dsl2_add_suffix = F.udf(my_split, t)# ③@F.udf注册@F.udf(t)def candy_add_suffix(data):list1 = data.split('_')return {'name':list1[0],'address':list1[1]}# 第三步.在SparkSQL中调用自定义函数# SQL方式df.createTempView('temp')spark.sql("""select id,sql_add_suffix(name_address) as new_name_address from temp""").show()# DSL方式# 调用dsl1_add_suffixdf.select('id', dsl1_add_suffix('name_address').alias('new_name_address')).show()# 调用dsl2_add_suffixdf.select('id',dsl2_add_suffix('name_address').alias('new_name_address')).show()# 调用candy_add_suffixdf.select('id',candy_add_suffix('name_address').alias('new_name_address')).show()# 4.关闭资源spark.stop()

4、Pandas的自定义函数

2-如果不是3.1.2版本,那么先卸载pyspark

命令: pip uninstall pyspark

3- 再按照【Spark课程阶段_部署文档.doc】中重新安装3.1.2版本pyspark

命令: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple
pyspark==3.1.2

4.1 Apache Arrow框架

​ Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率

​ Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数

如何安装? 三个节点建议都安装

检查服务器上是否有安装pyspark
pip list | grep pyspark  或者 conda list | grep pysparkpip list | grep pyarrow  
如果服务器已经安装了pyspark的库,那么仅需要执行以下内容,即可安装。例如在 node1安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]如果服务器中python环境中没有安装pyspark,建议执行以下操作,即可安装。例如在 node2 和 node3安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyarrow==10.0.0

在这里插入图片描述

Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用

如何使用呢? 默认不会自动启动的, 一般建议手动配置

spark.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)
4.2 基于Arrow完成Pandas和Spark的DataFrame互转

Pandas中DataFrame:

DataFrame:表示一个二维表对象,就是表示整个表

字段、列、索引;Series表示一列
在这里插入图片描述

Spark SQL中DataFrame:
在这里插入图片描述

使用场景:

1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析

2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame

Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()

示例:

# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# TODO: 手动开启arrow框架spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)# 2.数据输入df = spark.createDataFrame(data=[(1,'张三_广州'),(2,'李四_深圳')],schema='id int ,name_address string')df.show()print(type(df))print('------------------------')# 3.数据处理(切分,转换,分组聚合)# 4.数据输出# spark->pandaspd_df = df.toPandas()print(pd_df)print(type(pd_df))print('------------------------')# pandas->sparkdf2 = spark.createDataFrame(pd_df)df2.show()print(type(df2))# 5.关闭资源spark.stop()
4.3 基于Pandas自定义函数

​ 基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

​ Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数

4.3.1 自定义函数流程
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数包装成Spark SQL的函数注册方式一: udf对象 = spark.udf.register(参数1, 参数2)参数1: UDF函数名称。此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用注册方式二: udf对象 = F.pandas_udf(参数1, 参数2)参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型udf对象: 返回值对象,是一个UDF对象。仅能用在DSL中使用注册方式三: 语法糖写法  @F.pandas_udf(returnType)  放置到对应Python的函数上面说明: 实际是方式二的扩展。仅能用在DSL中使用第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!
4.3.2 自定义UDF函数
  • 自定义Python函数的要求:SeriesToSeries

    在这里插入图片描述

# 导包
import os
from pyspark.sql import SparkSession,functions as F
import pandas as pd# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# TODO: 开启Arrow的使用spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')# 2.数据输入df = spark.createDataFrame(data = [(1,1),(2,2),(3,3)],schema= 'num1 int,num2 int')df.show()# 3.基于pandas自定义函数 :SeriesTOSeries# 第一步: 自定义python函数def multiply(num1:pd.Series,num2:pd.Series)->pd.Series:return num1*num2# 第二步: 把python注册为SparkSQL函数# ①spark.udf.register注册dsl1_multiply = spark.udf.register('sql_multiply',multiply)# ②F.pandas_udf注册dsl2_multiply = F.pandas_udf(multiply,IntegerType())# ③@F.pandas_udf注册@F.pandas_udf(IntegerType())def candy_multiply(num1: pd.Series, num2: pd.Series) -> pd.Series:return num1 * num2# 第三步: 在SparkSQL中调用注册后函数# SQL方式df.createTempView('temp')spark.sql("""select num1,num2,sql_multiply(num1,num2) as result from temp""").show()# DSL方式#调用dsl1_multiplydf.select('num1','num2',dsl1_multiply('num1','num2').alias('result')).show()# 调用dsl2_multiplydf.select('num1', 'num2', dsl2_multiply('num1', 'num2').alias('result')).show()# 调用candy_multiplydf.select('num1', 'num2', candy_multiply('num1', 'num2').alias('result')).show()# 4.关闭资源spark.stop()
4.3.3 自定义UDAF函数
  • 自定义Python函数的要求:Series To 标量
  1. 简单来说:Series To 标量是指将一个Pandas Series(一维数组)转换为一个标量值(单个值),就像是“把一串数据浓缩成一个结果”。

  2. 具体而言

    • Series:Pandas中的一维数据结构,类似于带标签的数组,可以存储任意类型的数据。
    • 标量:单个值,比如整数、浮点数、字符串等。
    • 转换场景
      • 聚合操作:将Series中的所有值通过某种计算(如求和、平均值)转换为一个标量值。
      • 提取操作:从Series中提取某个特定位置的值作为标量。
    • 示例
      import pandas as pd# 创建一个Series
      s = pd.Series([1, 2, 3, 4, 5])# 聚合操作:求和
      sum_result = s.sum()  # 输出:15# 提取操作:获取第一个值
      first_value = s[0]    # 输出:1
      
  3. 实际生产场景

    • 在数据分析中,使用聚合操作将一列数据(如销售额)转换为总销售额或平均销售额。
    • 在数据处理中,从时间序列数据中提取某个时间点的值作为标量。
  4. 总之:Series To 标量是Pandas中常见的操作,通过聚合或提取,将一维数据转换为单个值,为数据分析和处理提供了便利。

表示:自定义函数的输入数据类型是Pandas中的Series对象,返回值数据类型是标量数据类型。也就是Python中的数据类型,例如:int、float、bool、list…

在这里插入图片描述

基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!
# 导包
import os
from pyspark.sql import SparkSession, functions as F
import pandas as pd# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerType, FloatTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# TODO: 开启Arrow的使用spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')# 2.数据输入df = spark.createDataFrame(data=[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],schema='id int,value float')df.show()# 3.基于pandas自定义函数 :SeriesTOSeries# 第一步: 自定义python函数# ③@F.pandas_udf注册  注意: 理论上UDAF只能用注册方式三语法糖方式,也就意味着只能DSL使用@F.pandas_udf(FloatType())def candy_mean_v(value: pd.Series) -> float:return value.mean()# 第二步: 注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式一register注册# ①spark.udf.register注册dsl1_mean_v = spark.udf.register('sql_mean_v', candy_mean_v)# 第三步: 在SparkSQL中调用注册后函数# DSL方式# 调用candy_mean_vdf.groupBy('id').agg(candy_mean_v('value').alias('result')).show()# 调用dsl1_mean_vdf.groupBy('id').agg(dsl1_mean_v('value').alias('result')).show()# SQL方式df.createTempView('temp')spark.sql("""select id,sql_mean_v(value) as result from temp group by id""").show()# 4.关闭资源spark.stop()

三、Spark on Hive(操作)

1、集成原理

在这里插入图片描述

HiveServer2的主要作用: 接收SQL语句,进行语法检查;解析SQL语句;优化;将SQL转变成MapReduce程序,提交到Yarn集群上运行SparkSQL与Hive集成,实际上是替换掉HiveServer2。是SparkSQL中的HiveServer2替换掉了Hive中的HiveServer2。集成以后优点如下:
1- 对于SparkSQL来说,可以避免在代码中编写schema信息。直接向MetaStore请求元数据信息
2- 对于SparkSQL来说,多个人可以共用同一套元数据信息,避免每个人对数据理解不同造成代码功能兼容性问题
3- 对于Hive来说,底层执行引擎由之前的MapReduce变成了Spark Core,能够提升运行效率
4- 对于使用者/程序员来说,SparkSQL与Hive集成,对于上层使用者来说,是完全透明的。

2、集成环境配置

环境搭建,参考【Spark课程阶段_部署文档.doc】的7章节内容。

1-node1上将hive-site.xml拷贝到spark安装路径conf目录

cd /export/server/hive/confcp hive-site.xml /export/server/spark/conf/

2-node1上执行以下命令将mysql的连接驱动包拷贝到spark的jars目录下

注意: 之前拷贝过的可以忽略此操作

cd /export/server/hive/libcp mysql-connector-java-5.1.32.jar  /export/server/spark/jars/

3、启动metastore服务

# 注意: 
# 启动 hadoop集群
start-all.sh# 启动hive的metastore
nohup /export/server/hive/bin/hive --service metastore &# 测试spark-sql
/export/server/spark/bin/spark-sql

4、SparkOnHive操作

4.1 黑窗口测试spark-sql
[root@node1 bin]# /export/server/spark/bin/spark-sql
...
spark-sql>show databases;
...
spark-sql>create database if not exists spark_demo;
...
spark-sql>create table if not exists spark_demo.stu(id int,name string);
...
spark-sql>insert into  spark_demo.stu values(1,'张三'),(2,'李四');
...
4.2 python代码测试spark-sql

SparkOnHive配置:

spark.sql.warehouse.dir: 告知Spark数据表存放的地方。推荐使用HDFS。如果不配置,默认使用本地磁盘存储。
hive.metastore.uris: 告知Spark,MetaStore元数据管理服务的连接信息
enableHiveSupport() : 开启Spark和Hive的集成使用格式如下:spark = SparkSession.builder\.config('spark.sql.warehouse.dir','hdfs://node1:8020/user/hive/warehouse')\.config('hive.metastore.uris','thrift://node1.itcast.cn:9083')\.appName('pyspark_demo')\.master('local[1]')\.enableHiveSupport()\.getOrCreate()

示例:

# 导包
import os
import timefrom pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder\.config('spark.sql.warehouse.dir','hdfs://node1:8020/user/hive/warehouse')\.config('hive.metastore.uris','thrift://node1.itcast.cn:9083')\.appName('pyspark_demo')\.master('local[1]')\.enableHiveSupport()\.getOrCreate()# 2.执行sql# 查看所有库spark.sql( "show databases").show()# 查看demo1的student表内容spark.sql("select * from demo1.student").show()# 测试是否能建库: 可以spark.sql( "create database if not exists spark_demo" )# 测试是否能在spark_demo建表: 可以spark.sql("""create table if not exists spark_demo.stu(id int,name string)""")# 测试是否可以往spark_demo.stu表插入数据: 可以spark.sql("""insert into  spark_demo.stu values(1,'张三'),(2,'李四')""")# 为了方便查看web页面time.sleep(500)# 3.关闭资源spark.stop()

四、SparkSQL的分布式执行引擎(了解)

分布式执行引擎 == Thrift服务 == ThriftServer == SparkSQL中的Hiveserver2

1、启动Thrift服务

​ 目前,我们已经完成Spark集成Hive的配置。但是目前集成后,如果需要连接Hive,此时需要启动一个Spark的客户端(spark-sql、代码)才可以。这个客户端底层相当于启动服务项,用于连接Hive的metastore的服务,进行处理操作。一旦退出客户端,相当于这个服务也就没有了,无法再使用

​ 目前的情况非常类似于在Hive部署的时候,有一个本地模式部署(在启动Hive客户端的时候,内部自动启动一个Hive的hiveserver2服务项)

大白话: 目前在Spark后台,并没有一个长期挂载的Spark的服务(Spark HiveServer2服务)。导致每次启动Spark客户端,都需要在内部启动一个服务项。这种方式,不适合测试使用,不合适后续的快速开发

​ 如何启动Spark 提供的分布式的执行引擎呢? 这个引擎大家完全可以将其理解为Spark的HiveServer2服务,实际上就是Spark的Thrift服务项

# 注意: 要启动sparkThriftServer2服务,必须要保证先启动好Hadoop以及Hive的metastore,不能启动Hive的hiveserver2服务!
# 启动 hadoop集群
start-all.sh# 启动hive的metastore
nohup /export/server/hive/bin/hive --service metastore &# 最后执行以下命令启动sparkThriftServer2:
/export/server/spark/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000 \
--hiveconf hive.server2.thrift.bind.host=node1 \
--hiveconf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse \
--master local[2]

校验是否成功:
在这里插入图片描述

访问界面:默认4040

在这里插入图片描述

2、beeline连接Thrift服务

启动后,可以通过spark提供beeline的方式连接这个服务。连接后,直接编写SQL即可

相当于模拟了一个Hive的客户端,但是底层执行的是Spark SQL,最终将其转换为Spark RDD的程序

启动命令:/export/server/spark/bin/beeline然后输入:!connect jdbc:hive2://node1:10000继续输入用户名: root
注意密码: 不需要写,直接回车

在这里插入图片描述

3、开发工具连接Thrift服务

如何通过DataGrip或者PyCharm连接Spark进行操作
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4、控制台编写SQL代码

进入以下页面就可以愉快的编写sql了,再也不用担心在spark.sql()中编写没有提示了:)
在这里插入图片描述

五、Spark SQL的运行机制(掌握)

Spark SQL底层依然运行的是Spark RDD的程序,所以说Spark RDD程序的运行的流程,在Spark SQL中依然是存在的,只不过在这个流程的基础上增加了从SQL翻译为RDD的过程

​ Spark SQL的运行机制,其实就是在描述如何将Spark SQL翻译为RDD程序:
在这里插入图片描述

​ 整个Spark SQL 转换为RDD 是基于Catalyst 优化器实施,基于这个优化器即可完成整个转换操作

5.1 Catalyst内部具体的执行流程:

在这里插入图片描述
在这里插入图片描述

大白话:

SQL执行顺序: from->join on->where->groupby->聚合操作->having->select [distinct] ->order by ->limit

1- 接收客户端提交过来的SQL/DSL代码,首先会校验SQL/DSL的语法是否正常。如果通过校验,根据SQL/DSL的执行顺序,生成未解析的逻辑计划,也叫做AST抽象语法树2- 对于AST抽象语法树加入元数据信息,确定一共涉及到哪些字段、字段的数据类型是什么,以及涉及到的表的其他相关元数据信息。加入元数据信息以后,就得到了(已经解析但是未优化的)逻辑计划3- 对(未优化的)逻辑计划执行优化操作,整个优化通过优化器来执行。在优化器匹配相对应的优化规则,实时具体的优化。SparkSQL底层提供了一两百中优化规则,得到优化的逻辑计划。例如: 谓词下推(断言下推)、列值裁剪3.1- 谓词下推: 也叫做断言下推。将数据过滤操作提前到数据扫描的时候执行,减少后续处理数据量,提升效率。3.2- 列值裁剪: 在表中只加载数据分析用到的字段,不相关的字段不加载进来。减少后续处理数据量,提升效率。4- 由于优化规则很多,导致会得到多个优化的逻辑计划。在转换成物理执行计划的过程中,会根据 成本模型(对比每个计划运行的耗时、资源消耗等)得到最优的一个物理执行计划5- 将物理执行计划通过code generation(代码生成器),转变成Spark RDD的代码6- 最后就是将Spark RDD代码部署到集群上运行。后续过程与Spark内核调度中Job的调度流程完全一致。

专业的术语:

1- Spark SQL底层解析是由RBO(基于规则的优化器)和CBO(基于代价的优化器)优化完成的2- RBO是基于规则优化,对于SQL或DSL的语句通过执行引擎得到未执行逻辑计划,在根据元数据得到逻辑计划,之后加入列值裁剪或谓词下推等优化手段形成优化的逻辑计划3- CBO是基于优化的逻辑计划得到多个物理执行计划,根据 代价函数(成本模型) 选择出最优的物理执行计划4- 通过code genaration代码生成器完成RDD的代码构建5- 底层依赖于DAGScheduler和TaskScheduler完成任务计算执行后续过程与Spark内核调度中Job的调度流程完全一致。
  1. 简单来说:SparkSQL的执行流程就像是“从SQL语句到结果的流水线”,通过解析、优化和执行,将SQL查询转化为分布式计算任务,最终返回结果。

  2. 具体而言

    • SQL解析
      • 将SQL语句解析为抽象语法树(AST)。
      • 使用ANTLR工具将AST转换为逻辑计划(Logical Plan)。
    • 逻辑优化
      • 对逻辑计划进行优化,如谓词下推、列剪裁等。
      • 生成优化后的逻辑计划。
    • 物理计划生成
      • 将逻辑计划转换为物理计划(Physical Plan),选择最优的执行策略。
      • 物理计划包括RDD转换、数据源读取等具体操作。
    • 任务调度与执行
      • 将物理计划分解为多个Stage和Task。
      • 通过DAGScheduler和TaskScheduler将Task分配到集群节点上执行。
    • 结果返回
      • 将计算结果返回给客户端,如DataFrame或直接输出。
  3. 实际生产场景

    • 在数据仓库中,使用SparkSQL查询海量数据,生成报表和洞察。
    • 在实时分析中,结合Structured Streaming,使用SparkSQL处理实时数据流。
  4. 总之:SparkSQL的执行流程通过解析、优化和执行,将SQL查询高效地转化为分布式计算任务,为大规模数据处理提供了强大的支持。

为什么 SparkSQL 的执行流程就像是“从 SQL 语句到结果的流水线”?

  1. 流水线:分阶段处理

    • 流水线:将复杂任务分解为多个阶段,每个阶段专注于特定任务。
    • SparkSQL:将SQL查询分解为SQL解析逻辑优化物理计划生成任务调度与执行结果返回等多个阶段,每个阶段完成特定任务。
  2. 高效流转:逐步优化和执行

    • 流水线:每个阶段完成后,数据会流转到下一个阶段,逐步完成最终目标。
    • SparkSQL:SQL语句经过解析、优化、物理计划生成等步骤,逐步转化为分布式计算任务,最终高效执行并返回结果。
  3. 自动化:无需手动干预

    • 流水线:自动化完成每个阶段的任务,无需人工干预。
    • SparkSQL:通过Catalyst优化器和Tungsten引擎,自动优化查询计划并执行,开发者只需关注SQL语句和结果。
  4. 结果导向:最终输出

    • 流水线:最终输出成品。
    • SparkSQL:最终输出查询结果(如DataFrame或报表),为业务决策提供支持。

实际意义

SparkSQL的执行流程就像“从SQL语句到结果的流水线”,通过分阶段、高效流转和自动化的方式,将SQL查询转化为分布式计算任务,最终返回结果,为大规模数据处理提供了强大的支持。

5.2 SparkSQL的执行流程总结:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

01_spark原生自定义UDF函数_返回字符串.py

# 导包
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 创建DF对象df = spark.createDataFrame([(1, "张三", '广州'),(2, "李四", '深圳'),(3, "王五", '上海')], schema=["id", "name", "address"])# 测试是否有数据df.show()# 需求: 自定义函数,功能是给df的所有地址都添加一个后缀'_itheima'# 一.自定义能添加后缀'_itheima'功能的python函数def add_suffix(address):return address + '_itheima'# 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)# 注册方式1: 适用于sql和dsl风格dsl_add_suffix = spark.udf.register('sql_add_suffix', add_suffix, StringType())# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView("stu_tb")spark.sql("""select *,sql_add_suffix(address) as address_newfrom stu_tb""").show()# 方式2: DSL风格# df.select(#     "*",#     dsl_add_suffix("address").alias("address_new")# ).show()# 注意: 最后一定释放资源spark.stop()

在这里插入图片描述

结果

在这里插入图片描述

02_spark原生自定义UDF函数_返回列表.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, ArrayType# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 创建DF对象df = spark.createDataFrame([(1, "张三_广州"),(2, "李四_深圳"),(3, "王五_上海")], schema=["id", "name_address"])# 测试是否有数据df.show()# 需求: 自定义函数# 一.自定义能返回列表的功能的python函数def my_split(name_address):return name_address.split('_')# 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)# 注册方式1: 适用于sql和dsl风格dsl_my_split = spark.udf.register('sql_my_split', my_split, ArrayType(StringType()))# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView("stu_tb")spark.sql("""select *,sql_my_split(name_address)[0] as name,sql_my_split(name_address)[1] as addressfrom stu_tb""").show()# 方式2: DSL风格df.select("*",dsl_my_split("name_address")[0].alias("name"),dsl_my_split("name_address")[1].alias("address")).show()# 注意: 最后一定释放资源spark.stop()

结果

在这里插入图片描述

03_spark原生自定义UDF函数_返回字典.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, ArrayType, StructType# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 创建DF对象df = spark.createDataFrame([(1, "张三_广州"),(2, "李四_深圳"),(3, "王五_上海")], schema=["id", "name_address"])# 测试是否有数据df.show()# 需求: 自定义函数# 一.自定义能返回字典的功能的python函数def my_split(name_address):list1 = name_address.split('_')dict1 = {"name": list1[0], "address": list1[1]}return dict1# 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)# 注册方式1: 适用于sql和dsl风格# 注意: 如果原始函数返回的是字典,就必须用StructType()且字段名必须和原生字典的key值一样,否则null补充t = StructType().add("name", StringType()).add("address", StringType())dsl_my_split = spark.udf.register('sql_my_split', my_split, t)# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView("stu_tb")spark.sql("""select *,sql_my_split(name_address)['name'] as name,sql_my_split(name_address)['address'] as addressfrom stu_tb""").show()# 方式2: DSL风格df.select("*",dsl_my_split("name_address")['name'].alias("name"),dsl_my_split("name_address")['address'].alias("address")).show()# 注意: 最后一定释放资源spark.stop()

结果

在这里插入图片描述

04_sparkSQL和pandas中df对象互转操作.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果想优化createDataFrame()效率可以手动开启arrow设置# TODO: 手动开启arrow设置spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)# 1.先创建sparkSQL的df对象spark_df = spark.createDataFrame([(1, "张三"),(2, "李四"),(3, "王五")], schema=["id", "name"])# 查看数据类型print(type(spark_df))  # <class 'pyspark.sql.dataframe.DataFrame'>spark_df.show()# 2.把saprk_df转换为pandas的df对象pd_df = spark_df.toPandas()# 查看数据类型print(type(pd_df))  # <class 'pandas.core.frame.DataFrame'>print(pd_df)# 3.把pandas的df对象转换为sparkSQL的df对象spark_df2 = spark.createDataFrame(pd_df)# 查看数据类型print(type(spark_df2))  # <class 'pyspark.sql.dataframe.DataFrame'>spark_df2.show()# 注意: 最后一定释放资源spark.stop()

05_spark基于pandas定义udf函数_s到s.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
import pandas as pd
from pyspark.sql.types import DoubleType, IntegerType# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# TODO: 手动开启arrow设置spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)# 创建DF对象df = spark.createDataFrame([(1, 1), (2, 2), (3, 3)], schema=["n1", "n2"])df.show()# 一.自定义python函数# 功能:输入两列,输出对应乘积1列def mul(n1: pd.Series, n2: pd.Series) -> pd.Series:return n1 * n2# 二.把python函数包装成spark的UDF函数(sql和dsl风格)# 注册方式1: 适用于sql和dsl风格dsl_mul = spark.udf.register("sql_mul", mul)# 注册方式2: 仅适用于dsl风格dsl2_mul = F.pandas_udf(mul, IntegerType())# 注册方式3: 仅适用于dsl风格@F.pandas_udf(IntegerType())def candy_mul(n1: pd.Series, n2: pd.Series) -> pd.Series:return n1 * n2# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView("nums_tb")spark.sql("""select n1,n2,sql_mul(n1, n2) as n3 from nums_tb""").show()# 方式2: DSL风格df.select("n1", "n2",dsl_mul("n1", "n2").alias("n3"),dsl2_mul("n1", "n2").alias("n4"),candy_mul("n1", "n2").alias("n5")).show()# 注意: 最后一定释放资源spark.stop()

06_spark基于pandas定义udaf函数_s到标量.py

# 导包
import osimport pandas as pd
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 创建DF对象df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],schema="id int, value float")df.show()# 二.使用语法糖方式注册原始函数为udaf函数@F.pandas_udf("float")# 一.定义原始python函数def candy_my_avg(values: pd.Series) -> float:return values.mean()# 三.使用自定义的udaf函数# dsl方式df.groupby("id").agg(candy_my_avg("value").alias("avg_value")).show()# 如果想用sql方式怎么办?把添加了语法糖的函数,再注册为udaf函数dsl_my_avg = spark.udf.register("sql_my_avg", candy_my_avg)df.createTempView('nums_tb')spark.sql("""select id,sql_my_avg(value) as avg_valuefrom nums_tbgroup by id""").show()# 注意: 最后一定释放资源spark.stop()

07_spark_sql操作数据库.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = (SparkSession.builder.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse").config("hive.metastore.uris", "thrift://node1.itcast.cn:9083").appName("spark_demo").master("local[1]").enableHiveSupport().getOrCreate())spark.sql("""create database if not exists spark_demo2""")spark.sql("""create table if not exists spark_demo2.stu(id int,name string,age int);""")spark.sql("""insert into spark_demo2.stu values(1,'张三',18),(2,'李四',28)""")spark.sql("""select * from  spark_demo2.stu""").show()# 注意: 最后一定释放资源spark.stop()

相关文章:

day07_Spark SQL

文章目录 day07_Spark SQL课程笔记一、今日课程内容二、Spark SQL函数定义&#xff08;掌握&#xff09;1、窗口函数2、自定义函数背景2.1 回顾函数分类标准:SQL最开始是_内置函数&自定义函数_两种 2.2 自定义函数背景 3、Spark原生自定义UDF函数3.1 自定义函数流程&#x…...

高性能现代PHP全栈框架 Spiral

概述 Spiral Framework 诞生于现实世界的软件开发项目是一个现代 PHP 框架&#xff0c;旨在为更快、更清洁、更卓越的软件开发提供动力。 特性 高性能 由于其设计以及复杂精密的应用服务器&#xff0c;Spiral Framework框架在不影响代码质量以及与常用库的兼容性的情况下&a…...

LeetCode - #182 Swift 实现找出重复的电子邮件

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…...

《解锁鸿蒙Next系统人工智能语音助手开发的关键步骤》

在当今数字化时代&#xff0c;鸿蒙Next系统与人工智能的融合为开发者带来了前所未有的机遇&#xff0c;开发一款人工智能语音助手应用更是备受关注。以下是在鸿蒙Next系统上开发人工智能语音助手应用的关键步骤&#xff1a; 环境搭建与权限申请 安装开发工具&#xff1a;首先需…...

【Linux网络编程】数据链路层 | MAC帧 | ARP协议

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站 &#x1f308;个人主页&#xff1a; 南桥几晴秋 &#x1f308;C专栏&#xff1a; 南桥谈C &#x1f308;C语言专栏&#xff1a; C语言学习系…...

《自动驾驶与机器人中的SLAM技术》ch7:基于 ESKF 的松耦合 LIO 系统

目录 基于 ESKF 的松耦合 LIO 系统 1 坐标系说明 2 松耦合 LIO 系统的运动和观测方程 3 松耦合 LIO 系统的数据准备 3.1 CloudConvert 类 3.2 MessageSync 类 4 松耦合 LIO 系统的主要流程 4.1 IMU 静止初始化 4.2 ESKF 之 运动过程——使用 IMU 预测 4.3 使用 IMU 预测位姿进…...

基于spingbott+html+Thymeleaf的24小时智能服务器监控平台设计与实现

博主介绍&#xff1a;硕士研究生&#xff0c;专注于信息化技术领域开发与管理&#xff0c;会使用java、标准c/c等开发语言&#xff0c;以及毕业项目实战✌ 从事基于java BS架构、CS架构、c/c 编程工作近16年&#xff0c;拥有近12年的管理工作经验&#xff0c;拥有较丰富的技术架…...

全栈面试(一)Basic/微服务

文章目录 项目地址一、Basic InterviewQuestions1. tell me about yourself?2. tell me about a time when you had to solve a complex code problem?3. tell me a situation that you persuade someone at work?4. tell me a about a confict with a teammate and how you…...

python安装完成后可以进行的后续步骤和注意事项

安装Python3完成后&#xff0c;你可以开始使用它进行编程和开发。以下是一些安装完成后可以进行的后续步骤和注意事项&#xff1a; 验证安装 检查Python版本&#xff1a; 打开“终端”应用程序。输入python3 --version&#xff0c;应该显示安装的Python3版本号。 检查pip版本…...

[Qt] 窗口 | 菜单栏MenuBar

目录 QMainWindow 概述 一、菜单栏 1、创建菜单栏 2、在菜单栏中添加菜单 3、创建菜单项 4、在菜单项之间添加分割线 5、添加快捷键 6、添加子菜单 7、添加图标 综合示例 QMainWindow 概述 Qt 窗口是通过 QMainWindow 类来实现的。 QMainWindow 是一个为用户 提供主…...

[读书日志]从零开始学习Chisel 第十三篇:Scala的隐式参数与隐式转换(敏捷硬件开发语言Chisel与数字系统设计)

10. 隐式转换与隐式参数 假设编写了一个向量类MyVector&#xff0c;并且包含一些向量的基本操作。因为向量可以与标量做数乘运算&#xff0c;所以需要一个计算数乘的方法“*”&#xff0c;它应该接收一个类型为基本值类的参数&#xff0c;在向量对象myVec调用该方法时&#xf…...

CMake学习笔记(1)

1. CMake概述 CMake 是一个项目构建工具&#xff0c;并且是跨平台的。关于项目构建我们所熟知的还有Makefile&#xff08;通过 make 命令进行项目的构建&#xff09;&#xff0c;大多是IDE软件都集成了make&#xff0c;比如&#xff1a;VS 的 nmake、linux 下的 GNU make、Qt …...

cursor+deepseek构建自己的AI编程助手

文章目录 准备工作在Cursor中添加deepseek 准备工作 下载安装Cursor &#xff08;默认安装在C盘&#xff09; 注册deepseek获取API key 在Cursor中添加deepseek 1、打开cursor&#xff0c;选择设置 选择Model&#xff0c;添加deepseek-chat 注意这里去掉其他的勾选项&…...

Kotlin实现DataBinding结合ViewModel的时候,提示找不到Unresolved reference: BR解决方案

在用Kotlin语言实现DataBinding结合ViewModel的代码的时候&#xff0c;如下所示&#xff1a; class UserModel(private val userName: String, private val userAge: Int) : BaseObservable() {get:Bindablevar name: String userNameset (value) {field valuenotifyPropert…...

java项目启动时,执行某方法

1. J2EE项目 在Servlet类中重写init()方法&#xff0c;这个方法会在Servlet实例化时调用&#xff0c;即项目启动时调用。 import javax.servlet.ServletException; import javax.servlet.http.HttpServlet;public class MyServlet extends HttpServlet {Overridepublic void …...

详解如何自定义 Android Dex VMP 保护壳

版权归作者所有&#xff0c;如有转发&#xff0c;请注明文章出处&#xff1a;https://cyrus-studio.github.io/blog/ 前言 Android Dex VMP&#xff08;Virtual Machine Protection&#xff0c;虚拟机保护&#xff09;壳是一种常见的应用保护技术&#xff0c;主要用于保护 And…...

Grails应用http.server.requests指标数据采集问题排查及解决

问题 遇到的问题&#xff1a;同一个应用&#xff0c;Spring Boot(Java)和Grails(Groovy)混合编程&#xff0c;常规的Spring Controller&#xff0c;可通过Micromete Pushgateway&#xff0c; 采集到http.server.requests指标数据&#xff0c;注意下面的指标名称是点号&#…...

开源临床试验软件OpenClinica的安装

本文是为帮网友 A萤火虫 解决安装问题做的记录&#xff1b; 简介 什么是 OpenClinica &#xff1f; OpenClinica 是世界上第一个商业开源临床试验软件&#xff0c;主要用于电子数据捕获&#xff08;EDC&#xff09;和临床数据管理&#xff08;CDM&#xff09;。它的设计旨在优…...

网络安全 | 网络安全法规:GDPR、CCPA与中国网络安全法

网络安全 | 网络安全法规&#xff1a;GDPR、CCPA与中国网络安全法 一、前言二、欧盟《通用数据保护条例》&#xff08;GDPR&#xff09;2.1 背景2.2 主要内容2.3 特点2.4 实施效果与影响 三、美国《加利福尼亚州消费者隐私法案》&#xff08;CCPA&#xff09;3.1 背景3.2 主要内…...

深入学习 Python 爬虫:从基础到实战

深入学习 Python 爬虫&#xff1a;从基础到实战 前言 Python 爬虫是一个强大的工具&#xff0c;可以帮助你从互联网上抓取各种数据。无论你是数据分析师、机器学习工程师&#xff0c;还是对网络数据感兴趣的开发者&#xff0c;爬虫都是一个非常实用的技能。在本文中&#xff…...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

Psychopy音频的使用

Psychopy音频的使用 本文主要解决以下问题&#xff1a; 指定音频引擎与设备&#xff1b;播放音频文件 本文所使用的环境&#xff1a; Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)

文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机

这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机&#xff0c;因为在使用过程中发现 Airsim 对外部监控相机的描述模糊&#xff0c;而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置&#xff0c;最后在源码示例中找到了&#xff0c;所以感…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

python爬虫——气象数据爬取

一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用&#xff1a; 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests&#xff1a;发送 …...

医疗AI模型可解释性编程研究:基于SHAP、LIME与Anchor

1 医疗树模型与可解释人工智能基础 医疗领域的人工智能应用正迅速从理论研究转向临床实践,在这一过程中,模型可解释性已成为确保AI系统被医疗专业人员接受和信任的关键因素。基于树模型的集成算法(如RandomForest、XGBoost、LightGBM)因其卓越的预测性能和相对良好的解释性…...

echarts使用graphic强行给图增加一个边框(边框根据自己的图形大小设置)- 适用于无法使用dom的样式

pdf-lib https://blog.csdn.net/Shi_haoliu/article/details/148157624?spm1001.2014.3001.5501 为了完成在pdf中导出echarts图&#xff0c;如果边框加在dom上面&#xff0c;pdf-lib导出svg的时候并不会导出边框&#xff0c;所以只能在echarts图上面加边框 grid的边框是在图里…...

写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里

写一个shell脚本&#xff0c;把局域网内&#xff0c;把能ping通的IP和不能ping通的IP分类&#xff0c;并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...