当前位置: 首页 > news >正文

Spark Streaming DStream的操作

一、DStream的定义

DStream是离散流,Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。DStream可以通过输入数据源来创建,比如Kafka、Flume,也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。
DStream的内部,其实是一系列持续不断产生的RDD,RDD是Spark Core的核心抽象,即不可变的,分布式的数据集。
对DStream应用的算子,其实在底层会被翻译为对DStream中每个RDD的操作,比如对一个DStream执行一个map操作,会产生一个新的DStream,其底层原理为,对输入DStream中的每个时间段的RDD,都应用一遍map操作,然后生成的RDD,即作为新的DStream中的那个时间段的一个RDD。

二、DStream的操作

1,普通的转换操作

转换描述
map(func)源 DStream的每个元素通过函数func返回一个新的DStream。
flatMap(func)类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。
filter(func)在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。
repartition(numPartitions)通过输入的参数numPartitions的值来改变DStream的分区大小。
union(otherStream)返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。
count()对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。
reduce(func)使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚 合操作,返回一个内部所包含的RDD只有一个元素的新DStream。
countByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。
reduceByKey(func, [numTasks])当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。
join(otherStream, [numTasks])当被调用类型分别为(K,V)和(K,W)键值对的2个DStream 时,返回类型为(K,(V,W))键值对的一个新 DSTREAM。
cogroup(otherStream, [numTasks])当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。
transform(func)通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。
updateStateByKey(func)返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。

2,窗口转换函数

转换描述
window(windowLength, slideInterval)返回一个基于源DStream的窗口批次计算后得到新的DStream。
countByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量。
reduceByWindow(func, windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。
countByValueAndWindow(windowLength,slideInterval, [numTasks])基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。

在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的。因此在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。

对于窗口操作而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔(window duration)决定,而窗口间隔指的就是窗口的持续时间,在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度,窗口操作还有另一个重要的参数就是滑动间隔(slide duration),它指的是经过多长时间窗口滑动一次形成新的窗口,滑动窗口默认情况下和批次间隔的相同,而窗口间隔一般设置的要比它们两个大。

3,输出操作

转换描述
print()在Driver中打印出DStream中数据的前10个元素。
saveAsTextFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix, [suffix])将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

三、常用操作详解

1,transform(func)

该transform操作(转换操作)及其类似的transformWith操作,允许在DStream上应用任意的RDD-to-RDD函数。它可以实现DStream API中未提供的操作,比如两个数据流的连接操作。
示例代码:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam informationval cleanedDStream = wordCounts.transform { rdd =>rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning...
}

2,updateStateByKey操作

使用的一般操作都是不记录历史数据的,也就说只记录当前定义时间段内的数据,跟前后时间段无关。如果要统计历史时间内的总共数据并且实时更新,如何解决呢?该updateStateByKey操作可以让你保持任意状态,同时不断有新的信息进行更新。

要使用updateStateByKey操作,必须进行下面两个步骤 :
(1)定义状态: 状态可以是任意的数据类型。
(2)定义状态更新函数:用一个函数指定如何使用先前的状态和从输入流中获取的新值更新状态。
对DStream通过updateStateByKey(updateFunction)来实现实时更新。

更新函数有两个参数 :
(1)newValues是当前新进入的数据。
(2)runningCount 是历史数据,被封装到了Option中。

示例:
首先我们需要了解数据的类型
编写处理方法
封装结果
代码:

//定义更新函数
//我们这里使用的Int类型的数据,因为要做统计个数
def updateFunc(newValues : Seq[Int],state :Option[Int])Some[Int] = {//传入的newVaules将当前的时间段的数据全部保存到Seq中//调用foldLeft(0)(_+_) 从0位置开始累加到结束   val currentCount = newValues.foldLeft(0)(_+_) //获取历史值,没有历史数据时为None,有数据的时候为Some//getOrElse(x)方法,如果获取值为None则用x代替val  previousCount = state.getOrElse(0)//计算结果,封装成Some返回Some(currentCount+previousCount) 
}
//使用
val stateDStream = DStream.updateStateByKey[Int](updateFunc)

相关文章:

Spark Streaming DStream的操作

一、DStream的定义 DStream是离散流,Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。DStream可以通过输入数据源来创建,比如Kafka、Flume,也可以通过对其他DStream应用高阶函数来创建,比如map、redu…...

蓝桥杯冲刺 - week1

文章目录💬前言🌲day192. 递归实现指数型枚举843. n-皇后问题🌲day2日志统计1209. 带分数🌲day3844. 走迷宫1101. 献给阿尔吉侬的花束🌲day41113. 红与黑🌲day51236. 递增三元组🌲day63491. 完全…...

