Spark Streaming编程基础
文章目录
- 1. 流式词频统计
- 1.1 Spark Streaming编程步骤
- 1.2 流式词频统计项目
- 1.2.1 创建项目
- 1.2.2 添加项目依赖
- 1.2.3 修改源目录
- 1.2.4 添加scala-sdk库
- 1.2.5 创建日志属性文件
- 1.3 创建词频统计对象
- 1.4 利用nc发送数据
- 1.5 启动应用,查看结果
- 2. 编程模型的基本概念
- 3. 离散化数据流
- 4. 基本数据源
- 5. 基本DStream转换操作
- 6. DStream输出操作
1. 流式词频统计
- 本实战演示了如何使用 Spark Streaming 实现实时词频统计。通过创建 Spark Streaming 项目,添加依赖,编写 Scala 代码,监听网络端口接收数据流,并按批次处理数据。利用
nc工具发送数据,程序每10秒统计一次词频并输出结果。该示例展示了 Spark Streaming 的微批处理特性,适用于实时数据处理场景。
1.1 Spark Streaming编程步骤
- 添加SparkStreaming相关依赖
- 获取程序入口接收数据
- 对数据进行业务处理
- 获取最终结果
- 启动程序等待程序执行结束
1.2 流式词频统计项目
1.2.1 创建项目
- 设置项目基本信息

- 单击【Create】按钮,生成项目基本骨架

1.2.2 添加项目依赖
- 在
pom.xml文件里添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.huawei.streaming</groupId><artifactId>SparkStreamingDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies></project>
- 刷新项目依赖

1.2.3 修改源目录
-
将
java修改为scala

-
在
pom.xml里设置源目录

1.2.4 添加scala-sdk库
- 在项目结构对话里添加

- 单击【Add to Modules】菜单项

- 单击【OK】按钮以后,就可以在
scala里创建Scala Class了

1.2.5 创建日志属性文件
- 在
resources里创建log4j2.properties文件

rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
1.3 创建词频统计对象
- 创建
net.huawei.streaming包

- 在
net.huawei.streaming包里创建SparkStreamingWordCount对象

