当前位置: 首页 > news >正文

Spark Streaming DStream的操作

一、DStream的定义

DStream是离散流,Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。DStream可以通过输入数据源来创建,比如Kafka、Flume,也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window。
DStream的内部,其实是一系列持续不断产生的RDD,RDD是Spark Core的核心抽象,即不可变的,分布式的数据集。
对DStream应用的算子,其实在底层会被翻译为对DStream中每个RDD的操作,比如对一个DStream执行一个map操作,会产生一个新的DStream,其底层原理为,对输入DStream中的每个时间段的RDD,都应用一遍map操作,然后生成的RDD,即作为新的DStream中的那个时间段的一个RDD。

二、DStream的操作

1,普通的转换操作

转换描述
map(func)源 DStream的每个元素通过函数func返回一个新的DStream。
flatMap(func)类似与map操作,不同的是每个输入元素可以被映射出0或者更多的输出元素。
filter(func)在源DSTREAM上选择Func函数返回仅为true的元素,最终返回一个新的DSTREAM 。
repartition(numPartitions)通过输入的参数numPartitions的值来改变DStream的分区大小。
union(otherStream)返回一个包含源DStream与其他 DStream的元素合并后的新DSTREAM。
count()对源DStream内部的所含有的RDD的元素数量进行计数,返回一个内部的RDD只包含一个元素的DStreaam。
reduce(func)使用函数func(有两个参数并返回一个结果)将源DStream 中每个RDD的元素进行聚 合操作,返回一个内部所包含的RDD只有一个元素的新DStream。
countByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次。
reduceByKey(func, [numTasks])当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新 DStream,其中每个键的值V都是使用聚合函数func汇总。注意:默认情况下,使用 Spark的默认并行度提交任务(本地模式下并行度为2,集群模式下位8),可以通过配置numTasks设置不同的并行任务数。
join(otherStream, [numTasks])当被调用类型分别为(K,V)和(K,W)键值对的2个DStream 时,返回类型为(K,(V,W))键值对的一个新 DSTREAM。
cogroup(otherStream, [numTasks])当被调用的两个DStream分别含有(K, V) 和(K, W)键值对时,返回一个(K, Seq[V], Seq[W])类型的新的DStream。
transform(func)通过对源DStream的每RDD应用RDD-to-RDD函数返回一个新的DStream,这可以用来在DStream做任意RDD操作。
updateStateByKey(func)返回一个新状态的DStream,其中每个键的状态是根据键的前一个状态和键的新值应用给定函数func后的更新。这个方法可以被用来维持每个键的任何状态数据。

2,窗口转换函数

转换描述
window(windowLength, slideInterval)返回一个基于源DStream的窗口批次计算后得到新的DStream。
countByWindow(windowLength,slideInterval)返回基于滑动窗口的DStream中的元素的数量。
reduceByWindow(func, windowLength,slideInterval)基于滑动窗口对源DStream中的元素进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操作,得到一个新的DStream。
reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最早的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么我们可以将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率。
countByValueAndWindow(windowLength,slideInterval, [numTasks])基于滑动窗口计算源DStream中每个RDD内每个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue一样,reduce任务的数量可以通过一个可选参数进行配置。

在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的。因此在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。

对于窗口操作而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔(window duration)决定,而窗口间隔指的就是窗口的持续时间,在窗口操作中,只有窗口的长度满足了才会触发批数据的处理。除了窗口的长度,窗口操作还有另一个重要的参数就是滑动间隔(slide duration),它指的是经过多长时间窗口滑动一次形成新的窗口,滑动窗口默认情况下和批次间隔的相同,而窗口间隔一般设置的要比它们两个大。

3,输出操作

转换描述
print()在Driver中打印出DStream中数据的前10个元素。
saveAsTextFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix, [suffix])将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

三、常用操作详解

1,transform(func)

该transform操作(转换操作)及其类似的transformWith操作,允许在DStream上应用任意的RDD-to-RDD函数。它可以实现DStream API中未提供的操作,比如两个数据流的连接操作。
示例代码:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam informationval cleanedDStream = wordCounts.transform { rdd =>rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning...
}

2,updateStateByKey操作

使用的一般操作都是不记录历史数据的,也就说只记录当前定义时间段内的数据,跟前后时间段无关。如果要统计历史时间内的总共数据并且实时更新,如何解决呢?该updateStateByKey操作可以让你保持任意状态,同时不断有新的信息进行更新。

要使用updateStateByKey操作,必须进行下面两个步骤 :
(1)定义状态: 状态可以是任意的数据类型。
(2)定义状态更新函数:用一个函数指定如何使用先前的状态和从输入流中获取的新值更新状态。
对DStream通过updateStateByKey(updateFunction)来实现实时更新。

