当前位置: 首页 > news >正文

StructuredStreaming (一)

一、sparkStreaming的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍 

1、2016年Spark2.0版本中发布

2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。

3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎

准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据

三、socket+console

1、在虚拟机中下载nc
yum install -y nc2、启动 nc -lk 9999

案例:wordcount

import osfrom pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()socketDf = spark.readStream.format("socket") \.option("host", "bigdata01") \.option("port", 9999) \.load()# 处理# 方式一:使用dsl语法splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))resultDf1 = splitDf.groupBy("word").count()# 方式二:使用sqlsocketDf.createOrReplaceTempView("wordcount")resultDf2 = spark.sql("""with t1 as( select num from wordcount lateral view explode(split(value," ")) c as num)select num,count(*) counts from t1 group by num;""")# 下面的就是sink的写法 后续会写query1 = resultDf1.writeStream \.outputMode("complete") \.format("console") \.start()query2 = resultDf2.writeStream \.outputMode("complete") \.format("console") \.start() \.awaitTermination()spark.stop()

四、file+console

文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructTypeif __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()# score_schema = StructType([#     StructField(name="stu_id", dataType=IntegerType(), nullable=False),#     StructField(name="subject_name", dataType=StringType(), nullable=True),#     StructField(name="score", dataType=DoubleType(), nullable=True)# ])score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())socketDf = spark.readStream.format("csv") \.option("sep", ";") \.schema(score_schema) \.load("../../resources/input1")socketDf.writeStream \.outputMode("append") \.format("console") \.option("truncate", False) \.start() \.awaitTermination()spark.stop()

相关文章:

StructuredStreaming (一)

一、sparkStreaming的不足 1.基于微批,延迟高不能做到真正的实时 2.DStream基于RDD,不直接支持SQL 3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD) 4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件&am…...

由播客转向个人定制的音频频道(1)平台搭建

项目的背景 最近开始听喜马拉雅播客的内容,但是发现许多不方便的地方。 休息的时候收听喜马拉雅,但是还需要不断地选择喜马拉雅的内容,比较麻烦,而且黑灯操作反而伤眼睛。 喜马拉雅为代表的播客平台都是VOD 形式的&#xff0…...

[自然语言处理] [AI]深入理解语言与情感分类:从基础到深度学习的进展

语言是人类智能的核心组成部分,具有极高的复杂性和多样性。理解语言,尤其是语言中的隐含部分,向来是人工智能研究的一个巨大挑战。图灵测试本身便是一场关于语言生成与理解的比赛,旨在检验机器是否能够模拟人类的语言能力。随着深度学习的飞速发展,语音识别、情感分析等自…...

【GPTs】Gif-PT:DALL·E制作创意动图与精灵动画

