Spark数据源的读取与写入、自定义函数
1. 数据源的读取与写入
1.1 数据读取
- 读文件
read.jsonread.csv- csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。
read.orc
- 读数据库
read.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’})
数据库创建测试数据:
create database itcast charset=utf8;create table itcast.tb_user(id int,name varchar(20),age int,gender varchar(20)
);insert into itcast.tb_user values (1,'张三',20,'男');
表查看:

读取数据库数据:
# 读取数据源,将数据转为DF
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# read读取数据库数据
# 使用jdbc方法通过jdbc读取数据库数据,在读取数据库之前,需要现将数据库连接驱动放入spark的jars目录下
#
df = ss.read.jdbc('jdbc:mysql://192.168.88.100:3306/itcast',table='tb_user',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
df.show()
运行结果:

1.2 数据写入
因为数据是在df中存储,所以使用DataFrame进行数据写入
使用DataFrame的的write方法
写入文件有个模式,覆盖和追加两种方式,用mode参数指定
覆盖 overwrite
追加 append
- 写入文件
- write.json
- write.csv
- write.orc
- 写入数据库
- write.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’},mode=‘写入方式’)
# 数据写入
from pyspark.sql import SparkSession,Row
ss = SparkSession.builder.getOrCreate()df = ss.createDataFrame([Row(id=1,name='张三',age=20),Row(id=2,name='李四',age=20),Row(id=3,name='王五',age=20)],schema='id int,name string,age int'
)# 将df数据写入hdfs文件中 mode='overwrite' 覆盖写入 append 追加写入
df.write.json('hdfs://node1:8020/data_json',mode='overwrite')# 写入数据库
# create table itcast.tb_stu(
# id int,
# name varchar(20),
# age int
# );
# 在jdbc连接中指定编码字符集为utf-8
df.write.jdbc('jdbc:mysql://192.168.88.100:3306/itcast?characterEncoding=utf8',table='tb_stu',mode='overwrite',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
运行结果:

2. 自定义函数

2.1 函数分类
- udf
- 自定义
- 一进一出
- udaf
- 聚合
- 自定义
- 多进一出
- udtf
- 爆炸
- 一进多出
2.2 UDF函数
对每一行数据依次进行计算,返回每一行的结果。
#UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()#自定义字符串长度计算函数
def len_func(field):if field is None:return 0else:data = len(field)return data
#将自定义的函数注册到spark中使用
len_func = ss.udf.register('len_func', len_func,returnType=IntegerType())#在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()#sql语句中使用
df.createTempView('stu')
df3= ss.sql('select *,len_func(name) from stu')
df3.show()
2.3 UDAF函数
多进一出 主要是聚合
使用pandas中的series实现,可以读取一列数据存储在pandas的series中进行数据的聚合。
#UDAF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *
import pandas as pdss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',',schema = 'id int,name string,age int,gender string,cls string')df.show()#对某个字段的整列数据进行计算,自定义udaf函数
# 第一步,装饰器注册
@F.pandas_udf(returnType=IntegerType())
def sub(field:pd.Series) -> int:n=field[0] #取出第一个值作为初始值for i in field[1::]:n-=ireturn n
#第二步,register方法注册
sub = ss.udf.register('sub', sub)df2 = df.select(sub('age'))
df2.show()
相关文章:
Spark数据源的读取与写入、自定义函数
1. 数据源的读取与写入 1.1 数据读取 读文件 read.jsonread.csv csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。 read.orc 读数据库 read.jdbc(jdbc连接地址,table‘表名’,properties{‘user’用户名,‘password’密码,‘driv…...
LeetCode 每日一题 2024/10/14-2024/10/20
记录了初步解题思路 以及本地实现代码;并不一定为最优 也希望大家能一起探讨 一起进步 目录 10/14 887. 鸡蛋掉落10/15 3200. 三角形的最大高度10/16 3194. 最小元素和最大元素的最小平均值10/17 3193. 统计逆序对的数目10/18 3191. 使二进制数组全部等于 1 的最少操…...
接口测试(六)jmeter——参数化(配置元件 --> 用户定义的变量)
一、jmeter——参数化(配置元件 --> 用户定义的变量) 注:示例仅供参考 1. 参数化格式:${变量名} 2. 配置元件:用户定义的变量 3. 添加【用户定义的变量】,【线程组】–>【添加】–>【配置元件】–…...
【学习笔记】网络流
背景 马上ICPC了,很惊奇的发现自己没整理网络流的板子。 最大流 dinic 这里选用的是二分图最大匹配的板子:飞行员配对方案问题 #include<bits/stdc.h> #define int long long using namespace std; const int N1e67,inf1e18; struct E {int to…...
【鸡翅Club】项目启动
一、项目背景 这是一个 C端的社区项目,有博客、交流,面试学习,练题等模块。 项目的背景主要是我们想要通过面试题的分类,难度,打标,来评估员工的技术能力。同时在我们公司招聘季的时候,极大的…...
python+大数据+基于热门视频的数据分析研究【内含源码+文档+部署教程】
博主介绍:✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久,选择我们就是选择放心、选择安心毕业✌ 🍅由于篇幅限制,想要获取完整文章或者源码,或者代做&am…...
【电子电力】基于PMU相量测量单元的电力系统状态评估
摘要 相量测量单元(PMU)作为一种精确且快速的实时监控设备,在电力系统状态评估中发挥了重要作用。本文研究了在没有PMU和部署PMU情况下,电力系统的电压角度和电压幅值估计误差的差异。通过比较实验结果,发现PMU的应用…...
ubuntu修改默认开机模式(图形/终端)
将 Ubuntu 16 系统设置为开机进入终端模式: 打开终端。编辑 Grub 配置文件:sudo nano /etc/default/grub。找到 GRUB_CMDLINE_LINUX_DEFAULT 行,将其修改为 GRUB_CMDLINE_LINUX_DEFAULT"text"。保存并退出编辑器(Ctrl …...
LaMI-DETR:基于GPT丰富优化的开放词汇目标检测 | ECCV‘24
现有的方法通过利用视觉-语言模型(VLMs)(如CLIP)强大的开放词汇识别能力来增强开放词汇目标检测,然而出现了两个主要挑战:(1)概念表示不足,CLIP文本空间中的类别名称缺乏…...
AI大模型是否有助于攻克重大疾病?
AI大模型在攻克重大疾病方面展现出了巨大的潜力,特别是在疾病预测、药物研发、个性化医疗等领域有着广泛应用。具体来说,AI大模型能够帮助以下几方面: 1、疾病预测与诊断:AI大模型通过分析海量的医学数据,可以提高重大…...
【渗透测试】-红日靶场-获取web服务器权限
拓扑图: 前置环境配置: Win 7 默认密码:hongrisec201 内网ip:192.168.52.143 打开虚拟网络编辑器 添加网络->VMent1->仅主机模式->子网ip:192.168.145.0 添加网卡: 虚拟机->设置-> 添加->网络适配器 保存&a…...
python 深度学习 项目调试 图像分割 segment-anything
起因, 目的: 项目来源: https://github.com/facebookresearch/segment-anything项目目的: 图像分割。 提前图片中的某个目标。facebook 出品, 居然有 47.3k star! 思考一些问题 我可以用这个项目来做什么?给一个图片, 进行分割࿰…...
【GO实战课】第六讲:电子商务网站(6):支付和订单处理
1. 简介 本课程将探讨电子商务网站的支付和订单处理功能,以及使用GO语言实现。在本课程中,我们将介绍如何设计一个可扩展、可靠和高性能的支付和订单处理系统,并演示如何使用GO语言编写相关代码。 本课程的目标是帮助学生理解电子商务网站的支付和订单处理功能,并提供一个…...
专题十三_记忆化搜索_算法专题详细总结
目录 1. 斐波那契数(easy) 那么这里就画出它的决策树 : 解法一:递归暴搜 解法二:记忆化搜索 解法三:动态规划 1.暴力解法(暴搜) 2.对优化解法的优化:把已经计算过的…...
已发布金融国家标准目录(截止2024年3月)
已发布金融国家标准目录2024年3月序号标准编号标准名称...
【论文#快速算法】Fast Intermode Decision in H.264/AVC Video Coding
目录 摘要1.前言2.帧间模式决策概览2.1 H.264/AVC中的帧间模式决策2.2 发现和动机 3.同质性和平稳性的确定3.1 同质性区域的确定3.2 稳定性区域的决定3.3 整体算法 4.实验结果4.1 IPPP序列的测试4.2 IBBP序列测试 5.结论 《Fast Intermode Decision in H.264/AVC Video Coding》…...
Git核心概念图例与最常用内容操作(reset、diff、restore、stash、reflog、cherry-pick)
文章目录 简介前置概念.git目录objects目录refs目录HEAD文件 resetreflog 与 reset --hardrevert(撤销指定提交)stashdiff工作区与暂存区差异暂存区与HEAD差异工作区与HEAD差异其他比较 restore、checkout(代码撤回)merge、rebase、cherry-pick 简介 本文将介绍Git几个核心概念…...
【人工智能在医疗企业个人中的应用】
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
IPv4头部和IPv6头部
IPv4和IPv6是互联网协议(IP)中的两个主要版本,它们在数据包头部(Header)结构上存在显著差异。以下是IPv4头部和IPv6头部的主要结构和区别: IPv4头部结构 IPv4(Internet Protocol Version 4&…...
从零开始手把手带你训练LLM保姆级教程,草履虫都能学会!零基础看完这篇就足够了~
导读 ChatGPT面世以来,各种大模型相继出现。那么大模型到底是如何训练的呢,在这篇文章中,我们将尽可能详细地梳理一个完整的 LLM 训练流程,包括模型预训练(Pretrain)、Tokenizer 训练、指令微调࿰…...
Virtual-Display-Driver:Windows虚拟显示器的全能解决方案深度解析
Virtual-Display-Driver:Windows虚拟显示器的全能解决方案深度解析 【免费下载链接】Virtual-Display-Driver Add virtual monitors to your windows 10/11 device! Works with VR, OBS, Sunshine, and/or any desktop sharing software. 项目地址: https://gitco…...
数据安全与性能瓶颈困扰企业?湖南天硕SSD固态硬盘带来航天级稳定体验
在数字化转型加速的今天,企业数据量呈指数级增长,随之而来的数据安全风险与存储性能瓶颈已成为众多企业,尤其是对数据可靠性要求极高的B端用户(如企业采购负责人、技术总监)面临的共同挑战。传统存储方案在应对复杂业务…...
United VARs CoE创享会重回上海,全球伙伴共议AI时代云ERP演进
时隔七年,United VARs Cloud ERP CoE 创享会再次回到中国!3月10日至12日,由Acloudear司享承办的United VARs Cloud ERP CoE 创享会在上海举行。来自全球多家United VARs成员机构及SAP的专家与管理者齐聚上海,围绕 Cloud ERP 战略、…...
Phi-4-mini-reasoning快速部署:Conda环境+PyTorch2.8适配避坑指南
Phi-4-mini-reasoning快速部署:Conda环境PyTorch2.8适配避坑指南 1. 项目概述 Phi-4-mini-reasoning是微软推出的3.8B参数轻量级开源模型,专为数学推理、逻辑推导和多步解题等强逻辑任务设计。这个模型主打"小参数、强推理、长上下文、低延迟&quo…...
UI-TARS-desktop场景应用:自动生成销售报告与更新库存实战
UI-TARS-desktop场景应用:自动生成销售报告与更新库存实战 1. 场景痛点与解决方案 1.1 传统销售管理的效率瓶颈 在零售和电商行业中,销售数据分析和库存管理是日常运营的核心工作。传统方式通常需要: 手动从多个系统导出销售数据人工整理…...
通义千问1.5-1.8B-Chat-GPTQ-Int4实战:微信小程序集成AI对话功能开发指南
通义千问1.5-1.8B-Chat-GPTQ-Int4实战:微信小程序集成AI对话功能开发指南 最近在做一个宠物社区的小程序,想加个智能客服功能,让用户能随时问问养宠问题。一开始觉得这事儿挺复杂,得自己搞个大模型服务器,成本高不说&…...
Java微服务在Istio中出现“偶发503 no healthy upstream”?7分钟定位Sidecar健康检查盲区与Liveness Probe冲突真相
第一章:Java微服务在Istio中偶发503问题的现象与影响在基于Istio构建的服务网格环境中,Java微服务(尤其是采用Spring Cloud Kubernetes或原生Spring Boot Istio Sidecar部署模式)频繁出现偶发性HTTP 503 Service Unavailable响应…...
从特效 SDK 到 AI 动效平台:Neon Vibe Motion 的技术演进之路
多媒体中台在 B 站主要负责剪辑、拍摄、直播等业务场景的动效渲染,开发维护的 SDK 在后文统一称为特效 SDK。 传统的视频特效生产一般分三条链路: 三条链路存在一个困境:效果丰富度、实时可交互、生产效率,三者不可兼得。 那么能…...
【C语言】memmove()函数实战:如何安全高效地处理内存重叠拷贝
1. 为什么需要memmove()函数? 在C语言中处理内存拷贝时,我们经常会遇到一个棘手的问题:当源内存块和目标内存块存在重叠区域时,使用memcpy()函数可能会导致数据被意外覆盖。想象一下你在整理书架,想把第三层到第五层的…...
期权到期日别慌!手把手教你搞定上交所股票期权的行权与交割(附避坑清单)
期权到期日实战指南:从行权准备到交割避坑全流程解析 手机屏幕上的红色倒计时提醒着期权合约即将到期,作为刚接触期权交易不久的新手,此刻最需要的不再是复杂的概念解释,而是一份能握在手中的应急操作清单。本文将用最直白的语言拆…...