更新函数有两个参数 :
(1)newValues是当前新进入的数据。
(2)runningCount 是历史数据,被封装到了Option中。

示例:
首先我们需要了解数据的类型
编写处理方法
封装结果
代码:

//定义更新函数
//我们这里使用的Int类型的数据,因为要做统计个数
def updateFunc(newValues : Seq[Int],state :Option[Int])Some[Int] = {//传入的newVaules将当前的时间段的数据全部保存到Seq中//调用foldLeft(0)(_+_) 从0位置开始累加到结束   val currentCount = newValues.foldLeft(0)(_+_) //获取历史值,没有历史数据时为None,有数据的时候为Some//getOrElse(x)方法,如果获取值为None则用x代替val  previousCount = state.getOrElse(0)//计算结果,封装成Some返回Some(currentCount+previousCount) 
}
//使用
val stateDStream = DStream.updateStateByKey[Int](updateFunc)

相关文章:

Spark Streaming DStream的操作

一、DStream的定义 DStream是离散流,Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流。DStream可以通过输入数据源来创建,比如Kafka、Flume,也可以通过对其他DStream应用高阶函数来创建,比如map、redu…...

蓝桥杯冲刺 - week1

文章目录💬前言🌲day192. 递归实现指数型枚举843. n-皇后问题🌲day2日志统计1209. 带分数🌲day3844. 走迷宫1101. 献给阿尔吉侬的花束🌲day41113. 红与黑🌲day51236. 递增三元组🌲day63491. 完全…...

Leetcode27. 移除元素

目录一、题目描述:二、解决思路和代码1. 解决思路2. 代码一、题目描述: 给你一个数组 nums 和一个值 val,你需要 原地 移除所有数值等于 val 的元素,并返回移除后数组的新长度。 不要使用额外的数组空间,你必须仅使用…...

ViewService——一种保证客户端与服务端同步的方法

简介在分布式系统中,最常见的场景就是主备架构。但是如果主机不幸宕机,如何正确的通知客户端当前后端服务器的状况成为一个值得研究的问题。本文描述了一种简单的模型用于解决此问题。背景以一个分布式的Key-Value数据库为背景。数据库对外提供3个接口Ge…...

使用STM32F103ZE开发贪吃蛇游戏

目录 前言 一、设置FreeROTS用户任务 (1)事件event任务 (2)按键输入方向控制任务 (3)果实食物任务 (4)显示任务函数 (3)开始任务 二、主函数 三、ADC采样…...

如何利用Web3D技术打造在线虚拟展览馆

随着Web3D技术的不断发展,越来越多的企业和组织开始将其应用于虚拟展览馆的建设中。虚拟展览馆可以为观众提供高度沉浸式的展览体验,让观众可以随时随地参观各种展览,同时也为展览组织者提供了更多的展示方式和机会。下面将介绍如何利用Web3D…...

第二十三章 opengl之高级OpenGL(实例化)

OpenGL实例化实例化数组绘制小行星带实例化 综合应用。 如果绘制了很多的模型,但是大部分的模型包含同一组顶点数据,只是不同的世界空间变换。 举例:一个全是草的场景,每根草都是一个包含了几个小三角形的模型。需要绘制很多根草…...

C++ String类总结

头文件 #include <string>构造函数 default (1) basic_string();explicit basic_string (const allocator_type& alloc); copy (2) basic_string (const basic_string& str);basic_string (const basic_string& str, const allocator_type& alloc); su…...

内网升级“高效安全”利器!统信软件发布私有化更新管理平台

随着数字化的深度推进&#xff0c;信息安全重要性进一步凸显。建设自主可控的国产操作系统&#xff0c;提升信息安全自主能力&#xff0c;已成为国家重要战略之一。 操作系统安全对计算机系统的整体安全发挥着关键作用&#xff0c;各类客户往往需要在第一时间获取更新与安全补…...

JAVA开发(自研项目的开发与推广)

https://live.csdn.net/v/284629 案例背景&#xff1a; 作为JAVA开发人员&#xff0c;我们可以开发无数多的web项目&#xff0c;电商系统&#xff0c;小程序&#xff0c;H5商城。有时候作为技术研发负责人&#xff0c;项目做成了有时候也需要对内进行内测&#xff0c;对外进行…...

Mysql用户权限分配详解

文章目录MySQL 权限介绍一、Mysql权限级别分析&#xff08;1&#xff09;全局级别&#xff08;1.1&#xff09; USER表的组成结构&#xff08;1.1.1&#xff09; 用户列&#xff08;1.1.2&#xff09; 权限列&#xff08;1.1.3&#xff09; 安全列&#xff08;1.1.4&#xff09…...

