pyspark之Structured Streaming file文件案例1
# generate_file.py
# 生成数据 生成500个文件,每个文件1000条数据
# 生成数据格式:eventtime name province action ()时间 用户名 省份 动作)
import os
import time
import shutil
import time
FIRST_NAME = ['Zhao', 'Qian', 'Sun', 'Li', 'Zhou', 'Wu', 'Zheng', 'Wang']
SECOND_NAME = ['San', 'Si', 'Wu', 'Chen', 'Yang', 'Min', 'Jie', 'Qi']
PROVINCE = ['BeiJing', 'ShanDong', 'ShangHai', 'HeNan', 'HaErBin']
ACTION = ['login', 'logout', 'purchase']
PATH = "/opt/software/tmp/"
DATA_PATH = "/opt/software/tmp/data/"
# 初始化环境
def test_Setup():
if os.path.exists(DATA_PATH):
shutil.rmtree(DATA_PATH)
os.mkdir(DATA_PATH)
# 清理数据,恢复测试环境
def test_TearDown():
shutile.rmtree(DATA_PATH)
# 数据保存文件
def writeAndMove(filename,content):
with open(PATH+filename,'wt',encoding='utf-8') as f:
f.write(content)
shutil.move(PATH+filename,DATA_PATH+filename)
if __name__ == '__main__':
test_Setup()
for i in range(500):
filename = "user_action_{}.log".format(i)
"""
验证spark输出模式,complete和update,增加代码,第一个文件i=0时,设置PROVINCE = "TAIWAN"
"""
if i == 0:
province= ['TaiWan']
else:
province = PROVINCE
content = ""
for _ in range(1000):
content += "{} {} {} {}\n".format(str(int(time.time())),random.choice(FIRST_NAME)+random.choice(SECOND_NAME),random.choice(province),random.choice(ACTION))
writeAndMove(filename,content)
time.sleep(10)
# spark_file_test.py
# 读取DATA文件夹下面文件,按照省份统计数据,主要考虑window情况,按照window情况测试,同时针对 outputMode和输出console和mysql进行考虑,其中保存到mysql时添加batch字段
from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import split,lit,from_unixtime
DATA_PATH = "/opt/software/tmp/data/"
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
lines = spark.readStream.format("text").option("seq","\n").load(DATA_PATH)
# 分隔符为空格
userinfo = lines.select(split(lines.value," ").alias("info"))
# 第一个为eventtime 第二个为name 第三个为province 第四个为action
# userinfo['info'][0]等同于userinfo['info'].getIterm(0)
user = userinfo.select(from_unixtime(userinfo['info'][0]).alias('eventtime'),
userinfo['info'][1].alias('name'),userinfo['info'][2].alias('province'),
userinfo['info'][3].alias('action'))
"""
测试1:数据直接输出到控制台,由于没有采用聚合,输出模式选择update
user.writeStream.outputMode("update").format("console").trigger(processingTime="8 seconds").start().awaitTermination()
"""
"""
测试2:数据存储到数据库,新建数据库表,可以通过printSchema()查看数据类型情况
def insert_into_mysql_batch(df:DataFrame,batch):
if df.count()>0:
# 此处将batch添加到df中,采用lit函数
data = df.withColumn("batch",lit(batch))
data.write.format("jdbc"). \
option("driver","com.mysql.jdbc.Driver"). \
option("url","jdbc:mysql://localhost:3306/spark").option("user","root").\
option("password","root").option("dbtable","user_log").\
option("batchsize",1000).mode("append").save()
else:
pass
user.writeStream.outputMode("update").foreachBatch((insert_into_mysql_batch)).trigger(processingTime="20 seconds").start().awaitTermination()
"""
"""
测试3:数据按照省份统计后,输出到控制台,分析complete和update输出模式区别,针对该问题,调整输入,province="TaiWan"只会输入1次,即如果输出方式complete,则每batch都会输出,update的话,只会出现在一个batch
userProvinceCounts = user.groupBy("province").count()
userProvinceCounts = userProvinceCounts.select(userProvinceCounts['province'],userProvinceCounts["count"].alias('sl'))
# 测试输出模式complete:complete将总计算结果都进行输出
"""
batch 0
TaiWan 1000
batch 1
TaiWan 1000
其他省份 sl
batch 2
TaiWan 1000
其他省份 sl
""" userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination()
# 测试输出模式update:update只输出相比上个批次变动的内容(新增或修改)
batch 0
TaiWan 1000
batch 1 中没有TaiWan输出
userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination()
"""
相关文章:
pyspark之Structured Streaming file文件案例1
# generate_file.py # 生成数据 生成500个文件,每个文件1000条数据 # 生成数据格式:eventtime name province action ()时间 用户名 省份 动作) import os import time import shutil import time FIRST_NAME [Zhao, Qian, Sun, Li, Zhou, Wu, Zheng, Wang] SEC…...
虚幻UE 特效-Niagara特效实战-雨天
回顾Niagara特效基础知识:虚幻UE 特效-Niagara特效初识 其他两篇实战:虚幻UE 特效-Niagara特效实战-火焰、烛火、虚幻UE 特效-Niagara特效实战-烟雾、喷泉 本篇笔记我们再来实战雨天,雨天主要用到了特效中的事件。 文章目录 一、雨天1、创建雨…...
k8s 集群搭建的一些坑
k8s集群部署的时候会遇到很多的坑,即使看网上的文档也可能遇到各种的坑。 安装准备 1、虚拟机两台(ip按自己的网络环境相应配置)(master/node) 192.168.100.215 k8s-master 192.168.100.216 k8s-node1 2、关闭防火墙(master/node) system…...
SpringMVC传递数据给前台
SpringMVC有三种方式将数据提供给前台 第一种 使用Request域 第二种 使用Model(数据默认是存放在Request域中) 与第一种方式其实是一致的 第三种 使用Map集合(数据默认是存放在Request域中)...
国标GB28181安防视频监控EasyCVR级联后上级平台视频加载慢的原因排查
国标GB28181协议安防视频监控系统EasyCVR视频综合管理平台,采用了开放式的网络结构,可以提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联、磁盘阵列存储、视频集中存储、云存储等丰富的视频能力,同时还…...
React16源码: React中的HostComponent HostText的源码实现
HostComponent & HostText 1 )概述 HostComponent 就是我们dom原生的这些节点, 如: div, span, p 标签这种 使用的是小写字母开头的这些节点一般都认为它是一个 HostComponent HostText,它是单纯的文本节点主要关注它们的一个更新过程 2 …...
Unity3D代码混淆方案详解
背景 Unity引擎使用Mono运行时,而C#语言易受反编译影响,存在代码泄露风险。本文通过《QQ乐团》项目实践,提出一种适用于Unity引擎的代码混淆方案,以保护代码逻辑。 引言 在Unity引擎下,为了防止代码被轻易反编译&a…...
安科瑞应急疏散照明系统在歌舞娱乐等场所的应用
首先必须明确疏散照明并不包含疏散指示标志,疏散照明是为了提供人员疏散时的必要照明,必须达到规定照度,以便逃生时看清逃生的路径,避免出现恐慌及踩踏事故,而疏散指示标志则是提供疏散路径方向引导的,所以…...
Go语言协程使用
主协程执行打印,子协程不打印 package main import ("fmt" )func do(i int) {fmt.Println("执行中") } func main() {fmt.Println("main协程")go do(1)fmt.Println("执行完了") }//main协程 //执行完了子协程没有打印输出…...
JAVA如何创建对象
在 Java 中创建对象的步骤如下: 定义一个类:在 Java 中,所有的对象都是通过类来创建的。因此,首先需要定义一个类,即描述对象的属性和行为。 声明变量:要创建一个对象,需要先声明一个变量来保存…...
《WebKit 技术内幕》之五(2): HTML解释器和DOM 模型
2.HTML 解释器 2.1 解释过程 HTML 解释器的工作就是将网络或者本地磁盘获取的 HTML 网页和资源从字节流解释成 DOM 树结构。 这一过程中,WebKit 内部对网页内容在各个阶段的结构表示。 WebKit 中这一过程如下:首先是字节流,经过解码之…...
Spring Boot多环境配置
Spring Boot的针对不同的环境创建不同的配置文件, 语法结构:application-{profile}.properties profile:代表的就是一套环境 需求 application-dev.yml 开发环境 端口8090 application-test.yml 测试环境 端口8091 application-prod.yml 生产环境 端口80…...
常用的目标跟踪有哪些
目标跟踪是计算机视觉领域的一个重要研究方向,主要用于实现视频监控、人机交互、智能交通等领域。下面介绍几种常用的目标跟踪方法: 特征匹配法 特征匹配法是目标跟踪中最基本的方法之一,其基本原理是通过提取目标的特征,然后在…...
python222网站实战(SpringBoot+SpringSecurity+MybatisPlus+thymeleaf+layui)-帖子详情页实现
锋哥原创的SpringbootLayui python222网站实战: python222网站实战课程视频教程(SpringBootPython爬虫实战) ( 火爆连载更新中... )_哔哩哔哩_bilibilipython222网站实战课程视频教程(SpringBootPython爬虫实战) ( 火…...
11、Kafka ------ Kafka 核心API 及 生产者API 讲解
目录 Kafka核心API 及 生产者API讲解★ Kafka的核心APIKafka包含如下5类核心API: ★ 生产者APIKafka 的API 文档 ★ 使用生产者API发送消息 Kafka核心API 及 生产者API讲解 官方文档 ★ Kafka的核心API Kafka包含如下5类核心API: Producer API&#x…...
MySQL 8.3 发布, 它带来哪些新变化?
1月16号 MySQL 官方发布 8.3 创新版 和 8.0.36 长期支持版本 (该版本 没有新增功能,更多是修复bug ),本文基于 官方文档 说一下 8.3 版本带来的变化。 一 增加的特性 1.1 GTID_NEXT 支持增加 TAG 选项。 之前的版本中 GTID_NEXTUUID:number ÿ…...
【数据结构】详谈队列的顺序存储及C语言实现
循环队列及其基本操作的C语言实现 前言一、队列的顺序存储1.1 队尾指针与队头指针1.2 基本操作实现的底层逻辑1.2.1 队列的创建与销毁1.2.2 队列的增加与删除1.2.3 队列的判空与判满1.2.4 逻辑的局限性 二、循环队列2.1 循环队列的实现逻辑一2.2 循环队列的实现逻辑二2.3 循环队…...
为什么 HTTPS 协议能保障数据传输的安全性?
HTTP 协议 在谈论 HTTPS 协议之前,先来回顾一下 HTTP 协议的概念。 HTTP 协议介绍 HTTP 协议是一种基于文本的传输协议,它位于 OSI 网络模型中的应用层。 HTTP 协议是通过客户端和服务器的请求应答来进行通讯,目前协议由之前的 RFC 2616 拆…...
使用 Node 创建 Web 服务器
Node.js 提供了 http 模块,http 模块主要用于搭建 HTTP 服务端和客户端,使用 HTTP 服务器或客户端功能必须调用 http 模块,代码如下: var http require(http); 以下是演示一个最基本的 HTTP 服务器架构(使用 8080 端口)&#x…...
leetcode 151反转字符串如何原地去除多余空格
题目:https://leetcode.cn/problems/reverse-words-in-a-string/description/ 完整题解:https://leetcode.cn/problems/reverse-words-in-a-string/solutions/2611893/chu-li-kong-ge-ku-han-shu-reversefan-zhu-bioo 思路来自代码随想录,对其中的除去多…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
【解密LSTM、GRU如何解决传统RNN梯度消失问题】
解密LSTM与GRU:如何让RNN变得更聪明? 在深度学习的世界里,循环神经网络(RNN)以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而,传统RNN存在的一个严重问题——梯度消失&#…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)
目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关࿰…...
