当前位置: 首页 > news >正文

Spark Streaming DStream转换

        DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的算子,如:updateStateByKey()、transform()以及各种Window相关的算子。

无状态转化操作

        无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中

         需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。

        例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。

Transform

Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。


object Spark06_Nostate_Transform {def main(args: Array[String]): Unit = {//创建SparkConfval conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(conf, Seconds(3))//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//转换为RDD操作val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {val words: RDD[String] = rdd.flatMap(_.split(" "))val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value})//打印wordAndCountDStream.print//启动ssc.start()ssc.awaitTermination()}
}

有状态转化操作

UpdateStateByKey

        UpdateStateByKey算子用于将历史结果应用到当前批次,该操作允许在使用新信息不断更新状态的同时能够保留他的状态。

        有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

UpdateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。

        为使用这个功能,需要做下面两步:

        1. 定义状态,状态可以是一个任意的数据类型。

        2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

        使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。

        更新版的wordcount

 例如:每隔一段时间景点人新增流量(从程序启动开始,在原有递增)

  1. 编写代码

object Spark07_State_updateStateByKey {def main(args: Array[String]): Unit = {//创建SparkConfval conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(conf, Seconds(3))//设置检查点路径  用于保存状态ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//扁平映射val flatMapDS: DStream[String] = lineDStream.flatMap(_.split(" "))//结构转换val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1))//聚合// 注意:DStreasm中reduceByKey只能对当前采集周期(窗口)进行聚合操作,没有状态//val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)val stateDS: DStream[(String, Int)] = mapDS.updateStateByKey((seq: Seq[Int], state: Option[Int]) => {Option(seq.sum + state.getOrElse(0))})
//打印输出stateDS.print()//启动ssc.start()ssc.awaitTermination()}
}
  1. 启动程序并向9999端口发送数据

[atguigu@hadoop202 ~]$ nc -lk 9999

  1. 查看结果为累加

Window Operations(窗口操作)

Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据。默认情况下, 计算只对一个时间段内的RDD进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上。一个窗口可以包含多个时间段,基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长

  • 窗口时长:计算内容的时间范围;
  • 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期的整数倍。

如下图所示WordCount案例:窗口大小为批次的2倍,滑动步等于批次大小。

 例如:一小时人流量的变化,窗口(6秒)和间隔(3秒)不一致,不一定从程序启动开始

需求:WordCount统计 3秒一个批次,窗口6秒,滑步3秒。

object Spark08_State_window {def main(args: Array[String]): Unit = {//创建SparkConfval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//设置检查点路径  用于保存状态ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//扁平映射val flatMapDS: DStream[String] = lineDStream.flatMap(_.split(" "))//设置窗口大小,滑动的步长val windowDS: DStream[String] = flatMapDS.window(Seconds(6),Seconds(3))//结构转换val mapDS: DStream[(String, Int)] = windowDS.map((_,1))//聚合val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)reduceDS.print()//启动ssc.start()ssc.awaitTermination()}
}

关于Window的操作还有如下方法:

1) window(windowLength, slideInterval)

        基于对源DStream窗化的批次进行计算返回一个新的Dstream

2) countByWindow(windowLength, slideInterval)

         返回一个滑动窗口计数流中的元素个数

3) countByValueAndWindow()

        返回的DStream则包含窗口中每个值的个数

4) reduceByWindow(func, windowLength, slideInterval)

         通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流

5) reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

        当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值

6) reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

        这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。如果把3秒的时间窗口当成一个池塘,池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。而第二个函数是,每游出去一条鱼,就将该鱼的总数减去一。

相关文章:

Spark Streaming DStream转换

DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的算子,如:updateStateByKey()、transform()以及各种Window相…...

水果商城,可运行

文章目录项目介绍一、技术栈二、本项目分为前后台,有管理员与用户两种角色;1、管理员角色包含以下功能:2、用户角色包含以下功能:三、用户功能页面展示四、管理员功能页面展示五、部分代码展示六、获取整套项目源码项目介绍 一、…...

LiveGBS国标GB/T28181国标视频流媒体平台-功能报警订阅配置报警预案告警截图及录像

LiveGBS国标GB/T28181国标视频流媒体平台-功能报警订阅配置报警预案告警截图及录像1、报警信息1.1、报警查询1.2、配置开启报警订阅1.2.1、国标设备编辑1.2.2、选择开启报警订阅1.3、配置摄像头报警1.3.1、配置摄像头报警通道ID1.3.2、配置摄像头开启侦测1.3.3、尝试触发摄像头…...

