当前位置: 首页 > news >正文

Spark数据源的读取与写入、自定义函数

1. 数据源的读取与写入

1.1 数据读取

  • 读文件
    • read.json
    • read.csv
      • csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。
    • read.orc
  • 读数据库
    • read.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’})
      数据库创建测试数据:
create database itcast charset=utf8;create table itcast.tb_user(id int,name varchar(20),age int,gender varchar(20)
);insert into  itcast.tb_user values (1,'张三',20,'男');

表查看:
在这里插入图片描述
读取数据库数据:

# 读取数据源,将数据转为DF
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# read读取数据库数据
# 使用jdbc方法通过jdbc读取数据库数据,在读取数据库之前,需要现将数据库连接驱动放入spark的jars目录下
#
df = ss.read.jdbc('jdbc:mysql://192.168.88.100:3306/itcast',table='tb_user',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
df.show()

运行结果:
在这里插入图片描述

1.2 数据写入

因为数据是在df中存储,所以使用DataFrame进行数据写入

使用DataFrame的的write方法


写入文件有个模式,覆盖和追加两种方式,用mode参数指定
覆盖 overwrite
追加 append

  • 写入文件
    • write.json
    • write.csv
    • write.orc
  • 写入数据库
    • write.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’},mode=‘写入方式’)
# 数据写入
from pyspark.sql import SparkSession,Row
ss = SparkSession.builder.getOrCreate()df = ss.createDataFrame([Row(id=1,name='张三',age=20),Row(id=2,name='李四',age=20),Row(id=3,name='王五',age=20)],schema='id int,name string,age int'
)# 将df数据写入hdfs文件中  mode='overwrite' 覆盖写入   append 追加写入
df.write.json('hdfs://node1:8020/data_json',mode='overwrite')# 写入数据库
# create table itcast.tb_stu(
#     id int,
#     name varchar(20),
#     age int
# );
# 在jdbc连接中指定编码字符集为utf-8
df.write.jdbc('jdbc:mysql://192.168.88.100:3306/itcast?characterEncoding=utf8',table='tb_stu',mode='overwrite',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})

运行结果:
在这里插入图片描述

2. 自定义函数

在这里插入图片描述

2.1 函数分类

  • udf
    • 自定义
    • 一进一出
  • udaf
    • 聚合
    • 自定义
    • 多进一出
  • udtf
    • 爆炸
    • 一进多出

2.2 UDF函数

对每一行数据依次进行计算,返回每一行的结果。

#UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()#自定义字符串长度计算函数
def len_func(field):if field is None:return 0else:data = len(field)return data
#将自定义的函数注册到spark中使用
len_func = ss.udf.register('len_func', len_func,returnType=IntegerType())#在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()#sql语句中使用
df.createTempView('stu')
df3= ss.sql('select *,len_func(name) from stu')
df3.show()

2.3 UDAF函数

多进一出 主要是聚合
使用pandas中的series实现,可以读取一列数据存储在pandas的series中进行数据的聚合。

#UDAF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *
import pandas as pdss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',',schema = 'id int,name string,age int,gender string,cls string')df.show()#对某个字段的整列数据进行计算,自定义udaf函数
# 第一步,装饰器注册
@F.pandas_udf(returnType=IntegerType())
def sub(field:pd.Series) -> int:n=field[0] #取出第一个值作为初始值for i in field[1::]:n-=ireturn n
#第二步,register方法注册
sub = ss.udf.register('sub', sub)df2 = df.select(sub('age'))
df2.show()

相关文章:

Spark数据源的读取与写入、自定义函数

