Spark Streaming DStream转换
DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的算子,如:updateStateByKey()、transform()以及各种Window相关的算子。
无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。
例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。
Transform
Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。
object Spark06_Nostate_Transform {def main(args: Array[String]): Unit = {//创建SparkConfval conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(conf, Seconds(3))//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//转换为RDD操作val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {val words: RDD[String] = rdd.flatMap(_.split(" "))val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value})//打印wordAndCountDStream.print//启动ssc.start()ssc.awaitTermination()}
}
有状态转化操作
UpdateStateByKey
UpdateStateByKey算子用于将历史结果应用到当前批次,该操作允许在使用新信息不断更新状态的同时能够保留他的状态。
有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
UpdateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。
为使用这个功能,需要做下面两步:
1. 定义状态,状态可以是一个任意的数据类型。
2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
更新版的wordcount

例如:每隔一段时间景点人新增流量(从程序启动开始,在原有递增)
- 编写代码
object Spark07_State_updateStateByKey {def main(args: Array[String]): Unit = {//创建SparkConfval conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(conf, Seconds(3))//设置检查点路径 用于保存状态ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//扁平映射val flatMapDS: DStream[String] = lineDStream.flatMap(_.split(" "))//结构转换val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1))//聚合// 注意:DStreasm中reduceByKey只能对当前采集周期(窗口)进行聚合操作,没有状态//val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)val stateDS: DStream[(String, Int)] = mapDS.updateStateByKey((seq: Seq[Int], state: Option[Int]) => {Option(seq.sum + state.getOrElse(0))})
//打印输出stateDS.print()//启动ssc.start()ssc.awaitTermination()}
}
- 启动程序并向9999端口发送数据
[atguigu@hadoop202 ~]$ nc -lk 9999
- 查看结果为累加
Window Operations(窗口操作)
Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据。默认情况下, 计算只对一个时间段内的RDD进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上。一个窗口可以包含多个时间段,基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
- 窗口时长:计算内容的时间范围;
- 滑动步长:隔多久触发一次计算。
注意:这两者都必须为采集周期的整数倍。
如下图所示WordCount案例:窗口大小为批次的2倍,滑动步等于批次大小。

例如:一小时人流量的变化,窗口(6秒)和间隔(3秒)不一致,不一定从程序启动开始
需求:WordCount统计 3秒一个批次,窗口6秒,滑步3秒。
object Spark08_State_window {def main(args: Array[String]): Unit = {//创建SparkConfval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//设置检查点路径 用于保存状态ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//扁平映射val flatMapDS: DStream[String] = lineDStream.flatMap(_.split(" "))//设置窗口大小,滑动的步长val windowDS: DStream[String] = flatMapDS.window(Seconds(6),Seconds(3))//结构转换val mapDS: DStream[(String, Int)] = windowDS.map((_,1))//聚合val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)reduceDS.print()//启动ssc.start()ssc.awaitTermination()}
}
关于Window的操作还有如下方法:
1) window(windowLength, slideInterval)
基于对源DStream窗化的批次进行计算返回一个新的Dstream
2) countByWindow(windowLength, slideInterval)
返回一个滑动窗口计数流中的元素个数
3) countByValueAndWindow()
返回的DStream则包含窗口中每个值的个数
4) reduceByWindow(func, windowLength, slideInterval)
通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流
5) reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值
6) reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。如果把3秒的时间窗口当成一个池塘,池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。而第二个函数是,每游出去一条鱼,就将该鱼的总数减去一。
相关文章:
Spark Streaming DStream转换
DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的算子,如:updateStateByKey()、transform()以及各种Window相…...
水果商城,可运行
文章目录项目介绍一、技术栈二、本项目分为前后台,有管理员与用户两种角色;1、管理员角色包含以下功能:2、用户角色包含以下功能:三、用户功能页面展示四、管理员功能页面展示五、部分代码展示六、获取整套项目源码项目介绍 一、…...
LiveGBS国标GB/T28181国标视频流媒体平台-功能报警订阅配置报警预案告警截图及录像
LiveGBS国标GB/T28181国标视频流媒体平台-功能报警订阅配置报警预案告警截图及录像1、报警信息1.1、报警查询1.2、配置开启报警订阅1.2.1、国标设备编辑1.2.2、选择开启报警订阅1.3、配置摄像头报警1.3.1、配置摄像头报警通道ID1.3.2、配置摄像头开启侦测1.3.3、尝试触发摄像头…...
软件测试---测试分类
一 : 按测试对象划分 1.1 可靠性测试 可靠性(Availability)即可用性,是指系统正常运行的能力或者程度,一般用正常向用户提供软件服务的时间占总时间的百分比表示。 1.2 容错性测试 行李箱 , 四个轮子 , 坏了一个 , 说明这个容错…...
剑指 Offer II 015. 字符串中的所有变位词
题目链接 剑指 Offer II 015. 字符串中的所有变位词 mid 题目描述 给定两个字符串 s和 p,找到 s中所有 p的 变位词 的子串,返回这些子串的起始索引。不考虑答案输出的顺序。 变位词 指字母相同,但排列不同的字符串。 示例 1: 输…...
【SpringCloud】SpringCloud详细教程之微服务比较
目录前言一.什么是微服务?为什么要使用微服务二.微服务对比三.企业开发场景前言 我会通过实际代码来给展示每个组件的用法 一.什么是微服务?为什么要使用微服务 分布式,把一个项目拆分成多个模块,每一个模块相当于一个服务。 微…...
二.项目使用vue-router,引入ant-design-vue的UI框架,引入less
根据前文《使用Vue脚手架工具搭建vue项目》搭建好脚手架后使用 1.vue-router 2.引入UI框架ant design vue 3.引入less 1.vue-router vue-router分为两种模式(默认为hash模式): hash history hash: 特征: 1.hash会在浏览器路径里带#号&#…...
网络安全怎么学?20年白帽子老江湖告诉你
很多人都知道龙叔是个老程序员,但却不知道其实我也是个H客,20年前我就开始痴迷于H客技术,可以说是网络安全方面的老江湖了。 到现在,我还依然会去研究这一块,偶尔会和一些网安的朋友交流技术,比如说红盟的…...
药房管理系统;药库管理系统
第一,主要功能: 本系统集日常销售、药品进销存、会员积分、GSP管理等药店所需的所有功能于一体,实现店铺管理的全部自动化。第二、新功能: 增加了“按功能查询药品”的功能,使软件用户可以根据客户的症状推荐合适…...
深眸科技|机器视觉提升制造性能,焕发传统企业智造新活力!
随着机器视觉技术的成熟与发展,其在工业制造中得到越来越广泛的应用。机器视觉在工业制造领域的应用朝着智能识别、智能检测、智能测量以及智能互联的完整智能体系方向发展。此外,快速变化的市场需求,不断涌入行业的竞争对手,让传…...
ubuntu安装SSH的方法
Ubuntu安装SSH的方法。14版的ubuntu经过测试,默认没有开启SSH,所以需要安装。 1、虚拟机设置网卡为桥接模式,即NAT。12版虚拟机默认的。 2、查看ubuntu使用的ip。 ifconfig即可查看,14版的ubuntu自带这个命令。 3、查看是否pi…...
哪种蓝牙耳机通话效果好?通话清晰的蓝牙耳机推荐
出门的时候,如果戴耳机和别人通话,就不必把耳机摘下来,接电话变得前所未有的简单。现在的蓝牙耳机,已经不是单纯的用来听音乐了,而是一种更好的功能。下面这四款蓝牙耳机不仅适合听歌,通话还清晰࿰…...
IT运维如何完成一场高质量复盘
复盘的终极目标是:还原事实,找到薄弱点加以改进。 提到复盘,很多人的第一反应是线上故障,有人要背锅了。 复盘真正的价值是还原事实,在薄弱处加以改进。如何做一次高质量的复盘,我们给出3点建议。 1、坦…...
JVM调优面试题——基础知识
文章目录1、JDK,JRE以及JVM的关系2、编译器到底干了什么事?3、类加载机制是什么?3.1、装载(Load)3.2、链接(Link)3.3、初始化(Initialize)4、类加载器有哪些?5、什么是双亲委派机制?6、介绍一下JVM内存划分(…...
三、mongdb 查询
一、 MongoDB文档检索 MongoDB中有多种方式可以检索文档: 1.1 查询过滤器 使用查询过滤器从集合中检索文档。查询过滤器是一组键值对,可按字段值查询文档。 例如: db.col.find({"status":"A"})这个示例查询status等于“A”的文档。 1.2 范围查询操作符…...
python的 ping 网络状态监测方法(含多IP)
ping 基本概念 ping (Packet Internet Groper)是一种因特网包探索器,用于测试网络连接量的程序。Ping是工作在 TCP/IP网络体系结构中应用层的一个服务命令, 主要是向特定的目的主机发送 ICMP(Internet Control Messag…...
【独家】华为OD机试提供C语言题解 - 单词反转
最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 最近更新的博客使用说明单词…...
Linux docker环境安装,docker-compose安装,jdk17安装
安装docker 删除之前安装的docker yum remove docker \docker-client \docker-client-latest \docker- common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-sqlinux \docker-engine-selinux \docker-engine \docker-ce安装yum工具 yum install -y y…...
界面开发(3)--- PyQt5用户登录界面连接数据库
文章目录数据库账户注册账号登录找回密码为了实现用户登录界面的登录功能,我们必须建立一个数据库,并把账号和对应的密码,存储到数据库中。如果输入的账号和密码与数据库中的一致,那我们就允许用户登录,进入新的界面。…...
以下真的没有任何要写的了,我需要凑字数,请大家原谅
以下真的没有任何要写的了,我需要凑字数,请大家原谅!!!!!!!!!!!!!!!&#…...
linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...
使用VSCode开发Django指南
使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...
shell脚本--常见案例
1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件: 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
如何理解 IP 数据报中的 TTL?
目录 前言理解 前言 面试灵魂一问:说说对 IP 数据报中 TTL 的理解?我们都知道,IP 数据报由首部和数据两部分组成,首部又分为两部分:固定部分和可变部分,共占 20 字节,而即将讨论的 TTL 就位于首…...
服务器--宝塔命令
一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行! sudo su - 1. CentOS 系统: yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
PHP 8.5 即将发布:管道操作符、强力调试
前不久,PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5!作为 PHP 语言的又一次重要迭代,PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是,借助强大的本地开发环境 ServBay&am…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...
