当前位置: 首页 > news >正文

Spark数据源的读取与写入、自定义函数

1. 数据源的读取与写入

1.1 数据读取

  • 读文件
    • read.json
    • read.csv
      • csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。
    • read.orc
  • 读数据库
    • read.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’})
      数据库创建测试数据:
create database itcast charset=utf8;create table itcast.tb_user(id int,name varchar(20),age int,gender varchar(20)
);insert into  itcast.tb_user values (1,'张三',20,'男');

表查看:
在这里插入图片描述
读取数据库数据:

# 读取数据源,将数据转为DF
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# read读取数据库数据
# 使用jdbc方法通过jdbc读取数据库数据,在读取数据库之前,需要现将数据库连接驱动放入spark的jars目录下
#
df = ss.read.jdbc('jdbc:mysql://192.168.88.100:3306/itcast',table='tb_user',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
df.show()

运行结果:
在这里插入图片描述

1.2 数据写入

因为数据是在df中存储,所以使用DataFrame进行数据写入

使用DataFrame的的write方法


写入文件有个模式,覆盖和追加两种方式,用mode参数指定
覆盖 overwrite
追加 append

  • 写入文件
    • write.json
    • write.csv
    • write.orc
  • 写入数据库
    • write.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’},mode=‘写入方式’)
# 数据写入
from pyspark.sql import SparkSession,Row
ss = SparkSession.builder.getOrCreate()df = ss.createDataFrame([Row(id=1,name='张三',age=20),Row(id=2,name='李四',age=20),Row(id=3,name='王五',age=20)],schema='id int,name string,age int'
)# 将df数据写入hdfs文件中  mode='overwrite' 覆盖写入   append 追加写入
df.write.json('hdfs://node1:8020/data_json',mode='overwrite')# 写入数据库
# create table itcast.tb_stu(
#     id int,
#     name varchar(20),
#     age int
# );
# 在jdbc连接中指定编码字符集为utf-8
df.write.jdbc('jdbc:mysql://192.168.88.100:3306/itcast?characterEncoding=utf8',table='tb_stu',mode='overwrite',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})

运行结果:
在这里插入图片描述

2. 自定义函数

在这里插入图片描述

2.1 函数分类

  • udf
    • 自定义
    • 一进一出
  • udaf
    • 聚合
    • 自定义
    • 多进一出
  • udtf
    • 爆炸
    • 一进多出

2.2 UDF函数

对每一行数据依次进行计算,返回每一行的结果。

#UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()#自定义字符串长度计算函数
def len_func(field):if field is None:return 0else:data = len(field)return data
#将自定义的函数注册到spark中使用
len_func = ss.udf.register('len_func', len_func,returnType=IntegerType())#在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()#sql语句中使用
df.createTempView('stu')
df3= ss.sql('select *,len_func(name) from stu')
df3.show()

2.3 UDAF函数

多进一出 主要是聚合
使用pandas中的series实现,可以读取一列数据存储在pandas的series中进行数据的聚合。

#UDAF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *
import pandas as pdss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',',schema = 'id int,name string,age int,gender string,cls string')df.show()#对某个字段的整列数据进行计算,自定义udaf函数
# 第一步,装饰器注册
@F.pandas_udf(returnType=IntegerType())
def sub(field:pd.Series) -> int:n=field[0] #取出第一个值作为初始值for i in field[1::]:n-=ireturn n
#第二步,register方法注册
sub = ss.udf.register('sub', sub)df2 = df.select(sub('age'))
df2.show()

相关文章:

Spark数据源的读取与写入、自定义函数

