Spark Streaming编程基础
文章目录
- 1. 流式词频统计
- 1.1 Spark Streaming编程步骤
- 1.2 流式词频统计项目
- 1.2.1 创建项目
- 1.2.2 添加项目依赖
- 1.2.3 修改源目录
- 1.2.4 添加scala-sdk库
- 1.2.5 创建日志属性文件
- 1.3 创建词频统计对象
- 1.4 利用nc发送数据
- 1.5 启动应用,查看结果
- 2. 编程模型的基本概念
- 3. 离散化数据流
- 4. 基本数据源
- 5. 基本DStream转换操作
- 6. DStream输出操作
1. 流式词频统计
- 本实战演示了如何使用 Spark Streaming 实现实时词频统计。通过创建 Spark Streaming 项目,添加依赖,编写 Scala 代码,监听网络端口接收数据流,并按批次处理数据。利用
nc工具发送数据,程序每10秒统计一次词频并输出结果。该示例展示了 Spark Streaming 的微批处理特性,适用于实时数据处理场景。
1.1 Spark Streaming编程步骤
- 添加SparkStreaming相关依赖
- 获取程序入口接收数据
- 对数据进行业务处理
- 获取最终结果
- 启动程序等待程序执行结束
1.2 流式词频统计项目
1.2.1 创建项目
- 设置项目基本信息

- 单击【Create】按钮,生成项目基本骨架

1.2.2 添加项目依赖
- 在
pom.xml文件里添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.huawei.streaming</groupId><artifactId>SparkStreamingDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies></project>
- 刷新项目依赖

1.2.3 修改源目录
-
将
java修改为scala

-
在
pom.xml里设置源目录

1.2.4 添加scala-sdk库
- 在项目结构对话里添加

- 单击【Add to Modules】菜单项

- 单击【OK】按钮以后,就可以在
scala里创建Scala Class了

1.2.5 创建日志属性文件
- 在
resources里创建log4j2.properties文件

rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
1.3 创建词频统计对象
- 创建
net.huawei.streaming包

- 在
net.huawei.streaming包里创建SparkStreamingWordCount对象