【TypeScript 入门】13.枚举类型

枚举类型 枚举类型:定义包含被命名的常量的集合。比如 TypeScript 支持枚举数字、字符两种常量值类型。 使用方式: enum + 枚举名字 + 花括弧包裹被命名了的常量成员: enum Size {S,M,L } const a = Size.M console.log(Size, Size)...

Python科学计算:偏微分方程1

首先&#xff0c;我们来看初边值问题&#xff1a;伯格斯方程&#xff1a;假设函数是定义在上的函数&#xff0c;且满足&#xff1a;右侧第一项表示自对流&#xff0c;第二项则表示扩散&#xff0c;在许多物理过程中&#xff0c;这两种效应占据着主导地位&#xff0c;为了固定一…...

PLS-DA分类的实现(基于sklearn)

目录 简单介绍 代码实现 数据集划分 选择因子个数 模型训练并分类 调用函数 简单介绍 &#xff08;此处取自各处资料&#xff09; PLS-DA既可以用来分类&#xff0c;也可以用来降维&#xff0c;与PCA不同的是&#xff0c;PCA是无监督的&#xff0c;PLS-DA是有监督的…...

常用hook

Hook 是 React 16.8 的新增特性。它可以让你在不编写 class 的情况下使用 state 以及其他的 React 特性。理解&#xff1a;hook是react提供的函数API官方提供的hook基础hookuseState APIconst [state, setState] useState(initialState); //返回state值 以及更新state的方法 …...

TryHackMe-GoldenEye(boot2root)

GoldenEye 这个房间将是一个有指导的挑战&#xff0c;以破解詹姆斯邦德风格的盒子并获得根。 端口扫描 循例nmap Web枚举 进入80 查看terminal.js 拿去cyberchef解码 拿着这组凭据到/sev-home登录 高清星际大战 POP3枚举 使用刚刚的凭据尝试登录pop3 使用hydra尝试爆破 这…...

Elasticsearch基本安全加上安全的 HTTPS 流量

基本安全加上安全的 HTTPS 流量 在生产环境中&#xff0c;除非您在 HTTP 层启用 TLS&#xff0c;否则某些 Elasticsearch 功能&#xff08;例如令牌和 API 密钥&#xff09;将被禁用。这个额外的安全层确保进出集群的所有通信都是安全的。 当您在模式下运行该elasticsearch-ce…...

C语言-程序环境和预处理(2)

文章目录预处理详解1.预定义符号2.#define2.1#define定义的标识符2.2#define定义宏2.3#define替换规则注意事项&#xff1a;2.4#和###的作用##的作用2.5带副作用的宏参数2.6宏和函数的对比宏的优势&#xff1a;宏的劣势&#xff1a;宏和函数的一个对比命名约定3.undef4.条件编译…...

JVM 收集算法 垃圾收集器 元空间 引用

文章目录JVM 收集算法标记-清除算法标记-复制算法标记-整理算法JVM垃圾收集器Serial收集器ParNew收集器Parallel Scavenge /Parallel Old收集器CMS收集器Garbage First(G1)收集器元空间引用强引用软引用弱引用虚引用JVM 收集算法 前面我们了解了整个堆内存实际是以分代收集机制…...

clip精读

开头部分 1. 要点一 从文章题目来看-目的是&#xff1a;使用文本监督得到一个可以迁移的 视觉系统 2.要点二 之前是 fix-ed 的class 有诸多局限性&#xff0c;所以现在用大量不是精细标注的数据来学将更好&#xff0c;利用的语言多样性。——这个方法在 nlp其实广泛的存在&…...

Java 语言特性(面试系列2)

一、SQL 基础 1. 复杂查询 &#xff08;1&#xff09;连接查询&#xff08;JOIN&#xff09; 内连接&#xff08;INNER JOIN&#xff09;&#xff1a;返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

现代密码学 | 椭圆曲线密码学—附py代码

Elliptic Curve Cryptography 椭圆曲线密码学&#xff08;ECC&#xff09;是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础&#xff0c;例如椭圆曲线数字签…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...

莫兰迪高级灰总结计划简约商务通用PPT模版

莫兰迪高级灰总结计划简约商务通用PPT模版&#xff0c;莫兰迪调色板清新简约工作汇报PPT模版&#xff0c;莫兰迪时尚风极简设计PPT模版&#xff0c;大学生毕业论文答辩PPT模版&#xff0c;莫兰迪配色总结计划简约商务通用PPT模版&#xff0c;莫兰迪商务汇报PPT模版&#xff0c;…...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...