1. 数据源的读取与写入 1.1 数据读取 读文件 read.jsonread.csv csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。 read.orc 读数据库 read.jdbc(jdbc连接地址,table‘表名’,properties{‘user’用户名,‘password’密码,‘driv…...

LeetCode 每日一题 2024/10/14-2024/10/20

记录了初步解题思路 以及本地实现代码;并不一定为最优 也希望大家能一起探讨 一起进步 目录 10/14 887. 鸡蛋掉落10/15 3200. 三角形的最大高度10/16 3194. 最小元素和最大元素的最小平均值10/17 3193. 统计逆序对的数目10/18 3191. 使二进制数组全部等于 1 的最少操…...

接口测试(六)jmeter——参数化(配置元件 --> 用户定义的变量)

一、jmeter——参数化(配置元件 --> 用户定义的变量) 注:示例仅供参考 1. 参数化格式:${变量名} 2. 配置元件:用户定义的变量 3. 添加【用户定义的变量】,【线程组】–>【添加】–>【配置元件】–…...

【学习笔记】网络流

背景 马上ICPC了&#xff0c;很惊奇的发现自己没整理网络流的板子。 最大流 dinic 这里选用的是二分图最大匹配的板子&#xff1a;飞行员配对方案问题 #include<bits/stdc.h> #define int long long using namespace std; const int N1e67,inf1e18; struct E {int to…...

【鸡翅Club】项目启动

一、项目背景 这是一个 C端的社区项目&#xff0c;有博客、交流&#xff0c;面试学习&#xff0c;练题等模块。 项目的背景主要是我们想要通过面试题的分类&#xff0c;难度&#xff0c;打标&#xff0c;来评估员工的技术能力。同时在我们公司招聘季的时候&#xff0c;极大的…...

python+大数据+基于热门视频的数据分析研究【内含源码+文档+部署教程】

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…...

【电子电力】基于PMU相量测量单元的电力系统状态评估

摘要 相量测量单元&#xff08;PMU&#xff09;作为一种精确且快速的实时监控设备&#xff0c;在电力系统状态评估中发挥了重要作用。本文研究了在没有PMU和部署PMU情况下&#xff0c;电力系统的电压角度和电压幅值估计误差的差异。通过比较实验结果&#xff0c;发现PMU的应用…...

ubuntu修改默认开机模式(图形/终端)

将 Ubuntu 16 系统设置为开机进入终端模式&#xff1a; 打开终端。编辑 Grub 配置文件&#xff1a;sudo nano /etc/default/grub。找到 GRUB_CMDLINE_LINUX_DEFAULT 行&#xff0c;将其修改为 GRUB_CMDLINE_LINUX_DEFAULT"text"。保存并退出编辑器&#xff08;Ctrl …...

LaMI-DETR:基于GPT丰富优化的开放词汇目标检测 | ECCV‘24

现有的方法通过利用视觉-语言模型&#xff08;VLMs&#xff09;&#xff08;如CLIP&#xff09;强大的开放词汇识别能力来增强开放词汇目标检测&#xff0c;然而出现了两个主要挑战&#xff1a;&#xff08;1&#xff09;概念表示不足&#xff0c;CLIP文本空间中的类别名称缺乏…...

AI大模型是否有助于攻克重大疾病?

AI大模型在攻克重大疾病方面展现出了巨大的潜力&#xff0c;特别是在疾病预测、药物研发、个性化医疗等领域有着广泛应用。具体来说&#xff0c;AI大模型能够帮助以下几方面&#xff1a; 1、疾病预测与诊断&#xff1a;AI大模型通过分析海量的医学数据&#xff0c;可以提高重大…...

【渗透测试】-红日靶场-获取web服务器权限

拓扑图&#xff1a; 前置环境配置&#xff1a; Win 7 默认密码&#xff1a;hongrisec201 内网ip:192.168.52.143 打开虚拟网络编辑器 添加网络->VMent1->仅主机模式->子网ip:192.168.145.0 添加网卡&#xff1a; 虚拟机->设置-> 添加->网络适配器 保存&a…...

python 深度学习 项目调试 图像分割 segment-anything

起因&#xff0c; 目的: 项目来源: https://github.com/facebookresearch/segment-anything项目目的: 图像分割。 提前图片中的某个目标。facebook 出品&#xff0c; 居然有 47.3k star! 思考一些问题 我可以用这个项目来做什么?给一个图片&#xff0c; 进行分割&#xff0…...

【GO实战课】第六讲:电子商务网站(6):支付和订单处理

1. 简介 本课程将探讨电子商务网站的支付和订单处理功能,以及使用GO语言实现。在本课程中,我们将介绍如何设计一个可扩展、可靠和高性能的支付和订单处理系统,并演示如何使用GO语言编写相关代码。 本课程的目标是帮助学生理解电子商务网站的支付和订单处理功能,并提供一个…...

专题十三_记忆化搜索_算法专题详细总结

目录 1. 斐波那契数&#xff08;easy&#xff09; 那么这里就画出它的决策树 &#xff1a; 解法一&#xff1a;递归暴搜 解法二&#xff1a;记忆化搜索 解法三&#xff1a;动态规划 1.暴力解法&#xff08;暴搜&#xff09; 2.对优化解法的优化&#xff1a;把已经计算过的…...

已发布金融国家标准目录(截止2024年3月)

已发布金融国家标准目录2024年3月序号标准编号标准名称...

【论文#快速算法】Fast Intermode Decision in H.264/AVC Video Coding

目录 摘要1.前言2.帧间模式决策概览2.1 H.264/AVC中的帧间模式决策2.2 发现和动机 3.同质性和平稳性的确定3.1 同质性区域的确定3.2 稳定性区域的决定3.3 整体算法 4.实验结果4.1 IPPP序列的测试4.2 IBBP序列测试 5.结论 《Fast Intermode Decision in H.264/AVC Video Coding》…...

Git核心概念图例与最常用内容操作(reset、diff、restore、stash、reflog、cherry-pick)

文章目录 简介前置概念.git目录objects目录refs目录HEAD文件 resetreflog 与 reset --hardrevert(撤销指定提交)stashdiff工作区与暂存区差异暂存区与HEAD差异工作区与HEAD差异其他比较 restore、checkout(代码撤回)merge、rebase、cherry-pick 简介 本文将介绍Git几个核心概念…...

【人工智能在医疗企业个人中的应用】

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

IPv4头部和IPv6头部

IPv4和IPv6是互联网协议&#xff08;IP&#xff09;中的两个主要版本&#xff0c;它们在数据包头部&#xff08;Header&#xff09;结构上存在显著差异。以下是IPv4头部和IPv6头部的主要结构和区别&#xff1a; IPv4头部结构 IPv4&#xff08;Internet Protocol Version 4&…...

从零开始手把手带你训练LLM保姆级教程,草履虫都能学会!零基础看完这篇就足够了~

导读 ChatGPT面世以来&#xff0c;各种大模型相继出现。那么大模型到底是如何训练的呢&#xff0c;在这篇文章中&#xff0c;我们将尽可能详细地梳理一个完整的 LLM 训练流程&#xff0c;包括模型预训练&#xff08;Pretrain&#xff09;、Tokenizer 训练、指令微调&#xff0…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

Qt Http Server模块功能及架构

Qt Http Server 是 Qt 6.0 中引入的一个新模块&#xff0c;它提供了一个轻量级的 HTTP 服务器实现&#xff0c;主要用于构建基于 HTTP 的应用程序和服务。 功能介绍&#xff1a; 主要功能 HTTP服务器功能&#xff1a; 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...

C++八股 —— 单例模式

文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全&#xff08;Thread Safety&#xff09; 线程安全是指在多线程环境下&#xff0c;某个函数、类或代码片段能够被多个线程同时调用时&#xff0c;仍能保证数据的一致性和逻辑的正确性&#xf…...

【Java学习笔记】BigInteger 和 BigDecimal 类

BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点&#xff1a;传参类型必须是类对象 一、BigInteger 1. 作用&#xff1a;适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)

推荐 github 项目:GeminiImageApp(图片生成方向&#xff0c;可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

Python 训练营打卡 Day 47

注意力热力图可视化 在day 46代码的基础上&#xff0c;对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...