当前位置: 首页 > news >正文

Spark数据源的读取与写入、自定义函数

1. 数据源的读取与写入

1.1 数据读取

  • 读文件
    • read.json
    • read.csv
      • csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。
    • read.orc
  • 读数据库
    • read.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’})
      数据库创建测试数据:
create database itcast charset=utf8;create table itcast.tb_user(id int,name varchar(20),age int,gender varchar(20)
);insert into  itcast.tb_user values (1,'张三',20,'男');

表查看:
在这里插入图片描述
读取数据库数据:

# 读取数据源,将数据转为DF
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# read读取数据库数据
# 使用jdbc方法通过jdbc读取数据库数据,在读取数据库之前,需要现将数据库连接驱动放入spark的jars目录下
#
df = ss.read.jdbc('jdbc:mysql://192.168.88.100:3306/itcast',table='tb_user',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
df.show()

运行结果:
在这里插入图片描述

1.2 数据写入

因为数据是在df中存储,所以使用DataFrame进行数据写入

使用DataFrame的的write方法


写入文件有个模式,覆盖和追加两种方式,用mode参数指定
覆盖 overwrite
追加 append

  • 写入文件
    • write.json
    • write.csv
    • write.orc
  • 写入数据库
    • write.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’},mode=‘写入方式’)
# 数据写入
from pyspark.sql import SparkSession,Row
ss = SparkSession.builder.getOrCreate()df = ss.createDataFrame([Row(id=1,name='张三',age=20),Row(id=2,name='李四',age=20),Row(id=3,name='王五',age=20)],schema='id int,name string,age int'
)# 将df数据写入hdfs文件中  mode='overwrite' 覆盖写入   append 追加写入
df.write.json('hdfs://node1:8020/data_json',mode='overwrite')# 写入数据库
# create table itcast.tb_stu(
#     id int,
#     name varchar(20),
#     age int
# );
# 在jdbc连接中指定编码字符集为utf-8
df.write.jdbc('jdbc:mysql://192.168.88.100:3306/itcast?characterEncoding=utf8',table='tb_stu',mode='overwrite',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})

运行结果:
在这里插入图片描述

2. 自定义函数

在这里插入图片描述

2.1 函数分类

  • udf
    • 自定义
    • 一进一出
  • udaf
    • 聚合
    • 自定义
    • 多进一出
  • udtf
    • 爆炸
    • 一进多出

2.2 UDF函数

对每一行数据依次进行计算,返回每一行的结果。

#UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()#自定义字符串长度计算函数
def len_func(field):if field is None:return 0else:data = len(field)return data
#将自定义的函数注册到spark中使用
len_func = ss.udf.register('len_func', len_func,returnType=IntegerType())#在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()#sql语句中使用
df.createTempView('stu')
df3= ss.sql('select *,len_func(name) from stu')
df3.show()

2.3 UDAF函数

多进一出 主要是聚合
使用pandas中的series实现,可以读取一列数据存储在pandas的series中进行数据的聚合。

#UDAF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *
import pandas as pdss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',',schema = 'id int,name string,age int,gender string,cls string')df.show()#对某个字段的整列数据进行计算,自定义udaf函数
# 第一步,装饰器注册
@F.pandas_udf(returnType=IntegerType())
def sub(field:pd.Series) -> int:n=field[0] #取出第一个值作为初始值for i in field[1::]:n-=ireturn n
#第二步,register方法注册
sub = ss.udf.register('sub', sub)df2 = df.select(sub('age'))
df2.show()

相关文章:

Spark数据源的读取与写入、自定义函数

