Spark-Streaming有状态计算
一、上下文
《Spark-Streaming初识》中的NetworkWordCount示例只能统计每个微批下的单词的数量,那么如何才能统计从开始加载数据到当下的所有数量呢?下面我们就来通过官方例子学习下Spark-Streaming有状态计算。
二、官方例子
所属包:org.apache.spark.examples.streaming
object StatefulNetworkWordCount {def main(args: Array[String]): Unit = {if (args.length < 2) {System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")//创建微批为 1 秒的上下文val ssc = new StreamingContext(sparkConf, Seconds(1))//指定 checkpoint 目录ssc.checkpoint(".")// 用一个 List 初始化一个 RDDval initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))// 在目标ip:port上创建一个ReceiverInputDStream,并对分隔测试的输入流中的单词进行计数(例如由'nc'生成)val lines = ssc.socketTextStream(args(0), args(1).toInt)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))// 使用mapWithState更新累积计数这将给出一个由状态组成的DStream(即单词的累积计数)val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {val sum = one.getOrElse(0) + state.getOption.getOrElse(0)val output = (word, sum)state.update(sum)output}val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))stateDstream.print()ssc.start()ssc.awaitTermination()}
}
三、分析
1、构建SparkConf
它是Spark应用程序的配置,用于设置Spark的各种参数。支持链式设置
new SparkConf().setMaster("local").setAppName("My app")
一旦SparkConf对象传递给Spark,用户就不能再对其进行修改。Spark不支持在运行时修改配置
2、构建StreamingContext
它是Spark Streaming功能的主要入口点,且提供了从各种输入源创建[[org.apache.spark.streaming.dstream.DStream]] 的方法。
创建和转换DStreams后,可以分别使用start()、stop()启动和停止流计算,awaitTermination()允许当前线程通过stop()或异常等待上下文的终止。
3、设置checkpoint
StreamingContext最终还是通过SparkContext来设置checkpoint,但其实都是为各自的checkpointDir设置checkpoint路径,在有状态计算中checkpoint是必须的。
所谓有状态计算就必须要把历史状态给存储下来,spark中使用使用checkpoint来实现这个存储,每个微批的数据的计算都要更新到历史状态中。
class SparkContext(config: SparkConf) extends Logging {private[spark] var checkpointDir: Option[String] = None}
class StreamingContext private[streaming] (_sc: SparkContext,_cp: Checkpoint,_batchDur: Duration) extends Logging {private[streaming] var checkpointDir: String = {if (isCheckpointPresent) {sc.setCheckpointDir(_cp.checkpointDir)_cp.checkpointDir} else {null}}}
4、初始化一个RDD
为什么要初始化一个RDD呢?我们看看下面是如何用到的。
5、创建一个ReceiverInputDStream
这里是从TCP源hostname:port创建输入流。使用TCP套接字接收数据,并使用给定的转换器将接收字节解释为对象
6、处理单词
从源码中可以看出会把这样的文本
hadoop spark flink kafka hadoop spark-streaming
处理成这样的格式
hadoop 1
spark 1
flink 1
kafka 1
hadoop 1
spark-streaming 1
6、使用mapWithState更新累积计数
该算子可以维护并更新每个key的状态。
这里用到一个新对象:StateSpec,且用到了它的两个方法,initialState和function
initialState:设置包含“mapWithState”将使用的初始状态的RDD`
function:设置实际的状态更新操作
//第1个参数:状态 key 的类别
//第2个参数:状态 value 的类别
//第3个参数:状态 数据 的类别
//第4个参数:状态 处理完要返回 的类别
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {// 使用state.exists()、state.get()、state.update()和state.remove()来管理状态,并返回必要的字符串
}
四、运行
运行Netcat
nc -lk 9999
新建一个窗口运行官方例子
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount cdh1 9999


大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:
第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)
- 广州
- https://ais.cn/u/fi2yym
第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)
- 青岛
- https://ais.cn/u/nuQr6f
第六届大数据与信息化教育国际学术会议(ICBDIE 2025)
- 苏州
- https://ais.cn/u/eYnmQr
第三届通信网络与机器学习国际学术会议(CNML 2025)
- 南京
- https://ais.cn/u/vUNva2
相关文章:
Spark-Streaming有状态计算
一、上下文 《Spark-Streaming初识》中的NetworkWordCount示例只能统计每个微批下的单词的数量,那么如何才能统计从开始加载数据到当下的所有数量呢?下面我们就来通过官方例子学习下Spark-Streaming有状态计算。 二、官方例子 所属包:org.…...
Markdown如何导出Html文件Markdown文件
Markdown如何导出Html文件Markdown文件 前言语法详解小结其他文章快来试试吧☺️ Markdown 导出 HTML 👈点击这里也可查看 前言 Markdown的源文件以md为后缀。Markdown是HTML语法的简化版本,它本身不带有任何样式信息。我们所看到的Markdown网页(如&…...
使用Python进行图像裁剪和直方图分析
一、简介 在数字图像处理领域,裁剪和分析图像的直方图是两个非常基本且重要的操作。本文将通过一个简单的Python项目,展示如何使用skimage和matplotlib库来裁剪图像并分析其RGB通道的直方图。 二、环境准备 在开始之前,请确保你已经安装了以…...
企业内管信息化系统
本文结尾处获取源码。 本文结尾处获取源码。 本文结尾处获取源码。 一、相关技术 后端:Java、JavaWeb / Springboot。前端:Vue、HTML / CSS / Javascript 等。数据库:MySQL 二、相关软件(列出的软件其一均可运行) I…...
【python因果库实战15】因果生存分析4
这里写目录标题 加权标准化生存分析总结个体层面的生存曲线 加权标准化生存分析 我们还可以将加权与标准化结合起来,使用 WeightedStandardizedSurvival 模块。在这里,我们将逆倾向得分加权模型(根据基线协变量重新加权人群)与加…...
Linux 线程详解
目录 一、线程概述 二、线程创建 三、线程终止 四、线程回收 五、线程取消 六、线程分离 七、线程安全 一、线程概述 线程是进程内的一个执行单元,是进程内可调度的实体。一个进程可以包含多个线程,这些线程共享进程的资源,如内存空…...
云架构:考量与框架
云架构:考量与框架 引言 在当今的数字化环境中,云计算已成为现代商业运营的基石。一个设计良好的云架构框架为可扩展、安全和弹性的系统奠定了基础。本文将深入探讨云架构的核心要素,讨论重要的考量因素、设计指南,以及最佳实践…...
SD下载、安装、使用、卸载-Stable Diffusion整合包v4.10发布!
目录 前言概述 SD安装1、安装软件2、启动3、配置4、运行5、测试 导入SD模型【决定画风】常用模型下载安装模型 SD卸载SD文生图提示词提示词使用技巧提示词的高级使用技巧强调关键词 前言 我向来不喜欢搞一些没有用的概念,所以直接整理可能用到的东西。 sd简单的说…...
java 发送邮件
前期准备 pom文件中引入 JavaMail API 和 JavaBean Activation FrameWork,得到两个jar包:mail.jar 和 activation.jar 发送简单邮件(只有邮件正文,普通文本) package com.zbttest.email;import com.sun.mail.util.Ma…...
聚类系列 (二)——HDBSCAN算法详解
在进行组会汇报的时候,为了引出本研究动机(论文尚未发表,暂不介绍),需要对DBSCAN、OPTICS、和HDBSCAN算法等进行详细介绍。在查询相关资料的时候,发现网络上对于DBSCAN算法的介绍非常多与细致,但…...
AngularJS HTML DOM
关于《AngularJS HTML DOM》的文章,我找到了一些有用的信息。这篇文章主要介绍了AngularJS如何通过特定的指令与HTML DOM元素进行交互。以下是一些关键点: ng-disabled 指令:这个指令用于将应用程序数据绑定到HTML的disabled属性。例如&#…...
C语言延时实现
C语言延时实现 在C语言中,delay 函数通过空循环实现延时,而不是像其他高级语言(如Python)直接使用 sleep 函数。这种实现方式是基于单片机的特性和C语言的底层操作。下面详细解释为什么这种空循环可以实现延时,以及它…...
OSI模型的网络层中产生拥塞的主要原因?
( 1 )缓冲区容量有限;( 1.5 分) ( 2 )传输线路的带宽有限;( 1.5 分) ( 3 )网络结点的处理能力有限;( 1 分…...
机器学习周报-ModernTCN文献阅读
文章目录 摘要Abstract 0 提升有效感受野(ERF)1 相关知识1.1 标准卷积1.2 深度分离卷积(Depthwise Convolution,DWConv)1.3 逐点卷积(Pointwise Convolution,PWConv)1.4 组卷积(Grou…...
什么是网关路由
1.认识网关 网关(Gateway)和路由(Router)是两个相关但不同的概念。 一、网关(Gateway) 定义 网关是一个网络节点,它充当了不同网络之间的连接点。可以将其看作是一个网络的 “大门”…...
信号的产生、处理
一、信号的概念 信号是linux系统提供的一种,向指定进程发送特定事件的方式。收到信号的进程,要对信号做识别和处理。信号的产生是异步的,进程在工作过程中随时可能收到信号。 信号的种类分为以下这么多种(用指令kill -l查看&…...
在Linux中,zabbix如何监控脑裂?
在Linux中,zabbix监控脑裂主要涉及对高可用(HA)系统中可能发生的节点间通信中断或不一致状态的监控。脑裂问题通常发生在具有冗余节点的高可用系统中,如集群、HA系统或分布式数据库系统,当节点之间失去通信时ÿ…...
C++基础概念复习
前言 本篇文章作基础复习用,主要是在C学习中遇到的概念总结,后续会继续补充。如有不足,请前辈指出,万分感谢。 1、什么是封装,有何优点,在C中如何体现封装这一特性? 封装是面向对象编程&…...
Earth靶场
打开靶机后使用 arp-scan -l 查询靶机 ip 我们使用 nmap 进行 dns 解析 把这两条解析添加到hosts文件中去,这样我们才可以访问页面 这样网站就可以正常打开 扫描ip时候我们发现443是打开的,扫描第二个dns解析的443端口能扫描出来一个 txt 文件 dirsear…...
JavaScript 日期格式
在 JavaScript 中,日期格式可以通过 Date 对象进行操作和格式化。下面是一些常见的 JavaScript 日期格式及其示例: 1. ISO 8601 格式 ISO 8601 是一种标准的日期和时间表示方法,格式为 YYYY-MM-DDTHH:mm:ss.sssZ,例如: let date = new Date(); console.log(date.toISOS…...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
Java编程之桥接模式
定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...
C++.OpenGL (20/64)混合(Blending)
混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...
三分算法与DeepSeek辅助证明是单峰函数
前置 单峰函数有唯一的最大值,最大值左侧的数值严格单调递增,最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值,最小值左侧的数值严格单调递减,最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...
Kafka主题运维全指南:从基础配置到故障处理
#作者:张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1:主题删除失败。常见错误2:__consumer_offsets占用太多的磁盘。 主题日常管理 …...
ThreadLocal 源码
ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物,因为每个访问一个线程局部变量的线程(通过其 get 或 set 方法)都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段,这些类希望将…...