package net.huawei.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 功能:流式词频统计* 作者:华卫* 日期:2025年01月23日*/
object SparkStreamingWordCount {def main(args: Array[String]): Unit = {// 创建SparkConf对象,2个线程,本地运行val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")// 创建StreamingContext对象,10秒一个批次val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))// 创建ReceiverInputDStream对象接收来自网络端口的数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata1", 9999)// lines中每条数据按照空格进行切分然后扁平化处理val words: DStream[String] = lines.flatMap(_.split(" "))// words中每条数据转换成(word,1)二元组val wordmap: DStream[(String, Int)] = words.map(word => (word, 1))// wordmap中每条数据按key分组,按value进行累加求和val wordcount: DStream[(String, Int)] = wordmap.reduceByKey(_ + _)// 打印词频统计结果 wordcount.print()// 启动实时流程序ssc.start()// 等待实时流程序结束ssc.awaitTermination()}
}
- 代码说明:这段代码实现了一个基于Spark Streaming的实时词频统计程序。它通过监听指定端口(
bigdata1:9999)接收数据流,将每行数据按空格切分并扁平化为单词,然后统计每个单词的出现次数。程序每10秒处理一个批次的数据,并打印词频统计结果。代码结构清晰,适用于实时数据处理场景。
1.4 利用nc发送数据
- 在
bigdata1节点利用nc发送数据,执行命令:nc -lp 9999

1.5 启动应用,查看结果
- 启动
SparkStreamingWordCount对象,在bigdata1节点上输入数据,在控制台查看词频统计结果

- 结果说明:Spark Streaming 采用微批处理,每批次数据独立处理,批次间不共享状态或共同计数。默认情况下,批次间数据互不影响。如需跨批次状态管理,可使用
updateStateByKey或mapWithState实现累加计数等功能。这种设计确保了流数据处理的灵活性和高效性。
2. 编程模型的基本概念
3. 离散化数据流
4. 基本数据源
5. 基本DStream转换操作
6. DStream输出操作
相关文章:
Spark Streaming编程基础
文章目录 1. 流式词频统计1.1 Spark Streaming编程步骤1.2 流式词频统计项目1.2.1 创建项目1.2.2 添加项目依赖1.2.3 修改源目录1.2.4 添加scala-sdk库1.2.5 创建日志属性文件 1.3 创建词频统计对象1.4 利用nc发送数据1.5 启动应用,查看结果 2. 编程模型的基本概念3…...
深入 Flutter 和 Compose 的 PlatformView 实现对比,它们是如何接入平台控件
在上一篇《深入 Flutter 和 Compose 在 UI 渲染刷新时 Diff 实现对比》发布之后,收到了大佬的“催稿”,想了解下 Flutter 和 Compose 在 PlatformView 实现上的对比,恰好过去写过不少 Flutter 上对于 PlatformView 的实现,这次恰好…...
C# OpenCV机器视觉:红外体温检测
在一个骄阳似火的夏日,全球却被一场突如其来的疫情阴霾笼罩。阿强所在的小镇,平日里熙熙攘攘的街道变得冷冷清清,人们戴着口罩,行色匆匆,眼神中满是对病毒的恐惧。阿强作为镇上小有名气的科技达人,看着这一…...
FCA-FineDataLink认证
FCA-FineDataLink证书 Part.1:判断题 (总分:18分 得分:16) 第1题 判断题 数据同步只支持写入到已存在表,不支持自动建表(得分:2分 满分:2分) 正确答案:B 你的答案&…...
第19篇:python高级编程进阶:使用Flask进行Web开发
第19篇:python高级编程进阶:使用Flask进行Web开发 内容简介 在第18篇文章中,我们介绍了Web开发的基础知识,并使用Flask框架构建了一个简单的Web应用。本篇文章将深入探讨Flask的高级功能,涵盖模板引擎(Ji…...
js截取video视频某一帧为图片
1.代码如下 <template><div class"box"><div class"video-box"><video controls ref"videoRef" preload"true"src"https://qt-minio.ictshop.com.cn:9000/resource-management/2025/01/08/7b96ac9d957c45a…...
[云讷科技]Kerloud Falcon四旋翼飞车虚拟仿真空间发布
虚拟仿真环境作为一个独立的专有软件包提供给我们的客户,用于帮助用户在实际测试之前验证自身的代码,并通过在仿真引擎中添加新的场景来探索新的飞行驾驶功能。 环境要求 由于环境依赖关系,虚拟仿真只能运行在装有Ubuntu 18.04的Intel-64位…...
Jetson nano 安装 PCL 指南
本指南帮助 ARM64 架构的 Jetson Nano 安装 PCL(点云库)。 安装步骤 第一步:安装依赖 在终端中运行以下命令,安装 PCL 所需的依赖: sudo apt-get update sudo apt-get install git build-essential linux-libc-dev s…...
go-zero框架基本配置和错误码封装
文章目录 加载配置信息配置 env加载.env文件配置servicecontext 查询数据生成model文件执行查询操作 错误码封装配置拦截器错误码封装 接上一篇:《go-zero框架快速入门》 加载配置信息 配置 env 在项目根目录下新增 .env 文件,可以配置当前读取哪个环…...
Android中Service在新进程中的启动流程2
目录 1、Service在客户端的启动入口 2、Service启动在AMS的处理 3、Service在新进程中的启动 4、Service与AMS的关系再续 上一篇文章中我们了解了Service在新进程中启动的大致流程,同时认识了与客户端进程交互的接口IApplicationThread以及与AMS交互的接口IActi…...
论文速读|Matrix-SSL:Matrix Information Theory for Self-Supervised Learning.ICML24
论文地址:Matrix Information Theory for Self-Supervised Learning 代码地址:https://github.com/yifanzhang-pro/matrix-ssl bib引用: article{zhang2023matrix,title{Matrix Information Theory for Self-Supervised Learning},author{Zh…...
ubunut22.04安装docker(基于阿里云 Docker 镜像源安装 Docker)
安装 更新包管理器: sudo apt update 安装 Docker 的依赖包 sudo apt install apt-transport-https ca-certificates curl gnupg lsb-release添加阿里云 Docker 镜像源 GPG 密钥: curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gp…...
k8s namespace绑定节点
k8s namespace绑定节点 1. apiserver 启用准入控制 PodNodeSelector2. namespace 添加注解 scheduler.alpha.kubernetes.io/node-selector3. label node 1. apiserver 启用准入控制 PodNodeSelector vim /etc/kubernetes/manifests/kube-apiserver.yaml spec:containers:- co…...
【ElementPlus】在Vue3中实现表格组件封装
预览 搜索筛选组件 <template><div><el-formref"formView":model"formData"label-width"auto"label-position"right":label-col-style"{ min-width: 100px }":inline"true"><el-form-item …...
cursor重构谷粒商城04——vagrant技术快速部署虚拟机
前言:这个系列将使用最前沿的cursor作为辅助编程工具,来快速开发一些基础的编程项目。目的是为了在真实项目中,帮助初级程序员快速进阶,以最快的速度,效率,快速进阶到中高阶程序员。 本项目将基于谷粒商城…...
26、正则表达式
目录 一. 匹配字符 .:匹配除换行符外的任意单个字符。 二. 位置锚点 ^:匹配输入字符串的开始位置。 $:匹配输入字符串的结束位置。 \b:匹配单词边界。 \B:匹配非单词边界。 三. 重复限定符 *:匹配…...
SpringBoot使用MockMVC通过http请求controller控制器调用测试
说明 在Spring Boot中编写测试控制器调用是一个常见的需求,通常使用Spring的测试框架来完成。Spring Boot提供了多种方式来测试控制器,包括使用MockMvc进行模拟HTTP请求和响应的测试。 基本示例 1. 创建Spring Boot项目 首先,确保你已经创建了一个Spring Boot项目。如果…...
【Unity3D】Unity混淆工具Obfuscator使用
目录 一、导入工具 二、各种混淆形式介绍 2.1 程序集混淆 2.2 命名空间混淆 2.3 类混淆 2.4 函数混淆 2.5 参数混淆 2.6 字段混淆 2.7 属性混淆 2.8 事件混淆 三、安全混淆 四、兼容性处理 4.1 动画方法兼容 4.2 GUI方法兼容 4.3 协程方法兼容 五、选项 5.1 调…...
C语言语法基础学习—动态分配空间(new和malloc的用法及区别)
前言 在 C 语言中,动态内存分配主要是通过 malloc() 和 free() 函数来完成的。而在 C 中是使用new和delete关键字,来动态分配内存。 虽然 C 语言没有 new,但 malloc() 和 new 在内存分配上的作用是相似的。下面我们详细解释 malloc() 和 ne…...
QT:控件属性及常用控件(3)-----输入类控件(正则表达式)
输入类控件既可以进行显示,也能让用户输入一些内容! 文章目录 1.Line Edit1.1 用户输入个人信息1.2 基于正则表达式的文本限制1.3 验证两次输入的密码是否一致1.4 让输入的密码可以被查看 2.Text Edit2.1 输入和显示同步2.1 其他信号出发情况 3.ComboBox…...
AMLP框架实战:基于MACE构建高精度机器学习势函数
1. 项目概述:当机器学习势函数遇上自动化管道在计算化学和材料科学领域,我们长久以来面临着一个核心矛盾:精度与效率的权衡。密度泛函理论(DFT)能提供接近实验的精度,但计算成本高昂,通常只能处…...
sudo企业级应用【20260525】001篇
文章目录 一、总体设计思路 1️⃣ 设计原则 2️⃣ 日志策略(重点) 二、10 个真实生产场景(含 sudoers 配置) 🔹 Linux 系统管理(3 个) ✅ 场景 1:基础运维(用户 / 权限) ✅ 场景 2:磁盘与文件系统 ✅ 场景 3:网络与防火墙 🔹 云管理(2 个) ✅ 场景 4:云 CLI …...
PentestGPT实战部署指南:AI驱动的渗透测试工作流落地
1. 这不是另一个“AI安全”的概念玩具,而是一套能真正跑起来的渗透测试辅助工作流“PentestGPT”这个名字刚在GitHub上出现时,我第一反应是点开又关掉——过去三年里,我见过太多打着“AI渗透”旗号的项目:有的只是把ChatGPT API封…...
长期使用Token Plan套餐在项目开发中的成本观察
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 长期使用Token Plan套餐在项目开发中的成本观察 在AI驱动的项目开发中,成本控制与预算管理是团队负责人必须面对的现实…...
阿波罗登月,不可能:读心术与影子叙事 ——不是向全世界展示登月,而是向全世界注射登月
阿波罗登月,不可能:读心术与影子叙事 ——不是向全世界展示登月,而是向全世界注射登月 Jianbing Zhu 1^{1}1 1^{1}1 ECT-OS-JiuHuaShan 文明实验室 ORCID: 0009-0006-8591-1891 DOI: 10.5281/zenodo.20373157 Email: ect-os-jiuhuashanzoho…...
终极Chrome画中画扩展:如何在浏览器中实现高效视频多任务处理
终极Chrome画中画扩展:如何在浏览器中实现高效视频多任务处理 【免费下载链接】picture-in-picture-chrome-extension 项目地址: https://gitcode.com/gh_mirrors/pi/picture-in-picture-chrome-extension 想要在浏览网页、处理文档的同时继续观看视频内容吗…...
机器学习与深度学习在社交媒体心理健康检测中的权衡与选择
1. 项目概述:当AI遇见心灵,社交媒体心理健康检测的技术十字路口在社交媒体成为我们数字生活延伸的今天,海量的文本数据无意中记录着用户的情感波动与心理状态。作为一名长期混迹于数据科学和自然语言处理(NLP)一线的从…...
Hermes Agent 框架如何对接 Taotoken 作为自定义模型供应商并配置环境变量
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 Hermes Agent 框架如何对接 Taotoken 作为自定义模型供应商并配置环境变量 Hermes Agent 是一个流行的 AI 代理开发框架࿰…...
8款网盘直链下载助手:彻底告别限速烦恼,实现高速下载自由
8款网盘直链下载助手:彻底告别限速烦恼,实现高速下载自由 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中国移…...
如何高效实现Windows自动化鼠标点击:AutoClicker完整实战指南
如何高效实现Windows自动化鼠标点击:AutoClicker完整实战指南 【免费下载链接】AutoClicker AutoClicker is a useful simple tool for automating mouse clicks. 项目地址: https://gitcode.com/gh_mirrors/au/AutoClicker AutoClicker是一款专业的Windows桌…...
