Spark实时(四):Strctured Streaming简单应用

文章目录
Strctured Streaming简单应用
一、Output Modes输出模式
二、Streaming Table API
三、Triggers
1、unspecified(默认模式)
2、Fixed interval micro-batches(固定间隔批次)
3、 One-time micro-batch (仅一次触发)
4、Continuous with fixed checkpoint interval(连续处理)
Strctured Streaming简单应用
一、Output Modes输出模式
Structured Streaming中结果输出时outputMode可以设置三种模式,三种默认区别如下:
- Append Mode(默认模式):追加模式,只有自上次触发后追加到结果表中的新行才会被输出。只有select、where、map、flatmap、filter、join查询支持追加模式。
- Complete Mode(完整模式):将整个更新的结果输出。仅可用于代码中有聚合查询情况,代码中没有聚合查询不能使用。
- Update Mode(更新模式):自Spark2.1.1版本后可用,只有自上次触发后更新的行才会被输出。这种模式仅仅输出自上次触发以来发生更改的行。如果结果数据没有聚合操作那么相当于Append Mode。
二、Streaming Table API
在Spark3.1版本之后,我们可以通过DataStreamReader.table()方式实时读取流式表中的数据,使用DataStreamWriter.toTable()向表中实时写数据。
案例:读取Socket数据实时写入到Spark流表中,然后读取流表数据展示数据。
代码示例如下:
package com.lanson.structuredStreamingimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTableAPI {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("StreamTableAPI").config("spark.sql.shuffle.partitions", 1).config("spark.sql.warehouse.dir", "./my-spark-warehouse").getOrCreate()spark.sparkContext.setLogLevel("Error");import spark.implicits._//2.读取socket数据,注册流表val df: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.对df进行转换val personinfo: DataFrame = df.as[String].map(line => {val arr: Array[String] = line.split(",")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")//4.将以上personinfo 写入到流表中personinfo.writeStream.option("checkpointLocation","./checkpoint/dir1").toTable("mytbl")import org.apache.spark.sql.functions._//5.读取mytbl 流表中的数据val query: StreamingQuery = spark.readStream.table("mytbl").withColumn("new_age", col("age").plus(6)).select("id", "name", "age", "new_age").writeStream.format("console").start()query.awaitTermination()}
}
以上代码编写完成后启动,向控制台输入以下数据:
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
结果输入如下:

