当前位置: 首页 > news >正文

Spark Streaming编程基础

文章目录

  • 1. 流式词频统计
    • 1.1 Spark Streaming编程步骤
    • 1.2 流式词频统计项目
      • 1.2.1 创建项目
      • 1.2.2 添加项目依赖
      • 1.2.3 修改源目录
      • 1.2.4 添加scala-sdk库
      • 1.2.5 创建日志属性文件
    • 1.3 创建词频统计对象
    • 1.4 利用nc发送数据
    • 1.5 启动应用,查看结果
  • 2. 编程模型的基本概念
  • 3. 离散化数据流
  • 4. 基本数据源
  • 5. 基本DStream转换操作
  • 6. DStream输出操作

1. 流式词频统计

  • 本实战演示了如何使用 Spark Streaming 实现实时词频统计。通过创建 Spark Streaming 项目,添加依赖,编写 Scala 代码,监听网络端口接收数据流,并按批次处理数据。利用 nc 工具发送数据,程序每10秒统计一次词频并输出结果。该示例展示了 Spark Streaming 的微批处理特性,适用于实时数据处理场景。

1.1 Spark Streaming编程步骤

  1. 添加SparkStreaming相关依赖
  2. 获取程序入口接收数据
  3. 对数据进行业务处理
  4. 获取最终结果
  5. 启动程序等待程序执行结束

1.2 流式词频统计项目

1.2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述

1.2.2 添加项目依赖

  • pom.xml文件里添加依赖
    在这里插入图片描述
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.huawei.streaming</groupId><artifactId>SparkStreamingDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies></project>
  • 刷新项目依赖
    在这里插入图片描述

1.2.3 修改源目录

  • java修改为scala
    在这里插入图片描述

  • pom.xml里设置源目录
    在这里插入图片描述

1.2.4 添加scala-sdk库

  • 在项目结构对话里添加
    在这里插入图片描述
  • 单击【Add to Modules】菜单项
    在这里插入图片描述
  • 单击【OK】按钮以后,就可以在scala里创建Scala Class
    在这里插入图片描述

1.2.5 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

1.3 创建词频统计对象

  • 创建net.huawei.streaming
    在这里插入图片描述
  • net.huawei.streaming包里创建SparkStreamingWordCount对象
    在这里插入图片描述
package net.huawei.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 功能:流式词频统计* 作者:华卫* 日期:2025年01月23日*/
object SparkStreamingWordCount {def main(args: Array[String]): Unit = {// 创建SparkConf对象,2个线程,本地运行val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")// 创建StreamingContext对象,10秒一个批次val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))// 创建ReceiverInputDStream对象接收来自网络端口的数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata1", 9999)// lines中每条数据按照空格进行切分然后扁平化处理val words: DStream[String] = lines.flatMap(_.split(" "))// words中每条数据转换成(word,1)二元组val wordmap: DStream[(String, Int)] = words.map(word => (word, 1))// wordmap中每条数据按key分组,按value进行累加求和val wordcount: DStream[(String, Int)] = wordmap.reduceByKey(_ + _)// 打印词频统计结果 wordcount.print()// 启动实时流程序ssc.start()// 等待实时流程序结束ssc.awaitTermination()}
}
  • 代码说明:这段代码实现了一个基于Spark Streaming的实时词频统计程序。它通过监听指定端口(bigdata1:9999)接收数据流,将每行数据按空格切分并扁平化为单词,然后统计每个单词的出现次数。程序每10秒处理一个批次的数据,并打印词频统计结果。代码结构清晰,适用于实时数据处理场景。

1.4 利用nc发送数据

  • bigdata1节点利用nc发送数据,执行命令:nc -lp 9999
    在这里插入图片描述

1.5 启动应用,查看结果

