Spark Streaming DStream转换
DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的算子,如:updateStateByKey()、transform()以及各种Window相关的算子。
无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。
例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。在之前的wordcount程序中,我们只会统计几秒内接收到的数据的单词个数,而不会累加。
Transform
Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。
object Spark06_Nostate_Transform {def main(args: Array[String]): Unit = {//创建SparkConfval conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(conf, Seconds(3))//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//转换为RDD操作val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {val words: RDD[String] = rdd.flatMap(_.split(" "))val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value})//打印wordAndCountDStream.print//启动ssc.start()ssc.awaitTermination()}
}
有状态转化操作
UpdateStateByKey
UpdateStateByKey算子用于将历史结果应用到当前批次,该操作允许在使用新信息不断更新状态的同时能够保留他的状态。
有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
UpdateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。
为使用这个功能,需要做下面两步:
1. 定义状态,状态可以是一个任意的数据类型。
2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
更新版的wordcount

例如:每隔一段时间景点人新增流量(从程序启动开始,在原有递增)
- 编写代码
object Spark07_State_updateStateByKey {def main(args: Array[String]): Unit = {//创建SparkConfval conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(conf, Seconds(3))//设置检查点路径 用于保存状态ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//扁平映射val flatMapDS: DStream[String] = lineDStream.flatMap(_.split(" "))//结构转换val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1))//聚合// 注意:DStreasm中reduceByKey只能对当前采集周期(窗口)进行聚合操作,没有状态//val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)val stateDS: DStream[(String, Int)] = mapDS.updateStateByKey((seq: Seq[Int], state: Option[Int]) => {Option(seq.sum + state.getOrElse(0))})
//打印输出stateDS.print()//启动ssc.start()ssc.awaitTermination()}
}
- 启动程序并向9999端口发送数据
[atguigu@hadoop202 ~]$ nc -lk 9999
- 查看结果为累加
Window Operations(窗口操作)
Spark Streaming 也提供了窗口计算, 允许执行转换操作作用在一个窗口内的数据。默认情况下, 计算只对一个时间段内的RDD进行, 有了窗口之后, 可以把计算应用到一个指定的窗口内的所有 RDD 上。一个窗口可以包含多个时间段,基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
- 窗口时长:计算内容的时间范围;
- 滑动步长:隔多久触发一次计算。
注意:这两者都必须为采集周期的整数倍。
如下图所示WordCount案例:窗口大小为批次的2倍,滑动步等于批次大小。

例如:一小时人流量的变化,窗口(6秒)和间隔(3秒)不一致,不一定从程序启动开始
需求:WordCount统计 3秒一个批次,窗口6秒,滑步3秒。
object Spark08_State_window {def main(args: Array[String]): Unit = {//创建SparkConfval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//设置检查点路径 用于保存状态ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)//扁平映射val flatMapDS: DStream[String] = lineDStream.flatMap(_.split(" "))//设置窗口大小,滑动的步长val windowDS: DStream[String] = flatMapDS.window(Seconds(6),Seconds(3))//结构转换val mapDS: DStream[(String, Int)] = windowDS.map((_,1))//聚合val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)reduceDS.print()//启动ssc.start()ssc.awaitTermination()}
}
关于Window的操作还有如下方法:
1) window(windowLength, slideInterval)
基于对源DStream窗化的批次进行计算返回一个新的Dstream
2) countByWindow(windowLength, slideInterval)
返回一个滑动窗口计数流中的元素个数
3) countByValueAndWindow()
返回的DStream则包含窗口中每个值的个数
4) reduceByWindow(func, windowLength, slideInterval)
通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流
5) reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值
6) reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。如果把3秒的时间窗口当成一个池塘,池塘每一秒都会有鱼游进或者游出,那么第一个函数表示每由进来一条鱼,就在该类鱼的数量上累加。而第二个函数是,每游出去一条鱼,就将该鱼的总数减去一。
相关文章:
Spark Streaming DStream转换
DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的算子,如:updateStateByKey()、transform()以及各种Window相…...
水果商城,可运行
文章目录项目介绍一、技术栈二、本项目分为前后台,有管理员与用户两种角色;1、管理员角色包含以下功能:2、用户角色包含以下功能:三、用户功能页面展示四、管理员功能页面展示五、部分代码展示六、获取整套项目源码项目介绍 一、…...
LiveGBS国标GB/T28181国标视频流媒体平台-功能报警订阅配置报警预案告警截图及录像
LiveGBS国标GB/T28181国标视频流媒体平台-功能报警订阅配置报警预案告警截图及录像1、报警信息1.1、报警查询1.2、配置开启报警订阅1.2.1、国标设备编辑1.2.2、选择开启报警订阅1.3、配置摄像头报警1.3.1、配置摄像头报警通道ID1.3.2、配置摄像头开启侦测1.3.3、尝试触发摄像头…...
软件测试---测试分类
一 : 按测试对象划分 1.1 可靠性测试 可靠性(Availability)即可用性,是指系统正常运行的能力或者程度,一般用正常向用户提供软件服务的时间占总时间的百分比表示。 1.2 容错性测试 行李箱 , 四个轮子 , 坏了一个 , 说明这个容错…...
剑指 Offer II 015. 字符串中的所有变位词
题目链接 剑指 Offer II 015. 字符串中的所有变位词 mid 题目描述 给定两个字符串 s和 p,找到 s中所有 p的 变位词 的子串,返回这些子串的起始索引。不考虑答案输出的顺序。 变位词 指字母相同,但排列不同的字符串。 示例 1: 输…...
【SpringCloud】SpringCloud详细教程之微服务比较
目录前言一.什么是微服务?为什么要使用微服务二.微服务对比三.企业开发场景前言 我会通过实际代码来给展示每个组件的用法 一.什么是微服务?为什么要使用微服务 分布式,把一个项目拆分成多个模块,每一个模块相当于一个服务。 微…...
二.项目使用vue-router,引入ant-design-vue的UI框架,引入less
根据前文《使用Vue脚手架工具搭建vue项目》搭建好脚手架后使用 1.vue-router 2.引入UI框架ant design vue 3.引入less 1.vue-router vue-router分为两种模式(默认为hash模式): hash history hash: 特征: 1.hash会在浏览器路径里带#号&#…...
网络安全怎么学?20年白帽子老江湖告诉你
很多人都知道龙叔是个老程序员,但却不知道其实我也是个H客,20年前我就开始痴迷于H客技术,可以说是网络安全方面的老江湖了。 到现在,我还依然会去研究这一块,偶尔会和一些网安的朋友交流技术,比如说红盟的…...
药房管理系统;药库管理系统
第一,主要功能: 本系统集日常销售、药品进销存、会员积分、GSP管理等药店所需的所有功能于一体,实现店铺管理的全部自动化。第二、新功能: 增加了“按功能查询药品”的功能,使软件用户可以根据客户的症状推荐合适…...
深眸科技|机器视觉提升制造性能,焕发传统企业智造新活力!
随着机器视觉技术的成熟与发展,其在工业制造中得到越来越广泛的应用。机器视觉在工业制造领域的应用朝着智能识别、智能检测、智能测量以及智能互联的完整智能体系方向发展。此外,快速变化的市场需求,不断涌入行业的竞争对手,让传…...
ubuntu安装SSH的方法
Ubuntu安装SSH的方法。14版的ubuntu经过测试,默认没有开启SSH,所以需要安装。 1、虚拟机设置网卡为桥接模式,即NAT。12版虚拟机默认的。 2、查看ubuntu使用的ip。 ifconfig即可查看,14版的ubuntu自带这个命令。 3、查看是否pi…...
哪种蓝牙耳机通话效果好?通话清晰的蓝牙耳机推荐
出门的时候,如果戴耳机和别人通话,就不必把耳机摘下来,接电话变得前所未有的简单。现在的蓝牙耳机,已经不是单纯的用来听音乐了,而是一种更好的功能。下面这四款蓝牙耳机不仅适合听歌,通话还清晰࿰…...
IT运维如何完成一场高质量复盘
复盘的终极目标是:还原事实,找到薄弱点加以改进。 提到复盘,很多人的第一反应是线上故障,有人要背锅了。 复盘真正的价值是还原事实,在薄弱处加以改进。如何做一次高质量的复盘,我们给出3点建议。 1、坦…...
JVM调优面试题——基础知识
文章目录1、JDK,JRE以及JVM的关系2、编译器到底干了什么事?3、类加载机制是什么?3.1、装载(Load)3.2、链接(Link)3.3、初始化(Initialize)4、类加载器有哪些?5、什么是双亲委派机制?6、介绍一下JVM内存划分(…...
三、mongdb 查询
一、 MongoDB文档检索 MongoDB中有多种方式可以检索文档: 1.1 查询过滤器 使用查询过滤器从集合中检索文档。查询过滤器是一组键值对,可按字段值查询文档。 例如: db.col.find({"status":"A"})这个示例查询status等于“A”的文档。 1.2 范围查询操作符…...
python的 ping 网络状态监测方法(含多IP)
ping 基本概念 ping (Packet Internet Groper)是一种因特网包探索器,用于测试网络连接量的程序。Ping是工作在 TCP/IP网络体系结构中应用层的一个服务命令, 主要是向特定的目的主机发送 ICMP(Internet Control Messag…...
【独家】华为OD机试提供C语言题解 - 单词反转
最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 最近更新的博客使用说明单词…...
Linux docker环境安装,docker-compose安装,jdk17安装
安装docker 删除之前安装的docker yum remove docker \docker-client \docker-client-latest \docker- common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-sqlinux \docker-engine-selinux \docker-engine \docker-ce安装yum工具 yum install -y y…...
界面开发(3)--- PyQt5用户登录界面连接数据库
文章目录数据库账户注册账号登录找回密码为了实现用户登录界面的登录功能,我们必须建立一个数据库,并把账号和对应的密码,存储到数据库中。如果输入的账号和密码与数据库中的一致,那我们就允许用户登录,进入新的界面。…...
以下真的没有任何要写的了,我需要凑字数,请大家原谅
以下真的没有任何要写的了,我需要凑字数,请大家原谅!!!!!!!!!!!!!!!&#…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
遍历 Map 类型集合的方法汇总
1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...
visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...
YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
css的定位(position)详解:相对定位 绝对定位 固定定位
在 CSS 中,元素的定位通过 position 属性控制,共有 5 种定位模式:static(静态定位)、relative(相对定位)、absolute(绝对定位)、fixed(固定定位)和…...
SpringTask-03.入门案例
一.入门案例 启动类: package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...
