当前位置: 首页 > news >正文

Spark实时(四):Strctured Streaming简单应用

文章目录

Strctured Streaming简单应用

一、Output Modes输出模式

二、Streaming Table API

三、​​​​​​​​​​​​​​Triggers

1、​​​​​​​unspecified(默认模式)

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)


Strctured Streaming简单应用

一、Output Modes输出模式

Structured Streaming中结果输出时outputMode可以设置三种模式,三种默认区别如下:

  • Append Mode(默认模式):追加模式,只有自上次触发后追加到结果表中的新行才会被输出。只有select、where、map、flatmap、filter、join查询支持追加模式。
  • Complete Mode(完整模式):将整个更新的结果输出。仅可用于代码中有聚合查询情况,代码中没有聚合查询不能使用。
  • Update Mode(更新模式):自Spark2.1.1版本后可用,只有自上次触发后更新的行才会被输出。这种模式仅仅输出自上次触发以来发生更改的行。如果结果数据没有聚合操作那么相当于Append Mode。

二、​​​​​​​​​​​​​​Streaming Table API

在Spark3.1版本之后,我们可以通过DataStreamReader.table()方式实时读取流式表中的数据,使用DataStreamWriter.toTable()向表中实时写数据。

案例:读取Socket数据实时写入到Spark流表中,然后读取流表数据展示数据。

代码示例如下:

package com.lanson.structuredStreamingimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTableAPI {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("StreamTableAPI").config("spark.sql.shuffle.partitions", 1).config("spark.sql.warehouse.dir", "./my-spark-warehouse").getOrCreate()spark.sparkContext.setLogLevel("Error");import spark.implicits._//2.读取socket数据,注册流表val df: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.对df进行转换val personinfo: DataFrame = df.as[String].map(line => {val arr: Array[String] = line.split(",")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")//4.将以上personinfo 写入到流表中personinfo.writeStream.option("checkpointLocation","./checkpoint/dir1").toTable("mytbl")import org.apache.spark.sql.functions._//5.读取mytbl 流表中的数据val query: StreamingQuery = spark.readStream.table("mytbl").withColumn("new_age", col("age").plus(6)).select("id", "name", "age", "new_age").writeStream.format("console").start()query.awaitTermination()}
}

以上代码编写完成后启动,向控制台输入以下数据:

1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22

结果输入如下:

注意:以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据,每次执行时,需要将该路径下的表数据清空。

三、​​​​​​​​​​​​​​Triggers

Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理,以下是支持的Triggers:

实时处理,以下是支持的Triggers:

Trigger Type

描述

Unspecified(默认)

  • 代码使用:Trigger.ProcessingTime(0L)。
  • 代码中没有明确指定触发类型则查询默认以微批模式执行,表示尽可能快的执行查询。

Fixed interval micro-batches(固定间隔批次)

  • 代码使用:Trigger.ProcessingTime(long interval,TimeUnit timeUnit)
  • 查询将以微批模式处理,批次间隔根据用户指定的时间间隔决定
  1. 如果前一个微批处理时间在时间间隔内完成,则会等待间隔时间完成后再开始下一个微批处理
  2. 如果前一个微批处理时间超过了时间间隔,那么下一个微批处理将在前一个微批处理完成后立即开始。
  3. 如果没有新数据可用,则不会启动微批处理。

One-time micro-batch(仅一次性触发)

  • 代码使用:Trigger.Once()
  • 只执行一个微批次查询所有可用数据,然后自动停止,适用于一次性作业。

Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段))

  • 代码使用:Trigger.Continuous(long interval,TimeUnit timeUnit)
  • 以固定的Checkpoint间隔(interval)连续处理。在这种模式下,连续处理引擎将每隔一定的间隔(interval)做一次checkpoint,可获得低至1ms的延迟。

下面以读取Socket数据为例,Scala代码演示各个模式

1、​​​​​​​unspecified(默认模式)

代码如下:

