当前位置: 首页 > news >正文

spark的使用

spark的使用

spark是一款分布式的计算框架,用于调度成百上千的服务器集群。

安装pyspark

# os.environ['PYSPARK_PYTHON']='解析器路径' pyspark_python配置解析器路径
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
pip install pyspark # 原始国外安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark  #网址安装

java安装

前置安装软件java包
java官网下载地址
一键下一步安装,配置环境变量
首先创建一个JAVA_HOME的全局变量然后在path中通过%%引入执行下面的bin 路径%JAVA_HOME%\bin

在这里插入图片描述
在这里插入图片描述
执行成功

from pyspark import SparkConf,SparkContext# 创建sparkConf 类对象
conf= SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc =SparkContext(conf=conf)
# 打印pySpark的运行脚本
print(sc.version)
# 停止sparkContext对象的运行(停止pySpark程序)
sc.stop()

PySpark的数据计算,都是基于RDD对象来进行的,RDD对象内置丰富的:成员方法(算子)

map算子

功能:map算子,是将RDD的数据一条条处理,处理的逻辑基于map算子中接收的处理函数,返回新的RDD语法:
在这里插入图片描述

# 简单执行map将数据乘以10返回,如果不引入python解析器的路径引入就会报错,
from pyspark import SparkConf, SparkContext
# 指定spark的python解析器路径
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
# 创建sparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])def func(data):return data * 10# map传入一个参数有返回值,是函数或者是值
rdd2 = rdd.map(func)
print(rdd2.collect())

在这里插入图片描述

flatMap

flatMapmap差不多就是在最后做了一个解除嵌套的功能

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
# 创建sparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(['中石科技 时间还复活甲 如今房价','慰问金 咖啡机 姐夫哥','格很高 客服管家二恶烷 可归结为'])rdd2 = rdd.flatMap(lambda x:x.split(' '))print(rdd2.collect())

在这里插入图片描述
map的结果
在这里插入图片描述

reduceByKey

reduceByKey对数据进行分组可以两两计算

from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:/dev/python/python3.11.4/python.exe"
# 创建sparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('男', 11), ('男', 22), ('女', 21), ('男', 31), ('女', 99)])
# 把男女进行分组value值进行计算
rdd2 = rdd.reduceByKey(lambda a, b:a+b)print(rdd2.collect()) # [('女', 120), ('男', 64)]

reduce

与reduce的区别就是没有进行分组

take

取出前几个数据

...
rdd = sc.parallelize([1,2,3,4,5]).take(3)  # [1,2,3]

count

计算rdd中的数据个数

filter

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='D:/dev/python/python3.11.4/python.exe'conf=SparkConf().setMaster('local[*]').setAppName('test_spark')
sc=SparkContext(conf=conf)rdd=sc.parallelize([1,2,3,4,5])rdd2=rdd.filter(lambda a:a%2==0) 
print(rdd2.collect()) # [2,4]

distinct

进行数据去重

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='D:/dev/python/python3.11.4/python.exe'conf=SparkConf().setMaster('local[*]').setAppName('test_spark')
sc=SparkContext(conf=conf)add= sc.parallelize([1,2,3,4,5,6,73,3,2,4,56,3,5])add2=add.distinct()
print(add2.collect()) # [56, 1, 73, 2, 3, 4, 5, 6]

sortBy排序

from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python3.11.4/python.exe'conf = SparkConf().setMaster('local[*]').setAppName('test_spark')
sc = SparkContext(conf=conf)add = sc.textFile('D:/wordText.txt')word_rdd = add.flatMap(lambda x: x.split(' '))
word_with_rdd = word_rdd.map(lambda word: (word, 1))
result_rdd =word_with_rdd.reduceByKey(lambda a,b:a+b)
result_num=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1) # 1.根据什么排序,2.True 升序 False降序 3.分布式分区
print(result_num.collect())

collect

将rdd内容变成list,从而就可以打印出来

spark写入文件

首先安装

  • 下载Hadoop安装包Hadoop安装包
  • 然后把hadoop.dll放入指定文件夹内

在这里插入图片描述