博客主页: [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 💯GPTs指令💯前言💯Gif-PT主要功能适用场景优点缺点 💯小结 💯GPTs指令 中文翻译: 使用Dalle生成用户请求的精灵图动画&#…...

云原生周刊:Istio 1.24.0 正式发布

云原生周刊:Istio 1.24.0 正式发布 开源项目推荐 Kopf Kopf 是一个简洁高效的 Python 框架,只需几行代码即可编写 Kubernetes Operator。Kubernetes(K8s)作为强大的容器编排系统,虽自带命令行工具(kubec…...

Linux设置jar包开机启动

操作系统环境:CentOS 7 【需要 root 权限,使用 root 用户进行操作 或 普通用户使用 sudo 进行操作】 一、系统服务的方式 原理:利用系统服务管理应用程序的生命周期, systemctl 为系统服务管理工具 systemctl start applicati…...

计算机视觉和机器人技术中的下一个标记预测与视频扩散相结合

一种新方法可以训练神经网络对损坏的数据进行分类,同时预测下一步操作。 它可以为机器人制定灵活的计划,生成高质量的视频,并帮助人工智能代理导航数字环境。 Diffusion Forcing 方法可以对嘈杂的数据进行分类,并可靠地预测任务的…...

C语言之简单的获取命令行参数和环境变量

C语言之简单的获取命令行参数和环境变量 本人的开发环境为WIN10操作系统用VMWARE虚拟的UBUNTU LINUX 18.04LTS!!! 所有代码的编辑、编译、运行都在虚拟机上操作,初学的朋友要注意这一点!!! 详细…...

STL之vecor的使用(超详解)

目录 1. C/C中的数组 1.1. C语言中的数组 1.2. C中的数组 2. vector的接口 2.1. vector的迭代器 2.2. vector的初始化与销毁 2.3. vector的容量操作 2.4. vector的访问操作 2.5. vector的修改操作 💓 博客主页:C-SDN花园GGbond ⏩ 文章专栏…...

SystemVerilog学习笔记(一):数据类型

在systemverilog中,主要包含以下数据类型: 4值类型2值类型数组字符串结构体和联合体枚举自定义类型 无符号数:无符号数的符号不使用任何标志,即无符号数只能存储正数。无符号二进制数的范围从 0 到 ((2^n) - 1),n 表…...

Linux软件包管理与Vim编辑器使用指南

目录 一、Linux软件包管理器yum 1.什么是软件包? 2.什么是软件包管理器? 3.查看软件包 4.安装软件 ​编辑 5.卸载软件 Linux开发工具: 二、Linux编辑器---vim 1.vim的基本概念 (1) 正常/普通模式(Normal mode&#xff0…...

每日一练 | 包过滤防火墙的工作原理

01 真题题目 包过滤防火墙对哪一层的数据报文进行检查? A. 应用层 B. 物理层 C. 网络层 D. 链路层 02 真题答案 C 03 答案解析 包过滤防火墙是一种基本的安全设备,它通过检查进出网络的数据包来决定是否允许该数据包通过。 这种类型的防火墙主要关注…...

AR眼镜方案_AR智能眼镜阵列/衍射光波导显示方案

在当今AR智能眼镜的发展中,显示和光学组件成为了技术攻坚的主要领域。由于这些组件的高制造难度和成本,其光学显示模块在整个设备的成本中约占40%。 采用光波导技术的AR眼镜显示方案,核心结构通常由光机、波导和耦合器组成。光机内的微型显示…...

SpringBoot(十九)创建多模块Springboot项目(完整版)

之前我有记录过一次SpringBoot多模块项目的搭建,但是那一次只是做了一个小小的测试。只是把各模块联通之后就结束了。 最近要增加业务开发,要将目前的单模块项目改成多模块项目,我就参照了一下我上次搭建的流程,发现总是有报错。上次搭建的比较顺利,很多细枝末节也没有仔细…...

Navicat 17 功能简介 | 单元格编辑器

Navicat 17 功能简介 | 单元格编辑器 本期,我们一起了解 Navicat 17 出色的数据操作功能的单元格编辑器。单元格编辑器支持文本、十六进制、图像和网页四种格式的数据编辑,位于底部的编辑器窗格,为你编辑更大容量的数据信息提供足够的显示和操…...

MySQL【四】

插入数据 向数据表中插入一行数据 INSERT|REPLACE INTO 表名[(字段列表)] VALUES(值列表); ########## 在s表中插入一条记录:学号为s011,姓名为李思,性别为默认值,计算机专业 ########## insert into s(sno,sname,dept)values(s011,李思,计…...

简单叙述 Spring Boot 启动过程

文章目录 1. 准备阶段:应用启动的入口2. 创建 SpringApplication 对象:开始启动工作3. 配置环境(Environment):识别开发环境与生产环境4. 启动监听器和初始化器:感知启动的关键事件5. 创建 ApplicationCont…...

微信小程序自定义tabbar;禁用某个tab;修改某个tab的样式

微信小程序自定义tabbar;禁用某个tab;修改某个tab的样式 原本使用本身的tabBar就已经很舒服了,很合适了的,但是总有一些脑洞大开的产品和客户,给你搞点多样式,没办法牛马就得去做咯,现在就给大…...

力扣113:路径总和II

给你二叉树的根节点 root 和一个整数目标和 targetSum ,找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1: 输入:root [5,4,8,11,null,13,4,7,2,null,null,5,1], targetSum 22 输出&a…...

JavaScript字符串常用方法

在JavaScript中,字符串是用来表示文本数据的基本数据类型。字符串可以用单引号()、双引号(")、或反引号()包裹。JavaScript中的字符串是不可变的,也就是说,字符串的值一旦创建就无法更改,但可以创建新字符串来替换原有字符串…...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...

使用VSCode开发Django指南

使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...

Docker 运行 Kafka 带 SASL 认证教程

Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

系统设计 --- MongoDB亿级数据查询优化策略

系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...

生成 Git SSH 证书

🔑 1. ​​生成 SSH 密钥对​​ 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​: -t rsa&#x…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...

阿里云Ubuntu 22.04 64位搭建Flask流程(亲测)

cd /home 进入home盘 安装虚拟环境: 1、安装virtualenv pip install virtualenv 2.创建新的虚拟环境: virtualenv myenv 3、激活虚拟环境(激活环境可以在当前环境下安装包) source myenv/bin/activate 此时,终端…...

Neko虚拟浏览器远程协作方案:Docker+内网穿透技术部署实践

前言:本文将向开发者介绍一款创新性协作工具——Neko虚拟浏览器。在数字化协作场景中,跨地域的团队常需面对实时共享屏幕、协同编辑文档等需求。通过本指南,你将掌握在Ubuntu系统中使用容器化技术部署该工具的具体方案,并结合内网…...

【51单片机】4. 模块化编程与LCD1602Debug

1. 什么是模块化编程 传统编程会将所有函数放在main.c中,如果使用的模块多,一个文件内会有很多代码,不利于组织和管理 模块化编程则是将各个模块的代码放在不同的.c文件里,在.h文件里提供外部可调用函数声明,其他.c文…...