软件测试---测试分类

一 : 按测试对象划分 1.1 可靠性测试 可靠性(Availability)即可用性,是指系统正常运行的能力或者程度,一般用正常向用户提供软件服务的时间占总时间的百分比表示。 1.2 容错性测试 行李箱 , 四个轮子 , 坏了一个 , 说明这个容错…...

剑指 Offer II 015. 字符串中的所有变位词

题目链接 剑指 Offer II 015. 字符串中的所有变位词 mid 题目描述 给定两个字符串 s和 p,找到 s中所有 p的 变位词 的子串,返回这些子串的起始索引。不考虑答案输出的顺序。 变位词 指字母相同,但排列不同的字符串。 示例 1: 输…...

【SpringCloud】SpringCloud详细教程之微服务比较

目录前言一.什么是微服务?为什么要使用微服务二.微服务对比三.企业开发场景前言 我会通过实际代码来给展示每个组件的用法 一.什么是微服务?为什么要使用微服务 分布式,把一个项目拆分成多个模块,每一个模块相当于一个服务。 微…...

二.项目使用vue-router,引入ant-design-vue的UI框架,引入less

根据前文《使用Vue脚手架工具搭建vue项目》搭建好脚手架后使用 1.vue-router 2.引入UI框架ant design vue 3.引入less 1.vue-router vue-router分为两种模式(默认为hash模式): hash history hash: 特征: 1.hash会在浏览器路径里带#号&#…...

网络安全怎么学?20年白帽子老江湖告诉你

很多人都知道龙叔是个老程序员,但却不知道其实我也是个H客,20年前我就开始痴迷于H客技术,可以说是网络安全方面的老江湖了。 到现在,我还依然会去研究这一块,偶尔会和一些网安的朋友交流技术,比如说红盟的…...

药房管理系统;药库管理系统

第一,主要功能:  本系统集日常销售、药品进销存、会员积分、GSP管理等药店所需的所有功能于一体,实现店铺管理的全部自动化。第二、新功能:  增加了“按功能查询药品”的功能,使软件用户可以根据客户的症状推荐合适…...

深眸科技|机器视觉提升制造性能,焕发传统企业智造新活力!

随着机器视觉技术的成熟与发展,其在工业制造中得到越来越广泛的应用。机器视觉在工业制造领域的应用朝着智能识别、智能检测、智能测量以及智能互联的完整智能体系方向发展。此外,快速变化的市场需求,不断涌入行业的竞争对手,让传…...

ubuntu安装SSH的方法

Ubuntu安装SSH的方法。14版的ubuntu经过测试,默认没有开启SSH,所以需要安装。 1、虚拟机设置网卡为桥接模式,即NAT。12版虚拟机默认的。 2、查看ubuntu使用的ip。 ifconfig即可查看,14版的ubuntu自带这个命令。 3、查看是否pi…...

哪种蓝牙耳机通话效果好?通话清晰的蓝牙耳机推荐

出门的时候,如果戴耳机和别人通话,就不必把耳机摘下来,接电话变得前所未有的简单。现在的蓝牙耳机,已经不是单纯的用来听音乐了,而是一种更好的功能。下面这四款蓝牙耳机不仅适合听歌,通话还清晰&#xff0…...

IT运维如何完成一场高质量复盘

复盘的终极目标是:还原事实,找到薄弱点加以改进。 提到复盘,很多人的第一反应是线上故障,有人要背锅了。 复盘真正的价值是还原事实,在薄弱处加以改进。如何做一次高质量的复盘,我们给出3点建议。 1、坦…...

JVM调优面试题——基础知识

文章目录1、JDK,JRE以及JVM的关系2、编译器到底干了什么事?3、类加载机制是什么?3.1、装载(Load)3.2、链接(Link)3.3、初始化(Initialize)4、类加载器有哪些?5、什么是双亲委派机制?6、介绍一下JVM内存划分&#xff08…...

三、mongdb 查询

一、 MongoDB文档检索 MongoDB中有多种方式可以检索文档: 1.1 查询过滤器 使用查询过滤器从集合中检索文档。查询过滤器是一组键值对,可按字段值查询文档。 例如: db.col.find({"status":"A"})这个示例查询status等于“A”的文档。 1.2 范围查询操作符…...

