Spark实时(三):Structured Streaming入门案例

文章目录
Structured Streaming入门案例
一、Scala代码如下
二、Java 代码如下
三、以上代码注意点如下
Structured Streaming入门案例
我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:
 <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.4.3</spark.version></properties><dependencies><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL  ON  Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><!--mysql依赖的jar包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--SparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka 0.10+ Source For Structured Streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- 向kafka 生产数据需要包 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Scala 包--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.12.15</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.12</version></dependency><dependency><groupId>com.google.collections</groupId><artifactId>google-collections</artifactId><version>1.0</version></dependency></dependencies>一、Scala代码如下
package com.lanson.structuredStreaming/***  Structured Streaming 实时读取Socket数据*/import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Structured Streaming 读取Socket数据*/
object SSReadSocketData {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSocketWordCount")//默认200个并行度,由于源头数据量少,可以设置少一些并行度.config("spark.sql.shuffle.partitions",1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("Error")//2.读取Socket中的每行数据,生成DataFrame默认列名为"value"val lines: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})//4.按照单词分组,统计个数,自动多一个列countval wordCounts: DataFrame = words.groupBy("value").count()//5.启动流并向控制台打印结果val query: StreamingQuery = wordCounts.writeStream//更新模式设置为complete.outputMode("complete").format("console").start()query.awaitTermination()}}
 
 
二、Java 代码如下
package com.lanson.structuredStreaming;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;public class SSReadSocketData01 {public static void main(String[] args) throws StreamingQueryException, TimeoutException {SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> lines = spark.readStream().format("socket").option("host", "node3").option("port", 9999).load();Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}}, Encoders.STRING());Dataset<Row> wordCounts = words.groupBy("value").count();StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();}
}

 
 
以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:
第一次输入:a b c
第二次输入:d a c
第三次输入:a b c可以看到控制台打印如下结果:
-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    c|    1|
|    b|    1|
|    a|    1|
+-----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    2|
|    b|    1|
|    a|    2|
+-----+-----+-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    3|
|    b|    2|
|    a|    3|
+-----+-----+三、以上代码注意点如下
- SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
- StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
- 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
- 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
相关文章:
 
Spark实时(三):Structured Streaming入门案例
文章目录 Structured Streaming入门案例 一、Scala代码如下 二、Java 代码如下 三、以上代码注意点如下 Structured Streaming入门案例 我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导…...
 
《Java初阶数据结构》----4.<线性表---Stack栈和Queue队列>
前言 大家好,我目前在学习java。之前也学了一段时间,但是没有发布博客。时间过的真的很快。我会利用好这个暑假,来复习之前学过的内容,并整理好之前写过的博客进行发布。如果博客中有错误或者没有读懂的地方。热烈欢迎大家在评论区…...
Android SurfaceFlinger——关联EGL三要素(二十七)
通过前面的文章我们得到了 EGL 的三要素——Display、Surface 和 Context。其中,Display 是一个图形显示系统或者硬件屏幕,Surface 代表一个可以被渲染的图像缓冲区,Context 包含了 OpenGL ES 的状态信息和资源,它是执行 OpenGL 命令的环境。下一步就是调用 eglMakeCurrent…...
 
Unity3D之TCP网络通信(客户端)
文章目录 概述TCP核心类异步机制 Unity中创建TCP客户端Unity中其它脚本获取TCP客户端接受到的数据后续改进 本文将以Unity3D应用项目作为客户端去连接制定的服务器为例进行相关说明。 Unity官网参考资料: https://developer.unity.cn/projects/6572ea1bedbc2a001ef…...
Kotlin 中 标准库函数
在 Kotlin 中,标准库提供了许多实用的函数,这些函数可以帮助简化代码、提高效率,以下是一些常用的标准库函数及其功能: let: let 函数允许你在对象上执行一个操作,并返回结果。它通常与安全调用操作符 ?. 一起使用&a…...
 
【教学类-69-01】20240721铠甲勇士扑克牌(随机14个数字+字母)涂色(男孩篇)
背景需求: 【教学类-68-01】20240720裙子涂色(女孩篇)-CSDN博客文章浏览阅读250次。【教学类-68-01】20240720裙子涂色(女孩篇)https://blog.csdn.net/reasonsummer/article/details/140578153 前期制作了女孩涂色延…...
 
Adobe“加速”创意人士开启设计新篇章
近日,Adobe公司宣布了其行业领先的专业设计应用程序——Adobe Illustrator和Adobe Photoshop的突破性创新。这一重大更新不仅为创意专业人士带来了前所未有的设计可能性和工作效率提升,还让不论是插画师、设计师还是摄影师,都能从中受益并创作…...
 
释疑 803-(1)概述 精炼提纯版
目录 习题 1-01计算机网络可以向用户提供哪些服务? 1-02 试简述分组交换的要点。 1-03 试从多个方面比较电路交换、报文交换和分组交换的主要优缺点。 1-05 互联网基础结构的发展大致分为哪几个阶段?请指出这几个阶段最主要的特点。 1-06 简述互联网标准制定的几个阶段…...
 
人工智能与机器学习原理精解【6】
文章目录 数值优化基础理论凹凸性定义在国外与国内存在不同国内定义国外定义总结示例与说明注意事项 国内凹凸性二阶定义的例子凹函数例子凸函数例子 凸函数(convex function)的开口方向凸函数的二阶导数凸函数的二阶定义单变量函数的二阶定义多变量函数…...
JDK、JRE、JVM之间的关系
JDK是Java的开发环境,用JDK开发了JAVA程序后,通过JDK中的编译程序(javac)将java文件编译成字节码文件,作为运行环境的JRE,字节码文件在JRE上运行,作为虚拟机的JVM解析这些字节码,映射…...
 
