当前位置: 首页 > news >正文

StructuredStreaming (一)

一、sparkStreaming的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍 

1、2016年Spark2.0版本中发布

2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。

3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎

准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据

三、socket+console

1、在虚拟机中下载nc
yum install -y nc2、启动 nc -lk 9999

案例:wordcount

import osfrom pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()socketDf = spark.readStream.format("socket") \.option("host", "bigdata01") \.option("port", 9999) \.load()# 处理# 方式一:使用dsl语法splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))resultDf1 = splitDf.groupBy("word").count()# 方式二:使用sqlsocketDf.createOrReplaceTempView("wordcount")resultDf2 = spark.sql("""with t1 as( select num from wordcount lateral view explode(split(value," ")) c as num)select num,count(*) counts from t1 group by num;""")# 下面的就是sink的写法 后续会写query1 = resultDf1.writeStream \.outputMode("complete") \.format("console") \.start()query2 = resultDf2.writeStream \.outputMode("complete") \.format("console") \.start() \.awaitTermination()spark.stop()

四、file+console

文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructTypeif __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()# score_schema = StructType([#     StructField(name="stu_id", dataType=IntegerType(), nullable=False),#     StructField(name="subject_name", dataType=StringType(), nullable=True),#     StructField(name="score", dataType=DoubleType(), nullable=True)# ])score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())socketDf = spark.readStream.format("csv") \.option("sep", ";") \.schema(score_schema) \.load("../../resources/input1")socketDf.writeStream \.outputMode("append") \.format("console") \.option("truncate", False) \.start() \.awaitTermination()spark.stop()

相关文章:

StructuredStreaming (一)

一、sparkStreaming的不足 1.基于微批,延迟高不能做到真正的实时 2.DStream基于RDD,不直接支持SQL 3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD) 4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件&am…...

由播客转向个人定制的音频频道(1)平台搭建

项目的背景 最近开始听喜马拉雅播客的内容,但是发现许多不方便的地方。 休息的时候收听喜马拉雅,但是还需要不断地选择喜马拉雅的内容,比较麻烦,而且黑灯操作反而伤眼睛。 喜马拉雅为代表的播客平台都是VOD 形式的&#xff0…...

[自然语言处理] [AI]深入理解语言与情感分类:从基础到深度学习的进展

语言是人类智能的核心组成部分,具有极高的复杂性和多样性。理解语言,尤其是语言中的隐含部分,向来是人工智能研究的一个巨大挑战。图灵测试本身便是一场关于语言生成与理解的比赛,旨在检验机器是否能够模拟人类的语言能力。随着深度学习的飞速发展,语音识别、情感分析等自…...

【GPTs】Gif-PT:DALL·E制作创意动图与精灵动画