注意:以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据,每次执行时,需要将该路径下的表数据清空。
三、Triggers
Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理,以下是支持的Triggers:
实时处理,以下是支持的Triggers:
| Trigger Type | 描述 |
| Unspecified(默认) |
|
| Fixed interval micro-batches(固定间隔批次) |
|
| One-time micro-batch(仅一次性触发) |
|
| Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段)) |
|
下面以读取Socket数据为例,Scala代码演示各个模式
1、unspecified(默认模式)
代码如下:
//3.默认微批模式执行查询,尽快将结果写出到控制台
val query: StreamingQuery = frame.writeStream.format("console").start()query.awaitTermination()
2、Fixed interval micro-batches(固定间隔批次)
代码如下:
//3.用户指定固定间隔批次触发查询val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.ProcessingTime("5 seconds"))
// .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS).start()query.awaitTermination()
注意:这种固定间隔批次指的是第一批次处理完成,等待间隔时间,然后处理第二批次数据,依次类推。
3、 One-time micro-batch (仅一次触发)
代码如下:
//4.仅一次触发执行
val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.Once()).start()
query.awaitTermination()
4、Continuous with fixed checkpoint interval(连续处理)
Continuous不再是周期性启动task的批量执行数,而是启动长期运行的task,而是不断一个一个数据进行处理,周期性的通过指定checkpoint来记录状态(如果不指定checkpoint目录,会将状态记录在Temp目录下),保证exactly-once语义,这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。
代码如下:
//3.Continuous 连续触发执行
val query: StreamingQuery = frame.writeStream.format("console")//每10ms 记录一次状态,而不是执行一次.trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS)).option("checkpointLocation","./checkpint/dir4").start()
query.awaitTermination()
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:
Spark实时(四):Strctured Streaming简单应用
文章目录 Strctured Streaming简单应用 一、Output Modes输出模式 二、Streaming Table API 三、Triggers 1、unspecified(默认模式) 2、Fixed interval micro-batches&am…...
SpringBoot上传超大文件导致OOM,完美问题解决办法
问题描述 报错: Caused by: java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) ~[?:1.8.0_381] at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) ~[?:1.8.0_381] at java.…...
PyTorch 的各个核心模块和它们的功能
1. torch 核心功能 张量操作:PyTorch 的张量是一个多维数组,类似于 NumPy 的 ndarray,但支持 GPU 加速。数学运算:提供了各种数学运算,包括线性代数操作、随机数生成等。自动微分:torch.autograd 模块用于…...
Java开发之LinkedList源码分析
#来自ゾフィー(佐菲) 1 简介 LinkedList 的底层数据结构是双向链表。可以当作链表、栈、队列、双端队列来使用。有以下特点: 在插入或删除数据时,性能好;允许有 null 值;查询效率不高;线程不安…...
外卖霸王餐系统架构怎么选?
在当今日益繁荣的外卖市场中,外卖霸王餐作为一种独特的营销策略,受到了众多商家的青睐。然而,要想成功实施外卖霸王餐活动,一个安全、稳定且高效的架构选择至关重要。本文将深入探讨外卖霸王餐架构的选择,以期为商家提…...
AV1技术学习:Transform Coding
对预测残差进行变换编码,去除潜在的空间相关性。VP9 采用统一的变换块大小设计,编码块中的所有的块共享相同的变换大小。VP9 支持 4 4、8 8、16 16、32 32 四种正方形变换大小。根据预测模式选择由一维离散余弦变换 (DCT) 和非对称离散正弦变换 (ADS…...
Git操作指令
Git操作指令 一、安装git 1、设置配置信息: # global全局配置 git config --global user.name "Your username" git config --global user.email "Your email"2、查看git版本号 git -v # or git --version3、查看配置信息: git…...
CSS 创建:从入门到精通
CSS 创建:从入门到精通 CSS(层叠样式表)是网页设计中不可或缺的一部分,它用于控制网页的布局和样式。本文将详细介绍CSS的创建过程,包括基本概念、语法结构、选择器、样式属性以及如何将CSS应用到HTML中。无论您是初学者还是有经验的开发者,本文都将为您提供宝贵的信息。…...
Windows 11 系统对磁盘进行分区保姆级教程
Windows 11磁盘分区 磁盘分区是将硬盘驱动器划分为多个逻辑部分的过程,每个逻辑部分都可以独立使用和管理。在Windows 11操作系统中进行磁盘分区主要有以下几个作用和意义: 组织和管理数据:分区可以帮助用户更好地组织他们的数据,…...
探索WebKit的CSS盒模型:深入理解Web布局的基石
探索WebKit的CSS盒模型:深入理解Web布局的基石 在Web开发的世界中,CSS盒模型(Box Model)是构建网页布局的核心原理。WebKit,作为Safari浏览器的渲染引擎,对CSS盒模型有着深入而精确的支持。本文将带你深入…...
c++初阶知识——string类详解
目录 前言: 1.标准库中的string类 1.1 auto和范围for auto 范围for 1.2 string类常用接口说明 1.string类对象的常见构造 1.3 string类对象的访问及遍历操作 1.4. string类对象的修改操作 1.5 string类非成员函数 2.string类的模拟实现 2.1 经典的string…...
php接口返回的json字符串,json_decode()失败,原来是多了红点
问题: 调用某个接口返回的json,json_decode()失败,返回数据为null, echo json_last_error();返回错误码 4 经过多次调试发现:多出来一个红点,预览是看不到的。 解决:要去除BOM头部 $resul…...
Python3网络爬虫开发实战(2)爬虫基础库
文章目录 一、urllib1. urlparse 实现 URL 的识别和分段2. urlunparse 用于构造 URL3. urljoin 用于两个链接的拼接4. urlencode 将 params 字典序列化为 params 字符串5. parse_qs 和 parse_qsl 用于将 params 字符串反序列化为 params 字典或列表6. quote 和 unquote 对 URL的…...
el-image预览图片点击遮盖处关闭预览
预览关闭按钮不明显 解决方式: 1.修改按钮样式明显点: //el-image 添加自定义类名,下文【test-image】代指 .test-image .el-icon-circle-close{ color:#fff; font-size:20px; ...改成很明显的样式 }2.使用事件监听,监听当前遮…...
基于Neo4j将知识图谱用于检索增强生成:Knowledge Graphs for RAG
Knowledge Graphs for RAG 本文是学习https://www.deeplearning.ai/short-courses/knowledge-graphs-rag/这门课的学习笔记。 What you’ll learn in this course Knowledge graphs are used in development to structure complex data relationships, drive intelligent sea…...
康康近期的慢SQL(oracle vs 达梦)
近期执行的sql,哪些比较慢? 或者健康检查时搂一眼状态 oracle: --最近3天内的慢sql set lines 200 pages 100 col txt for a65 col sql_id for a13 select a.sql_id,a.cnt,a.pctload,b.sql_text txt from (select * from (select sql_id,co…...
探索 GPT-4o mini:成本效益与创新的双重驱动
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
2.6基本算法之动态规划2989:糖果
描述 由于在维护世界和平的事务中做出巨大贡献,Dzx被赠予糖果公司2010年5月23日当天无限量糖果免费优惠券。在这一天,Dzx可以从糖果公司的N件产品中任意选择若干件带回家享用。糖果公司的N件产品每件都包含数量不同的糖果。Dzx希望他选择的产品包含的糖…...
12.顶部带三角形的边框 CSS 关键字 currentColor
顶部带三角形的边框 创建一个在顶部带有三角形的内容容器。 使用 ::before 和 ::after 伪元素创建两个三角形。两个三角形的颜色应分别与容器的 border-color 和容器的 background-color 相同。一个三角形(::before)的 border-width 应比另一个(::after)宽 1px,以起到边框的作…...
Llama中模块参数大小
LLama2中,流程中数据大小的变换如下 Transformer模块 第一次输入,进行prefill,输入x维度为[1, 8, 4096] 1. 构建wq,wk,wv,wo,尺寸均为[4096,4096], 与x点乘,得到xq, xk, xv 2. 构建KV cache, 尺寸为 [b…...
零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?
一、核心优势:专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发,是一款收费低廉但功能全面的Windows NAS工具,主打“无学习成本部署” 。与其他NAS软件相比,其优势在于: 无需硬件改造:将任意W…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
第7篇:中间件全链路监控与 SQL 性能分析实践
7.1 章节导读 在构建数据库中间件的过程中,可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中,必须做到: 🔍 追踪每一条 SQL 的生命周期(从入口到数据库执行)&#…...
Web后端基础(基础知识)
BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器,应用程序的逻辑和数据都存储在服务端。 优点:维护方便缺点:体验一般 CS架构:Client/Server,客户端/服务器架构模式。需要单独…...
适应性Java用于现代 API:REST、GraphQL 和事件驱动
在快速发展的软件开发领域,REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名,不断适应这些现代范式的需求。随着不断发展的生态系统,Java 在现代 API 方…...
前端中slice和splic的区别
1. slice slice 用于从数组中提取一部分元素,返回一个新的数组。 特点: 不修改原数组:slice 不会改变原数组,而是返回一个新的数组。提取数组的部分:slice 会根据指定的开始索引和结束索引提取数组的一部分。不包含…...
