当前位置: 首页 > news >正文

StructuredStreaming (一)

一、sparkStreaming的不足

1.基于微批,延迟高不能做到真正的实时

2.DStream基于RDD,不直接支持SQL

3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件,一个是事件处理的时间)

5.数据的Exactly-Once(恰好一次语义)需要手动实现

二、StructuredStreaming 的介绍 

1、2016年Spark2.0版本中发布

2、基于SparkSQL引擎的可扩展、容错的全新的流处理引擎。

3、并不是对Spark Streaming的简单改进,而是重新开发的全新流式引擎

准实时技术:来一批处理一批 实时:来一条处理一条 离线:一般都是处理一些静止的数据

三、socket+console

1、在虚拟机中下载nc
yum install -y nc2、启动 nc -lk 9999

案例:wordcount

import osfrom pyspark.sql import SparkSession
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()socketDf = spark.readStream.format("socket") \.option("host", "bigdata01") \.option("port", 9999) \.load()# 处理# 方式一:使用dsl语法splitDf = socketDf.select(explode(F.split(socketDf.value, " ")).alias("word"))resultDf1 = splitDf.groupBy("word").count()# 方式二:使用sqlsocketDf.createOrReplaceTempView("wordcount")resultDf2 = spark.sql("""with t1 as( select num from wordcount lateral view explode(split(value," ")) c as num)select num,count(*) counts from t1 group by num;""")# 下面的就是sink的写法 后续会写query1 = resultDf1.writeStream \.outputMode("complete") \.format("console") \.start()query2 = resultDf2.writeStream \.outputMode("complete") \.format("console") \.start() \.awaitTermination()spark.stop()

四、file+console

文件中的数据:
1;yuwen;43
1;shuxue;55
2;yuwen;77
2;shuxue;88
3;yuwen;98
3;shuxue;65
3;yingyu;88
import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructField, StringType, DoubleType, LongType, IntegerType, StructTypeif __name__ == '__main__':os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'# 创建一个sparkSession对象spark = SparkSession.builder.appName("socketDemo").getOrCreate()# score_schema = StructType([#     StructField(name="stu_id", dataType=IntegerType(), nullable=False),#     StructField(name="subject_name", dataType=StringType(), nullable=True),#     StructField(name="score", dataType=DoubleType(), nullable=True)# ])score_schema = StructType().add("stu_id", IntegerType()).add("subject_name", StringType()).add("score",DoubleType())socketDf = spark.readStream.format("csv") \.option("sep", ";") \.schema(score_schema) \.load("../../resources/input1")socketDf.writeStream \.outputMode("append") \.format("console") \.option("truncate", False) \.start() \.awaitTermination()spark.stop()

相关文章:

StructuredStreaming (一)