os.environ['HADOOP_HOME']='D:/dev/hadoop/hadoopjob3.0'
conf = SparkConf().setMaster('local[*]').setAppName('test_spark')
sc = SparkContext(conf=conf)rdd2=sc.parallelize([[1,3,5],[6,7,9]])
rdd2.saveAsTextFile('D:/output1')

在这里插入图片描述

这样创建出来的文件就有16个分区,因为我的是16内核
如果想要在一个分区就要设置参数

import ...
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python3.11.4/python.exe'
os.environ['HADOOP_HOME']='D:/dev/hadoop/hadoopjob3.0'
conf = SparkConf().setMaster('local[*]').setAppName('test_spark')
# 第一种
conf.set("spark.default.parallelism",'1') # 设置一个分区
sc = SparkContext(conf=conf)# rdd2=sc.parallelize([[1,3,5],[6,7,9]])
# 第二种设置一个分区
rdd2=sc.parallelize([[1,3,5],[6,7,9]],1) # numSlices=1  参数可以不写直接传1
rdd2.saveAsTextFile('D:/output1')

相关文章:

spark的使用

spark的使用 spark是一款分布式的计算框架,用于调度成百上千的服务器集群。 安装pyspark # os.environ[PYSPARK_PYTHON]解析器路径 pyspark_python配置解析器路径 import os os.environ[PYSPARK_PYTHON]"D:/dev/python/python3.11.4/python.exe"pip inst…...

力扣:66. 加一(Python3)

题目: 给定一个由 整数 组成的 非空 数组所表示的非负整数,在该数的基础上加一。 最高位数字存放在数组的首位, 数组中每个元素只存储单个数字。 你可以假设除了整数 0 之外,这个整数不会以零开头。 来源:力扣&#xf…...

Go的标准库Context理解

作为一个才入门的菜鸟,还没写过真正的 go 项目,要理解这个 Context 还是有点难,不过还是要尝试一下。在 Go http包的Server中,每一个请求在都有一个对应的 goroutine 去处理。请求处理函数通常会启动额外的 goroutine 用来访问后端…...

Nuxt3_1_路由+页面+组件+资源+样式 使用及实例

1、 简介 1.1 开发必备 node版本 v16.10.0 我使用的是16.14.0编辑器推荐使用Volar Extension 的VS code插件Terminal 运行nuxt指令 1.2 环境搭建 安装项目: npx nuxilatest init [first_nuxt3]进入项目目录: cd [first_nuxt3]安装依赖:n…...

Kafka第三课

Flume 由三部分 Source Channel Sink 可以通过配置拦截器和Channel选择器,来实现对数据的分流, 可以通过对channel的2个存储容量的的设置,来实现对流速的控制 Kafka 同样由三大部分组成 生产者 服务器 消费者 生产者负责发送数据给服务器 服务器存储数据 消费者通过从服务器取…...

elasticsearch修改es集群的索引副本数量

前言 最近es集群进行调整,从2节点变成了单节点。所以需要将集群模式改为单点模式,并需要将es 集群的全部索引副本个数改为0,不然会有很多未分配的分片,导致集群状态为 yellow。 具体实践 1. 先将现有的index的副本数量为0个 此…...

【SpringCloud】Ribbon定制化配置

文章目录 使用Ribbon自带负载均衡算法添加负载均衡算法ConfigurationRestTemplate使用上面负载均衡算法 自定义负载均衡算法负载均衡算法实现RestTemplate在Controller中使用该负载均衡算法ServiceIInstance解释 使用Ribbon自带负载均衡算法 添加负载均衡算法Configuration /…...

Mac terminal 每次打开都要重新配置文件

1. 问题描述 每次打开 Terminal,base_profile文件中配置的内容就不生效,需要重新执行source ~/.bash_profile才可以使用。 2. 原因分析 zsh加载的是~/.zshrc文件,而.zshrc 文件中并没有定义任务环境变量。 3. 解决办法 在~/.zshrc文件末尾添…...

el-button实现按钮,鼠标移入显示,移出隐藏

2023.8.18今天我学习了 如何实现鼠标移入显示按钮,鼠标移出隐藏按钮。 效果如图: 鼠标移入时: 鼠标移出时: mouseover //鼠标移入事件 mouseleave //鼠标移出事件 原本我是想直接在el-button写入这两个方法,但是elem…...

