20210127_spark学习笔记

03
六月
2021

一、部分理论
spark:由Scala语言开发的快速、通用、可扩展的基于内存的大数据分析引擎。在mapreduce上进行了优化,但没mapreduce稳定。

Spark Core是spark平台的基础通用执行引擎,所有其他功能都是基于。它在外部存储系统中提供内存计算和引用数据集。spark最基础的最核心的功能
Spark SQL是Spark Core之上的一个组件,它引入了一个称为SchemaRDD的新数据抽象,它为结构化和半结构化数据提供支持。
Spark Streaming利用Spark Core的快速调度功能来执行流式分析。它以小批量获取数据,并对这些小批量的数据执行RDD(弹性分布式数据集)转换。针对实时数据进行流式计算的组件,提供了丰富的api
Spark MLlib是Spark之上的分布式机器学习框架,因为基于分布式内存的Spark架构。根据基准,它是由MLlib开发人员针对交替最小二乘法(ALS)实现完成的。 Spark MLlib是基于Hadoop磁盘的Apache Mahout版本的9倍(在Mahout获得了Spark接口之前)。
Spark GraphX是Spark上的一个分布式图形处理框架。它提供了一个用于表达图形计算的API,可以通过使用Pregel抽象API为用户定义的图形建模。它还为此抽象提供了一个优化的运行时。
RDD:弹性分布式数据集(Resiliennt Distributed Datasets)是Spark的基本数据结构。它是一个不可变的分布式对象集合。 RDD中的每个数据集划分为逻辑分区,可以在集群的不同节点上计算。 RDD可以包含任何类型的Python,Java或Scala对象,包括用户定义的类。
使用Spark RDD进行数据共享,由于复制,序列化和磁盘IO,MapReduce中的数据共享速度很慢。大多数Hadoop应用程序,他们花费90%以上的时间做HDFS读写操作。
它支持内存中处理计算。它将存储器的状态存储为作业中的对象,并且对象可以在这些作业之间共享。内存中的数据共享比网络和磁盘快10到100倍。

Driver:驱动器节点,用于执行spark中的main方法
executor:执行任务,返回任务结果给driver。工作节点
master & worker:资源的管理和调度 (和yarn的resourcemanager,nodemanager相似)
applicationMaster:解耦合resourcemanager和Driver
客户提交任务 ==> driver ==> 向applicationMaster申请资源 ==>  master执行

RDD、DataFrame、Dataset 全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
RDD一般和spark mlib同时使用,RDD不支持sparksql操作
DataFrame是一个分布式数据集合,它被组织成命名列。从概念上讲,它相当于具有良好优化技术的关系表。以RDD为基础
Dataset优化DataFrame,提供更多api处理模式

RDD >> DataFrame(性能优化,数据结构化,增加列名) >> Dataset(提供结构化数据处理API,增加列名及对应类型)   #三种模式可以相互转换,提供了这三种类的相互转换方法


二、下载与安装
1.官网下载 https://archive.apache.org/dist/ 
         192.168.100.101    192.168.100.102   192.168.100.103
spark     worker(master)   worker(master HA)      worker
             ZOOKEEPER         ZOOKEEPER          ZOOKEEPER

2.解压文件 tar -xzvf spark-2.4.7-bin-hadoop2.7.tgz 并重命名
mv /root/spark-2.4.7-bin-hadoop2.7 /root/spark-2.4.7

mv /root/spark-2.4.7/conf/slaves.template /root/spark-2.4.7/conf/slaves
vim /root/spark-2.4.7/conf/slaves
master
s1
s2

mv /root/spark-2.4.7/conf/spark-env.sh.template /root/spark-2.4.7/conf/spark-env.sh
vim /root/spark-2.4.7/conf/spark-env.sh
export JAVA_HOME=/root/jdk1.8.0_271

#SPARK_MASTER_HOST=master  #不是高可用只配置一台机器名
#SPARK_MASTER_PORT=7077    #不是高可用只配置一个端口

SPARK_MASTER_WEBUI_PORT=8989
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=master,s1,s2
-Dspark.deploy.zookeeper.dir=/spark"

export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://master:9000/directory
-Dspark.history.retainedApplications=30"

YARN_CONF_DIR=/root/hadoop-2.10.1/etc/hadoop  #资源调度给yarn去做

mv /root/spark-2.4.7/conf/spark-defaults.conf.template /root/spark-2.4.7/conf/spark-defaults.conf
vim /root/spark-2.4.7/conf/spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://master:9000/directory
spark.yarn.historyServer.address=master:18080   #让yarn和18080的运行情况关联
spark.history.ui.port=18080

启动hadoop集群
start-dfs.sh
hadoop中创建spark目录
hadoop fs -mkdir /directory
3.分发文件
scp -r /root/spark-2.4.7 root@192.168.100.102:/root/
scp -r /root/spark-2.4.7 root@192.168.100.103:/root/
4.启动spark
/root/spark-2.4.7/sbin/start-all.sh  可以通过网址访问 http://192.168.100.101:8080/
/root/spark-2.4.7/sbin/start-history-server.sh 可以通过网址访问 http://192.168.100.101:18080/
5.测试任务
/root/spark-2.4.7/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 /root/spark-2.4.7/examples/jars/spark-examples_2.11-2.4.7.jar 10

6.启动高可用另外一台机器的master,在s1机器启动master
/root/spark-2.4.7/sbin/start-master.sh   #可以通过网址访问 http://192.168.100.102:8989/

7.测试任务
/root/spark-2.4.7/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077,s1:7077 /root/spark-2.4.7/examples/jars/spark-examples_2.11-2.4.7.jar 2