1. 数据源的读取与写入 1.1 数据读取 读文件 read.jsonread.csv csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。 read.orc 读数据库 read.jdbc(jdbc连接地址,table‘表名’,properties{‘user’用户名,‘password’密码,‘driv…...

LeetCode 每日一题 2024/10/14-2024/10/20

记录了初步解题思路 以及本地实现代码;并不一定为最优 也希望大家能一起探讨 一起进步 目录 10/14 887. 鸡蛋掉落10/15 3200. 三角形的最大高度10/16 3194. 最小元素和最大元素的最小平均值10/17 3193. 统计逆序对的数目10/18 3191. 使二进制数组全部等于 1 的最少操…...

接口测试(六)jmeter——参数化(配置元件 --> 用户定义的变量)

一、jmeter——参数化(配置元件 --> 用户定义的变量) 注:示例仅供参考 1. 参数化格式:${变量名} 2. 配置元件:用户定义的变量 3. 添加【用户定义的变量】,【线程组】–>【添加】–>【配置元件】–…...

【学习笔记】网络流

背景 马上ICPC了&#xff0c;很惊奇的发现自己没整理网络流的板子。 最大流 dinic 这里选用的是二分图最大匹配的板子&#xff1a;飞行员配对方案问题 #include<bits/stdc.h> #define int long long using namespace std; const int N1e67,inf1e18; struct E {int to…...

【鸡翅Club】项目启动

一、项目背景 这是一个 C端的社区项目&#xff0c;有博客、交流&#xff0c;面试学习&#xff0c;练题等模块。 项目的背景主要是我们想要通过面试题的分类&#xff0c;难度&#xff0c;打标&#xff0c;来评估员工的技术能力。同时在我们公司招聘季的时候&#xff0c;极大的…...

python+大数据+基于热门视频的数据分析研究【内含源码+文档+部署教程】

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…...

【电子电力】基于PMU相量测量单元的电力系统状态评估

摘要 相量测量单元&#xff08;PMU&#xff09;作为一种精确且快速的实时监控设备&#xff0c;在电力系统状态评估中发挥了重要作用。本文研究了在没有PMU和部署PMU情况下&#xff0c;电力系统的电压角度和电压幅值估计误差的差异。通过比较实验结果&#xff0c;发现PMU的应用…...

ubuntu修改默认开机模式(图形/终端)

将 Ubuntu 16 系统设置为开机进入终端模式&#xff1a; 打开终端。编辑 Grub 配置文件&#xff1a;sudo nano /etc/default/grub。找到 GRUB_CMDLINE_LINUX_DEFAULT 行&#xff0c;将其修改为 GRUB_CMDLINE_LINUX_DEFAULT"text"。保存并退出编辑器&#xff08;Ctrl …...

LaMI-DETR:基于GPT丰富优化的开放词汇目标检测 | ECCV‘24

现有的方法通过利用视觉-语言模型&#xff08;VLMs&#xff09;&#xff08;如CLIP&#xff09;强大的开放词汇识别能力来增强开放词汇目标检测&#xff0c;然而出现了两个主要挑战&#xff1a;&#xff08;1&#xff09;概念表示不足&#xff0c;CLIP文本空间中的类别名称缺乏…...

AI大模型是否有助于攻克重大疾病?

AI大模型在攻克重大疾病方面展现出了巨大的潜力&#xff0c;特别是在疾病预测、药物研发、个性化医疗等领域有着广泛应用。具体来说&#xff0c;AI大模型能够帮助以下几方面&#xff1a; 1、疾病预测与诊断&#xff1a;AI大模型通过分析海量的医学数据&#xff0c;可以提高重大…...

【渗透测试】-红日靶场-获取web服务器权限

拓扑图&#xff1a; 前置环境配置&#xff1a; Win 7 默认密码&#xff1a;hongrisec201 内网ip:192.168.52.143 打开虚拟网络编辑器 添加网络->VMent1->仅主机模式->子网ip:192.168.145.0 添加网卡&#xff1a; 虚拟机->设置-> 添加->网络适配器 保存&a…...

python 深度学习 项目调试 图像分割 segment-anything

起因&#xff0c; 目的: 项目来源: https://github.com/facebookresearch/segment-anything项目目的: 图像分割。 提前图片中的某个目标。facebook 出品&#xff0c; 居然有 47.3k star! 思考一些问题 我可以用这个项目来做什么?给一个图片&#xff0c; 进行分割&#xff0…...

【GO实战课】第六讲:电子商务网站(6):支付和订单处理

1. 简介 本课程将探讨电子商务网站的支付和订单处理功能,以及使用GO语言实现。在本课程中,我们将介绍如何设计一个可扩展、可靠和高性能的支付和订单处理系统,并演示如何使用GO语言编写相关代码。 本课程的目标是帮助学生理解电子商务网站的支付和订单处理功能,并提供一个…...

专题十三_记忆化搜索_算法专题详细总结

目录 1. 斐波那契数&#xff08;easy&#xff09; 那么这里就画出它的决策树 &#xff1a; 解法一&#xff1a;递归暴搜 解法二&#xff1a;记忆化搜索 解法三&#xff1a;动态规划 1.暴力解法&#xff08;暴搜&#xff09; 2.对优化解法的优化&#xff1a;把已经计算过的…...

已发布金融国家标准目录(截止2024年3月)

已发布金融国家标准目录2024年3月序号标准编号标准名称...

【论文#快速算法】Fast Intermode Decision in H.264/AVC Video Coding

目录 摘要1.前言2.帧间模式决策概览2.1 H.264/AVC中的帧间模式决策2.2 发现和动机 3.同质性和平稳性的确定3.1 同质性区域的确定3.2 稳定性区域的决定3.3 整体算法 4.实验结果4.1 IPPP序列的测试4.2 IBBP序列测试 5.结论 《Fast Intermode Decision in H.264/AVC Video Coding》…...

Git核心概念图例与最常用内容操作(reset、diff、restore、stash、reflog、cherry-pick)

文章目录 简介前置概念.git目录objects目录refs目录HEAD文件 resetreflog 与 reset --hardrevert(撤销指定提交)stashdiff工作区与暂存区差异暂存区与HEAD差异工作区与HEAD差异其他比较 restore、checkout(代码撤回)merge、rebase、cherry-pick 简介 本文将介绍Git几个核心概念…...

【人工智能在医疗企业个人中的应用】

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

IPv4头部和IPv6头部

IPv4和IPv6是互联网协议&#xff08;IP&#xff09;中的两个主要版本&#xff0c;它们在数据包头部&#xff08;Header&#xff09;结构上存在显著差异。以下是IPv4头部和IPv6头部的主要结构和区别&#xff1a; IPv4头部结构 IPv4&#xff08;Internet Protocol Version 4&…...

从零开始手把手带你训练LLM保姆级教程,草履虫都能学会!零基础看完这篇就足够了~

导读 ChatGPT面世以来&#xff0c;各种大模型相继出现。那么大模型到底是如何训练的呢&#xff0c;在这篇文章中&#xff0c;我们将尽可能详细地梳理一个完整的 LLM 训练流程&#xff0c;包括模型预训练&#xff08;Pretrain&#xff09;、Tokenizer 训练、指令微调&#xff0…...

后进先出(LIFO)详解

LIFO 是 Last In, First Out 的缩写&#xff0c;中文译为后进先出。这是一种数据结构的工作原则&#xff0c;类似于一摞盘子或一叠书本&#xff1a; 最后放进去的元素最先出来 -想象往筒状容器里放盘子&#xff1a; &#xff08;1&#xff09;你放进的最后一个盘子&#xff08…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速&#xff08;高性能&#xff09;的 Web 框架&#xff0c;用于构建 API&#xff0c;支持 Python 3.6。它基于标准 Python 类型提示&#xff0c;易于学习且功能强大。以下是一个完整的 FastAPI 入门教程&#xff0c;涵盖从环境搭建到创建并运行一个简单的…...

如何在看板中有效管理突发紧急任务

在看板中有效管理突发紧急任务需要&#xff1a;设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP&#xff08;Work-in-Progress&#xff09;弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中&#xff0c;设立专门的紧急任务通道尤为重要&#xff0c;这能…...

大数据学习(132)-HIve数据分析

​​​​&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...

Ubuntu Cursor升级成v1.0

0. 当前版本低 使用当前 Cursor v0.50时 GitHub Copilot Chat 打不开&#xff0c;快捷键也不好用&#xff0c;当看到 Cursor 升级后&#xff0c;还是蛮高兴的 1. 下载 Cursor 下载地址&#xff1a;https://www.cursor.com/cn/downloads 点击下载 Linux (x64) &#xff0c;…...