Spark实时(三):Structured Streaming入门案例
文章目录
Structured Streaming入门案例
一、Scala代码如下
二、Java 代码如下
三、以上代码注意点如下
Structured Streaming入门案例
我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:
<!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.4.3</spark.version></properties><dependencies><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL ON Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><!--mysql依赖的jar包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--SparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka 0.10+ Source For Structured Streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- 向kafka 生产数据需要包 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Scala 包--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.12.15</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.12</version></dependency><dependency><groupId>com.google.collections</groupId><artifactId>google-collections</artifactId><version>1.0</version></dependency></dependencies>
一、Scala代码如下
package com.lanson.structuredStreaming/*** Structured Streaming 实时读取Socket数据*/import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Structured Streaming 读取Socket数据*/
object SSReadSocketData {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSocketWordCount")//默认200个并行度,由于源头数据量少,可以设置少一些并行度.config("spark.sql.shuffle.partitions",1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("Error")//2.读取Socket中的每行数据,生成DataFrame默认列名为"value"val lines: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})//4.按照单词分组,统计个数,自动多一个列countval wordCounts: DataFrame = words.groupBy("value").count()//5.启动流并向控制台打印结果val query: StreamingQuery = wordCounts.writeStream//更新模式设置为complete.outputMode("complete").format("console").start()query.awaitTermination()}}
二、Java 代码如下
package com.lanson.structuredStreaming;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;public class SSReadSocketData01 {public static void main(String[] args) throws StreamingQueryException, TimeoutException {SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> lines = spark.readStream().format("socket").option("host", "node3").option("port", 9999).load();Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}}, Encoders.STRING());Dataset<Row> wordCounts = words.groupBy("value").count();StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();}
}
以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:
第一次输入:a b c
第二次输入:d a c
第三次输入:a b c
可以看到控制台打印如下结果:
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| c| 1|
| b| 1|
| a| 1|
+-----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| d| 1|
| c| 2|
| b| 1|
| a| 2|
+-----+-----+-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
| d| 1|
| c| 3|
| b| 2|
| a| 3|
+-----+-----+
三、以上代码注意点如下
- SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
- StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
- 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
- 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:

Spark实时(三):Structured Streaming入门案例
文章目录 Structured Streaming入门案例 一、Scala代码如下 二、Java 代码如下 三、以上代码注意点如下 Structured Streaming入门案例 我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导…...

《Java初阶数据结构》----4.<线性表---Stack栈和Queue队列>
前言 大家好,我目前在学习java。之前也学了一段时间,但是没有发布博客。时间过的真的很快。我会利用好这个暑假,来复习之前学过的内容,并整理好之前写过的博客进行发布。如果博客中有错误或者没有读懂的地方。热烈欢迎大家在评论区…...
Android SurfaceFlinger——关联EGL三要素(二十七)
通过前面的文章我们得到了 EGL 的三要素——Display、Surface 和 Context。其中,Display 是一个图形显示系统或者硬件屏幕,Surface 代表一个可以被渲染的图像缓冲区,Context 包含了 OpenGL ES 的状态信息和资源,它是执行 OpenGL 命令的环境。下一步就是调用 eglMakeCurrent…...

Unity3D之TCP网络通信(客户端)
文章目录 概述TCP核心类异步机制 Unity中创建TCP客户端Unity中其它脚本获取TCP客户端接受到的数据后续改进 本文将以Unity3D应用项目作为客户端去连接制定的服务器为例进行相关说明。 Unity官网参考资料: https://developer.unity.cn/projects/6572ea1bedbc2a001ef…...
Kotlin 中 标准库函数
在 Kotlin 中,标准库提供了许多实用的函数,这些函数可以帮助简化代码、提高效率,以下是一些常用的标准库函数及其功能: let: let 函数允许你在对象上执行一个操作,并返回结果。它通常与安全调用操作符 ?. 一起使用&a…...

【教学类-69-01】20240721铠甲勇士扑克牌(随机14个数字+字母)涂色(男孩篇)
背景需求: 【教学类-68-01】20240720裙子涂色(女孩篇)-CSDN博客文章浏览阅读250次。【教学类-68-01】20240720裙子涂色(女孩篇)https://blog.csdn.net/reasonsummer/article/details/140578153 前期制作了女孩涂色延…...

Adobe“加速”创意人士开启设计新篇章
近日,Adobe公司宣布了其行业领先的专业设计应用程序——Adobe Illustrator和Adobe Photoshop的突破性创新。这一重大更新不仅为创意专业人士带来了前所未有的设计可能性和工作效率提升,还让不论是插画师、设计师还是摄影师,都能从中受益并创作…...

