当前位置: 首页 > news >正文

Spark Streaming编程基础

文章目录

  • 1. 流式词频统计
    • 1.1 Spark Streaming编程步骤
    • 1.2 流式词频统计项目
      • 1.2.1 创建项目
      • 1.2.2 添加项目依赖
      • 1.2.3 修改源目录
      • 1.2.4 添加scala-sdk库
      • 1.2.5 创建日志属性文件
    • 1.3 创建词频统计对象
    • 1.4 利用nc发送数据
    • 1.5 启动应用,查看结果
  • 2. 编程模型的基本概念
  • 3. 离散化数据流
  • 4. 基本数据源
  • 5. 基本DStream转换操作
  • 6. DStream输出操作

1. 流式词频统计

  • 本实战演示了如何使用 Spark Streaming 实现实时词频统计。通过创建 Spark Streaming 项目,添加依赖,编写 Scala 代码,监听网络端口接收数据流,并按批次处理数据。利用 nc 工具发送数据,程序每10秒统计一次词频并输出结果。该示例展示了 Spark Streaming 的微批处理特性,适用于实时数据处理场景。

1.1 Spark Streaming编程步骤

  1. 添加SparkStreaming相关依赖
  2. 获取程序入口接收数据
  3. 对数据进行业务处理
  4. 获取最终结果
  5. 启动程序等待程序执行结束

1.2 流式词频统计项目

1.2.1 创建项目

  • 设置项目基本信息
    在这里插入图片描述
  • 单击【Create】按钮,生成项目基本骨架
    在这里插入图片描述

1.2.2 添加项目依赖

  • pom.xml文件里添加依赖
    在这里插入图片描述
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>net.huawei.streaming</groupId><artifactId>SparkStreamingDemo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.3.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version></dependency></dependencies></project>
  • 刷新项目依赖
    在这里插入图片描述

1.2.3 修改源目录

  • java修改为scala
    在这里插入图片描述

  • pom.xml里设置源目录
    在这里插入图片描述

1.2.4 添加scala-sdk库

  • 在项目结构对话里添加
    在这里插入图片描述
  • 单击【Add to Modules】菜单项
    在这里插入图片描述
  • 单击【OK】按钮以后,就可以在scala里创建Scala Class
    在这里插入图片描述

1.2.5 创建日志属性文件

  • resources里创建log4j2.properties文件
    在这里插入图片描述
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = consoleappender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

1.3 创建词频统计对象

  • 创建net.huawei.streaming
    在这里插入图片描述
  • net.huawei.streaming包里创建SparkStreamingWordCount对象
    在这里插入图片描述
package net.huawei.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 功能:流式词频统计* 作者:华卫* 日期:2025年01月23日*/
object SparkStreamingWordCount {def main(args: Array[String]): Unit = {// 创建SparkConf对象,2个线程,本地运行val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingWordCount")// 创建StreamingContext对象,10秒一个批次val ssc: StreamingContext = new StreamingContext(conf, Seconds(10))// 创建ReceiverInputDStream对象接收来自网络端口的数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata1", 9999)// lines中每条数据按照空格进行切分然后扁平化处理val words: DStream[String] = lines.flatMap(_.split(" "))// words中每条数据转换成(word,1)二元组val wordmap: DStream[(String, Int)] = words.map(word => (word, 1))// wordmap中每条数据按key分组,按value进行累加求和val wordcount: DStream[(String, Int)] = wordmap.reduceByKey(_ + _)// 打印词频统计结果 wordcount.print()// 启动实时流程序ssc.start()// 等待实时流程序结束ssc.awaitTermination()}
}
  • 代码说明:这段代码实现了一个基于Spark Streaming的实时词频统计程序。它通过监听指定端口(bigdata1:9999)接收数据流,将每行数据按空格切分并扁平化为单词,然后统计每个单词的出现次数。程序每10秒处理一个批次的数据,并打印词频统计结果。代码结构清晰,适用于实时数据处理场景。

1.4 利用nc发送数据