8.yarn调取测试   任务在hadoop内部执行,是cluster模式
/root/spark-2.4.7/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster /root/spark-2.4.7/examples/jars/spark-examples_2.11-2.4.7.jar 2

8.yarn调取客户端模式测试  任务在客户端执行,是client模式  生产不用,测试用
/root/spark-2.4.7/bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client /root/spark-2.4.7/examples/jars/spark-examples_2.11-2.4.7.jar 4


任务参数说明
--class:spark程序主函数的类
--master:运行的spark环境
--executor-memory 1G 指定可用内存
--total-executor-cores 2 指定所有执行者可用cpu核数
--executor-cores 指定每个执行者使用cpu核数
--num-executors:配置executor数量
application-jar 写出依赖的jar包
application-argment 命令行参数

local:本地模式,解压软件后就可以使用
standalone:独立部署模式,实现了master-slave模式
HA:高可用模式,上面安装模式就是高可用

三、Spark命令学习
执行 spark-shell 进入Scale命令模式。 Scale区分大小写
scala> spark
res8: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@27cef157
scala> val df=spark.read.json("F:/BaiduNetdiskDownload/spark-2.4.7-bin-hadoop2.7/input/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

scala> df.show
+---+--------+
|age|username|
+---+--------+
| 30|zhangsan|
| 20|    lisi|
| 40|  wangwu|
+---+--------+

scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- username: string (nullable = true)
 
df.createTempView("user")   #df.createOrReplaceTempView("user")


df.createOrReplaceGlobalTempView("emp") #跨session访问
spark.newSession.sql("select * from global_tmp.emp").show  #新session访问全局临时视图

df.select("age").show
scala> df.select($"age" + 1).show
+---------+
|(age + 1)|
+---------+
|       31|
|       21|
|       41|
+---------+

scala> val df=spark.read.load("F:/BaiduNetdiskDownload/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/users.parquet")  #默认加载的是 parquet格式
df.show
df.write.format("json").mode("overwrite").save("/root/study")
df.read.format("csv").option("set",";").option("header","true").load("/root/study/aaa.csv")
scala> val df = spark.read.format(source = "jdbc").option("url","jdbc:mysql://10.246.9.3:3306/test?characterEncoding=UTF-8&&serverTimezone=GMT").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","yehaver").option("dbtable","my_test").load()
spark.read.format(source = "jdbc").option("url","jdbc:mysql://10.246.9.3:3306/test?characterEncoding=UTF-8&&serverTimezone=GMT").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","yehaver").option("dbtable","my_test").load().show
df.write.format("json").mode("overwrite").save("F:/BaiduNetdiskDownload/test.sjon")  #将数据库取的数据保存到文件

df.write.format(source = "jdbc").option("url","jdbc:mysql://10.246.9.3:3306/test?characterEncoding=UTF-8&&serverTimezone=GMT").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","yehaver").option("dbtable","my_test_new").save()

spark.sql("select * from json.`data/user/json`").show
spark.sql("create table my_table(id int)")
spark.sql("load data local inpath '/root/study/id.txt' into table my_table")
spark.sql("show tables").show
spark.sql("select * from user").show

涉及问题
java.io.IOException: (null) entry in command string: null chmod 0644
下载hadoop.dll文,并拷贝到c:\windows\system32目录中
error: unclosed character literal

加载和读取数据两种主要模式,默认读取和写的文件格式是 parquet。
spark.read.load  #中间可以加 format,option.mode,SaveMode等选项
spark.write.save  #中间可以加 format,option.mode,SaveMode等选项

四、spark配置外部HIVE
配置前需要安装好 hadoop并启动,zookeeper并启动,hive配置好  
1.拷贝hive的 hive-site.xml 和 mysql-connector-java-8.0.22.jar 文件到 spark目录
cp /root/hive-3.1.2/conf/hive-site.xml /root/spark-2.4.7/conf/hive-site.xml
cp /root/hive-3.1.2/lib/mysql-connector-java-8.0.22.jar /root/spark-2.4.7/jars/mysql-connector-java-8.0.22.jar
2.重新进入 spark-shell  #spark前面服务可以不用启动,但节点处理
3.推送spark
:quit
4.直接使用spark-sql进入命令行,以后可直接写sql
show tables;
select * from my_table;  

涉及问题
ERROR metastore.ObjectStore: Version information found in metastore differs 3.1.0 from expected schema version 1.2.0. 
在 /root/hive-3.1.2/conf/hive-site.xml 中增加如下配置
        </property>
        <property>
        <name>hive.metastore.uris</name>
        <value></value>
        </property>
或者 关闭版本验证
<property>
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
</property>

五、通过客户端访问spark
1.启动服务
/root/spark-2.4.7/sbin/start-thriftserver.sh   

2.进入beeline客户端模式,也可以在windows机器使用命令链接
./bin/beeline -u jdbc:hive2://master:10000 -n root -p root
0: jdbc:hive2://master:10000> show tables;

3.关闭服务 
/root/spark-2.4.7/sbin/stop-thriftserver.sh   


六、spark Streaming;准实时(秒或者分钟),微批量数据处理框架
流式数据处理:来一条数据处理一条,流水试
批量数据处理: 等来一批数据后才处理

实时数据处理:毫秒级别
离线数据处理:小时或者天

DStream:随时间推移(一段周期)采集数据

TAG

网友评论

共有访客发表了评论
请登录后再发布评论,和谐社会,请文明发言,谢谢合作! 立即登录 注册会员