spark代码
RDD
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
该系总共有多少学生;
val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val par = lines.map(row=>row.split(",")(0))
val distinct_par = par.distinct() //去重操作
distinct_par.count //取得总数
该系共开设来多少门课程;
val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val par = lines.map(row=>row.split(",")(1))
val distinct_par = par.distinct()
distinct_par.count
Tom 同学的总成绩平均分是多少;
val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val pare = lines.filter(row=>row.split(",")(0)=="Tom")
pare.foreach(println)
Tom,DataBase,26
Tom,Algorithm,12
Tom,OperatingSystem,16
Tom,Python,40
Tom,Software,60
pare.map(row=>(row.split(",")(0),row.split(",")(2).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y0) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
//res9: Array[(String, Int)] = Array((Tom,30))
求每名同学的选修的课程门数
val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val pare = lines.map(row=>(row.split(",")(0),row.split(",")(1)))
pare.mapValues(x => (x,1)).reduceByKey((x,y) => (" ",x._2 + y._2)).mapValues(x => x._2).foreach(println)
各门课程的平均分是多少;
val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val pare = lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt))
pare.mapValues(x=>(x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
res0: Array[(String, Int)] = Array((Python,57), (OperatingSystem,54), (CLanguage,50),
使用累加器计算共有多少人选了 DataBase 这门课
val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
val pare = lines.filter(row=>row.split(",")(1)=="DataBase").map(row=>(row.split(",")(1),1))
val accum = sc.longAccumulator("My Accumulator")
pare.values.foreach(x => accum.add(x))
accum.value
res19: Long = 126
DateFrame
源文件内容如下(包含 id,name,age),将数据复制保存到 ubuntu 系统/usr/local/spark 下, 命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按 id:1,name:Ella,age:36 的格式 打印出 DataFrame 的所有数据。请写出程序代码。
1,Ella,36
2,Bob,29
3,Jack,29
方法一:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ object RDDtoDF { def main(args: Array[String]) { case class Employee(id:Long,name: String, age: Long) val employeeDF = spark.sparkContext.textFile("file:///usr/local/spark/employee.txt").map(_.split(",")).map(attributes => Employee(attributes(0).trim.toInt,attributes(1), attributes(2).trim.toInt)).toDF() employeeDF.createOrReplaceTempView("employee") val employeeRDD = spark.sql("select id,name,age from employee") employeeRDD.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() }
}
方法二:
import org.apache.spark.sql.types._import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Row object RDDtoDF { def main(args: Array[String]) { val employeeRDD =
spark.sparkContext.textFile("file:///usr/local/spark/employee.txt")
val schemaString = "id name age"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, nullable = true))
val schema = StructType(fields)
val rowRDD = employeeRDD.map(_.split(",")).map(attributes =>
Row(attributes(0).trim, attributes(1), attributes(2).trim))
val employeeDF = spark.createDataFrame(rowRDD, schema)
employeeDF.createOrReplaceTempView("employee")
val results = spark.sql("SELECT id,name,age FROM employee")
results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() }
}
{ “id”:1 ,“name”:" Ella",“age”:36 }
{ “id”:2,“name”:“Bob”,“age”:29 }
{ “id”:3 ,“name”:“Jack”,“age”:29 }
{ “id”:4 ,“name”:“Jim”,“age”:28 }
{ “id”:5 ,“name”:“Damon” }
{ “id”:5 ,“name”:“Damon” }
创建 DataFrame
scala> import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().getOrCreate()
scala> import spark.implicits._
scala> val df = spark.read.json("file:///usr/local/spark/employee.json")
(1) 查询 DataFrame 的所有数据
答案:
scala> df.show()
(2) 查询所有数据,并去除重复的数据
答案:
scala> df.distinct().show()
(3) 查询所有数据,打印时去除 id 字段
答案:
scala> df.drop("id").show()
(4) 筛选 age>20 的记录
答案:
scala> df.filter(df("age") > 30 ).show()
(5) 将数据按 name 分组
答案:
scala> df.groupBy("name").count().show()
(6) 将数据按 name 升序排列
答案:
scala> df.sort(df("name").asc).show()
(7) 取出前 3 行数据
答案:
scala> df.take(3) 或 scala> df.head(3)
(8) 查询所有记录的 name 列,并为其取别名为 username
答案:
scala> df.select(df("name").as("username")).show()
(9) 查询年龄 age 的平均值
答案:
scala> df.agg("age"->"avg")
(10) 查询年龄 age 的最小值
答案:
scala> df.agg("age"->"min")
编程实现利用 DataFrame 读写 MySQL 的数据
(1) 在 MySQL 数据库中新建数据库 sparktest,再建表 employee,包含下列两行数据;
表 1 employee 表原有数据
id name gender age
1 Alice F 22 2 John M 25
假设当前目录为/usr/local/spark/mycode/testmysql,在当前目录下新建一个目录 mkdir -p src/main/scala , 然 后 在 目 录 /usr/local/spark/mycode/testmysql/src/main/scala 下 新 建 一 个testmysql.scala
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row object TestMySQL { def main(args: Array[String]) { val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3).toInt))
val employeeDF = spark.createDataFrame(rowRDD, schema)
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "hadoop")
prop.put("driver","com.mysql.jdbc.Driver")
employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest",
sparktest.employee", prop)
val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "hadoop").load()
jdbcDF.agg("age" -> "max", "age" -> "sum") }
}
启动spark-shell
cd /usr/local/spark
./bin/spark-shell
退出spark-shell
:quit
集群里安装Spark
sudo tar -zxf ~/下载/spark-2.1.0-bin-without-hadoop.tgz -C /usr/local
cd /usr/local
sudo mv ./spark-2.1.0-bin-without-hadoop ./spark
sudo chow -R hadoop:hadoop ./spark
RDD转换操作API
| 操作 | 含义 |
|---|---|
| filter(func) | 帅选出满足函数func的元素,并返回一个新的数据集 |
| map(func) | 将每个元素传入到函数func中,并将结构返回一个新的数据集 |
| flatMap(func) | 与上类似,但每个元素都可以映射到0个或多个输出结果 |
| groupByKey() | 应用于(K,V)键值对数据集时,返回一个(K,Iterable)形式的数据集 |
| reduceByKey(func) | 应用于(K,V)键值对数据集时,返回一个新的(K,V)形式的数据集,V是每个Key传递到函数func中的聚合后的结果。 |
RDD行动操作API
| 操作 | 含义 |
|---|---|
| count() | 返回数据集中的元素个数 |
| collect() | 以数组的形式返回数据集中的所有元素 |
| first() | 返回数据集中的第一个元素 |
| take(n) | 以数据的形式返回数据集中的前n个元素 |
| reduce(func) | 通过函数func聚合数据集中的元素 |
| foreache(func) | 将数据集中的每个元素传递到函数func中运行 |
相关文章:
spark代码
RDD Tom,DataBase,80 Tom,Algorithm,50 Tom,DataStructure,60 Jim,DataBase,90 Jim,Algorithm,60 Jim,DataStructure,80 该系总共有多少学生; val lines sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") val par lines.map(ro…...
利用OpenCV的函数equalizeHist()对图像作直方图均衡化处理
如果一幅图像的灰度值集中在某个比较窄的区域,则图像的对比度会显得比较小,不便于对图像的分析和处理。 图像的直方图均衡化可以实现将原图像的灰度值范围扩大,这样图像的对比度就得到了提高,从而方便对图像进行后续的分析和处理…...
星河智联Android开发
背景:朋友内推,过了一周约面。本人 2019年毕业 20230208一面 1.自我介绍 2.为啥换工作 3.项目经历(中控面板、智能音箱、语音问的比较细) 4.问题 Handler机制原理?了解同步和异步消息吗?View事件分发…...
【C++】关联式容器——map和set的使用
文章目录一、关联式容器二、键值对三、树形结构的关联式容器1.set2.multiset3.map4.multimap四、题目练习一、关联式容器 序列式容器📕:已经接触过STL中的部分容器,比如:vector、list、deque、forward_list(C11)等,这些容器统称为…...
Promise的实现原理
作用:异步问题同步化解决方案,解决回调地狱、链式操作原理: 状态:pending、fufilled reject构造函数传入一个函数,resolve进入then,reject进入catch静态方法:resolve reject all any react ne…...
【MFC】数据库操作——ODBC(20)
ODBC:开放式数据库连接,是为解决异构数据库(不同数据库采用的数据存储方法不同)共享而产生的。ODBC API相对来说非常复杂,这里介绍MFC的ODBC类。 添加ODBC用户DSN 首先,在计算机中添加用户DSN:(WIN10下&a…...
旺店通与金蝶云星空对接集成采购入库单接口
旺店通旗舰奇门与金蝶云星空对接集成采购入库单查询连通销售退货新增V1(12-采购入库单集成方案-P)数据源系统:旺店通旗舰奇门旺店通是北京掌上先机网络科技有限公司旗下品牌,国内的零售云服务提供商,基于云计算SaaS服务模式,以体系化解决方案…...
Linux基础-学会使用命令帮助
概述使用 whatis使用 man查看命令程序路径 which总结参考资料概述Linux 命令及其参数繁多,大多数人都是无法记住全部功能和具体参数意思的。在 linux 终端,面对命令不知道怎么用,或不记得命令的拼写及参数时,我们需要求助于系统的…...
MyBatis 之四(动态SQL之 if、trim、where、set、foreach 标签)
文章目录动态 SQL1. if 标签2. trim 标签3. where 标签4. set 标签5. foreach 标签回顾一下,在上一篇 MyBatis 之三(查询操作 占位符#{} 与 ${}、like查询、resultMap、association、collection)中,学习了针对查询操作的相关知识点…...
PAT (Advanced Level) Practice 1006 Sign In and Sign Out
1006 Sign In and Sign Out题目翻译代码分数 25作者 CHEN, Yue单位 浙江大学At the beginning of every day, the first person who signs in the computer room will unlock the door, and the last one who signs out will lock the door. Given the records of signing in’…...
Android入门第64天-MVVM下瀑布流界面的完美实现-使用RecyclerView
前言 网上充满着不完善的基于RecyclerView的瀑布流实现,要么根本是错的、要么就是只知其一不知其二、要么就是一充诉了一堆无用代码、要么用的是古老的MVC设计模式。 一个真正的、用户体验类似于淘宝、抖音的瀑布流怎么实现目前基本为无解。因为本人正好自己空闲时也…...
Windows PowerShell中成功进入conda虚拟环境
本人操作系统是Windows10(输入命令cmd或在运运行中输入winver查看)在cmd命令行中大家都很熟悉,很方便进入到指定创建了的虚拟环境中,那么在PowerShell中怎么进入呢?比如在VSCode中的TERMINAL使用的是PowerShell&#x…...
【C++】类与对象理解和学习(中)
专栏放在【C知识总结】,会持续更新,期待支持🌹六大默认成员函数前言每个类中都含有六大默认成员函数,也就是说,即使这个类是个空类,里面什么都没有写,但是编译器依然会自动生成六个默认成员函数…...
每日英语学习(11)大英复习单词和翻译
2023.2.20 单词 1.contemplate 思考、沉思 2.spark 激起 3.venture 冒险 4.stunning 极好的 5.dictate 影响 6.diplomatic 外交的 7.vicious 恶性的 8.premier 首要的 9.endeavor 努力 10.bypass 绕过 11.handicaps 不利因素 12.vulnerable 脆弱的 13.temperament 气质、性格…...
x79主板M.2无法识别固态硬盘
问题描述: 这几天在装电脑,买了块M.2接口固态硬盘。装上去始终无法读取到硬盘,一开始以为是寨板Bios问题不支持M.2的设备。更新了最新的BIOS然后还是没有识别出来,然而将日常用的电脑PM510硬盘装上发现可以识别,而且日常用电脑也…...
配置Tomcat性能优化
配置Tomcat性能优化 📒博客主页: 微笑的段嘉许博客主页 💻微信公众号:微笑的段嘉许 🎉欢迎关注🔎点赞👍收藏⭐留言📝 📌本文由微笑的段嘉许原创! Ǵ…...
Hive3 安装方式详解,datagrid自定义驱动连接hive
1 Hive的安装方式 hive的安装一共有三种方式:内嵌模式、本地模式、远程模式。 元数据服务(metastore)作用是:客户端连接metastore服务,metastore再去连接MySQL数据库来存取元数据。有了metastore服务,就可以有多个客户端同时连接…...
约束优化:约束优化的三种序列无约束优化方法(罚函数法)
文章目录约束优化:约束优化的三种序列无约束优化方法(罚函数法)外点罚函数法L2-罚函数法:非精确算法对于等式约束对于不等式约束L1-罚函数法:精确算法内点罚函数法:障碍函数法参考文献约束优化:…...
你真的会做APP UI自动化测试吗?我敢打赌百分之九十的人都不知道这个思路
目录 前言 一,开发语言选择 二,UI测试框架选择 1,Appium 2,Airtest 3,选择框架 三,单元测试框架选择 四,测试环境搭建 1,测试电脑选择 2,测试手机选择 3&#…...
GIT:【基础三】Git工作核心原理
目录 一、Git本地四个工作区域 二、Git提交文件流程 一、Git本地四个工作区域 工作目录(Working Directory):电脑上存放开发代码的地方。暂存区(Stage/Index):用于l临时存放改动的文件,本质上只是一个文件,保存即将提交到文件列…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...
Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配
AI3D视觉的工业赋能者 迁移科技成立于2017年,作为行业领先的3D工业相机及视觉系统供应商,累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成,通过稳定、易用、高回报的AI3D视觉系统,为汽车、新能源、金属制造等行…...
c#开发AI模型对话
AI模型 前面已经介绍了一般AI模型本地部署,直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型,但是目前国内可能使用不多,至少实践例子很少看见。开发训练模型就不介绍了&am…...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...
蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...