  • bigdata1节点利用nc发送数据,执行命令:nc -lp 9999
    在这里插入图片描述

1.5 启动应用,查看结果

  • 启动SparkStreamingWordCount对象,在bigdata1节点上输入数据,在控制台查看词频统计结果
    在这里插入图片描述
  • 结果说明:Spark Streaming 采用微批处理,每批次数据独立处理,批次间不共享状态或共同计数。默认情况下,批次间数据互不影响。如需跨批次状态管理,可使用 updateStateByKeymapWithState 实现累加计数等功能。这种设计确保了流数据处理的灵活性和高效性。

2. 编程模型的基本概念

3. 离散化数据流

4. 基本数据源

5. 基本DStream转换操作

6. DStream输出操作

相关文章:

Spark Streaming编程基础

文章目录 1. 流式词频统计1.1 Spark Streaming编程步骤1.2 流式词频统计项目1.2.1 创建项目1.2.2 添加项目依赖1.2.3 修改源目录1.2.4 添加scala-sdk库1.2.5 创建日志属性文件 1.3 创建词频统计对象1.4 利用nc发送数据1.5 启动应用&#xff0c;查看结果 2. 编程模型的基本概念3…...

深入 Flutter 和 Compose 的 PlatformView 实现对比,它们是如何接入平台控件

在上一篇《深入 Flutter 和 Compose 在 UI 渲染刷新时 Diff 实现对比》发布之后&#xff0c;收到了大佬的“催稿”&#xff0c;想了解下 Flutter 和 Compose 在 PlatformView 实现上的对比&#xff0c;恰好过去写过不少 Flutter 上对于 PlatformView 的实现&#xff0c;这次恰好…...

C# OpenCV机器视觉:红外体温检测

在一个骄阳似火的夏日&#xff0c;全球却被一场突如其来的疫情阴霾笼罩。阿强所在的小镇&#xff0c;平日里熙熙攘攘的街道变得冷冷清清&#xff0c;人们戴着口罩&#xff0c;行色匆匆&#xff0c;眼神中满是对病毒的恐惧。阿强作为镇上小有名气的科技达人&#xff0c;看着这一…...

FCA-FineDataLink认证

FCA-FineDataLink证书 Part.1&#xff1a;判断题 &#xff08;总分&#xff1a;18分 得分&#xff1a;16&#xff09; 第1题 判断题 数据同步只支持写入到已存在表&#xff0c;不支持自动建表(得分&#xff1a;2分 满分&#xff1a;2分) 正确答案&#xff1a;B 你的答案&…...

第19篇:python高级编程进阶:使用Flask进行Web开发

第19篇&#xff1a;python高级编程进阶&#xff1a;使用Flask进行Web开发 内容简介 在第18篇文章中&#xff0c;我们介绍了Web开发的基础知识&#xff0c;并使用Flask框架构建了一个简单的Web应用。本篇文章将深入探讨Flask的高级功能&#xff0c;涵盖模板引擎&#xff08;Ji…...

js截取video视频某一帧为图片

1.代码如下 <template><div class"box"><div class"video-box"><video controls ref"videoRef" preload"true"src"https://qt-minio.ictshop.com.cn:9000/resource-management/2025/01/08/7b96ac9d957c45a…...

[云讷科技]Kerloud Falcon四旋翼飞车虚拟仿真空间发布

虚拟仿真环境作为一个独立的专有软件包提供给我们的客户&#xff0c;用于帮助用户在实际测试之前验证自身的代码&#xff0c;并通过在仿真引擎中添加新的场景来探索新的飞行驾驶功能。 环境要求 由于环境依赖关系&#xff0c;虚拟仿真只能运行在装有Ubuntu 18.04的Intel-64位…...

Jetson nano 安装 PCL 指南

本指南帮助 ARM64 架构的 Jetson Nano 安装 PCL&#xff08;点云库&#xff09;。 安装步骤 第一步&#xff1a;安装依赖 在终端中运行以下命令&#xff0c;安装 PCL 所需的依赖&#xff1a; sudo apt-get update sudo apt-get install git build-essential linux-libc-dev s…...

go-zero框架基本配置和错误码封装

文章目录 加载配置信息配置 env加载.env文件配置servicecontext 查询数据生成model文件执行查询操作 错误码封装配置拦截器错误码封装 接上一篇&#xff1a;《go-zero框架快速入门》 加载配置信息 配置 env 在项目根目录下新增 .env 文件&#xff0c;可以配置当前读取哪个环…...

Android中Service在新进程中的启动流程2

目录 1、Service在客户端的启动入口 2、Service启动在AMS的处理 3、Service在新进程中的启动 4、Service与AMS的关系再续 上一篇文章中我们了解了Service在新进程中启动的大致流程&#xff0c;同时认识了与客户端进程交互的接口IApplicationThread以及与AMS交互的接口IActi…...

论文速读|Matrix-SSL:Matrix Information Theory for Self-Supervised Learning.ICML24

论文地址&#xff1a;Matrix Information Theory for Self-Supervised Learning 代码地址&#xff1a;https://github.com/yifanzhang-pro/matrix-ssl bib引用&#xff1a; article{zhang2023matrix,title{Matrix Information Theory for Self-Supervised Learning},author{Zh…...

ubunut22.04安装docker(基于阿里云 Docker 镜像源安装 Docker)

安装 更新包管理器&#xff1a; sudo apt update 安装 Docker 的依赖包 sudo apt install apt-transport-https ca-certificates curl gnupg lsb-release添加阿里云 Docker 镜像源 GPG 密钥&#xff1a; curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gp…...

k8s namespace绑定节点

k8s namespace绑定节点 1. apiserver 启用准入控制 PodNodeSelector2. namespace 添加注解 scheduler.alpha.kubernetes.io/node-selector3. label node 1. apiserver 启用准入控制 PodNodeSelector vim /etc/kubernetes/manifests/kube-apiserver.yaml spec:containers:- co…...

【ElementPlus】在Vue3中实现表格组件封装

预览 搜索筛选组件 <template><div><el-formref"formView":model"formData"label-width"auto"label-position"right":label-col-style"{ min-width: 100px }":inline"true"><el-form-item …...

cursor重构谷粒商城04——vagrant技术快速部署虚拟机

前言&#xff1a;这个系列将使用最前沿的cursor作为辅助编程工具&#xff0c;来快速开发一些基础的编程项目。目的是为了在真实项目中&#xff0c;帮助初级程序员快速进阶&#xff0c;以最快的速度&#xff0c;效率&#xff0c;快速进阶到中高阶程序员。 本项目将基于谷粒商城…...

26、正则表达式

目录 一. 匹配字符 .&#xff1a;匹配除换行符外的任意单个字符。 二. 位置锚点 ^&#xff1a;匹配输入字符串的开始位置。 $&#xff1a;匹配输入字符串的结束位置。 \b&#xff1a;匹配单词边界。 \B&#xff1a;匹配非单词边界。 三. 重复限定符 *&#xff1a;匹配…...

SpringBoot使用MockMVC通过http请求controller控制器调用测试

说明 在Spring Boot中编写测试控制器调用是一个常见的需求,通常使用Spring的测试框架来完成。Spring Boot提供了多种方式来测试控制器,包括使用MockMvc进行模拟HTTP请求和响应的测试。 基本示例 1. 创建Spring Boot项目 首先,确保你已经创建了一个Spring Boot项目。如果…...

【Unity3D】Unity混淆工具Obfuscator使用

目录 一、导入工具 二、各种混淆形式介绍 2.1 程序集混淆 2.2 命名空间混淆 2.3 类混淆 2.4 函数混淆 2.5 参数混淆 2.6 字段混淆 2.7 属性混淆 2.8 事件混淆 三、安全混淆 四、兼容性处理 4.1 动画方法兼容 4.2 GUI方法兼容 4.3 协程方法兼容 五、选项 5.1 调…...

C语言语法基础学习—动态分配空间(new和malloc的用法及区别)

前言 在 C 语言中&#xff0c;动态内存分配主要是通过 malloc() 和 free() 函数来完成的。而在 C 中是使用new和delete关键字&#xff0c;来动态分配内存。 虽然 C 语言没有 new&#xff0c;但 malloc() 和 new 在内存分配上的作用是相似的。下面我们详细解释 malloc() 和 ne…...

QT:控件属性及常用控件(3)-----输入类控件(正则表达式)

输入类控件既可以进行显示&#xff0c;也能让用户输入一些内容&#xff01; 文章目录 1.Line Edit1.1 用户输入个人信息1.2 基于正则表达式的文本限制1.3 验证两次输入的密码是否一致1.4 让输入的密码可以被查看 2.Text Edit2.1 输入和显示同步2.1 其他信号出发情况 3.ComboBox…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP

编辑-虚拟网络编辑器-更改设置 选择桥接模式&#xff0c;然后找到相应的网卡&#xff08;可以查看自己本机的网络连接&#xff09; windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置&#xff0c;选择刚才配置的桥接模式 静态ip设置&#xff1a; 我用的ubuntu24桌…...

Java数组Arrays操作全攻略

Arrays类的概述 Java中的Arrays类位于java.util包中&#xff0c;提供了一系列静态方法用于操作数组&#xff08;如排序、搜索、填充、比较等&#xff09;。这些方法适用于基本类型数组和对象数组。 常用成员方法及代码示例 排序&#xff08;sort&#xff09; 对数组进行升序…...

使用python进行图像处理—图像变换(6)

图像变换是指改变图像的几何形状或空间位置的操作。常见的几何变换包括平移、旋转、缩放、剪切&#xff08;shear&#xff09;以及更复杂的仿射变换和透视变换。这些变换在图像配准、图像校正、创建特效等场景中非常有用。 6.1仿射变换(Affine Transformation) 仿射变换是一种…...

Q1起重机指挥理论备考要点分析

Q1起重机指挥理论备考要点分析 一、考试重点内容概述 Q1起重机指挥理论考试主要包含三大核心模块&#xff1a;安全技术知识&#xff08;占40%&#xff09;、指挥信号规范&#xff08;占30%&#xff09;和法规标准&#xff08;占30%&#xff09;。考试采用百分制&#xff0c;8…...

rk3506上移植lvgl应用

本文档介绍如何在开发板上运行以及移植LVGL。 1. 移植准备 硬件环境:开发板及其配套屏幕 开发板镜像 主机环境:Ubuntu 22.04.5 2. LVGL启动 ​ 出厂系统默认配置了 LVGL,并且上电之后默认会启动 一个LVGL应用 。 LVGL 的启动脚本为/etc/init.d/pre_init/S00-lv_demo,…...

Clickhouse统计指定表中各字段的空值、空字符串或零值比例

下面是一段Clickhouse SQL代码&#xff0c;用于统计指定数据库中多张表的字段空值情况。代码通过动态生成查询语句实现自动化统计&#xff0c;处理逻辑如下&#xff1a; 从系统表获取指定数据库&#xff08;替换your_database&#xff09;中所有表的字段元数据根据字段类型动态…...

基于机器学习的智能故障预测系统:构建与优化

前言 在现代工业生产中&#xff0c;设备故障不仅会导致生产中断&#xff0c;还会带来巨大的经济损失。传统的故障检测方法依赖于人工巡检和定期维护&#xff0c;这种方式效率低下且难以提前预测潜在故障。随着工业物联网&#xff08;IIoT&#xff09;和机器学习技术的发展&…...