Leetcode27. 移除元素

目录一、题目描述:二、解决思路和代码1. 解决思路2. 代码一、题目描述: 给你一个数组 nums 和一个值 val,你需要 原地 移除所有数值等于 val 的元素,并返回移除后数组的新长度。 不要使用额外的数组空间,你必须仅使用…...

ViewService——一种保证客户端与服务端同步的方法

简介在分布式系统中,最常见的场景就是主备架构。但是如果主机不幸宕机,如何正确的通知客户端当前后端服务器的状况成为一个值得研究的问题。本文描述了一种简单的模型用于解决此问题。背景以一个分布式的Key-Value数据库为背景。数据库对外提供3个接口Ge…...

使用STM32F103ZE开发贪吃蛇游戏

目录 前言 一、设置FreeROTS用户任务 (1)事件event任务 (2)按键输入方向控制任务 (3)果实食物任务 (4)显示任务函数 (3)开始任务 二、主函数 三、ADC采样…...

如何利用Web3D技术打造在线虚拟展览馆

随着Web3D技术的不断发展,越来越多的企业和组织开始将其应用于虚拟展览馆的建设中。虚拟展览馆可以为观众提供高度沉浸式的展览体验,让观众可以随时随地参观各种展览,同时也为展览组织者提供了更多的展示方式和机会。下面将介绍如何利用Web3D…...

第二十三章 opengl之高级OpenGL(实例化)

OpenGL实例化实例化数组绘制小行星带实例化 综合应用。 如果绘制了很多的模型,但是大部分的模型包含同一组顶点数据,只是不同的世界空间变换。 举例:一个全是草的场景,每根草都是一个包含了几个小三角形的模型。需要绘制很多根草…...

C++ String类总结

头文件 #include <string>构造函数 default (1) basic_string();explicit basic_string (const allocator_type& alloc); copy (2) basic_string (const basic_string& str);basic_string (const basic_string& str, const allocator_type& alloc); su…...

内网升级“高效安全”利器!统信软件发布私有化更新管理平台

随着数字化的深度推进&#xff0c;信息安全重要性进一步凸显。建设自主可控的国产操作系统&#xff0c;提升信息安全自主能力&#xff0c;已成为国家重要战略之一。 操作系统安全对计算机系统的整体安全发挥着关键作用&#xff0c;各类客户往往需要在第一时间获取更新与安全补…...

JAVA开发(自研项目的开发与推广)

https://live.csdn.net/v/284629 案例背景&#xff1a; 作为JAVA开发人员&#xff0c;我们可以开发无数多的web项目&#xff0c;电商系统&#xff0c;小程序&#xff0c;H5商城。有时候作为技术研发负责人&#xff0c;项目做成了有时候也需要对内进行内测&#xff0c;对外进行…...

Mysql用户权限分配详解

文章目录MySQL 权限介绍一、Mysql权限级别分析&#xff08;1&#xff09;全局级别&#xff08;1.1&#xff09; USER表的组成结构&#xff08;1.1.1&#xff09; 用户列&#xff08;1.1.2&#xff09; 权限列&#xff08;1.1.3&#xff09; 安全列&#xff08;1.1.4&#xff09…...

【TypeScript 入门】13.枚举类型

枚举类型 枚举类型:定义包含被命名的常量的集合。比如 TypeScript 支持枚举数字、字符两种常量值类型。 使用方式: enum + 枚举名字 + 花括弧包裹被命名了的常量成员: enum Size {S,M,L } const a = Size.M console.log(Size, Size)...

Python科学计算:偏微分方程1

首先&#xff0c;我们来看初边值问题&#xff1a;伯格斯方程&#xff1a;假设函数是定义在上的函数&#xff0c;且满足&#xff1a;右侧第一项表示自对流&#xff0c;第二项则表示扩散&#xff0c;在许多物理过程中&#xff0c;这两种效应占据着主导地位&#xff0c;为了固定一…...

PLS-DA分类的实现(基于sklearn)

目录 简单介绍 代码实现 数据集划分 选择因子个数 模型训练并分类 调用函数 简单介绍 &#xff08;此处取自各处资料&#xff09; PLS-DA既可以用来分类&#xff0c;也可以用来降维&#xff0c;与PCA不同的是&#xff0c;PCA是无监督的&#xff0c;PLS-DA是有监督的…...

常用hook