uniapp+uview封装小程序请求

提要: uniapp项目引入uview库 此步骤不再阐述 1.创建环境文件 env.js: let BASE_URL;if (process.env.NODE_ENV development) {// 开发环境BASE_URL 请求地址; } else {// 生产环境BASE_URL 请求地址; }export default BASE_URL; 2.创建请求文件 该…...

idea常见错误大全之:解决全局搜索失效+搜索条件失效(条件为空)+F8失灵

问题一:全局搜索快捷键ctrlshiftf 突然失灵了,键盘敲烂了 都没反应,这是为什么呢? 肯定不是idea本身的原因,那么就是其它外在因素影响到了idea的快捷键,那么其它的快捷键为什么没失效呢,原因只有…...

【论文阅读】基于深度学习的时序预测——LTSF-Linear

系列文章链接 论文一:2020 Informer:长时序数据预测 论文二:2021 Autoformer:长序列数据预测 论文三:2022 FEDformer:长序列数据预测 论文四:2022 Non-Stationary Transformers:非平…...

02.FFMPEG的安装和添加硬件加速自编译

说一个极其郁闷的事情,就在昨天收到3399的一块板子后,往电脑上面一插,然后悲剧的事情就发生了,我的电脑蓝屏重启了,这下好了,我写到一半的帖子也不见了,我的SSH里面的记录全部消失了&#xff0c…...

elementUI 的上传组件<el-upload>,自定义上传按钮样式

方法一&#xff1a; 原理&#xff1a;调用<el-upload>组件的方法唤起选择文件事件 效果&#xff1a; 页面代码&#xff1a; 1、选择图片按钮 <div class"flex_row_spacebetween btn" click"chooseImg"><span class"el-icon-plus ic…...

【卷积神经网络】卷积,池化,全连接

随着计算机硬件的升级与性能的提高&#xff0c;运算量已不再是阻碍深度学习发展的难题。卷积神经网络&#xff08;Convolution Neural Network&#xff0c;CNN&#xff09;是深度学习中一项代表性的工作&#xff0c;CNN 是受人脑对图像的理解过程启发而提出的模型&#xff0c;其…...

【SA8295P 源码分析】76 - Thermal 功耗 之 /dev/thermalmgr 相关调试命令汇总

【SA8295P 源码分析】76 - Thermal 功耗 之 /dev/thermalmgr 相关调试命令汇总 1、配置文件:/mnt/etc/system/config/thermal-engine.conf2、获取当前SOC所有温度传感器的温度:cat /dev/thermalmgr3、查看所有 Thermal 默认配置和自定义配置:echo query config > /dev/th…...

以太网(一):PoE供电

一、定义&#xff1a; PoE系统包括供电端设备&#xff08;PSE&#xff09;和受电端设备&#xff08;PD&#xff09;两部分PoE&#xff08;Power over Ethernet&#xff09;&#xff1a;​是一种可以在以太网中透过双绞线来传输电力与数据到设备上的技术PSE&#xff08;Power S…...

骨传导耳机游泳能戴吗?骨传导游泳耳机哪个牌子好?

溽热的夏日&#xff0c;如果能够跳入水中畅游一番&#xff0c;那真的是再好不过了&#xff0c;既能强身健体&#xff0c;又能降温解暑。公共的游泳场馆人声鼎沸&#xff0c;像我这种“社恐”患者&#xff0c;如果在场馆中要待好几个小时&#xff0c;难免会觉得时间漫长&#xf…...

18万字应急管理局智慧矿山煤矿数字化矿山技术解决方案WORD

导读&#xff1a;原文《18万字应急管理局智慧矿山煤矿数字化矿山技术解决方案WORD》&#xff08;获取来源见文尾&#xff09;&#xff0c;本文精选其中精华及架构部分&#xff0c;逻辑清晰、内容完整&#xff0c;为快速形成售前方案提供参考。 目 录 第一章 项目概述 1.1项目…...

【mysql】MySQL CUP过高如何排查?

文章目录 一. 问题锁定二. QPS激增会导致CPU飘高三. 慢SQL会导致CPU飘高四. 大量空闲连接会导致CPU飘高五. MySQL问题排查常用命令 一. 问题锁定 通过top命令查看服务器CPU资源使用情况&#xff0c;明确CPU占用率较高的是否是mysqld进程&#xff0c;如果是则可以明确CUP飘高的原…...

lua实现http的异步回调

想用lua实现与http服务器的通信&#xff0c;请求一些数据会回来&#xff0c;默认lua.socket.http是同步的&#xff0c;所以想弄一个异步的方式 测试环境 lua 5.1 同步 以下是同步的代码&#xff0c;其中http.request会被阻塞住的 local function send_request()local res,…...

云服务 Ubuntu 20.04 版本 使用 Nginx 配置SSL证书和nginx从HTTP跳转到HTTPS

1.云服务申请免费的SSL证书 2.从云服务SSL证书下载到本地解压上传到服务器 3.配置Nginx下的 nginx.cof 文件 4.开放安全组&#xff0c;内部与外部 5.测试配置与跳转是否成功 1.云服务申请免费的SSL证书 1.1.登录云平台找到SSL证书 注意&#xff1a;博主这里是腾讯云&#x…...

隧道代理技术解析:为批量数据采集提供强大支持

嘿&#xff01;作为一名专业的爬虫程序员&#xff0c;我今天要和大家分享一个强大的技术&#xff0c;它能够为批量数据采集提供强大的支持——隧道代理技术。如果你在进行大规模数据采集任务时遇到了IP封禁和限制的问题&#xff0c;那么这项技术将是你的救星。废话不多说&#…...

小程序制作教程:从零开始搭建企业小程序

在如今的数字化时代&#xff0c;企业介绍小程序成为了企业展示与推广的重要工具。通过企业介绍小程序&#xff0c;企业可以向用户展示自己的品牌形象、产品服务以及企业文化等内容&#xff0c;进而提高用户对企业的认知度和信任度。本文将介绍如何从零开始搭建一个企业介绍小程…...

Redis-秒杀

唉 就记得当时抢冰墩墩的时候的秒杀了 我们要注意什么问题呢? 1.几百万人在这个瞬间抢冰墩墩 这个瞬间会有大量的请求 服务器要能抗的住 2.不能超卖,就那些冰墩墩 卖多了压根没有 好不容易抢到你说没货了怕不是要被冲烂 3.避免少卖 拢共就那些 你再少卖点 没屁了 4.防黄牛…...

2022年下半年信息安全工程师下午真题及答案解析

试题一 (20分) 已知某公司网络环境结构主要由三个部分组成&#xff0c;分别是DMZ区、内网办公区和生产区&#xff0c;其拓扑结构如图1-1所示。信息安全部的王工正在按照等级保护2.0的要求对部分业务系统开展安全配置。图1-1当中&#xff0c;网站服务器的IP地址是192.168.70.14…...

【前端|Javascript第5篇】全网最详细的JS的内置对象文章!

前言 在当今数字时代&#xff0c;前端技术正日益成为塑造用户体验的关键。我们在开发中需要用到很多js的内置对象的一些属性来帮助我们更快速的进行开发。或许你是刚踏入前端领域的小白&#xff0c;或者是希望深入了解内置对象的开发者&#xff0c;不论你的经验如何&#xff0c…...

Python pycparser(c文件解析)模块使用教程

文章目录 安装 pycparser 模块模块开发者网址获取抽象语法树1. 需要导入的模块2. 获取 不关注预处理相关 c语言文件的抽象语法树ast3. 获取 预处理后的c语言文件的抽象语法树ast 语法树组成1. 数据类型定义 Typedef2. 类型声明 TypeDecl3. 标识符类型 IdentifierType4. 变量声明…...

解决IDEA tomcat控制台只有server日志

解决IDEA tomcat控制台只有server日志 确认tomcatxxx/conf/logging.properties文件是否存在&#xff0c;存在就会有。前提是在run configuration配置了打印多个日志...

Java版本+企业电子招投标系统源代码+支持二开+Spring cloud tbms

​ 项目说明 随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大&#xff0c;公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境&#xff0c;最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范&#xff0c;以…...