//3.默认微批模式执行查询,尽快将结果写出到控制台
val query: StreamingQuery = frame.writeStream.format("console").start()query.awaitTermination()

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

代码如下:

//3.用户指定固定间隔批次触发查询val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.ProcessingTime("5 seconds"))
//      .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS).start()query.awaitTermination()

注意:这种固定间隔批次指的是第一批次处理完成,等待间隔时间,然后处理第二批次数据,依次类推。

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

代码如下:

//4.仅一次触发执行
val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.Once()).start()
query.awaitTermination()

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)

Continuous不再是周期性启动task的批量执行数,而是启动长期运行的task,而是不断一个一个数据进行处理,周期性的通过指定checkpoint来记录状态(如果不指定checkpoint目录,会将状态记录在Temp目录下),保证exactly-once语义,这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。

代码如下:

//3.Continuous 连续触发执行
val query: StreamingQuery = frame.writeStream.format("console")//每10ms 记录一次状态,而不是执行一次.trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS)).option("checkpointLocation","./checkpint/dir4").start()
query.awaitTermination()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

相关文章:

Spark实时(四):Strctured Streaming简单应用

文章目录 Strctured Streaming简单应用 一、Output Modes输出模式 二、Streaming Table API 三、​​​​​​​​​​​​​​Triggers 1、​​​​​​​unspecified(默认模式) 2、​​​​​​​​​​​​​​Fixed interval micro-batches&am…...

SpringBoot上传超大文件导致OOM,完美问题解决办法

问题描述 报错: Caused by: java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) ~[?:1.8.0_381] at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) ~[?:1.8.0_381] at java.…...

PyTorch 的各个核心模块和它们的功能

1. torch 核心功能 张量操作:PyTorch 的张量是一个多维数组,类似于 NumPy 的 ndarray,但支持 GPU 加速。数学运算:提供了各种数学运算,包括线性代数操作、随机数生成等。自动微分:torch.autograd 模块用于…...

Java开发之LinkedList源码分析

#来自ゾフィー(佐菲) 1 简介 LinkedList 的底层数据结构是双向链表。可以当作链表、栈、队列、双端队列来使用。有以下特点: 在插入或删除数据时,性能好;允许有 null 值;查询效率不高;线程不安…...

外卖霸王餐系统架构怎么选?

在当今日益繁荣的外卖市场中,外卖霸王餐作为一种独特的营销策略,受到了众多商家的青睐。然而,要想成功实施外卖霸王餐活动,一个安全、稳定且高效的架构选择至关重要。本文将深入探讨外卖霸王餐架构的选择,以期为商家提…...

AV1技术学习:Transform Coding

