当前位置: 首页 > news >正文

StructuredStreaming (一)

一、sparkStreaming的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍 

1、2016年Spark2.0版本中发布

2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。

3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎

准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据

三、socket+console

1、在虚拟机中下载nc
yum install -y nc2、启动 nc -lk 9999

案例:wordcount

import osfrom pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()socketDf = spark.readStream.format("socket") \.option("host", "bigdata01") \.option("port", 9999) \.load()# 处理# 方式一:使用dsl语法splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))resultDf1 = splitDf.groupBy("word").count()# 方式二:使用sqlsocketDf.createOrReplaceTempView("wordcount")resultDf2 = spark.sql("""with t1 as( select num from wordcount lateral view explode(split(value," ")) c as num)select num,count(*) counts from t1 group by num;""")# 下面的就是sink的写法 后续会写query1 = resultDf1.writeStream \.outputMode("complete") \.format("console") \.start()query2 = resultDf2.writeStream \.outputMode("complete") \.format("console") \.start() \.awaitTermination()spark.stop()

四、file+console

文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructTypeif __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()# score_schema = StructType([#     StructField(name="stu_id", dataType=IntegerType(), nullable=False),#     StructField(name="subject_name", dataType=StringType(), nullable=True),#     StructField(name="score", dataType=DoubleType(), nullable=True)# ])score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())socketDf = spark.readStream.format("csv") \.option("sep", ";") \.schema(score_schema) \.load("../../resources/input1")socketDf.writeStream \.outputMode("append") \.format("console") \.option("truncate", False) \.start() \.awaitTermination()spark.stop()

相关文章:

StructuredStreaming (一)

一、sparkStreaming的不足 1.基于微批,延迟高不能做到真正的实时 2.DStream基于RDD,不直接支持SQL 3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD) 4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件&am…...

由播客转向个人定制的音频频道(1)平台搭建

项目的背景 最近开始听喜马拉雅播客的内容,但是发现许多不方便的地方。 休息的时候收听喜马拉雅,但是还需要不断地选择喜马拉雅的内容,比较麻烦,而且黑灯操作反而伤眼睛。 喜马拉雅为代表的播客平台都是VOD 形式的&#xff0…...

[自然语言处理] [AI]深入理解语言与情感分类:从基础到深度学习的进展

语言是人类智能的核心组成部分,具有极高的复杂性和多样性。理解语言,尤其是语言中的隐含部分,向来是人工智能研究的一个巨大挑战。图灵测试本身便是一场关于语言生成与理解的比赛,旨在检验机器是否能够模拟人类的语言能力。随着深度学习的飞速发展,语音识别、情感分析等自…...

【GPTs】Gif-PT:DALL·E制作创意动图与精灵动画

