当前位置: 首页 > news >正文

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

把DStream写入到MySQL数据库中

  • Spark 3.4.1
  • MySQL 8.0.30
  • sbt 1.9.2

文章目录

  • 【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
  • 前言
  • 一、背景说明
  • 二、使用步骤
    • 1.引入库
    • 2.开发代码
    • 运行测试
  • 总结


前言

需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL


提示:本项目通过sbt控制依赖

一、背景说明

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中

Spark Streaming是一个基于Spark的实时计算框架,它可以从多种数据源消费数据,并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤:

  • 数据接收:Spark Streaming可以从各种输入源接收数据,如Kafka、Flume、Twitter、Kinesis等,然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器(Receiver)负责接收数据,并将数据存储在内存或磁盘中。
  • 数据划分:Spark Streaming将连续的数据流划分为一系列小批量(Batch)的数据,每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔(Batch Interval),可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDD,RDD是Spark的核心数据结构,表示一个不可变的分布式数据集。
  • 数据处理:Spark Streaming对每个批次的RDD进行转换和输出操作,实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数,如map、reduce、join等,也可以使用Spark Streaming提供的一些特殊函数,如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中,如HDFS、数据库等。
  • 数据输出:Spark Streaming将处理结果以DStream的形式输出,DStream是一系列连续的RDD组成的序列,表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。

DStream有状态转换操作是指在Spark Streaming中,对DStream进行一些基于历史数据或中间结果的转换,从而得到一个新的DStream。
在这里插入图片描述

二、使用步骤

1.引入库