对预测残差进行变换编码,去除潜在的空间相关性。VP9 采用统一的变换块大小设计,编码块中的所有的块共享相同的变换大小。VP9 支持 4 4、8 8、16 16、32 32 四种正方形变换大小。根据预测模式选择由一维离散余弦变换 (DCT) 和非对称离散正弦变换 (ADS…...

Git操作指令

Git操作指令 一、安装git 1、设置配置信息: # global全局配置 git config --global user.name "Your username" git config --global user.email "Your email"2、查看git版本号 git -v # or git --version3、查看配置信息: git…...

CSS 创建:从入门到精通

CSS 创建:从入门到精通 CSS(层叠样式表)是网页设计中不可或缺的一部分,它用于控制网页的布局和样式。本文将详细介绍CSS的创建过程,包括基本概念、语法结构、选择器、样式属性以及如何将CSS应用到HTML中。无论您是初学者还是有经验的开发者,本文都将为您提供宝贵的信息。…...

Windows 11 系统对磁盘进行分区保姆级教程

Windows 11磁盘分区 磁盘分区是将硬盘驱动器划分为多个逻辑部分的过程,每个逻辑部分都可以独立使用和管理。在Windows 11操作系统中进行磁盘分区主要有以下几个作用和意义: 组织和管理数据:分区可以帮助用户更好地组织他们的数据&#xff0c…...

探索WebKit的CSS盒模型:深入理解Web布局的基石

探索WebKit的CSS盒模型:深入理解Web布局的基石 在Web开发的世界中,CSS盒模型(Box Model)是构建网页布局的核心原理。WebKit,作为Safari浏览器的渲染引擎,对CSS盒模型有着深入而精确的支持。本文将带你深入…...

c++初阶知识——string类详解

目录 前言: 1.标准库中的string类 1.1 auto和范围for auto 范围for 1.2 string类常用接口说明 1.string类对象的常见构造 1.3 string类对象的访问及遍历操作 1.4. string类对象的修改操作 1.5 string类非成员函数 2.string类的模拟实现 2.1 经典的string…...

php接口返回的json字符串,json_decode()失败,原来是多了红点

问题: 调用某个接口返回的json,json_decode()失败,返回数据为null, echo json_last_error();返回错误码 4 经过多次调试发现:多出来一个红点,预览是看不到的。 解决:要去除BOM头部 $resul…...

Python3网络爬虫开发实战(2)爬虫基础库

文章目录 一、urllib1. urlparse 实现 URL 的识别和分段2. urlunparse 用于构造 URL3. urljoin 用于两个链接的拼接4. urlencode 将 params 字典序列化为 params 字符串5. parse_qs 和 parse_qsl 用于将 params 字符串反序列化为 params 字典或列表6. quote 和 unquote 对 URL的…...

el-image预览图片点击遮盖处关闭预览

预览关闭按钮不明显 解决方式: 1.修改按钮样式明显点: //el-image 添加自定义类名,下文【test-image】代指 .test-image .el-icon-circle-close{ color:#fff; font-size:20px; ...改成很明显的样式 }2.使用事件监听,监听当前遮…...

基于Neo4j将知识图谱用于检索增强生成:Knowledge Graphs for RAG

Knowledge Graphs for RAG 本文是学习https://www.deeplearning.ai/short-courses/knowledge-graphs-rag/这门课的学习笔记。 What you’ll learn in this course Knowledge graphs are used in development to structure complex data relationships, drive intelligent sea…...

康康近期的慢SQL(oracle vs 达梦)

近期执行的sql,哪些比较慢? 或者健康检查时搂一眼状态 oracle: --最近3天内的慢sql set lines 200 pages 100 col txt for a65 col sql_id for a13 select a.sql_id,a.cnt,a.pctload,b.sql_text txt from (select * from (select sql_id,co…...

探索 GPT-4o mini:成本效益与创新的双重驱动

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

2.6基本算法之动态规划2989:糖果

描述 由于在维护世界和平的事务中做出巨大贡献,Dzx被赠予糖果公司2010年5月23日当天无限量糖果免费优惠券。在这一天,Dzx可以从糖果公司的N件产品中任意选择若干件带回家享用。糖果公司的N件产品每件都包含数量不同的糖果。Dzx希望他选择的产品包含的糖…...

12.顶部带三角形的边框 CSS 关键字 currentColor

顶部带三角形的边框 创建一个在顶部带有三角形的内容容器。 使用 ::before 和 ::after 伪元素创建两个三角形。两个三角形的颜色应分别与容器的 border-color 和容器的 background-color 相同。一个三角形(::before)的 border-width 应比另一个(::after)宽 1px,以起到边框的作…...

Llama中模块参数大小

LLama2中,流程中数据大小的变换如下 Transformer模块 第一次输入,进行prefill,输入x维度为[1, 8, 4096] 1. 构建wq,wk,wv,wo,尺寸均为[4096,4096], 与x点乘,得到xq, xk, xv 2. 构建KV cache, 尺寸为 [b…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇,是在之前两篇博客的基础上展开,主要介绍如何在操作界面中创建和拖动物体,这篇博客跟随的视频链接如下: B 站视频:s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...

Redis数据倾斜问题解决

Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

【Oracle】分区表

个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的

修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...