package net.huawei.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 功能:流式词频统计* 作者:华卫* 日期:2025年01月23日*/
object SparkStreamingWordCount {def main(args: Array[String]): Unit = {// 创建SparkConf对象,2个线程,本地运行val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")// 创建StreamingContext对象,10秒一个批次val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))// 创建ReceiverInputDStream对象接收来自网络端口的数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata1", 9999)// lines中每条数据按照空格进行切分然后扁平化处理val words: DStream[String] = lines.flatMap(_.split(" "))// words中每条数据转换成(word,1)二元组val wordmap: DStream[(String, Int)] = words.map(word => (word, 1))// wordmap中每条数据按key分组,按value进行累加求和val wordcount: DStream[(String, Int)] = wordmap.reduceByKey(_ + _)// 打印词频统计结果 wordcount.print()// 启动实时流程序ssc.start()// 等待实时流程序结束ssc.awaitTermination()}
}
- 代码说明:这段代码实现了一个基于Spark Streaming的实时词频统计程序。它通过监听指定端口(
bigdata1:9999)接收数据流,将每行数据按空格切分并扁平化为单词,然后统计每个单词的出现次数。程序每10秒处理一个批次的数据,并打印词频统计结果。代码结构清晰,适用于实时数据处理场景。
1.4 利用nc发送数据
- 在
bigdata1节点利用nc发送数据,执行命令:nc -lp 9999

1.5 启动应用,查看结果
- 启动
SparkStreamingWordCount对象,在bigdata1节点上输入数据,在控制台查看词频统计结果

- 结果说明:Spark Streaming 采用微批处理,每批次数据独立处理,批次间不共享状态或共同计数。默认情况下,批次间数据互不影响。如需跨批次状态管理,可使用
updateStateByKey或mapWithState实现累加计数等功能。这种设计确保了流数据处理的灵活性和高效性。
2. 编程模型的基本概念
3. 离散化数据流
4. 基本数据源
5. 基本DStream转换操作
6. DStream输出操作
相关文章:
Spark Streaming编程基础
文章目录 1. 流式词频统计1.1 Spark Streaming编程步骤1.2 流式词频统计项目1.2.1 创建项目1.2.2 添加项目依赖1.2.3 修改源目录1.2.4 添加scala-sdk库1.2.5 创建日志属性文件 1.3 创建词频统计对象1.4 利用nc发送数据1.5 启动应用,查看结果 2. 编程模型的基本概念3…...
深入 Flutter 和 Compose 的 PlatformView 实现对比,它们是如何接入平台控件
在上一篇《深入 Flutter 和 Compose 在 UI 渲染刷新时 Diff 实现对比》发布之后,收到了大佬的“催稿”,想了解下 Flutter 和 Compose 在 PlatformView 实现上的对比,恰好过去写过不少 Flutter 上对于 PlatformView 的实现,这次恰好…...
C# OpenCV机器视觉:红外体温检测
在一个骄阳似火的夏日,全球却被一场突如其来的疫情阴霾笼罩。阿强所在的小镇,平日里熙熙攘攘的街道变得冷冷清清,人们戴着口罩,行色匆匆,眼神中满是对病毒的恐惧。阿强作为镇上小有名气的科技达人,看着这一…...
FCA-FineDataLink认证
FCA-FineDataLink证书 Part.1:判断题 (总分:18分 得分:16) 第1题 判断题 数据同步只支持写入到已存在表,不支持自动建表(得分:2分 满分:2分) 正确答案:B 你的答案&…...
第19篇:python高级编程进阶:使用Flask进行Web开发
第19篇:python高级编程进阶:使用Flask进行Web开发 内容简介 在第18篇文章中,我们介绍了Web开发的基础知识,并使用Flask框架构建了一个简单的Web应用。本篇文章将深入探讨Flask的高级功能,涵盖模板引擎(Ji…...
js截取video视频某一帧为图片
1.代码如下 <template><div class"box"><div class"video-box"><video controls ref"videoRef" preload"true"src"https://qt-minio.ictshop.com.cn:9000/resource-management/2025/01/08/7b96ac9d957c45a…...
[云讷科技]Kerloud Falcon四旋翼飞车虚拟仿真空间发布
虚拟仿真环境作为一个独立的专有软件包提供给我们的客户,用于帮助用户在实际测试之前验证自身的代码,并通过在仿真引擎中添加新的场景来探索新的飞行驾驶功能。 环境要求 由于环境依赖关系,虚拟仿真只能运行在装有Ubuntu 18.04的Intel-64位…...
Jetson nano 安装 PCL 指南
本指南帮助 ARM64 架构的 Jetson Nano 安装 PCL(点云库)。 安装步骤 第一步:安装依赖 在终端中运行以下命令,安装 PCL 所需的依赖: sudo apt-get update sudo apt-get install git build-essential linux-libc-dev s…...
go-zero框架基本配置和错误码封装
文章目录 加载配置信息配置 env加载.env文件配置servicecontext 查询数据生成model文件执行查询操作 错误码封装配置拦截器错误码封装 接上一篇:《go-zero框架快速入门》 加载配置信息 配置 env 在项目根目录下新增 .env 文件,可以配置当前读取哪个环…...
Android中Service在新进程中的启动流程2
目录 1、Service在客户端的启动入口 2、Service启动在AMS的处理 3、Service在新进程中的启动 4、Service与AMS的关系再续 上一篇文章中我们了解了Service在新进程中启动的大致流程,同时认识了与客户端进程交互的接口IApplicationThread以及与AMS交互的接口IActi…...
论文速读|Matrix-SSL:Matrix Information Theory for Self-Supervised Learning.ICML24
论文地址:Matrix Information Theory for Self-Supervised Learning 代码地址:https://github.com/yifanzhang-pro/matrix-ssl bib引用: article{zhang2023matrix,title{Matrix Information Theory for Self-Supervised Learning},author{Zh…...
ubunut22.04安装docker(基于阿里云 Docker 镜像源安装 Docker)
安装 更新包管理器: sudo apt update 安装 Docker 的依赖包 sudo apt install apt-transport-https ca-certificates curl gnupg lsb-release添加阿里云 Docker 镜像源 GPG 密钥: curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gp…...
k8s namespace绑定节点
k8s namespace绑定节点 1. apiserver 启用准入控制 PodNodeSelector2. namespace 添加注解 scheduler.alpha.kubernetes.io/node-selector3. label node 1. apiserver 启用准入控制 PodNodeSelector vim /etc/kubernetes/manifests/kube-apiserver.yaml spec:containers:- co…...
【ElementPlus】在Vue3中实现表格组件封装
预览 搜索筛选组件 <template><div><el-formref"formView":model"formData"label-width"auto"label-position"right":label-col-style"{ min-width: 100px }":inline"true"><el-form-item …...
cursor重构谷粒商城04——vagrant技术快速部署虚拟机
前言:这个系列将使用最前沿的cursor作为辅助编程工具,来快速开发一些基础的编程项目。目的是为了在真实项目中,帮助初级程序员快速进阶,以最快的速度,效率,快速进阶到中高阶程序员。 本项目将基于谷粒商城…...
26、正则表达式
目录 一. 匹配字符 .:匹配除换行符外的任意单个字符。 二. 位置锚点 ^:匹配输入字符串的开始位置。 $:匹配输入字符串的结束位置。 \b:匹配单词边界。 \B:匹配非单词边界。 三. 重复限定符 *:匹配…...
SpringBoot使用MockMVC通过http请求controller控制器调用测试
说明 在Spring Boot中编写测试控制器调用是一个常见的需求,通常使用Spring的测试框架来完成。Spring Boot提供了多种方式来测试控制器,包括使用MockMvc进行模拟HTTP请求和响应的测试。 基本示例 1. 创建Spring Boot项目 首先,确保你已经创建了一个Spring Boot项目。如果…...
【Unity3D】Unity混淆工具Obfuscator使用
目录 一、导入工具 二、各种混淆形式介绍 2.1 程序集混淆 2.2 命名空间混淆 2.3 类混淆 2.4 函数混淆 2.5 参数混淆 2.6 字段混淆 2.7 属性混淆 2.8 事件混淆 三、安全混淆 四、兼容性处理 4.1 动画方法兼容 4.2 GUI方法兼容 4.3 协程方法兼容 五、选项 5.1 调…...
C语言语法基础学习—动态分配空间(new和malloc的用法及区别)
前言 在 C 语言中,动态内存分配主要是通过 malloc() 和 free() 函数来完成的。而在 C 中是使用new和delete关键字,来动态分配内存。 虽然 C 语言没有 new,但 malloc() 和 new 在内存分配上的作用是相似的。下面我们详细解释 malloc() 和 ne…...
QT:控件属性及常用控件(3)-----输入类控件(正则表达式)
输入类控件既可以进行显示,也能让用户输入一些内容! 文章目录 1.Line Edit1.1 用户输入个人信息1.2 基于正则表达式的文本限制1.3 验证两次输入的密码是否一致1.4 让输入的密码可以被查看 2.Text Edit2.1 输入和显示同步2.1 其他信号出发情况 3.ComboBox…...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...
逻辑回归暴力训练预测金融欺诈
简述 「使用逻辑回归暴力预测金融欺诈,并不断增加特征维度持续测试」的做法,体现了一种逐步建模与迭代验证的实验思路,在金融欺诈检测中非常有价值,本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...
算法打卡第18天
从中序与后序遍历序列构造二叉树 (力扣106题) 给定两个整数数组 inorder 和 postorder ,其中 inorder 是二叉树的中序遍历, postorder 是同一棵树的后序遍历,请你构造并返回这颗 二叉树 。 示例 1: 输入:inorder [9,3,15,20,7…...
一些实用的chrome扩展0x01
简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序,无论是测试应用程序、搜寻漏洞还是收集情报,它们都能提升工作流程。 FoxyProxy 代理管理工具,此扩展简化了使用代理(如 Burp…...
2025年- H71-Lc179--39.组合总和(回溯,组合)--Java版
1.题目描述 2.思路 当前的元素可以重复使用。 (1)确定回溯算法函数的参数和返回值(一般是void类型) (2)因为是用递归实现的,所以我们要确定终止条件 (3)单层搜索逻辑 二…...
React核心概念:State是什么?如何用useState管理组件自己的数据?
系列回顾: 在上一篇《React入门第一步》中,我们已经成功创建并运行了第一个React项目。我们学会了用Vite初始化项目,并修改了App.jsx组件,让页面显示出我们想要的文字。但是,那个页面是“死”的,它只是静态…...
