Spark-Streaming有状态计算
一、上下文
《Spark-Streaming初识》中的NetworkWordCount示例只能统计每个微批下的单词的数量,那么如何才能统计从开始加载数据到当下的所有数量呢?下面我们就来通过官方例子学习下Spark-Streaming有状态计算。
二、官方例子
所属包:org.apache.spark.examples.streaming
object StatefulNetworkWordCount {def main(args: Array[String]): Unit = {if (args.length < 2) {System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")System.exit(1)}StreamingExamples.setStreamingLogLevels()val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")//创建微批为 1 秒的上下文val ssc = new StreamingContext(sparkConf, Seconds(1))//指定 checkpoint 目录ssc.checkpoint(".")// 用一个 List 初始化一个 RDDval initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))// 在目标ip:port上创建一个ReceiverInputDStream,并对分隔测试的输入流中的单词进行计数(例如由'nc'生成)val lines = ssc.socketTextStream(args(0), args(1).toInt)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))// 使用mapWithState更新累积计数这将给出一个由状态组成的DStream(即单词的累积计数)val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {val sum = one.getOrElse(0) + state.getOption.getOrElse(0)val output = (word, sum)state.update(sum)output}val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))stateDstream.print()ssc.start()ssc.awaitTermination()}
}
三、分析
1、构建SparkConf
它是Spark应用程序的配置,用于设置Spark的各种参数。支持链式设置
new SparkConf().setMaster("local").setAppName("My app")
一旦SparkConf对象传递给Spark,用户就不能再对其进行修改。Spark不支持在运行时修改配置
2、构建StreamingContext
它是Spark Streaming功能的主要入口点,且提供了从各种输入源创建[[org.apache.spark.streaming.dstream.DStream]] 的方法。
创建和转换DStreams后,可以分别使用start()、stop()启动和停止流计算,awaitTermination()允许当前线程通过stop()或异常等待上下文的终止。
3、设置checkpoint
StreamingContext最终还是通过SparkContext来设置checkpoint,但其实都是为各自的checkpointDir设置checkpoint路径,在有状态计算中checkpoint是必须的。
所谓有状态计算就必须要把历史状态给存储下来,spark中使用使用checkpoint来实现这个存储,每个微批的数据的计算都要更新到历史状态中。
class SparkContext(config: SparkConf) extends Logging {private[spark] var checkpointDir: Option[String] = None}
class StreamingContext private[streaming] (_sc: SparkContext,_cp: Checkpoint,_batchDur: Duration) extends Logging {private[streaming] var checkpointDir: String = {if (isCheckpointPresent) {sc.setCheckpointDir(_cp.checkpointDir)_cp.checkpointDir} else {null}}}
4、初始化一个RDD
为什么要初始化一个RDD呢?我们看看下面是如何用到的。
5、创建一个ReceiverInputDStream
这里是从TCP源hostname:port创建输入流。使用TCP套接字接收数据,并使用给定的转换器将接收字节解释为对象
6、处理单词
从源码中可以看出会把这样的文本
hadoop spark flink kafka hadoop spark-streaming
处理成这样的格式
hadoop 1
spark 1
flink 1
kafka 1
hadoop 1
spark-streaming 1
6、使用mapWithState更新累积计数
该算子可以维护并更新每个key的状态。
这里用到一个新对象:StateSpec,且用到了它的两个方法,initialState和function
initialState:设置包含“mapWithState”将使用的初始状态的RDD`
function:设置实际的状态更新操作
//第1个参数:状态 key 的类别
//第2个参数:状态 value 的类别
//第3个参数:状态 数据 的类别
//第4个参数:状态 处理完要返回 的类别
def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {// 使用state.exists()、state.get()、state.update()和state.remove()来管理状态,并返回必要的字符串
}
四、运行
运行Netcat
nc -lk 9999
新建一个窗口运行官方例子
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/
bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount cdh1 9999
大多数高校硕博生毕业要求需要参加学术会议,发表EI或者SCI检索的学术论文会议论文:
可访问艾思科蓝官网,浏览即将召开的学术会议列表。会议如下:
第四届大数据、信息与计算机网络国际学术会议(BDICN 2025)
- 广州
- https://ais.cn/u/fi2yym
第四届电子信息工程、大数据与计算机技术国际学术会议(EIBDCT 2025)
- 青岛
- https://ais.cn/u/nuQr6f
第六届大数据与信息化教育国际学术会议(ICBDIE 2025)
- 苏州
- https://ais.cn/u/eYnmQr
第三届通信网络与机器学习国际学术会议(CNML 2025)
- 南京
- https://ais.cn/u/vUNva2
相关文章:

Spark-Streaming有状态计算
一、上下文 《Spark-Streaming初识》中的NetworkWordCount示例只能统计每个微批下的单词的数量,那么如何才能统计从开始加载数据到当下的所有数量呢?下面我们就来通过官方例子学习下Spark-Streaming有状态计算。 二、官方例子 所属包:org.…...

Markdown如何导出Html文件Markdown文件
Markdown如何导出Html文件Markdown文件 前言语法详解小结其他文章快来试试吧☺️ Markdown 导出 HTML 👈点击这里也可查看 前言 Markdown的源文件以md为后缀。Markdown是HTML语法的简化版本,它本身不带有任何样式信息。我们所看到的Markdown网页(如&…...

使用Python进行图像裁剪和直方图分析
一、简介 在数字图像处理领域,裁剪和分析图像的直方图是两个非常基本且重要的操作。本文将通过一个简单的Python项目,展示如何使用skimage和matplotlib库来裁剪图像并分析其RGB通道的直方图。 二、环境准备 在开始之前,请确保你已经安装了以…...

企业内管信息化系统
本文结尾处获取源码。 本文结尾处获取源码。 本文结尾处获取源码。 一、相关技术 后端:Java、JavaWeb / Springboot。前端:Vue、HTML / CSS / Javascript 等。数据库:MySQL 二、相关软件(列出的软件其一均可运行) I…...

【python因果库实战15】因果生存分析4
这里写目录标题 加权标准化生存分析总结个体层面的生存曲线 加权标准化生存分析 我们还可以将加权与标准化结合起来,使用 WeightedStandardizedSurvival 模块。在这里,我们将逆倾向得分加权模型(根据基线协变量重新加权人群)与加…...

Linux 线程详解
目录 一、线程概述 二、线程创建 三、线程终止 四、线程回收 五、线程取消 六、线程分离 七、线程安全 一、线程概述 线程是进程内的一个执行单元,是进程内可调度的实体。一个进程可以包含多个线程,这些线程共享进程的资源,如内存空…...
云架构:考量与框架
云架构:考量与框架 引言 在当今的数字化环境中,云计算已成为现代商业运营的基石。一个设计良好的云架构框架为可扩展、安全和弹性的系统奠定了基础。本文将深入探讨云架构的核心要素,讨论重要的考量因素、设计指南,以及最佳实践…...

SD下载、安装、使用、卸载-Stable Diffusion整合包v4.10发布!
目录 前言概述 SD安装1、安装软件2、启动3、配置4、运行5、测试 导入SD模型【决定画风】常用模型下载安装模型 SD卸载SD文生图提示词提示词使用技巧提示词的高级使用技巧强调关键词 前言 我向来不喜欢搞一些没有用的概念,所以直接整理可能用到的东西。 sd简单的说…...
java 发送邮件
前期准备 pom文件中引入 JavaMail API 和 JavaBean Activation FrameWork,得到两个jar包:mail.jar 和 activation.jar 发送简单邮件(只有邮件正文,普通文本) package com.zbttest.email;import com.sun.mail.util.Ma…...

聚类系列 (二)——HDBSCAN算法详解
在进行组会汇报的时候,为了引出本研究动机(论文尚未发表,暂不介绍),需要对DBSCAN、OPTICS、和HDBSCAN算法等进行详细介绍。在查询相关资料的时候,发现网络上对于DBSCAN算法的介绍非常多与细致,但…...
AngularJS HTML DOM
关于《AngularJS HTML DOM》的文章,我找到了一些有用的信息。这篇文章主要介绍了AngularJS如何通过特定的指令与HTML DOM元素进行交互。以下是一些关键点: ng-disabled 指令:这个指令用于将应用程序数据绑定到HTML的disabled属性。例如&#…...
C语言延时实现
C语言延时实现 在C语言中,delay 函数通过空循环实现延时,而不是像其他高级语言(如Python)直接使用 sleep 函数。这种实现方式是基于单片机的特性和C语言的底层操作。下面详细解释为什么这种空循环可以实现延时,以及它…...

OSI模型的网络层中产生拥塞的主要原因?
( 1 )缓冲区容量有限;( 1.5 分) ( 2 )传输线路的带宽有限;( 1.5 分) ( 3 )网络结点的处理能力有限;( 1 分…...

机器学习周报-ModernTCN文献阅读
文章目录 摘要Abstract 0 提升有效感受野(ERF)1 相关知识1.1 标准卷积1.2 深度分离卷积(Depthwise Convolution,DWConv)1.3 逐点卷积(Pointwise Convolution,PWConv)1.4 组卷积(Grou…...

什么是网关路由
1.认识网关 网关(Gateway)和路由(Router)是两个相关但不同的概念。 一、网关(Gateway) 定义 网关是一个网络节点,它充当了不同网络之间的连接点。可以将其看作是一个网络的 “大门”…...

信号的产生、处理
一、信号的概念 信号是linux系统提供的一种,向指定进程发送特定事件的方式。收到信号的进程,要对信号做识别和处理。信号的产生是异步的,进程在工作过程中随时可能收到信号。 信号的种类分为以下这么多种(用指令kill -l查看&…...
在Linux中,zabbix如何监控脑裂?
在Linux中,zabbix监控脑裂主要涉及对高可用(HA)系统中可能发生的节点间通信中断或不一致状态的监控。脑裂问题通常发生在具有冗余节点的高可用系统中,如集群、HA系统或分布式数据库系统,当节点之间失去通信时ÿ…...

C++基础概念复习
前言 本篇文章作基础复习用,主要是在C学习中遇到的概念总结,后续会继续补充。如有不足,请前辈指出,万分感谢。 1、什么是封装,有何优点,在C中如何体现封装这一特性? 封装是面向对象编程&…...

Earth靶场
打开靶机后使用 arp-scan -l 查询靶机 ip 我们使用 nmap 进行 dns 解析 把这两条解析添加到hosts文件中去,这样我们才可以访问页面 这样网站就可以正常打开 扫描ip时候我们发现443是打开的,扫描第二个dns解析的443端口能扫描出来一个 txt 文件 dirsear…...
JavaScript 日期格式
在 JavaScript 中,日期格式可以通过 Date 对象进行操作和格式化。下面是一些常见的 JavaScript 日期格式及其示例: 1. ISO 8601 格式 ISO 8601 是一种标准的日期和时间表示方法,格式为 YYYY-MM-DDTHH:mm:ss.sssZ,例如: let date = new Date(); console.log(date.toISOS…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...

Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案
问题描述:iview使用table 中type: "index",分页之后 ,索引还是从1开始,试过绑定后台返回数据的id, 这种方法可行,就是后台返回数据的每个页面id都不完全是按照从1开始的升序,因此百度了下,找到了…...
多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验
一、多模态商品数据接口的技术架构 (一)多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如,当用户上传一张“蓝色连衣裙”的图片时,接口可自动提取图像中的颜色(RGB值&…...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...