1. 数据源的读取与写入 1.1 数据读取 读文件 read.jsonread.csv csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。 read.orc 读数据库 read.jdbc(jdbc连接地址,table‘表名’,properties{‘user’用户名,‘password’密码,‘driv…...

LeetCode 每日一题 2024/10/14-2024/10/20

记录了初步解题思路 以及本地实现代码;并不一定为最优 也希望大家能一起探讨 一起进步 目录 10/14 887. 鸡蛋掉落10/15 3200. 三角形的最大高度10/16 3194. 最小元素和最大元素的最小平均值10/17 3193. 统计逆序对的数目10/18 3191. 使二进制数组全部等于 1 的最少操…...

接口测试(六)jmeter——参数化(配置元件 --> 用户定义的变量)

一、jmeter——参数化(配置元件 --> 用户定义的变量) 注:示例仅供参考 1. 参数化格式:${变量名} 2. 配置元件:用户定义的变量 3. 添加【用户定义的变量】,【线程组】–>【添加】–>【配置元件】–…...

【学习笔记】网络流

背景 马上ICPC了&#xff0c;很惊奇的发现自己没整理网络流的板子。 最大流 dinic 这里选用的是二分图最大匹配的板子&#xff1a;飞行员配对方案问题 #include<bits/stdc.h> #define int long long using namespace std; const int N1e67,inf1e18; struct E {int to…...

【鸡翅Club】项目启动

一、项目背景 这是一个 C端的社区项目&#xff0c;有博客、交流&#xff0c;面试学习&#xff0c;练题等模块。 项目的背景主要是我们想要通过面试题的分类&#xff0c;难度&#xff0c;打标&#xff0c;来评估员工的技术能力。同时在我们公司招聘季的时候&#xff0c;极大的…...

python+大数据+基于热门视频的数据分析研究【内含源码+文档+部署教程】

博主介绍&#xff1a;✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业毕业设计项目实战6年之久&#xff0c;选择我们就是选择放心、选择安心毕业✌ &#x1f345;由于篇幅限制&#xff0c;想要获取完整文章或者源码&#xff0c;或者代做&am…...

【电子电力】基于PMU相量测量单元的电力系统状态评估

摘要 相量测量单元&#xff08;PMU&#xff09;作为一种精确且快速的实时监控设备&#xff0c;在电力系统状态评估中发挥了重要作用。本文研究了在没有PMU和部署PMU情况下&#xff0c;电力系统的电压角度和电压幅值估计误差的差异。通过比较实验结果&#xff0c;发现PMU的应用…...

ubuntu修改默认开机模式(图形/终端)

将 Ubuntu 16 系统设置为开机进入终端模式&#xff1a; 打开终端。编辑 Grub 配置文件&#xff1a;sudo nano /etc/default/grub。找到 GRUB_CMDLINE_LINUX_DEFAULT 行&#xff0c;将其修改为 GRUB_CMDLINE_LINUX_DEFAULT"text"。保存并退出编辑器&#xff08;Ctrl …...

LaMI-DETR:基于GPT丰富优化的开放词汇目标检测 | ECCV‘24

现有的方法通过利用视觉-语言模型&#xff08;VLMs&#xff09;&#xff08;如CLIP&#xff09;强大的开放词汇识别能力来增强开放词汇目标检测&#xff0c;然而出现了两个主要挑战&#xff1a;&#xff08;1&#xff09;概念表示不足&#xff0c;CLIP文本空间中的类别名称缺乏…...

AI大模型是否有助于攻克重大疾病?

AI大模型在攻克重大疾病方面展现出了巨大的潜力&#xff0c;特别是在疾病预测、药物研发、个性化医疗等领域有着广泛应用。具体来说&#xff0c;AI大模型能够帮助以下几方面&#xff1a; 1、疾病预测与诊断&#xff1a;AI大模型通过分析海量的医学数据&#xff0c;可以提高重大…...

【渗透测试】-红日靶场-获取web服务器权限

拓扑图&#xff1a; 前置环境配置&#xff1a; Win 7 默认密码&#xff1a;hongrisec201 内网ip:192.168.52.143 打开虚拟网络编辑器 添加网络->VMent1->仅主机模式->子网ip:192.168.145.0 添加网卡&#xff1a; 虚拟机->设置-> 添加->网络适配器 保存&a…...

python 深度学习 项目调试 图像分割 segment-anything

起因&#xff0c; 目的: 项目来源: https://github.com/facebookresearch/segment-anything项目目的: 图像分割。 提前图片中的某个目标。facebook 出品&#xff0c; 居然有 47.3k star! 思考一些问题 我可以用这个项目来做什么?给一个图片&#xff0c; 进行分割&#xff0…...

【GO实战课】第六讲:电子商务网站(6):支付和订单处理

1. 简介 本课程将探讨电子商务网站的支付和订单处理功能,以及使用GO语言实现。在本课程中,我们将介绍如何设计一个可扩展、可靠和高性能的支付和订单处理系统,并演示如何使用GO语言编写相关代码。 本课程的目标是帮助学生理解电子商务网站的支付和订单处理功能,并提供一个…...

专题十三_记忆化搜索_算法专题详细总结

目录 1. 斐波那契数&#xff08;easy&#xff09; 那么这里就画出它的决策树 &#xff1a; 解法一&#xff1a;递归暴搜 解法二&#xff1a;记忆化搜索 解法三&#xff1a;动态规划 1.暴力解法&#xff08;暴搜&#xff09; 2.对优化解法的优化&#xff1a;把已经计算过的…...

已发布金融国家标准目录(截止2024年3月)

已发布金融国家标准目录2024年3月序号标准编号标准名称...

【论文#快速算法】Fast Intermode Decision in H.264/AVC Video Coding

目录 摘要1.前言2.帧间模式决策概览2.1 H.264/AVC中的帧间模式决策2.2 发现和动机 3.同质性和平稳性的确定3.1 同质性区域的确定3.2 稳定性区域的决定3.3 整体算法 4.实验结果4.1 IPPP序列的测试4.2 IBBP序列测试 5.结论 《Fast Intermode Decision in H.264/AVC Video Coding》…...

Git核心概念图例与最常用内容操作(reset、diff、restore、stash、reflog、cherry-pick)

文章目录 简介前置概念.git目录objects目录refs目录HEAD文件 resetreflog 与 reset --hardrevert(撤销指定提交)stashdiff工作区与暂存区差异暂存区与HEAD差异工作区与HEAD差异其他比较 restore、checkout(代码撤回)merge、rebase、cherry-pick 简介 本文将介绍Git几个核心概念…...

【人工智能在医疗企业个人中的应用】

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

IPv4头部和IPv6头部

IPv4和IPv6是互联网协议&#xff08;IP&#xff09;中的两个主要版本&#xff0c;它们在数据包头部&#xff08;Header&#xff09;结构上存在显著差异。以下是IPv4头部和IPv6头部的主要结构和区别&#xff1a; IPv4头部结构 IPv4&#xff08;Internet Protocol Version 4&…...

从零开始手把手带你训练LLM保姆级教程,草履虫都能学会!零基础看完这篇就足够了~

导读 ChatGPT面世以来&#xff0c;各种大模型相继出现。那么大模型到底是如何训练的呢&#xff0c;在这篇文章中&#xff0c;我们将尽可能详细地梳理一个完整的 LLM 训练流程&#xff0c;包括模型预训练&#xff08;Pretrain&#xff09;、Tokenizer 训练、指令微调&#xff0…...

strcat函数追加字符串

char * strcat ( char * destination, const char * source ); dest&#xff1a;目标字符串&#xff0c;即要将源字符串追加到其末尾的字符串。src&#xff1a;源字符串&#xff0c;即要追加到目标字符串末尾的字符串 使用strcat函数给目标字符串追加字符时&#xff0c;首先要…...

每月洞察:App Store 和 Google Play 的主要更新

Google Play 和 App Store 的算法不断发展&#xff0c;定期更新和变化会显着影响其功能。对于开发人员和营销人员来说&#xff0c;跟上这些变化至关重要&#xff0c;因为它们会直接影响应用发现和排名。 本文将深入探讨 Google Play 和 App Store 的最新更新&#xff0c;解释它…...

【python openai function2json小工具】

两种方法 一种openai-swarm中提供的、一种langchain实现的 一、openai工具函数调用 import inspectdef merge_chunk(final_response: dict, delta: dict) -> None:delta.pop("role", None)merge_fields(final_response, delta)tool_calls delta.get("tool_…...

super()和super().__init__()的解释

一、super 1.基本概念 在python继承当中&#xff0c;super()函数主要用在子类中调用父类的方法。它返回一个特殊对象&#xff0c;这个对象会帮我们调用父类方法 class Parent:def __init__(self, name):self.name namedef say_hello(self):print(f"Hello, Im {self.nam…...

【C++】—— 多态(下)

【C】—— 多态&#xff08;下&#xff09; 4 多态的原理 4.1 虚函数表指针4.2 多态的原理4.3 动态绑定和静态绑定 4.4 虚函数表 4 多态的原理 4.1 虚函数表指针 我们以一道题来引入多态的原理 下面编译为 32 位程序的运行结果是什么&#xff08;&#xff09; A、编译报错  B…...

idea 2023 配置 web service

前言 能在网上查到的资料,都是比较老的,搞了一上午才配置好了环境 因此记录一下,服务你我他 我的环境: java 1.8,Idea2023.1 配置web service 服务端 直接新建一个java新项目 下载插件 添加框架支持 启动项目 配置web service 客户端 新建项目,下载三个插件的步骤和上面服务…...

MYSQL数据库SQL+DQL

关于MySQL数据库中的SQL和DQL&#xff0c;以下是一些关键信息&#xff1a; SQL概述 SQL&#xff08;Structured Query Language&#xff0c;结构化查询语言&#xff09;是用于操作关系型数据库的编程语言。它定义了一套操作关系型数据库的统一标准。SQL语句可以单行或多行书写…...

Java中的异常Throwable

原文链接https://javaguide.cn/java/basis/java-basic-questions-03.html#%E5%BC%82%E5%B8%B8 Java 异常类层次结构图 Exception 和 Error 的区别 在 Java 中&#xff0c;所有的异常都有一个共同的祖先 java.lang 包中的 Throwable 类。Throwable 类有两个重要的子类: Excep…...

Day4顺序表c++代码实现

代码中实现&#xff0c;顺序表的初始化&#xff0c;插入&#xff0c;查找&#xff0c;删除 废话不多说&#xff0c;直接上 #include<iostream> using namespace std; #define eleType int struct SequentialList {eleType* elements;//指针int size;int capacity;//容量…...

将图片转换成base64格式

1.先创建一个canvas对象&#xff0c;然后给canvas对象设置图片对象的宽高&#xff0c;再调用canvas的getContext获取2d上下文对象&#xff0c;再调用上下文对象的drawImage方法&#xff0c;再通过canvase对象的toDataURL方法&#xff0c;将图片转换成base64格式的字符串 2.将b…...