一、sparkStreaming的不足 1.基于微批,延迟高不能做到真正的实时 2.DStream基于RDD,不直接支持SQL 3.流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD) 4.不支持EventTime事件时间(一般流处理都会有两个时间:事件发生的事件&am…...

由播客转向个人定制的音频频道(1)平台搭建

项目的背景 最近开始听喜马拉雅播客的内容,但是发现许多不方便的地方。 休息的时候收听喜马拉雅,但是还需要不断地选择喜马拉雅的内容,比较麻烦,而且黑灯操作反而伤眼睛。 喜马拉雅为代表的播客平台都是VOD 形式的&#xff0…...

[自然语言处理] [AI]深入理解语言与情感分类:从基础到深度学习的进展

语言是人类智能的核心组成部分,具有极高的复杂性和多样性。理解语言,尤其是语言中的隐含部分,向来是人工智能研究的一个巨大挑战。图灵测试本身便是一场关于语言生成与理解的比赛,旨在检验机器是否能够模拟人类的语言能力。随着深度学习的飞速发展,语音识别、情感分析等自…...

【GPTs】Gif-PT:DALL·E制作创意动图与精灵动画

博客主页: [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 💯GPTs指令💯前言💯Gif-PT主要功能适用场景优点缺点 💯小结 💯GPTs指令 中文翻译: 使用Dalle生成用户请求的精灵图动画&#…...

云原生周刊:Istio 1.24.0 正式发布

云原生周刊:Istio 1.24.0 正式发布 开源项目推荐 Kopf Kopf 是一个简洁高效的 Python 框架,只需几行代码即可编写 Kubernetes Operator。Kubernetes(K8s)作为强大的容器编排系统,虽自带命令行工具(kubec…...

Linux设置jar包开机启动

操作系统环境:CentOS 7 【需要 root 权限,使用 root 用户进行操作 或 普通用户使用 sudo 进行操作】 一、系统服务的方式 原理:利用系统服务管理应用程序的生命周期, systemctl 为系统服务管理工具 systemctl start applicati…...

计算机视觉和机器人技术中的下一个标记预测与视频扩散相结合

一种新方法可以训练神经网络对损坏的数据进行分类,同时预测下一步操作。 它可以为机器人制定灵活的计划,生成高质量的视频,并帮助人工智能代理导航数字环境。 Diffusion Forcing 方法可以对嘈杂的数据进行分类,并可靠地预测任务的…...

C语言之简单的获取命令行参数和环境变量

C语言之简单的获取命令行参数和环境变量 本人的开发环境为WIN10操作系统用VMWARE虚拟的UBUNTU LINUX 18.04LTS!!! 所有代码的编辑、编译、运行都在虚拟机上操作,初学的朋友要注意这一点!!! 详细…...

STL之vecor的使用(超详解)

目录 1. C/C中的数组 1.1. C语言中的数组 1.2. C中的数组 2. vector的接口 2.1. vector的迭代器 2.2. vector的初始化与销毁 2.3. vector的容量操作 2.4. vector的访问操作 2.5. vector的修改操作 💓 博客主页:C-SDN花园GGbond ⏩ 文章专栏…...

SystemVerilog学习笔记(一):数据类型

在systemverilog中,主要包含以下数据类型: 4值类型2值类型数组字符串结构体和联合体枚举自定义类型 无符号数:无符号数的符号不使用任何标志,即无符号数只能存储正数。无符号二进制数的范围从 0 到 ((2^n) - 1),n 表…...

Linux软件包管理与Vim编辑器使用指南

目录 一、Linux软件包管理器yum 1.什么是软件包? 2.什么是软件包管理器? 3.查看软件包 4.安装软件 ​编辑 5.卸载软件 Linux开发工具: 二、Linux编辑器---vim 1.vim的基本概念 (1) 正常/普通模式(Normal mode&#xff0…...

每日一练 | 包过滤防火墙的工作原理

01 真题题目 包过滤防火墙对哪一层的数据报文进行检查? A. 应用层 B. 物理层 C. 网络层 D. 链路层 02 真题答案 C 03 答案解析 包过滤防火墙是一种基本的安全设备,它通过检查进出网络的数据包来决定是否允许该数据包通过。 这种类型的防火墙主要关注…...

AR眼镜方案_AR智能眼镜阵列/衍射光波导显示方案

在当今AR智能眼镜的发展中,显示和光学组件成为了技术攻坚的主要领域。由于这些组件的高制造难度和成本,其光学显示模块在整个设备的成本中约占40%。 采用光波导技术的AR眼镜显示方案,核心结构通常由光机、波导和耦合器组成。光机内的微型显示…...

SpringBoot(十九)创建多模块Springboot项目(完整版)

之前我有记录过一次SpringBoot多模块项目的搭建,但是那一次只是做了一个小小的测试。只是把各模块联通之后就结束了。 最近要增加业务开发,要将目前的单模块项目改成多模块项目,我就参照了一下我上次搭建的流程,发现总是有报错。上次搭建的比较顺利,很多细枝末节也没有仔细…...

Navicat 17 功能简介 | 单元格编辑器

Navicat 17 功能简介 | 单元格编辑器 本期,我们一起了解 Navicat 17 出色的数据操作功能的单元格编辑器。单元格编辑器支持文本、十六进制、图像和网页四种格式的数据编辑,位于底部的编辑器窗格,为你编辑更大容量的数据信息提供足够的显示和操…...

MySQL【四】

插入数据 向数据表中插入一行数据 INSERT|REPLACE INTO 表名[(字段列表)] VALUES(值列表); ########## 在s表中插入一条记录:学号为s011,姓名为李思,性别为默认值,计算机专业 ########## insert into s(sno,sname,dept)values(s011,李思,计…...

简单叙述 Spring Boot 启动过程

文章目录 1. 准备阶段:应用启动的入口2. 创建 SpringApplication 对象:开始启动工作3. 配置环境(Environment):识别开发环境与生产环境4. 启动监听器和初始化器:感知启动的关键事件5. 创建 ApplicationCont…...

微信小程序自定义tabbar;禁用某个tab;修改某个tab的样式

微信小程序自定义tabbar;禁用某个tab;修改某个tab的样式 原本使用本身的tabBar就已经很舒服了,很合适了的,但是总有一些脑洞大开的产品和客户,给你搞点多样式,没办法牛马就得去做咯,现在就给大…...

力扣113:路径总和II

给你二叉树的根节点 root 和一个整数目标和 targetSum ,找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 叶子节点 是指没有子节点的节点。 示例 1: 输入:root [5,4,8,11,null,13,4,7,2,null,null,5,1], targetSum 22 输出&a…...

JavaScript字符串常用方法

在JavaScript中,字符串是用来表示文本数据的基本数据类型。字符串可以用单引号()、双引号(")、或反引号()包裹。JavaScript中的字符串是不可变的,也就是说,字符串的值一旦创建就无法更改,但可以创建新字符串来替换原有字符串…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

MFC内存泄露

1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表

##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...

Java毕业设计:WML信息查询与后端信息发布系统开发

JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息&#xff0…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...

通过MicroSip配置自己的freeswitch服务器进行调试记录

之前用docker安装的freeswitch的,启动是正常的, 但用下面的Microsip连接不上 主要原因有可能一下几个 1、通过下面命令可以看 [rootlocalhost default]# docker exec -it freeswitch fs_cli -x "sofia status profile internal"Name …...

上位机开发过程中的设计模式体会(1):工厂方法模式、单例模式和生成器模式

简介 在我的 QT/C 开发工作中,合理运用设计模式极大地提高了代码的可维护性和可扩展性。本文将分享我在实际项目中应用的三种创造型模式:工厂方法模式、单例模式和生成器模式。 1. 工厂模式 (Factory Pattern) 应用场景 在我的 QT 项目中曾经有一个需…...

消防一体化安全管控平台:构建消防“一张图”和APP统一管理

在城市的某个角落,一场突如其来的火灾打破了平静。熊熊烈火迅速蔓延,滚滚浓烟弥漫开来,周围群众的生命财产安全受到严重威胁。就在这千钧一发之际,消防救援队伍迅速行动,而豪越科技消防一体化安全管控平台构建的消防“…...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 ,不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源(最常用) conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...