pyspark之Structured Streaming file文件案例1
# generate_file.py
# 生成数据 生成500个文件,每个文件1000条数据
# 生成数据格式:eventtime name province action ()时间 用户名 省份 动作)
import os
import time
import shutil
import time
FIRST_NAME = ['Zhao', 'Qian', 'Sun', 'Li', 'Zhou', 'Wu', 'Zheng', 'Wang']
SECOND_NAME = ['San', 'Si', 'Wu', 'Chen', 'Yang', 'Min', 'Jie', 'Qi']
PROVINCE = ['BeiJing', 'ShanDong', 'ShangHai', 'HeNan', 'HaErBin']
ACTION = ['login', 'logout', 'purchase']
PATH = "/opt/software/tmp/"
DATA_PATH = "/opt/software/tmp/data/"
# 初始化环境
def test_Setup():
if os.path.exists(DATA_PATH):
shutil.rmtree(DATA_PATH)
os.mkdir(DATA_PATH)
# 清理数据,恢复测试环境
def test_TearDown():
shutile.rmtree(DATA_PATH)
# 数据保存文件
def writeAndMove(filename,content):
with open(PATH+filename,'wt',encoding='utf-8') as f:
f.write(content)
shutil.move(PATH+filename,DATA_PATH+filename)
if __name__ == '__main__':
test_Setup()
for i in range(500):
filename = "user_action_{}.log".format(i)
"""
验证spark输出模式,complete和update,增加代码,第一个文件i=0时,设置PROVINCE = "TAIWAN"
"""
if i == 0:
province= ['TaiWan']
else:
province = PROVINCE
content = ""
for _ in range(1000):
content += "{} {} {} {}\n".format(str(int(time.time())),random.choice(FIRST_NAME)+random.choice(SECOND_NAME),random.choice(province),random.choice(ACTION))
writeAndMove(filename,content)
time.sleep(10)
# spark_file_test.py
# 读取DATA文件夹下面文件,按照省份统计数据,主要考虑window情况,按照window情况测试,同时针对 outputMode和输出console和mysql进行考虑,其中保存到mysql时添加batch字段
from pyspark.sql import SparkSession,DataFrame
from pyspark.sql.functions import split,lit,from_unixtime
DATA_PATH = "/opt/software/tmp/data/"
if __name__ == '__main__':
spark = SparkSession.builder.getOrCreate()
lines = spark.readStream.format("text").option("seq","\n").load(DATA_PATH)
# 分隔符为空格
userinfo = lines.select(split(lines.value," ").alias("info"))
# 第一个为eventtime 第二个为name 第三个为province 第四个为action
# userinfo['info'][0]等同于userinfo['info'].getIterm(0)
user = userinfo.select(from_unixtime(userinfo['info'][0]).alias('eventtime'),
userinfo['info'][1].alias('name'),userinfo['info'][2].alias('province'),
userinfo['info'][3].alias('action'))
"""
测试1:数据直接输出到控制台,由于没有采用聚合,输出模式选择update
user.writeStream.outputMode("update").format("console").trigger(processingTime="8 seconds").start().awaitTermination()
"""
"""
测试2:数据存储到数据库,新建数据库表,可以通过printSchema()查看数据类型情况
def insert_into_mysql_batch(df:DataFrame,batch):
if df.count()>0:
# 此处将batch添加到df中,采用lit函数
data = df.withColumn("batch",lit(batch))
data.write.format("jdbc"). \
option("driver","com.mysql.jdbc.Driver"). \
option("url","jdbc:mysql://localhost:3306/spark").option("user","root").\
option("password","root").option("dbtable","user_log").\
option("batchsize",1000).mode("append").save()
else:
pass
user.writeStream.outputMode("update").foreachBatch((insert_into_mysql_batch)).trigger(processingTime="20 seconds").start().awaitTermination()
"""
"""
测试3:数据按照省份统计后,输出到控制台,分析complete和update输出模式区别,针对该问题,调整输入,province="TaiWan"只会输入1次,即如果输出方式complete,则每batch都会输出,update的话,只会出现在一个batch
userProvinceCounts = user.groupBy("province").count()
userProvinceCounts = userProvinceCounts.select(userProvinceCounts['province'],userProvinceCounts["count"].alias('sl'))
# 测试输出模式complete:complete将总计算结果都进行输出
"""
batch 0
TaiWan 1000
batch 1
TaiWan 1000
其他省份 sl
batch 2
TaiWan 1000
其他省份 sl
""" userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination()
# 测试输出模式update:update只输出相比上个批次变动的内容(新增或修改)
batch 0
TaiWan 1000
batch 1 中没有TaiWan输出
userProvinceCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="20 seconds").start().awaitTermination()
"""
相关文章:
pyspark之Structured Streaming file文件案例1
# generate_file.py # 生成数据 生成500个文件,每个文件1000条数据 # 生成数据格式:eventtime name province action ()时间 用户名 省份 动作) import os import time import shutil import time FIRST_NAME [Zhao, Qian, Sun, Li, Zhou, Wu, Zheng, Wang] SEC…...
虚幻UE 特效-Niagara特效实战-雨天
回顾Niagara特效基础知识:虚幻UE 特效-Niagara特效初识 其他两篇实战:虚幻UE 特效-Niagara特效实战-火焰、烛火、虚幻UE 特效-Niagara特效实战-烟雾、喷泉 本篇笔记我们再来实战雨天,雨天主要用到了特效中的事件。 文章目录 一、雨天1、创建雨…...
k8s 集群搭建的一些坑
k8s集群部署的时候会遇到很多的坑,即使看网上的文档也可能遇到各种的坑。 安装准备 1、虚拟机两台(ip按自己的网络环境相应配置)(master/node) 192.168.100.215 k8s-master 192.168.100.216 k8s-node1 2、关闭防火墙(master/node) system…...
SpringMVC传递数据给前台
SpringMVC有三种方式将数据提供给前台 第一种 使用Request域 第二种 使用Model(数据默认是存放在Request域中) 与第一种方式其实是一致的 第三种 使用Map集合(数据默认是存放在Request域中)...
国标GB28181安防视频监控EasyCVR级联后上级平台视频加载慢的原因排查
国标GB28181协议安防视频监控系统EasyCVR视频综合管理平台,采用了开放式的网络结构,可以提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联、磁盘阵列存储、视频集中存储、云存储等丰富的视频能力,同时还…...
React16源码: React中的HostComponent HostText的源码实现
HostComponent & HostText 1 )概述 HostComponent 就是我们dom原生的这些节点, 如: div, span, p 标签这种 使用的是小写字母开头的这些节点一般都认为它是一个 HostComponent HostText,它是单纯的文本节点主要关注它们的一个更新过程 2 …...
Unity3D代码混淆方案详解
背景 Unity引擎使用Mono运行时,而C#语言易受反编译影响,存在代码泄露风险。本文通过《QQ乐团》项目实践,提出一种适用于Unity引擎的代码混淆方案,以保护代码逻辑。 引言 在Unity引擎下,为了防止代码被轻易反编译&a…...
安科瑞应急疏散照明系统在歌舞娱乐等场所的应用
首先必须明确疏散照明并不包含疏散指示标志,疏散照明是为了提供人员疏散时的必要照明,必须达到规定照度,以便逃生时看清逃生的路径,避免出现恐慌及踩踏事故,而疏散指示标志则是提供疏散路径方向引导的,所以…...
Go语言协程使用
主协程执行打印,子协程不打印 package main import ("fmt" )func do(i int) {fmt.Println("执行中") } func main() {fmt.Println("main协程")go do(1)fmt.Println("执行完了") }//main协程 //执行完了子协程没有打印输出…...
JAVA如何创建对象
在 Java 中创建对象的步骤如下: 定义一个类:在 Java 中,所有的对象都是通过类来创建的。因此,首先需要定义一个类,即描述对象的属性和行为。 声明变量:要创建一个对象,需要先声明一个变量来保存…...
《WebKit 技术内幕》之五(2): HTML解释器和DOM 模型
2.HTML 解释器 2.1 解释过程 HTML 解释器的工作就是将网络或者本地磁盘获取的 HTML 网页和资源从字节流解释成 DOM 树结构。 这一过程中,WebKit 内部对网页内容在各个阶段的结构表示。 WebKit 中这一过程如下:首先是字节流,经过解码之…...
Spring Boot多环境配置
Spring Boot的针对不同的环境创建不同的配置文件, 语法结构:application-{profile}.properties profile:代表的就是一套环境 需求 application-dev.yml 开发环境 端口8090 application-test.yml 测试环境 端口8091 application-prod.yml 生产环境 端口80…...
常用的目标跟踪有哪些
目标跟踪是计算机视觉领域的一个重要研究方向,主要用于实现视频监控、人机交互、智能交通等领域。下面介绍几种常用的目标跟踪方法: 特征匹配法 特征匹配法是目标跟踪中最基本的方法之一,其基本原理是通过提取目标的特征,然后在…...
python222网站实战(SpringBoot+SpringSecurity+MybatisPlus+thymeleaf+layui)-帖子详情页实现
锋哥原创的SpringbootLayui python222网站实战: python222网站实战课程视频教程(SpringBootPython爬虫实战) ( 火爆连载更新中... )_哔哩哔哩_bilibilipython222网站实战课程视频教程(SpringBootPython爬虫实战) ( 火…...
11、Kafka ------ Kafka 核心API 及 生产者API 讲解
目录 Kafka核心API 及 生产者API讲解★ Kafka的核心APIKafka包含如下5类核心API: ★ 生产者APIKafka 的API 文档 ★ 使用生产者API发送消息 Kafka核心API 及 生产者API讲解 官方文档 ★ Kafka的核心API Kafka包含如下5类核心API: Producer API&#x…...
MySQL 8.3 发布, 它带来哪些新变化?
1月16号 MySQL 官方发布 8.3 创新版 和 8.0.36 长期支持版本 (该版本 没有新增功能,更多是修复bug ),本文基于 官方文档 说一下 8.3 版本带来的变化。 一 增加的特性 1.1 GTID_NEXT 支持增加 TAG 选项。 之前的版本中 GTID_NEXTUUID:number ÿ…...
【数据结构】详谈队列的顺序存储及C语言实现
循环队列及其基本操作的C语言实现 前言一、队列的顺序存储1.1 队尾指针与队头指针1.2 基本操作实现的底层逻辑1.2.1 队列的创建与销毁1.2.2 队列的增加与删除1.2.3 队列的判空与判满1.2.4 逻辑的局限性 二、循环队列2.1 循环队列的实现逻辑一2.2 循环队列的实现逻辑二2.3 循环队…...
为什么 HTTPS 协议能保障数据传输的安全性?
HTTP 协议 在谈论 HTTPS 协议之前,先来回顾一下 HTTP 协议的概念。 HTTP 协议介绍 HTTP 协议是一种基于文本的传输协议,它位于 OSI 网络模型中的应用层。 HTTP 协议是通过客户端和服务器的请求应答来进行通讯,目前协议由之前的 RFC 2616 拆…...
使用 Node 创建 Web 服务器
Node.js 提供了 http 模块,http 模块主要用于搭建 HTTP 服务端和客户端,使用 HTTP 服务器或客户端功能必须调用 http 模块,代码如下: var http require(http); 以下是演示一个最基本的 HTTP 服务器架构(使用 8080 端口)&#x…...
leetcode 151反转字符串如何原地去除多余空格
题目:https://leetcode.cn/problems/reverse-words-in-a-string/description/ 完整题解:https://leetcode.cn/problems/reverse-words-in-a-string/solutions/2611893/chu-li-kong-ge-ku-han-shu-reversefan-zhu-bioo 思路来自代码随想录,对其中的除去多…...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
大模型多显卡多服务器并行计算方法与实践指南
一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
docker 部署发现spring.profiles.active 问题
报错: org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