博客主页: [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 💯GPTs指令💯前言💯Gif-PT主要功能适用场景优点缺点 💯小结 💯GPTs指令 中文翻译: 使用Dalle生成用户请求的精灵图动画&#…...

云原生周刊:Istio 1.24.0 正式发布

云原生周刊:Istio 1.24.0 正式发布 开源项目推荐 Kopf Kopf 是一个简洁高效的 Python 框架,只需几行代码即可编写 Kubernetes Operator。Kubernetes(K8s)作为强大的容器编排系统,虽自带命令行工具(kubec…...

Linux设置jar包开机启动

操作系统环境:CentOS 7 【需要 root 权限,使用 root 用户进行操作 或 普通用户使用 sudo 进行操作】 一、系统服务的方式 原理:利用系统服务管理应用程序的生命周期, systemctl 为系统服务管理工具 systemctl start applicati…...

计算机视觉和机器人技术中的下一个标记预测与视频扩散相结合

一种新方法可以训练神经网络对损坏的数据进行分类,同时预测下一步操作。 它可以为机器人制定灵活的计划,生成高质量的视频,并帮助人工智能代理导航数字环境。 Diffusion Forcing 方法可以对嘈杂的数据进行分类,并可靠地预测任务的…...

C语言之简单的获取命令行参数和环境变量

C语言之简单的获取命令行参数和环境变量 本人的开发环境为WIN10操作系统用VMWARE虚拟的UBUNTU LINUX 18.04LTS!!! 所有代码的编辑、编译、运行都在虚拟机上操作,初学的朋友要注意这一点!!! 详细…...

STL之vecor的使用(超详解)

目录 1. C/C中的数组 1.1. C语言中的数组 1.2. C中的数组 2. vector的接口 2.1. vector的迭代器 2.2. vector的初始化与销毁 2.3. vector的容量操作 2.4. vector的访问操作 2.5. vector的修改操作 💓 博客主页:C-SDN花园GGbond ⏩ 文章专栏…...

SystemVerilog学习笔记(一):数据类型

在systemverilog中,主要包含以下数据类型: 4值类型2值类型数组字符串结构体和联合体枚举自定义类型 无符号数:无符号数的符号不使用任何标志,即无符号数只能存储正数。无符号二进制数的范围从 0 到 ((2^n) - 1),n 表…...

Linux软件包管理与Vim编辑器使用指南

目录 一、Linux软件包管理器yum 1.什么是软件包? 2.什么是软件包管理器? 3.查看软件包 4.安装软件 ​编辑 5.卸载软件 Linux开发工具: 二、Linux编辑器---vim 1.vim的基本概念 (1) 正常/普通模式(Normal mode&#xff0…...

每日一练 | 包过滤防火墙的工作原理

01 真题题目 包过滤防火墙对哪一层的数据报文进行检查? A. 应用层 B. 物理层 C. 网络层 D. 链路层 02 真题答案 C 03 答案解析 包过滤防火墙是一种基本的安全设备,它通过检查进出网络的数据包来决定是否允许该数据包通过。 这种类型的防火墙主要关注…...

AR眼镜方案_AR智能眼镜阵列/衍射光波导显示方案

在当今AR智能眼镜的发展中,显示和光学组件成为了技术攻坚的主要领域。由于这些组件的高制造难度和成本,其光学显示模块在整个设备的成本中约占40%。 采用光波导技术的AR眼镜显示方案,核心结构通常由光机、波导和耦合器组成。光机内的微型显示…...

SpringBoot(十九)创建多模块Springboot项目(完整版)

之前我有记录过一次SpringBoot多模块项目的搭建,但是那一次只是做了一个小小的测试。只是把各模块联通之后就结束了。 最近要增加业务开发,要将目前的单模块项目改成多模块项目,我就参照了一下我上次搭建的流程,发现总是有报错。上次搭建的比较顺利,很多细枝末节也没有仔细…...

Navicat 17 功能简介 | 单元格编辑器

Navicat 17 功能简介 | 单元格编辑器 本期,我们一起了解 Navicat 17 出色的数据操作功能的单元格编辑器。单元格编辑器支持文本、十六进制、图像和网页四种格式的数据编辑,位于底部的编辑器窗格,为你编辑更大容量的数据信息提供足够的显示和操…...

MySQL【四】

插入数据 向数据表中插入一行数据 INSERT|REPLACE INTO 表名[(字段列表)] VALUES(值列表); ########## 在s表中插入一条记录:学号为s011,姓名为李思,性别为默认值,计算机专业 ########## insert into s(sno,sname,dept)values(s011,李思,计…...

简单叙述 Spring Boot 启动过程

文章目录 1. 准备阶段:应用启动的入口2. 创建 SpringApplication 对象:开始启动工作3. 配置环境(Environment):识别开发环境与生产环境4. 启动监听器和初始化器:感知启动的关键事件5. 创建 ApplicationCont…...

微信小程序自定义tabbar;禁用某个tab;修改某个tab的样式

微信小程序自定义tabbar;禁用某个tab;修改某个tab的样式 原本使用本身的tabBar就已经很舒服了,很合适了的,但是总有一些脑洞大开的产品和客户,给你搞点多样式,没办法牛马就得去做咯,现在就给大…...

力扣113:路径总和II

给你二叉树的根节点 root 和一个整数目标和 targetSum ,找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1: 输入:root [5,4,8,11,null,13,4,7,2,null,null,5,1], targetSum 22 输出&a…...

JavaScript字符串常用方法

在JavaScript中,字符串是用来表示文本数据的基本数据类型。字符串可以用单引号()、双引号(")、或反引号()包裹。JavaScript中的字符串是不可变的,也就是说,字符串的值一旦创建就无法更改,但可以创建新字符串来替换原有字符串…...

xtu oj 加一

样例输入# 2 4 1 2 3 4 4 3 2 4 1样例输出# 3 5 解题思路:最小操作次数一定是把所有数变成数组中最大值max。 1、找最大值,一开始我把max初始值设为0,如果a[i]>max,maxa[i],WA了。又看了一遍题目,发现所有整数的绝对值小于…...

QTcpSocket 服务端和客户端

前提&#xff1a; pro文件中添加 QT network 服务端主要采用信号槽机制&#xff0c;代码如如下 核心代码头文件#ifndef TCPSERVER_H #define TCPSERVER_H#include <QObject>#include <QTcpServer> #include <QTcpSocket> #include <QDebug> #inclu…...

Isaac Sim+SKRL机器人并行强化学习

目录 Isaac Sim介绍 OmniIssacGymEnvs安装 SKRL安装与测试 基于UR5的机械臂Reach强化学习测评 机器人控制 OMNI GYM环境编写 SKRL运行文件 训练结果与速度对比 结果分析 运行体验与建议 Isaac Sim介绍 Isaac Sim是英伟达出的一款机器人仿真平台&#xff0c;适用于做机…...

项目中用户数据获取遇到bug

项目跟练的时候 Uncaught (in promise) TypeError: Cannot read properties of undefined (reading ‘code’) at Proxy.userInfo (user.ts:57:17) 因此我想要用result接受信息的时候会出错&#xff0c;报错显示为result.code没有该值 导致我无法获取到相应的数据 解决如下 给…...

SpringSecurity+jwt+captcha登录认证授权总结

SpringSecurityjwtcaptcha登录认证授权总结 版本信息&#xff1a; springboot 3.2.0、springSecurity 6.2.0、mybatis-plus 3.5.5 认证授权思路和流程&#xff1a; 未携带token&#xff0c;访问登录接口&#xff1a; 1、用户登录携带账号密码 2、请求到达自定义Filter&am…...

项目技术栈-解决方案-web3去中心化

web3去中心化 Web3 DApp区块链:钱包:智能合约:UI:ETH系开发技能树DeFi应用 去中心化金融P2P 去中心化网络参考Web3 DApp 区块链: 以以太坊(Ethereum)为主流,也包括Solana、Aptos等其他非EVM链。 区块链本身是软件,需要运行在一系列节点上,这些节点组成P2P网络或者半…...

【AI声音克隆整合包及教程】第二代GPT-SoVITS V2:创新与应用

一、引言 随着科技的迅猛发展&#xff0c;声音克隆技术已经成为一个炙手可热的研究领域。SoVITS&#xff08;Sound Voice Intelligent Transfer System&#xff09;&#xff0c;作为该领域的先锋&#xff0c;凭借其卓越的性能和广泛的适用性&#xff0c;正在为多个行业带来前所…...

分清数据链路层、网络层、传输层的区别,以及这些层面的代表协议

目录 数据链路层 网络层 传输层 数据链路层 OSI模型的第二层&#xff0c;负责在相邻节点之间传输帧&#xff0c;处理帧的封装、地址、差错控制和流量控制等。确保数据在物理介质上可靠地传输&#xff0c;并为上层协议提供服务。 以太网&#xff08;Ethernet&#xff09;&…...

git没有识别出大写字母改成小写重命名的文件目录

Git 默认不会跟踪大写字母和小写字母的区别&#xff0c;因为在大多数文件系统中&#xff0c;大写字母和小写字母被认为是相同的文件&#xff0c;只有在区分大小写的文件系统中&#xff08;如 macOS 的 HFS 或 Windows 的 NTFS&#xff09;&#xff0c;这才是一个问题。 如果重命…...

自己动手写Qt Creator插件

文章目录 前言一、环境准备1.先看自己的Qt Creator IDE的版本2.下载源码 二、使用步骤1.参考原本的插件2.编写自定义插件1.cmakelist增加一个模块2.同理&#xff0c;qbs文件也增加一个3.插件源码 三、效果总结 前言 就目前而言&#xff0c;Qt Creator这个IDE&#xff0c;插件比…...