  • 启动SparkStreamingWordCount对象,在bigdata1节点上输入数据,在控制台查看词频统计结果
    在这里插入图片描述
  • 结果说明:Spark Streaming 采用微批处理,每批次数据独立处理,批次间不共享状态或共同计数。默认情况下,批次间数据互不影响。如需跨批次状态管理,可使用 updateStateByKeymapWithState 实现累加计数等功能。这种设计确保了流数据处理的灵活性和高效性。

2. 编程模型的基本概念

3. 离散化数据流

4. 基本数据源

5. 基本DStream转换操作

6. DStream输出操作

相关文章:

Spark Streaming编程基础

文章目录 1. 流式词频统计1.1 Spark Streaming编程步骤1.2 流式词频统计项目1.2.1 创建项目1.2.2 添加项目依赖1.2.3 修改源目录1.2.4 添加scala-sdk库1.2.5 创建日志属性文件 1.3 创建词频统计对象1.4 利用nc发送数据1.5 启动应用&#xff0c;查看结果 2. 编程模型的基本概念3…...

深入 Flutter 和 Compose 的 PlatformView 实现对比,它们是如何接入平台控件

在上一篇《深入 Flutter 和 Compose 在 UI 渲染刷新时 Diff 实现对比》发布之后&#xff0c;收到了大佬的“催稿”&#xff0c;想了解下 Flutter 和 Compose 在 PlatformView 实现上的对比&#xff0c;恰好过去写过不少 Flutter 上对于 PlatformView 的实现&#xff0c;这次恰好…...

C# OpenCV机器视觉:红外体温检测

在一个骄阳似火的夏日&#xff0c;全球却被一场突如其来的疫情阴霾笼罩。阿强所在的小镇&#xff0c;平日里熙熙攘攘的街道变得冷冷清清&#xff0c;人们戴着口罩&#xff0c;行色匆匆&#xff0c;眼神中满是对病毒的恐惧。阿强作为镇上小有名气的科技达人&#xff0c;看着这一…...

FCA-FineDataLink认证

FCA-FineDataLink证书 Part.1&#xff1a;判断题 &#xff08;总分&#xff1a;18分 得分&#xff1a;16&#xff09; 第1题 判断题 数据同步只支持写入到已存在表&#xff0c;不支持自动建表(得分&#xff1a;2分 满分&#xff1a;2分) 正确答案&#xff1a;B 你的答案&…...

第19篇:python高级编程进阶:使用Flask进行Web开发

第19篇&#xff1a;python高级编程进阶&#xff1a;使用Flask进行Web开发 内容简介 在第18篇文章中&#xff0c;我们介绍了Web开发的基础知识&#xff0c;并使用Flask框架构建了一个简单的Web应用。本篇文章将深入探讨Flask的高级功能&#xff0c;涵盖模板引擎&#xff08;Ji…...

js截取video视频某一帧为图片

1.代码如下 <template><div class"box"><div class"video-box"><video controls ref"videoRef" preload"true"src"https://qt-minio.ictshop.com.cn:9000/resource-management/2025/01/08/7b96ac9d957c45a…...

[云讷科技]Kerloud Falcon四旋翼飞车虚拟仿真空间发布

虚拟仿真环境作为一个独立的专有软件包提供给我们的客户&#xff0c;用于帮助用户在实际测试之前验证自身的代码&#xff0c;并通过在仿真引擎中添加新的场景来探索新的飞行驾驶功能。 环境要求 由于环境依赖关系&#xff0c;虚拟仿真只能运行在装有Ubuntu 18.04的Intel-64位…...

Jetson nano 安装 PCL 指南

本指南帮助 ARM64 架构的 Jetson Nano 安装 PCL&#xff08;点云库&#xff09;。 安装步骤 第一步&#xff1a;安装依赖 在终端中运行以下命令&#xff0c;安装 PCL 所需的依赖&#xff1a; sudo apt-get update sudo apt-get install git build-essential linux-libc-dev s…...

go-zero框架基本配置和错误码封装

文章目录 加载配置信息配置 env加载.env文件配置servicecontext 查询数据生成model文件执行查询操作 错误码封装配置拦截器错误码封装 接上一篇&#xff1a;《go-zero框架快速入门》 加载配置信息 配置 env 在项目根目录下新增 .env 文件&#xff0c;可以配置当前读取哪个环…...

Android中Service在新进程中的启动流程2

目录 1、Service在客户端的启动入口 2、Service启动在AMS的处理 3、Service在新进程中的启动 4、Service与AMS的关系再续 上一篇文章中我们了解了Service在新进程中启动的大致流程&#xff0c;同时认识了与客户端进程交互的接口IApplicationThread以及与AMS交互的接口IActi…...

论文速读|Matrix-SSL:Matrix Information Theory for Self-Supervised Learning.ICML24

论文地址&#xff1a;Matrix Information Theory for Self-Supervised Learning 代码地址&#xff1a;https://github.com/yifanzhang-pro/matrix-ssl bib引用&#xff1a; article{zhang2023matrix,title{Matrix Information Theory for Self-Supervised Learning},author{Zh…...

ubunut22.04安装docker(基于阿里云 Docker 镜像源安装 Docker)

安装 更新包管理器&#xff1a; sudo apt update 安装 Docker 的依赖包 sudo apt install apt-transport-https ca-certificates curl gnupg lsb-release添加阿里云 Docker 镜像源 GPG 密钥&#xff1a; curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gp…...

k8s namespace绑定节点

k8s namespace绑定节点 1. apiserver 启用准入控制 PodNodeSelector2. namespace 添加注解 scheduler.alpha.kubernetes.io/node-selector3. label node 1. apiserver 启用准入控制 PodNodeSelector vim /etc/kubernetes/manifests/kube-apiserver.yaml spec:containers:- co…...

【ElementPlus】在Vue3中实现表格组件封装

预览 搜索筛选组件 <template><div><el-formref"formView":model"formData"label-width"auto"label-position"right":label-col-style"{ min-width: 100px }":inline"true"><el-form-item …...

cursor重构谷粒商城04——vagrant技术快速部署虚拟机

前言&#xff1a;这个系列将使用最前沿的cursor作为辅助编程工具&#xff0c;来快速开发一些基础的编程项目。目的是为了在真实项目中&#xff0c;帮助初级程序员快速进阶&#xff0c;以最快的速度&#xff0c;效率&#xff0c;快速进阶到中高阶程序员。 本项目将基于谷粒商城…...

26、正则表达式

目录 一. 匹配字符 .&#xff1a;匹配除换行符外的任意单个字符。 二. 位置锚点 ^&#xff1a;匹配输入字符串的开始位置。 $&#xff1a;匹配输入字符串的结束位置。 \b&#xff1a;匹配单词边界。 \B&#xff1a;匹配非单词边界。 三. 重复限定符 *&#xff1a;匹配…...

SpringBoot使用MockMVC通过http请求controller控制器调用测试

说明 在Spring Boot中编写测试控制器调用是一个常见的需求,通常使用Spring的测试框架来完成。Spring Boot提供了多种方式来测试控制器,包括使用MockMvc进行模拟HTTP请求和响应的测试。 基本示例 1. 创建Spring Boot项目 首先,确保你已经创建了一个Spring Boot项目。如果…...

【Unity3D】Unity混淆工具Obfuscator使用

目录 一、导入工具 二、各种混淆形式介绍 2.1 程序集混淆 2.2 命名空间混淆 2.3 类混淆 2.4 函数混淆 2.5 参数混淆 2.6 字段混淆 2.7 属性混淆 2.8 事件混淆 三、安全混淆 四、兼容性处理 4.1 动画方法兼容 4.2 GUI方法兼容 4.3 协程方法兼容 五、选项 5.1 调…...

C语言语法基础学习—动态分配空间(new和malloc的用法及区别)

前言 在 C 语言中&#xff0c;动态内存分配主要是通过 malloc() 和 free() 函数来完成的。而在 C 中是使用new和delete关键字&#xff0c;来动态分配内存。 虽然 C 语言没有 new&#xff0c;但 malloc() 和 new 在内存分配上的作用是相似的。下面我们详细解释 malloc() 和 ne…...

QT:控件属性及常用控件(3)-----输入类控件(正则表达式)

输入类控件既可以进行显示&#xff0c;也能让用户输入一些内容&#xff01; 文章目录 1.Line Edit1.1 用户输入个人信息1.2 基于正则表达式的文本限制1.3 验证两次输入的密码是否一致1.4 让输入的密码可以被查看 2.Text Edit2.1 输入和显示同步2.1 其他信号出发情况 3.ComboBox…...

深入解析PCS1800分布式控制系统:架构设计与工业应用实践

1. PCS1800分布式控制系统架构解析 第一次接触PCS1800系统是在2013年某化工厂的DCS改造项目上。当时现场老师傅指着机柜里整齐排列的模块说&#xff1a;"这玩意儿就像人的神经系统&#xff0c;MNet是大脑&#xff0c;SNet是脊髓&#xff0c;CNet就是末梢神经。"这个…...

WarcraftHelper完全指南:从显示异常到性能飞跃的5个关键突破

WarcraftHelper完全指南&#xff1a;从显示异常到性能飞跃的5个关键突破 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 诊断宽屏适配问题 在34英寸2…...

网络安全培训资源awesome-osint:OSINT视频教程与博客指南

网络安全培训资源awesome-osint&#xff1a;OSINT视频教程与博客指南 开源情报&#xff08;OSINT&#xff09;是网络安全领域的重要技能&#xff0c;通过公开可用的信息源收集情报。对于网络安全新手和从业者来说&#xff0c;找到高质量的OSINT培训资源至关重要。awesome-osin…...

5步打造清爽右键菜单:ContextMenuManager开源工具完全指南

5步打造清爽右键菜单&#xff1a;ContextMenuManager开源工具完全指南 【免费下载链接】ContextMenuManager &#x1f5b1;️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 当你在Windows系统中右键点击文件时&#…...

Qwen3-1.7B能做什么?实测写邮件、生成故事、智能聊天

Qwen3-1.7B能做什么&#xff1f;实测写邮件、生成故事、智能聊天 1. 认识Qwen3-1.7B Qwen3&#xff08;千问3&#xff09;是阿里巴巴集团开源的新一代通义千问大语言模型系列中的一员&#xff0c;1.7B版本虽然参数量不大&#xff0c;但在日常应用中表现出色。这个17亿参数的模…...

内存占用直降68%?揭秘头部金融科技公司Python服务的成本控制策略,含可落地的12个代码级优化checklist

第一章&#xff1a;Python 智能体内存管理策略Python 的内存管理并非由开发者手动控制&#xff0c;而是通过一套高度自动化的智能体机制协同运作&#xff0c;核心包括引用计数、循环垃圾回收器&#xff08;gc 模块&#xff09;和内存池&#xff08;pymalloc&#xff09;三层结构…...

AIVideo在软件测试领域的应用:自动化生成测试案例视频

AIVideo在软件测试领域的应用&#xff1a;自动化生成测试案例视频 1. 引言&#xff1a;测试视频制作的痛点与机遇 作为一名测试工程师&#xff0c;你是否曾经遇到过这样的困境&#xff1a;每次编写完测试用例后&#xff0c;还需要花费大量时间录制演示视频&#xff0c;展示测…...

AI核心概念解析:Agent、Prompt、Skill 及生态关系

&#x1f310; AI核心概念解析&#xff1a;Agent、Prompt、Skill 及生态关系 一、关键名词正确定义与原理 1. Agent&#xff08;智能体&#xff09; 指具备感知—决策—行动闭环能力的自主软件实体。它不是单个模型&#xff0c;而是一个系统架构&#xff1a;接收输入&#x…...

Elasticsearch-PHP异步搜索终极指南:如何实现高性能搜索应用

Elasticsearch-PHP异步搜索终极指南&#xff1a;如何实现高性能搜索应用 【免费下载链接】elasticsearch-php Official PHP client for Elasticsearch. 项目地址: https://gitcode.com/gh_mirrors/el/elasticsearch-php Elasticsearch-PHP是官方PHP客户端&#xff0c;为…...

终极指南:gin-vue-admin前端错误监控告警配置详解 - 邮件与钉钉实时通知方案

终极指南&#xff1a;gin-vue-admin前端错误监控告警配置详解 - 邮件与钉钉实时通知方案 【免费下载链接】gin-vue-admin &#x1f680;ViteVue3Gin拥有AI辅助的基础开发平台&#xff0c;企业级业务AI开发解决方案&#xff0c;内置mcp辅助服务&#xff0c;内置skills管理&#…...