释疑 803-(1)概述 精炼提纯版
目录 习题 1-01计算机网络可以向用户提供哪些服务? 1-02 试简述分组交换的要点。 1-03 试从多个方面比较电路交换、报文交换和分组交换的主要优缺点。 1-05 互联网基础结构的发展大致分为哪几个阶段?请指出这几个阶段最主要的特点。 1-06 简述互联网标准制定的几个阶段…...

人工智能与机器学习原理精解【6】
文章目录 数值优化基础理论凹凸性定义在国外与国内存在不同国内定义国外定义总结示例与说明注意事项 国内凹凸性二阶定义的例子凹函数例子凸函数例子 凸函数(convex function)的开口方向凸函数的二阶导数凸函数的二阶定义单变量函数的二阶定义多变量函数…...
JDK、JRE、JVM之间的关系
JDK是Java的开发环境,用JDK开发了JAVA程序后,通过JDK中的编译程序(javac)将java文件编译成字节码文件,作为运行环境的JRE,字节码文件在JRE上运行,作为虚拟机的JVM解析这些字节码,映射…...

redis构建集群时,一直Waiting for the cluster to join
redis构建集群时,一直Waiting for the cluster to join 前置条件参考 前置条件 这是我搭建的集群相关信息,三台虚拟机,分别是一主一从。在将所有虚拟机中redis服务器用到的tcp端口都打开之后,进行构建集群。但是出现上面的情况。 …...

C++之类与对象(2)
前言 今天将步入学习类的默认成员函数,本节讲解其中的构造函数和析构函数。 1.类的默认成员函数 在 C 中,如果一个类没有显式定义某些成员函数,编译器会自动为该类生成默认的成员函数。以下是编译器可能会生成的默认成员函数: 默…...

「树形结构」基于 Antd 实现一个动态增加子节点+可拖拽的树
效果 如图所示 实现 import { createRoot } from react-dom/client; import React, { useState } from react; import { Tree, Input, Button } from antd; import { PlusOutlined } from ant-design/icons;const { TreeNode } Tree; const { Search } Input;const ini…...

ubuntu那些ppa源在哪
Ubuntu中的 PPA 终极指南 - UBUNTU粉丝之家 什么是PPA PPA 代表个人包存档。 PPA 允许应用程序开发人员和 Linux 用户创建自己的存储库来分发软件。 使用 PPA,您可以轻松获取较新的软件版本或官方 Ubuntu 存储库无法提供的软件。 为什么使用PPA? 正如…...

20240724-然后用idea创建一个Java项目/配置maven环境/本地仓储配置
1.创建一个java项目 (1)点击页面的create project,然后next (2)不勾选,继续next (3)选择新项目名称,新项目路径,然后Finsh,在新打开的页面选择…...

PaddleOCR-PP-OCRv4推理详解及部署实现(下)
目录 前言1. 检测模型1.1 预处理1.2 后处理1.3 推理 2. 方向分类器模型2.1 预处理2.2 后处理2.3 推理 3. 识别模型3.1 预处理3.2 后处理3.3 推理 4. PP-OCRv4部署4.1 源码下载4.2 环境配置4.2.1 配置CMakeLists.txt4.2.2 配置Makefile 4.3 ONNX导出4.4 engine生成4.4.1 检测模型…...
【Golang 面试基础题】每日 5 题(二)
✍个人博客:Pandaconda-CSDN博客 📣专栏地址:http://t.csdnimg.cn/UWz06 📚专栏简介:在这个专栏中,我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话,欢迎点赞👍收藏…...
状态模式与订单状态机的实现
状态模式 状态模式(State Design Pattern)是一种行为设计模式,用于在对象的内部状态改变时改变其行为。这种模式可以将状态的变化封装在状态对象中,使得对象在状态变化时不会影响到其他代码,提升了代码的灵活性和可维…...
【MSP430】MSP430是什么?与STM32对比哪个性能更佳?
一、MSP430是什么? MSP430F5529LP是一款由德州仪器(TI)推出的16位微控制器单元(MCU)开发板,具有USB功能,内存配置为128KB闪存和8KB RAM,工作频率高达25MHz。 这款MCU以其高性能和多…...

Win11 操作(四)g502鼠标连接电脑不亮灯无反应
罗技鼠标连接电脑不亮灯无反应 前言 罗技技术💩中💩,贴吧技术神中神! 最近买了一个g502,结果买回来直接插上电脑连灯都不亮,问了一下客服。客服简单的让我换接口,又是下载ghub之类的…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...

大型活动交通拥堵治理的视觉算法应用
大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动(如演唱会、马拉松赛事、高考中考等)期间,城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例,暖城商圈曾因观众集中离场导致周边…...

【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...