Hook 是 React 16.8 的新增特性。它可以让你在不编写 class 的情况下使用 state 以及其他的 React 特性。理解&#xff1a;hook是react提供的函数API官方提供的hook基础hookuseState APIconst [state, setState] useState(initialState); //返回state值 以及更新state的方法 …...

TryHackMe-GoldenEye(boot2root)

GoldenEye 这个房间将是一个有指导的挑战&#xff0c;以破解詹姆斯邦德风格的盒子并获得根。 端口扫描 循例nmap Web枚举 进入80 查看terminal.js 拿去cyberchef解码 拿着这组凭据到/sev-home登录 高清星际大战 POP3枚举 使用刚刚的凭据尝试登录pop3 使用hydra尝试爆破 这…...

Elasticsearch基本安全加上安全的 HTTPS 流量

基本安全加上安全的 HTTPS 流量 在生产环境中&#xff0c;除非您在 HTTP 层启用 TLS&#xff0c;否则某些 Elasticsearch 功能&#xff08;例如令牌和 API 密钥&#xff09;将被禁用。这个额外的安全层确保进出集群的所有通信都是安全的。 当您在模式下运行该elasticsearch-ce…...

C语言-程序环境和预处理(2)

文章目录预处理详解1.预定义符号2.#define2.1#define定义的标识符2.2#define定义宏2.3#define替换规则注意事项&#xff1a;2.4#和###的作用##的作用2.5带副作用的宏参数2.6宏和函数的对比宏的优势&#xff1a;宏的劣势&#xff1a;宏和函数的一个对比命名约定3.undef4.条件编译…...

JVM 收集算法 垃圾收集器 元空间 引用

文章目录JVM 收集算法标记-清除算法标记-复制算法标记-整理算法JVM垃圾收集器Serial收集器ParNew收集器Parallel Scavenge /Parallel Old收集器CMS收集器Garbage First(G1)收集器元空间引用强引用软引用弱引用虚引用JVM 收集算法 前面我们了解了整个堆内存实际是以分代收集机制…...

clip精读

开头部分 1. 要点一 从文章题目来看-目的是&#xff1a;使用文本监督得到一个可以迁移的 视觉系统 2.要点二 之前是 fix-ed 的class 有诸多局限性&#xff0c;所以现在用大量不是精细标注的数据来学将更好&#xff0c;利用的语言多样性。——这个方法在 nlp其实广泛的存在&…...

OpenLayers 可视化之热力图

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 热力图&#xff08;Heatmap&#xff09;又叫热点图&#xff0c;是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一&#xff09; 1. CSI-2层定义&#xff08;CSI-2 Layer Definitions&#xff09; 分层结构 &#xff1a;CSI-2协议分为6层&#xff1a; 物理层&#xff08;PHY Layer&#xff09; &#xff1a; 定义电气特性、时钟机制和传输介质&#xff08;导线&#…...

深入理解JavaScript设计模式之单例模式

目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式&#xff08;Singleton Pattern&#…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中&#xff0c;UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

GitHub 趋势日报 (2025年06月06日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...

pgsql:还原数据库后出现重复序列导致“more than one owned sequence found“报错问题的解决

问题&#xff1a; pgsql数据库通过备份数据库文件进行还原时&#xff0c;如果表中有自增序列&#xff0c;还原后可能会出现重复的序列&#xff0c;此时若向表中插入新行时会出现“more than one owned sequence found”的报错提示。 点击菜单“其它”-》“序列”&#xff0c;…...

DAY 45 超大力王爱学Python

来自超大力王的友情提示&#xff1a;在用tensordoard的时候一定一定要用绝对位置&#xff0c;例如&#xff1a;tensorboard --logdir"D:\代码\archive (1)\runs\cifar10_mlp_experiment_2" 不然读取不了数据 知识点回顾&#xff1a; tensorboard的发展历史和原理tens…...

RabbitMQ 各类交换机

为什么要用交换机&#xff1f; 交换机用来路由消息。如果直发队列&#xff0c;这个消息就被处理消失了&#xff0c;那别的队列也需要这个消息怎么办&#xff1f;那就要用到交换机 交换机类型 1&#xff0c;fanout&#xff1a;广播 特点 广播所有消息​​&#xff1a;将消息…...

轻量安全的密码管理工具Vaultwarden

一、Vaultwarden概述 Vaultwarden主要作用是提供一个自托管的密码管理器服务。它是Bitwarden密码管理器的第三方轻量版&#xff0c;由国外开发者在Bitwarden的基础上&#xff0c;采用Rust语言重写而成。 &#xff08;一&#xff09;Vaultwarden镜像的作用及特点 轻量级与高性…...