redis构建集群时,一直Waiting for the cluster to join
redis构建集群时,一直Waiting for the cluster to join 前置条件参考 前置条件 这是我搭建的集群相关信息,三台虚拟机,分别是一主一从。在将所有虚拟机中redis服务器用到的tcp端口都打开之后,进行构建集群。但是出现上面的情况。 …...
 
C++之类与对象(2)
前言 今天将步入学习类的默认成员函数,本节讲解其中的构造函数和析构函数。 1.类的默认成员函数 在 C 中,如果一个类没有显式定义某些成员函数,编译器会自动为该类生成默认的成员函数。以下是编译器可能会生成的默认成员函数: 默…...
 
「树形结构」基于 Antd 实现一个动态增加子节点+可拖拽的树
效果 如图所示 实现 import { createRoot } from react-dom/client; import React, { useState } from react; import { Tree, Input, Button } from antd; import { PlusOutlined } from ant-design/icons;const { TreeNode } Tree; const { Search } Input;const ini…...
 
ubuntu那些ppa源在哪
Ubuntu中的 PPA 终极指南 - UBUNTU粉丝之家 什么是PPA PPA 代表个人包存档。 PPA 允许应用程序开发人员和 Linux 用户创建自己的存储库来分发软件。 使用 PPA,您可以轻松获取较新的软件版本或官方 Ubuntu 存储库无法提供的软件。 为什么使用PPA? 正如…...
 
20240724-然后用idea创建一个Java项目/配置maven环境/本地仓储配置
1.创建一个java项目 (1)点击页面的create project,然后next (2)不勾选,继续next (3)选择新项目名称,新项目路径,然后Finsh,在新打开的页面选择…...
 
PaddleOCR-PP-OCRv4推理详解及部署实现(下)
目录 前言1. 检测模型1.1 预处理1.2 后处理1.3 推理 2. 方向分类器模型2.1 预处理2.2 后处理2.3 推理 3. 识别模型3.1 预处理3.2 后处理3.3 推理 4. PP-OCRv4部署4.1 源码下载4.2 环境配置4.2.1 配置CMakeLists.txt4.2.2 配置Makefile 4.3 ONNX导出4.4 engine生成4.4.1 检测模型…...
【Golang 面试基础题】每日 5 题(二)
✍个人博客:Pandaconda-CSDN博客 📣专栏地址:http://t.csdnimg.cn/UWz06 📚专栏简介:在这个专栏中,我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话,欢迎点赞👍收藏…...
状态模式与订单状态机的实现
状态模式 状态模式(State Design Pattern)是一种行为设计模式,用于在对象的内部状态改变时改变其行为。这种模式可以将状态的变化封装在状态对象中,使得对象在状态变化时不会影响到其他代码,提升了代码的灵活性和可维…...
【MSP430】MSP430是什么?与STM32对比哪个性能更佳?
一、MSP430是什么? MSP430F5529LP是一款由德州仪器(TI)推出的16位微控制器单元(MCU)开发板,具有USB功能,内存配置为128KB闪存和8KB RAM,工作频率高达25MHz。 这款MCU以其高性能和多…...
 
Win11 操作(四)g502鼠标连接电脑不亮灯无反应
罗技鼠标连接电脑不亮灯无反应 前言 罗技技术💩中💩,贴吧技术神中神! 最近买了一个g502,结果买回来直接插上电脑连灯都不亮,问了一下客服。客服简单的让我换接口,又是下载ghub之类的…...
 
(十)学生端搭建
本次旨在将之前的已完成的部分功能进行拼装到学生端,同时完善学生端的构建。本次工作主要包括: 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
 
(一)单例模式
一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...
第7篇:中间件全链路监控与 SQL 性能分析实践
7.1 章节导读 在构建数据库中间件的过程中,可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中,必须做到: 🔍 追踪每一条 SQL 的生命周期(从入口到数据库执行)&#…...
 
Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)
引言 工欲善其事,必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后,我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集,就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...
 
pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)
目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 (1)输入单引号 (2)万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...
小木的算法日记-多叉树的递归/层序遍历
🌲 从二叉树到森林:一文彻底搞懂多叉树遍历的艺术 🚀 引言 你好,未来的算法大神! 在数据结构的世界里,“树”无疑是最核心、最迷人的概念之一。我们中的大多数人都是从 二叉树 开始入门的,它…...
 
Axure 下拉框联动
实现选省、选完省之后选对应省份下的市区...
 
基于开源AI智能名片链动2 + 1模式S2B2C商城小程序的沉浸式体验营销研究
摘要:在消费市场竞争日益激烈的当下,传统体验营销方式存在诸多局限。本文聚焦开源AI智能名片链动2 1模式S2B2C商城小程序,探讨其在沉浸式体验营销中的应用。通过对比传统品鉴、工厂参观等初级体验方式,分析沉浸式体验的优势与价值…...
LUA+Reids实现库存秒杀预扣减 记录流水 以及自己的思考
目录 lua脚本 记录流水 记录流水的作用 流水什么时候删除 我们在做库存扣减的时候,显示基于Lua脚本和Redis实现的预扣减 这样可以在秒杀扣减的时候保证操作的原子性和高效性 lua脚本 // ... 已有代码 ...Overridepublic InventoryResponse decrease(Inventor…...