ThisBuild / version := "0.1.0-SNAPSHOT"ThisBuild / scalaVersion := "2.13.11"lazy val root = (project in file(".")).settings(name := "SparkLearning",idePackagePrefix := Some("cn.lh.spark"),libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1",libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6",libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1" % "provided",libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)

2.开发代码

为了实现通过spark Streaming 监控控制台输入,需要开发两个代码:

  • NetworkWordCountStatefultoMysql.scala
  • StreamingSaveMySQL8.scala

NetworkWordCountStatefultoMysql.scala

package cn.lh.spark  import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}  object NetworkWordCountStatefultoMysql {  def main(args: Array[String]): Unit = {  //    定义状态更新函数  val updateFunc = (values: Seq[Int], state: Option[Int]) => {  val currentCount = values.foldLeft(0)(_ + _)  val previousCount = state.getOrElse(0)  Some(currentCount + previousCount)  }  //    设置log4j日志级别  StreamingExamples.setStreamingLogLevels()  val conf: SparkConf = new SparkConf().setAppName("NetworkCountStateful").setMaster("local[2]")  val scc: StreamingContext = new StreamingContext(conf, Seconds(5))  //    设置检查点,具有容错机制  scc.checkpoint("F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint")  val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.137.110", 9999)  val words: DStream[String] = lines.flatMap(_.split(" "))  val wordDstream: DStream[(String, Int)] = words.map(x => (x, 1))  val stateDstream: DStream[(String, Int)] = wordDstream.updateStateByKey[Int](updateFunc)  // 打印出状态  stateDstream.print()  // 将统计结果保存到MySQL中  stateDstream.foreachRDD(rdd =>{  val repartitionedRDD = rdd.repartition(3)  repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL)  })  scc.start()  scc.awaitTermination()  scc.stop()  }  }

StreamingSaveMySQL8.scala

package cn.lh.spark  import java.sql.DriverManager  object StreamingSaveMySQL8 {  // 定义写入 MySQL 的函数  def writeToMySQL(iter: Iterator[(String,Int)]): Unit = {  // 保存到MySQL  val ip = "192.168.137.110"  val port = "3306"  val db = "sparklearning"  val username = "lh"  val pwd = "Lh123456!"  val jdbcurl = s"jdbc:mysql://$ip:$port/$db"  val conn = DriverManager.getConnection(jdbcurl, username, pwd)  val statement = conn.prepareStatement("INSERT INTO wordcount (word,count) VALUES (?,?)")  try {  // 写入数据  iter.foreach { wc =>  statement.setString(1, wc._1.trim)  statement.setInt(2, wc._2.toInt)  statement.executeUpdate()  }  } catch {  case e:Exception => e.printStackTrace()  } finally {  if(statement != null){  statement.close()  }  if(conn!=null){  conn.close()  }  }  }  }

运行测试

准备工作:

  1. 提前在mysql中新建数据表保存Spark Streaming写入的数据
    在这里插入图片描述

  2. 启动nc -lk 9999
    在这里插入图片描述

  3. 启动 NetworkWordCountStatefultoMysql.scala
    ![[Pasted image 20230804214904.png]]在这里插入图片描述

  4. 在nc端口输入字符,再分别到idea控制台和MySQL检查结果

在这里插入图片描述


总结

本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流,并统计字符串,实现实时统计单词出现的数量。试验成功,相对简单。
后期改善点如下:

  • 通过配置文件读取mysql数据库相应的配置信息,不要写死在代码里
  • 写入数据时,sql语句【插入的表信息】,可以在调用方法时,当作参数输入
  • iter: Iterator[(String,Int)] 应用泛型
  • 插入表时,自动保存插入时间

欢迎各位开发者一同改进代码,有问题有疑问提出来交流。谢谢!

相关文章:

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】 把DStream写入到MySQL数据库中 Spark 3.4.1MySQL 8.0.30sbt 1.9.2 文章目录 【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】前言一、背景说明二、使用步骤1.引入库2…...

Dcat Admin 入门应用指南

在现代的网络应用开发中,管理后台是不可或缺的一部分。它为开发者提供了一个方便管理和监控应用数据的界面。而 Dcat Admin 是一个强大的管理后台框架,它基于 Laravel 框架开发,提供了丰富的功能和灵活的扩展性。本文将带您深入了解 Dcat Adm…...

计算机视觉:替换万物Inpaint Anything

目录 1 Inpaint Anything介绍 1.1 为什么我们需要Inpaint Anything 1.2 Inpaint Anything工作原理 1.3 Inpaint Anything的功能是什么 1.4 Segment Anything模型(SAM) 1.5 Inpaint Anything 1.5.1 移除任何物体 1.5.2 填充任意内容 1.5.3 替换任…...

AWS——01篇(AWS入门 以及 AWS之EC2实例及简单实用)

AWS——01篇(AWS入门 以及 AWS之EC2实例及简单实用) 1. 前言2. 创建AWS账户3. EC23.1 启动 EC2 新实例3.1.1 入口3.1.2 设置名称 选择服务3.1.3 创建密钥对3.1.4 网络设置——安全组3.1.4.1 初始设置3.1.4.2 添加安全组规则(开放新端口&…...

Clickhouse 优势与部署

一、clickhouse简介 1.1clickhouse介绍 ClickHouse的背后研发团队是俄罗斯的Yandex公司,2011年在纳斯达克上市,它的核心产品是搜索引擎。我们知道,做搜索引擎的公司营收非常依赖流量和在线广告,所以做搜索引擎的公司一般会并行推…...

全球数据泄露事件增加近三倍

网络安全公司 Surfshark 的最新研究显示,2023 年第二季度共有 1.108 亿个账户遭到泄露,其中美国排名第一,几乎占 4 月至 6 月所有泄露事件的一半。 俄罗斯排名第二,西班牙排名第三,其次是法国和土耳其。 与 2023 年…...

【雕爷学编程】 MicroPython动手做(38)——控制触摸屏2

MixPY——让爱(AI)触手可及 MixPY布局 主控芯片:K210(64位双核带硬件FPU和卷积加速器的 RISC-V CPU) 显示屏:LCD_2.8寸 320*240分辨率,支持电阻触摸 摄像头:OV2640,200W像素 扬声器&#…...

钉钉微应用

钉钉微应用 在做钉钉微应用开发的时候,遇到了一些相关性的问题,特此记录下,有遇到其他问题的,欢迎一起讨论 调试工具 当我们基于钉钉开发微应用时,难免会遇到调用钉钉api后的调试,这个时候可以安装eruda…...

【 SpringSecurity】第三方认证方法级别安全

文章目录 SpringSecurity 第三方认证实现方法级别的安全 SpringSecurity 第三方认证 在登录网页时,时常有用其他账号登录的方式,它们能够让用户避免在Web站点特定的登录页上自己输入凭证信息。这样的Web站点提供了一种通过其他网站(如Facebo…...

达梦数据库在windows上的安装

前言 简单记录达梦数据库DM7在windows10上的安装过程 1 下载并安装安装包 官网登录后才能下载,建议先注册账户。 下载地址:产品下载-达梦数据 ,CPU选择x86,操作系统选择win64即可。解压安装包后,一路安装下去即可。…...

新手Vite打包工具的使用并解决yarn create vite报错

一、手动创建 1.创建vite-Demo文件夹 2.初始化 yarn init -y 3.安装vite yarn add -D vite 4.打包准备 说明&#xff1a;不需要在src下面创建&#xff0c;在vite-Demo文件夹创建 4.1index.js文件 document.body.insertAdjacentHTML("beforeend","<h1>…...

SpringMVC框架——First Day

目录 三层架构 MVC模型 SpringMVC 快速入门案例 SpringMVC的概述&#xff08;了解&#xff09; SpringMVC在三层架构的位置 SpringMVC的优势&#xff08;了解&#xff09; 创建SpringMVC的Maven项目 1.在pom.xml中添加所需要的jar包 2.在工程的web.xml中配置核心Spring…...

基于C++雪花算法工具类Snowflake -来自chatGPT

#include <iostream> #include <chrono> #include <stdexcept>class Snowflake { private:// 雪花算法的各个参数static constexpr int64_t workerIdBits 5;static constexpr int64_t datacenterIdBits 5;static constexpr int64_t sequenceBits 12;stati…...

若依打印sql

官方issue 自动生成的代码&#xff0c;sql日志怎么没有打印 在ruoyi-admin中的application.yml配置如下。 # 日志配置&#xff0c;默认 logging:level:com.ruoyi: debugorg.springframework: warn#添加配置com.ying: debug输出sql...

Camunda BPM Run下载(7.20)

官网地址: https://camunda.com/ 中文站点:http://camunda-cn.shaochenfeng.com https://downloads.camunda.cloud/release/camunda-bpm/run/7.20/https://downloads.camunda.cloud/release/camunda-bpm/run/7.20/camunda-bpm-run-7.20.0-alpha3.ziphttps://downloads.camunda…...

【Ubuntu】Ubuntu 22.04 升级 OpenSSH 9.3p2 修复CVE-2023-38408

升级原因 近日Openssh暴露出一个安全漏洞CVE-2023-38408&#xff0c;以下是相关资讯&#xff1a; 一、漏洞详情 OpenSSH是一个用于安全远程登录和文件传输的开源软件套件。它提供了一系列的客户端和服务器程序&#xff0c;包括 ssh、scp、sftp等&#xff0c;用于在网络上进行…...

【知网检索】2023年金融,贸易和商业管理国际学术会议(FTBM2023)

随着经济全球化&#xff0c;贸易自由化的进程加快&#xff0c;我国经济对外开放程度不断加深&#xff0c;正在加快融入世界经济一体化当中。当今世界各国竞争过程中&#xff0c;金融、贸易以及商业形态已成为其关键与焦点竞争内容。 2023年金融、贸易和商业管理国际学术会议(F…...

数据可视化:Matplotlib详解及实战

1 Matplotlib介绍 Matplotlib是Python中最常用的可视化工具之一,可以非常方便地创建海量类型的2D图表和一些基本的3D图表。 Matplotlib提供了一个套面向绘图对象编程的API接口&#xff0c;能够很轻松地实现各种图像的绘制&#xff0c;并且它可以配合Python GUI工具&#xff08;…...

Flutter flutter_boost 集成

刚开始接触使用flutter boost路由的心得体会记录如下&#xff1a; Fltter项目部分&#xff1a; 第一步 在Flutter项目的 pubspec.yaml文件中添加如下信息&#xff1a; flutter_boost:git:url: https://github.com/alibaba/flutter_boost.gitref: 4.3.0之后在flutter工程下运…...

Stable Diffusion中人物生成相关的negative prompts

下面是常用的negative prompt&#xff0c;在使用stable Diffusion webui等工具生成时可以填入。 bad anatomy, bad proportions, blurry, cloned face, deformed, disfigured, duplicate, extra arms, extra fingers, extra limbs, extra legs, fused fingers, gross proporti…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件

今天呢&#xff0c;博主的学习进度也是步入了Java Mybatis 框架&#xff0c;目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学&#xff0c;希望能对大家有所帮助&#xff0c;也特别欢迎大家指点不足之处&#xff0c;小生很乐意接受正确的建议&…...

基于Docker Compose部署Java微服务项目

一. 创建根项目 根项目&#xff08;父项目&#xff09;主要用于依赖管理 一些需要注意的点&#xff1a; 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件&#xff0c;否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

从零实现STL哈希容器:unordered_map/unordered_set封装详解

本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说&#xff0c;直接开始吧&#xff01; 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

C++:多态机制详解

目录 一. 多态的概念 1.静态多态&#xff08;编译时多态&#xff09; 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1&#xff09;.协变 2&#xff09;.析构函数的重写 5.override 和 final关键字 1&#…...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...

基于PHP的连锁酒店管理系统

有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发&#xff0c;数据库mysql&#xff0c;前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...

探索Selenium:自动化测试的神奇钥匙

目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...