python的 ping 网络状态监测方法(含多IP)

ping 基本概念 ping (Packet Internet Groper)是一种因特网包探索器,用于测试网络连接量的程序。Ping是工作在 TCP/IP网络体系结构中应用层的一个服务命令, 主要是向特定的目的主机发送 ICMP(Internet Control Messag…...

【独家】华为OD机试提供C语言题解 - 单词反转

最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 最近更新的博客使用说明单词…...

Linux docker环境安装,docker-compose安装,jdk17安装

安装docker 删除之前安装的docker yum remove docker \docker-client \docker-client-latest \docker- common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-sqlinux \docker-engine-selinux \docker-engine \docker-ce安装yum工具 yum install -y y…...

界面开发(3)--- PyQt5用户登录界面连接数据库

文章目录数据库账户注册账号登录找回密码为了实现用户登录界面的登录功能,我们必须建立一个数据库,并把账号和对应的密码,存储到数据库中。如果输入的账号和密码与数据库中的一致,那我们就允许用户登录,进入新的界面。…...

以下真的没有任何要写的了,我需要凑字数,请大家原谅

以下真的没有任何要写的了,我需要凑字数,请大家原谅!!!!!!!!!!!!!!!&#…...

synchronized 学习

学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...

字符串哈希+KMP

P10468 兔子与兔子 #include<bits/stdc.h> using namespace std; typedef unsigned long long ull; const int N 1000010; ull a[N], pw[N]; int n; ull gethash(int l, int r){return a[r] - a[l - 1] * pw[r - l 1]; } signed main(){ios::sync_with_stdio(false), …...

从实验室到产业:IndexTTS 在六大核心场景的落地实践

一、内容创作&#xff1a;重构数字内容生产范式 在短视频创作领域&#xff0c;IndexTTS 的语音克隆技术彻底改变了配音流程。B 站 UP 主通过 5 秒参考音频即可克隆出郭老师音色&#xff0c;生成的 “各位吴彦祖们大家好” 语音相似度达 97%&#xff0c;单条视频播放量突破百万…...

【大厂机试题解法笔记】矩阵匹配

题目 从一个 N * M&#xff08;N ≤ M&#xff09;的矩阵中选出 N 个数&#xff0c;任意两个数字不能在同一行或同一列&#xff0c;求选出来的 N 个数中第 K 大的数字的最小值是多少。 输入描述 输入矩阵要求&#xff1a;1 ≤ K ≤ N ≤ M ≤ 150 输入格式 N M K N*M矩阵 输…...

若依项目部署--传统架构--未完待续

若依项目介绍 项目源码获取 #Git工具下载 dnf -y install git #若依项目获取 git clone https://gitee.com/y_project/RuoYi-Vue.git项目背景 随着企业信息化需求的增加&#xff0c;传统开发模式存在效率低&#xff0c;重复劳动多等问题。若依项目通过整合主流技术框架&…...

【Vue】scoped+组件通信+props校验

【scoped作用及原理】 【作用】 默认写在组件中style的样式会全局生效, 因此很容易造成多个组件之间的样式冲突问题 故而可以给组件加上scoped 属性&#xff0c; 令样式只作用于当前组件的标签 作用&#xff1a;防止不同vue组件样式污染 【原理】 给组件加上scoped 属性后…...

2025-06-01-Hive 技术及应用介绍

Hive 技术及应用介绍 参考资料 Hive 技术原理Hive 架构及应用介绍Hive - 小海哥哥 de - 博客园https://cwiki.apache.org/confluence/display/Hive/Home(官方文档) Apache Hive 是基于 Hadoop 构建的数据仓库工具&#xff0c;它为海量结构化数据提供类 SQL 的查询能力&#xf…...

开源 vGPU 方案:HAMi,实现细粒度 GPU 切分

本文主要分享一个开源的 GPU 虚拟化方案&#xff1a;HAMi&#xff0c;包括如何安装、配置以及使用。 相比于上一篇分享的 TimeSlicing 方案&#xff0c;HAMi 除了 GPU 共享之外还可以实现 GPU core、memory 得限制&#xff0c;保证共享同一 GPU 的各个 Pod 都能拿到足够的资源。…...