【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
把DStream写入到MySQL数据库中
- Spark 3.4.1
- MySQL 8.0.30
- sbt 1.9.2
文章目录
- 【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
- 前言
- 一、背景说明
- 二、使用步骤
- 1.引入库
- 2.开发代码
- 运行测试
- 总结
前言
需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL
提示:本项目通过sbt控制依赖
一、背景说明
在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中
Spark Streaming是一个基于Spark的实时计算框架,它可以从多种数据源消费数据,并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤:
- 数据接收:Spark Streaming可以从各种输入源接收数据,如Kafka、Flume、Twitter、Kinesis等,然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器(Receiver)负责接收数据,并将数据存储在内存或磁盘中。
- 数据划分:Spark Streaming将连续的数据流划分为一系列小批量(Batch)的数据,每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔(Batch Interval),可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDD,RDD是Spark的核心数据结构,表示一个不可变的分布式数据集。
- 数据处理:Spark Streaming对每个批次的RDD进行转换和输出操作,实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数,如map、reduce、join等,也可以使用Spark Streaming提供的一些特殊函数,如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中,如HDFS、数据库等。
- 数据输出:Spark Streaming将处理结果以DStream的形式输出,DStream是一系列连续的RDD组成的序列,表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。
DStream有状态转换操作是指在Spark Streaming中,对DStream进行一些基于历史数据或中间结果的转换,从而得到一个新的DStream。
二、使用步骤
1.引入库
ThisBuild / version := "0.1.0-SNAPSHOT"ThisBuild / scalaVersion := "2.13.11"lazy val root = (project in file(".")).settings(name := "SparkLearning",idePackagePrefix := Some("cn.lh.spark"),libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1",libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6",libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1" % "provided",libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)
2.开发代码
为了实现通过spark Streaming 监控控制台输入,需要开发两个代码:
- NetworkWordCountStatefultoMysql.scala
- StreamingSaveMySQL8.scala
NetworkWordCountStatefultoMysql.scala
package cn.lh.spark import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object NetworkWordCountStatefultoMysql { def main(args: Array[String]): Unit = { // 定义状态更新函数 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } // 设置log4j日志级别 StreamingExamples.setStreamingLogLevels() val conf: SparkConf = new SparkConf().setAppName("NetworkCountStateful").setMaster("local[2]") val scc: StreamingContext = new StreamingContext(conf, Seconds(5)) // 设置检查点,具有容错机制 scc.checkpoint("F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint") val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.137.110", 9999) val words: DStream[String] = lines.flatMap(_.split(" ")) val wordDstream: DStream[(String, Int)] = words.map(x => (x, 1)) val stateDstream: DStream[(String, Int)] = wordDstream.updateStateByKey[Int](updateFunc) // 打印出状态 stateDstream.print() // 将统计结果保存到MySQL中 stateDstream.foreachRDD(rdd =>{ val repartitionedRDD = rdd.repartition(3) repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL) }) scc.start() scc.awaitTermination() scc.stop() } }
StreamingSaveMySQL8.scala
package cn.lh.spark import java.sql.DriverManager object StreamingSaveMySQL8 { // 定义写入 MySQL 的函数 def writeToMySQL(iter: Iterator[(String,Int)]): Unit = { // 保存到MySQL val ip = "192.168.137.110" val port = "3306" val db = "sparklearning" val username = "lh" val pwd = "Lh123456!" val jdbcurl = s"jdbc:mysql://$ip:$port/$db" val conn = DriverManager.getConnection(jdbcurl, username, pwd) val statement = conn.prepareStatement("INSERT INTO wordcount (word,count) VALUES (?,?)") try { // 写入数据 iter.foreach { wc => statement.setString(1, wc._1.trim) statement.setInt(2, wc._2.toInt) statement.executeUpdate() } } catch { case e:Exception => e.printStackTrace() } finally { if(statement != null){ statement.close() } if(conn!=null){ conn.close() } } } }
运行测试
准备工作:
-
提前在mysql中新建数据表保存Spark Streaming写入的数据
-
启动nc -lk 9999
-
启动 NetworkWordCountStatefultoMysql.scala
![[Pasted image 20230804214904.png]] -
在nc端口输入字符,再分别到idea控制台和MySQL检查结果
总结
本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流,并统计字符串,实现实时统计单词出现的数量。试验成功,相对简单。
后期改善点如下:
- 通过配置文件读取mysql数据库相应的配置信息,不要写死在代码里
- 写入数据时,sql语句【插入的表信息】,可以在调用方法时,当作参数输入
- iter: Iterator[(String,Int)] 应用泛型
- 插入表时,自动保存插入时间
欢迎各位开发者一同改进代码,有问题有疑问提出来交流。谢谢!
相关文章:

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】 把DStream写入到MySQL数据库中 Spark 3.4.1MySQL 8.0.30sbt 1.9.2 文章目录 【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】前言一、背景说明二、使用步骤1.引入库2…...
Dcat Admin 入门应用指南
在现代的网络应用开发中,管理后台是不可或缺的一部分。它为开发者提供了一个方便管理和监控应用数据的界面。而 Dcat Admin 是一个强大的管理后台框架,它基于 Laravel 框架开发,提供了丰富的功能和灵活的扩展性。本文将带您深入了解 Dcat Adm…...

计算机视觉:替换万物Inpaint Anything
目录 1 Inpaint Anything介绍 1.1 为什么我们需要Inpaint Anything 1.2 Inpaint Anything工作原理 1.3 Inpaint Anything的功能是什么 1.4 Segment Anything模型(SAM) 1.5 Inpaint Anything 1.5.1 移除任何物体 1.5.2 填充任意内容 1.5.3 替换任…...

AWS——01篇(AWS入门 以及 AWS之EC2实例及简单实用)
AWS——01篇(AWS入门 以及 AWS之EC2实例及简单实用) 1. 前言2. 创建AWS账户3. EC23.1 启动 EC2 新实例3.1.1 入口3.1.2 设置名称 选择服务3.1.3 创建密钥对3.1.4 网络设置——安全组3.1.4.1 初始设置3.1.4.2 添加安全组规则(开放新端口&…...

Clickhouse 优势与部署
一、clickhouse简介 1.1clickhouse介绍 ClickHouse的背后研发团队是俄罗斯的Yandex公司,2011年在纳斯达克上市,它的核心产品是搜索引擎。我们知道,做搜索引擎的公司营收非常依赖流量和在线广告,所以做搜索引擎的公司一般会并行推…...

全球数据泄露事件增加近三倍
网络安全公司 Surfshark 的最新研究显示,2023 年第二季度共有 1.108 亿个账户遭到泄露,其中美国排名第一,几乎占 4 月至 6 月所有泄露事件的一半。 俄罗斯排名第二,西班牙排名第三,其次是法国和土耳其。 与 2023 年…...

【雕爷学编程】 MicroPython动手做(38)——控制触摸屏2
MixPY——让爱(AI)触手可及 MixPY布局 主控芯片:K210(64位双核带硬件FPU和卷积加速器的 RISC-V CPU) 显示屏:LCD_2.8寸 320*240分辨率,支持电阻触摸 摄像头:OV2640,200W像素 扬声器&#…...

钉钉微应用
钉钉微应用 在做钉钉微应用开发的时候,遇到了一些相关性的问题,特此记录下,有遇到其他问题的,欢迎一起讨论 调试工具 当我们基于钉钉开发微应用时,难免会遇到调用钉钉api后的调试,这个时候可以安装eruda…...
【 SpringSecurity】第三方认证方法级别安全
文章目录 SpringSecurity 第三方认证实现方法级别的安全 SpringSecurity 第三方认证 在登录网页时,时常有用其他账号登录的方式,它们能够让用户避免在Web站点特定的登录页上自己输入凭证信息。这样的Web站点提供了一种通过其他网站(如Facebo…...

达梦数据库在windows上的安装
前言 简单记录达梦数据库DM7在windows10上的安装过程 1 下载并安装安装包 官网登录后才能下载,建议先注册账户。 下载地址:产品下载-达梦数据 ,CPU选择x86,操作系统选择win64即可。解压安装包后,一路安装下去即可。…...

新手Vite打包工具的使用并解决yarn create vite报错
一、手动创建 1.创建vite-Demo文件夹 2.初始化 yarn init -y 3.安装vite yarn add -D vite 4.打包准备 说明:不需要在src下面创建,在vite-Demo文件夹创建 4.1index.js文件 document.body.insertAdjacentHTML("beforeend","<h1>…...

SpringMVC框架——First Day
目录 三层架构 MVC模型 SpringMVC 快速入门案例 SpringMVC的概述(了解) SpringMVC在三层架构的位置 SpringMVC的优势(了解) 创建SpringMVC的Maven项目 1.在pom.xml中添加所需要的jar包 2.在工程的web.xml中配置核心Spring…...
基于C++雪花算法工具类Snowflake -来自chatGPT
#include <iostream> #include <chrono> #include <stdexcept>class Snowflake { private:// 雪花算法的各个参数static constexpr int64_t workerIdBits 5;static constexpr int64_t datacenterIdBits 5;static constexpr int64_t sequenceBits 12;stati…...

若依打印sql
官方issue 自动生成的代码,sql日志怎么没有打印 在ruoyi-admin中的application.yml配置如下。 # 日志配置,默认 logging:level:com.ruoyi: debugorg.springframework: warn#添加配置com.ying: debug输出sql...

Camunda BPM Run下载(7.20)
官网地址: https://camunda.com/ 中文站点:http://camunda-cn.shaochenfeng.com https://downloads.camunda.cloud/release/camunda-bpm/run/7.20/https://downloads.camunda.cloud/release/camunda-bpm/run/7.20/camunda-bpm-run-7.20.0-alpha3.ziphttps://downloads.camunda…...

【Ubuntu】Ubuntu 22.04 升级 OpenSSH 9.3p2 修复CVE-2023-38408
升级原因 近日Openssh暴露出一个安全漏洞CVE-2023-38408,以下是相关资讯: 一、漏洞详情 OpenSSH是一个用于安全远程登录和文件传输的开源软件套件。它提供了一系列的客户端和服务器程序,包括 ssh、scp、sftp等,用于在网络上进行…...

【知网检索】2023年金融,贸易和商业管理国际学术会议(FTBM2023)
随着经济全球化,贸易自由化的进程加快,我国经济对外开放程度不断加深,正在加快融入世界经济一体化当中。当今世界各国竞争过程中,金融、贸易以及商业形态已成为其关键与焦点竞争内容。 2023年金融、贸易和商业管理国际学术会议(F…...

数据可视化:Matplotlib详解及实战
1 Matplotlib介绍 Matplotlib是Python中最常用的可视化工具之一,可以非常方便地创建海量类型的2D图表和一些基本的3D图表。 Matplotlib提供了一个套面向绘图对象编程的API接口,能够很轻松地实现各种图像的绘制,并且它可以配合Python GUI工具(…...
Flutter flutter_boost 集成
刚开始接触使用flutter boost路由的心得体会记录如下: Fltter项目部分: 第一步 在Flutter项目的 pubspec.yaml文件中添加如下信息: flutter_boost:git:url: https://github.com/alibaba/flutter_boost.gitref: 4.3.0之后在flutter工程下运…...
Stable Diffusion中人物生成相关的negative prompts
下面是常用的negative prompt,在使用stable Diffusion webui等工具生成时可以填入。 bad anatomy, bad proportions, blurry, cloned face, deformed, disfigured, duplicate, extra arms, extra fingers, extra limbs, extra legs, fused fingers, gross proporti…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...

css实现圆环展示百分比,根据值动态展示所占比例
代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...

visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...

视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)
前言: 最近在做行为检测相关的模型,用的是时空图卷积网络(STGCN),但原有kinetic-400数据集数据质量较低,需要进行细粒度的标注,同时粗略搜了下已有开源工具基本都集中于图像分割这块,…...

基于SpringBoot在线拍卖系统的设计和实现
摘 要 随着社会的发展,社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统,主要的模块包括管理员;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...
MySQL JOIN 表过多的优化思路
当 MySQL 查询涉及大量表 JOIN 时,性能会显著下降。以下是优化思路和简易实现方法: 一、核心优化思路 减少 JOIN 数量 数据冗余:添加必要的冗余字段(如订单表直接存储用户名)合并表:将频繁关联的小表合并成…...
Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换
目录 关键点 技术实现1 技术实现2 摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车…...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?
在工业自动化持续演进的今天,通信网络的角色正变得愈发关键。 2025年6月6日,为期三天的华南国际工业博览会在深圳国际会展中心(宝安)圆满落幕。作为国内工业通信领域的技术型企业,光路科技(Fiberroad&…...
区块链技术概述
区块链技术是一种去中心化、分布式账本技术,通过密码学、共识机制和智能合约等核心组件,实现数据不可篡改、透明可追溯的系统。 一、核心技术 1. 去中心化 特点:数据存储在网络中的多个节点(计算机),而非…...