博客主页: [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 💯GPTs指令💯前言💯Gif-PT主要功能适用场景优点缺点 💯小结 💯GPTs指令 中文翻译: 使用Dalle生成用户请求的精灵图动画&#…...

云原生周刊:Istio 1.24.0 正式发布

云原生周刊:Istio 1.24.0 正式发布 开源项目推荐 Kopf Kopf 是一个简洁高效的 Python 框架,只需几行代码即可编写 Kubernetes Operator。Kubernetes(K8s)作为强大的容器编排系统,虽自带命令行工具(kubec…...

Linux设置jar包开机启动

操作系统环境:CentOS 7 【需要 root 权限,使用 root 用户进行操作 或 普通用户使用 sudo 进行操作】 一、系统服务的方式 原理:利用系统服务管理应用程序的生命周期, systemctl 为系统服务管理工具 systemctl start applicati…...

计算机视觉和机器人技术中的下一个标记预测与视频扩散相结合

一种新方法可以训练神经网络对损坏的数据进行分类,同时预测下一步操作。 它可以为机器人制定灵活的计划,生成高质量的视频,并帮助人工智能代理导航数字环境。 Diffusion Forcing 方法可以对嘈杂的数据进行分类,并可靠地预测任务的…...

C语言之简单的获取命令行参数和环境变量

C语言之简单的获取命令行参数和环境变量 本人的开发环境为WIN10操作系统用VMWARE虚拟的UBUNTU LINUX 18.04LTS!!! 所有代码的编辑、编译、运行都在虚拟机上操作,初学的朋友要注意这一点!!! 详细…...

STL之vecor的使用(超详解)

目录 1. C/C中的数组 1.1. C语言中的数组 1.2. C中的数组 2. vector的接口 2.1. vector的迭代器 2.2. vector的初始化与销毁 2.3. vector的容量操作 2.4. vector的访问操作 2.5. vector的修改操作 💓 博客主页:C-SDN花园GGbond ⏩ 文章专栏…...

SystemVerilog学习笔记(一):数据类型

在systemverilog中,主要包含以下数据类型: 4值类型2值类型数组字符串结构体和联合体枚举自定义类型 无符号数:无符号数的符号不使用任何标志,即无符号数只能存储正数。无符号二进制数的范围从 0 到 ((2^n) - 1),n 表…...

Linux软件包管理与Vim编辑器使用指南

目录 一、Linux软件包管理器yum 1.什么是软件包? 2.什么是软件包管理器? 3.查看软件包 4.安装软件 ​编辑 5.卸载软件 Linux开发工具: 二、Linux编辑器---vim 1.vim的基本概念 (1) 正常/普通模式(Normal mode&#xff0…...

每日一练 | 包过滤防火墙的工作原理

01 真题题目 包过滤防火墙对哪一层的数据报文进行检查? A. 应用层 B. 物理层 C. 网络层 D. 链路层 02 真题答案 C 03 答案解析 包过滤防火墙是一种基本的安全设备,它通过检查进出网络的数据包来决定是否允许该数据包通过。 这种类型的防火墙主要关注…...

AR眼镜方案_AR智能眼镜阵列/衍射光波导显示方案

在当今AR智能眼镜的发展中,显示和光学组件成为了技术攻坚的主要领域。由于这些组件的高制造难度和成本,其光学显示模块在整个设备的成本中约占40%。 采用光波导技术的AR眼镜显示方案,核心结构通常由光机、波导和耦合器组成。光机内的微型显示…...

SpringBoot(十九)创建多模块Springboot项目(完整版)

之前我有记录过一次SpringBoot多模块项目的搭建,但是那一次只是做了一个小小的测试。只是把各模块联通之后就结束了。 最近要增加业务开发,要将目前的单模块项目改成多模块项目,我就参照了一下我上次搭建的流程,发现总是有报错。上次搭建的比较顺利,很多细枝末节也没有仔细…...

Navicat 17 功能简介 | 单元格编辑器

Navicat 17 功能简介 | 单元格编辑器 本期,我们一起了解 Navicat 17 出色的数据操作功能的单元格编辑器。单元格编辑器支持文本、十六进制、图像和网页四种格式的数据编辑,位于底部的编辑器窗格,为你编辑更大容量的数据信息提供足够的显示和操…...

MySQL【四】

插入数据 向数据表中插入一行数据 INSERT|REPLACE INTO 表名[(字段列表)] VALUES(值列表); ########## 在s表中插入一条记录:学号为s011,姓名为李思,性别为默认值,计算机专业 ########## insert into s(sno,sname,dept)values(s011,李思,计…...

简单叙述 Spring Boot 启动过程

文章目录 1. 准备阶段:应用启动的入口2. 创建 SpringApplication 对象:开始启动工作3. 配置环境(Environment):识别开发环境与生产环境4. 启动监听器和初始化器:感知启动的关键事件5. 创建 ApplicationCont…...

微信小程序自定义tabbar;禁用某个tab;修改某个tab的样式

微信小程序自定义tabbar;禁用某个tab;修改某个tab的样式 原本使用本身的tabBar就已经很舒服了,很合适了的,但是总有一些脑洞大开的产品和客户,给你搞点多样式,没办法牛马就得去做咯,现在就给大…...

力扣113:路径总和II

给你二叉树的根节点 root 和一个整数目标和 targetSum ,找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1: 输入:root [5,4,8,11,null,13,4,7,2,null,null,5,1], targetSum 22 输出&a…...

JavaScript字符串常用方法

在JavaScript中,字符串是用来表示文本数据的基本数据类型。字符串可以用单引号()、双引号(")、或反引号()包裹。JavaScript中的字符串是不可变的,也就是说,字符串的值一旦创建就无法更改,但可以创建新字符串来替换原有字符串…...

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

React Native 开发环境搭建(全平台详解)

React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...

centos 7 部署awstats 网站访问检测

一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...

Rust 开发环境搭建

环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行: rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu ​ 2、Hello World fn main() { println…...

LCTF液晶可调谐滤波器在多光谱相机捕捉无人机目标检测中的作用

中达瑞和自2005年成立以来,一直在光谱成像领域深度钻研和发展,始终致力于研发高性能、高可靠性的光谱成像相机,为科研院校提供更优的产品和服务。在《低空背景下无人机目标的光谱特征研究及目标检测应用》这篇论文中提到中达瑞和 LCTF 作为多…...

消息队列系统设计与实践全解析

文章目录 🚀 消息队列系统设计与实践全解析🔍 一、消息队列选型1.1 业务场景匹配矩阵1.2 吞吐量/延迟/可靠性权衡💡 权衡决策框架 1.3 运维复杂度评估🔧 运维成本降低策略 🏗️ 二、典型架构设计2.1 分布式事务最终一致…...

Spring AOP代理对象生成原理

代理对象生成的关键类是【AnnotationAwareAspectJAutoProxyCreator】,这个类继承了【BeanPostProcessor】是一个后置处理器 在bean对象生命周期中初始化时执行【org.springframework.beans.factory.config.BeanPostProcessor#postProcessAfterInitialization】方法时…...