当前位置: 首页 > news >正文

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

把DStream写入到MySQL数据库中

  • Spark 3.4.1
  • MySQL 8.0.30
  • sbt 1.9.2

文章目录

  • 【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
  • 前言
  • 一、背景说明
  • 二、使用步骤
    • 1.引入库
    • 2.开发代码
    • 运行测试
  • 总结


前言

需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL


提示:本项目通过sbt控制依赖

一、背景说明

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中

Spark Streaming是一个基于Spark的实时计算框架,它可以从多种数据源消费数据,并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤:

  • 数据接收:Spark Streaming可以从各种输入源接收数据,如Kafka、Flume、Twitter、Kinesis等,然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器(Receiver)负责接收数据,并将数据存储在内存或磁盘中。
  • 数据划分:Spark Streaming将连续的数据流划分为一系列小批量(Batch)的数据,每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔(Batch Interval),可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDD,RDD是Spark的核心数据结构,表示一个不可变的分布式数据集。
  • 数据处理:Spark Streaming对每个批次的RDD进行转换和输出操作,实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数,如map、reduce、join等,也可以使用Spark Streaming提供的一些特殊函数,如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中,如HDFS、数据库等。
  • 数据输出:Spark Streaming将处理结果以DStream的形式输出,DStream是一系列连续的RDD组成的序列,表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。

DStream有状态转换操作是指在Spark Streaming中,对DStream进行一些基于历史数据或中间结果的转换,从而得到一个新的DStream。
在这里插入图片描述

二、使用步骤

1.引入库

ThisBuild / version := "0.1.0-SNAPSHOT"ThisBuild / scalaVersion := "2.13.11"lazy val root = (project in file(".")).settings(name := "SparkLearning",idePackagePrefix := Some("cn.lh.spark"),libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1",libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6",libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1" % "provided",libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)

2.开发代码

为了实现通过spark Streaming 监控控制台输入,需要开发两个代码:

  • NetworkWordCountStatefultoMysql.scala
  • StreamingSaveMySQL8.scala

NetworkWordCountStatefultoMysql.scala

package cn.lh.spark  import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}  object NetworkWordCountStatefultoMysql {  def main(args: Array[String]): Unit = {  //    定义状态更新函数  val updateFunc = (values: Seq[Int], state: Option[Int]) => {  val currentCount = values.foldLeft(0)(_ + _)  val previousCount = state.getOrElse(0)  Some(currentCount + previousCount)  }  //    设置log4j日志级别  StreamingExamples.setStreamingLogLevels()  val conf: SparkConf = new SparkConf().setAppName("NetworkCountStateful").setMaster("local[2]")  val scc: StreamingContext = new StreamingContext(conf, Seconds(5))  //    设置检查点,具有容错机制  scc.checkpoint("F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint")  val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.137.110", 9999)  val words: DStream[String] = lines.flatMap(_.split(" "))  val wordDstream: DStream[(String, Int)] = words.map(x => (x, 1))  val stateDstream: DStream[(String, Int)] = wordDstream.updateStateByKey[Int](updateFunc)  // 打印出状态  stateDstream.print()  // 将统计结果保存到MySQL中  stateDstream.foreachRDD(rdd =>{  val repartitionedRDD = rdd.repartition(3)  repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL)  })  scc.start()  scc.awaitTermination()  scc.stop()  }  }

StreamingSaveMySQL8.scala

package cn.lh.spark  import java.sql.DriverManager  object StreamingSaveMySQL8 {  // 定义写入 MySQL 的函数  def writeToMySQL(iter: Iterator[(String,Int)]): Unit = {  // 保存到MySQL  val ip = "192.168.137.110"  val port = "3306"  val db = "sparklearning"  val username = "lh"  val pwd = "Lh123456!"  val jdbcurl = s"jdbc:mysql://$ip:$port/$db"  val conn = DriverManager.getConnection(jdbcurl, username, pwd)  val statement = conn.prepareStatement("INSERT INTO wordcount (word,count) VALUES (?,?)")  try {  // 写入数据  iter.foreach { wc =>  statement.setString(1, wc._1.trim)  statement.setInt(2, wc._2.toInt)  statement.executeUpdate()  }  } catch {  case e:Exception => e.printStackTrace()  } finally {  if(statement != null){  statement.close()  }  if(conn!=null){  conn.close()  }  }  }  }

运行测试

准备工作:

  1. 提前在mysql中新建数据表保存Spark Streaming写入的数据
    在这里插入图片描述

  2. 启动nc -lk 9999
    在这里插入图片描述

  3. 启动 NetworkWordCountStatefultoMysql.scala
    ![[Pasted image 20230804214904.png]]在这里插入图片描述

  4. 在nc端口输入字符,再分别到idea控制台和MySQL检查结果

在这里插入图片描述


总结

本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流,并统计字符串,实现实时统计单词出现的数量。试验成功,相对简单。
后期改善点如下:

  • 通过配置文件读取mysql数据库相应的配置信息,不要写死在代码里
  • 写入数据时,sql语句【插入的表信息】,可以在调用方法时,当作参数输入
  • iter: Iterator[(String,Int)] 应用泛型
  • 插入表时,自动保存插入时间

欢迎各位开发者一同改进代码,有问题有疑问提出来交流。谢谢!

相关文章:

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】 把DStream写入到MySQL数据库中 Spark 3.4.1MySQL 8.0.30sbt 1.9.2 文章目录 【IDEASpark Streaming 3.4.1Dstream监控套接字流统计WordCount保存至MySQL8】前言一、背景说明二、使用步骤1.引入库2…...

Dcat Admin 入门应用指南

在现代的网络应用开发中,管理后台是不可或缺的一部分。它为开发者提供了一个方便管理和监控应用数据的界面。而 Dcat Admin 是一个强大的管理后台框架,它基于 Laravel 框架开发,提供了丰富的功能和灵活的扩展性。本文将带您深入了解 Dcat Adm…...

计算机视觉:替换万物Inpaint Anything

目录 1 Inpaint Anything介绍 1.1 为什么我们需要Inpaint Anything 1.2 Inpaint Anything工作原理 1.3 Inpaint Anything的功能是什么 1.4 Segment Anything模型(SAM) 1.5 Inpaint Anything 1.5.1 移除任何物体 1.5.2 填充任意内容 1.5.3 替换任…...

AWS——01篇(AWS入门 以及 AWS之EC2实例及简单实用)

AWS——01篇(AWS入门 以及 AWS之EC2实例及简单实用) 1. 前言2. 创建AWS账户3. EC23.1 启动 EC2 新实例3.1.1 入口3.1.2 设置名称 选择服务3.1.3 创建密钥对3.1.4 网络设置——安全组3.1.4.1 初始设置3.1.4.2 添加安全组规则(开放新端口&…...

Clickhouse 优势与部署

一、clickhouse简介 1.1clickhouse介绍 ClickHouse的背后研发团队是俄罗斯的Yandex公司,2011年在纳斯达克上市,它的核心产品是搜索引擎。我们知道,做搜索引擎的公司营收非常依赖流量和在线广告,所以做搜索引擎的公司一般会并行推…...

全球数据泄露事件增加近三倍

网络安全公司 Surfshark 的最新研究显示,2023 年第二季度共有 1.108 亿个账户遭到泄露,其中美国排名第一,几乎占 4 月至 6 月所有泄露事件的一半。 俄罗斯排名第二,西班牙排名第三,其次是法国和土耳其。 与 2023 年…...

【雕爷学编程】 MicroPython动手做(38)——控制触摸屏2

MixPY——让爱(AI)触手可及 MixPY布局 主控芯片:K210(64位双核带硬件FPU和卷积加速器的 RISC-V CPU) 显示屏:LCD_2.8寸 320*240分辨率,支持电阻触摸 摄像头:OV2640,200W像素 扬声器&#…...

钉钉微应用

钉钉微应用 在做钉钉微应用开发的时候,遇到了一些相关性的问题,特此记录下,有遇到其他问题的,欢迎一起讨论 调试工具 当我们基于钉钉开发微应用时,难免会遇到调用钉钉api后的调试,这个时候可以安装eruda…...

【 SpringSecurity】第三方认证方法级别安全

文章目录 SpringSecurity 第三方认证实现方法级别的安全 SpringSecurity 第三方认证 在登录网页时,时常有用其他账号登录的方式,它们能够让用户避免在Web站点特定的登录页上自己输入凭证信息。这样的Web站点提供了一种通过其他网站(如Facebo…...

达梦数据库在windows上的安装

前言 简单记录达梦数据库DM7在windows10上的安装过程 1 下载并安装安装包 官网登录后才能下载,建议先注册账户。 下载地址:产品下载-达梦数据 ,CPU选择x86,操作系统选择win64即可。解压安装包后,一路安装下去即可。…...

新手Vite打包工具的使用并解决yarn create vite报错

一、手动创建 1.创建vite-Demo文件夹 2.初始化 yarn init -y 3.安装vite yarn add -D vite 4.打包准备 说明&#xff1a;不需要在src下面创建&#xff0c;在vite-Demo文件夹创建 4.1index.js文件 document.body.insertAdjacentHTML("beforeend","<h1>…...

SpringMVC框架——First Day

目录 三层架构 MVC模型 SpringMVC 快速入门案例 SpringMVC的概述&#xff08;了解&#xff09; SpringMVC在三层架构的位置 SpringMVC的优势&#xff08;了解&#xff09; 创建SpringMVC的Maven项目 1.在pom.xml中添加所需要的jar包 2.在工程的web.xml中配置核心Spring…...

基于C++雪花算法工具类Snowflake -来自chatGPT

#include <iostream> #include <chrono> #include <stdexcept>class Snowflake { private:// 雪花算法的各个参数static constexpr int64_t workerIdBits 5;static constexpr int64_t datacenterIdBits 5;static constexpr int64_t sequenceBits 12;stati…...

若依打印sql

官方issue 自动生成的代码&#xff0c;sql日志怎么没有打印 在ruoyi-admin中的application.yml配置如下。 # 日志配置&#xff0c;默认 logging:level:com.ruoyi: debugorg.springframework: warn#添加配置com.ying: debug输出sql...

Camunda BPM Run下载(7.20)

官网地址: https://camunda.com/ 中文站点:http://camunda-cn.shaochenfeng.com https://downloads.camunda.cloud/release/camunda-bpm/run/7.20/https://downloads.camunda.cloud/release/camunda-bpm/run/7.20/camunda-bpm-run-7.20.0-alpha3.ziphttps://downloads.camunda…...

【Ubuntu】Ubuntu 22.04 升级 OpenSSH 9.3p2 修复CVE-2023-38408

升级原因 近日Openssh暴露出一个安全漏洞CVE-2023-38408&#xff0c;以下是相关资讯&#xff1a; 一、漏洞详情 OpenSSH是一个用于安全远程登录和文件传输的开源软件套件。它提供了一系列的客户端和服务器程序&#xff0c;包括 ssh、scp、sftp等&#xff0c;用于在网络上进行…...

【知网检索】2023年金融,贸易和商业管理国际学术会议(FTBM2023)

随着经济全球化&#xff0c;贸易自由化的进程加快&#xff0c;我国经济对外开放程度不断加深&#xff0c;正在加快融入世界经济一体化当中。当今世界各国竞争过程中&#xff0c;金融、贸易以及商业形态已成为其关键与焦点竞争内容。 2023年金融、贸易和商业管理国际学术会议(F…...

数据可视化:Matplotlib详解及实战

1 Matplotlib介绍 Matplotlib是Python中最常用的可视化工具之一,可以非常方便地创建海量类型的2D图表和一些基本的3D图表。 Matplotlib提供了一个套面向绘图对象编程的API接口&#xff0c;能够很轻松地实现各种图像的绘制&#xff0c;并且它可以配合Python GUI工具&#xff08;…...

Flutter flutter_boost 集成

刚开始接触使用flutter boost路由的心得体会记录如下&#xff1a; Fltter项目部分&#xff1a; 第一步 在Flutter项目的 pubspec.yaml文件中添加如下信息&#xff1a; flutter_boost:git:url: https://github.com/alibaba/flutter_boost.gitref: 4.3.0之后在flutter工程下运…...

Stable Diffusion中人物生成相关的negative prompts

下面是常用的negative prompt&#xff0c;在使用stable Diffusion webui等工具生成时可以填入。 bad anatomy, bad proportions, blurry, cloned face, deformed, disfigured, duplicate, extra arms, extra fingers, extra limbs, extra legs, fused fingers, gross proporti…...

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…...

Java如何权衡是使用无序的数组还是有序的数组

在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...

Qt Widget类解析与代码注释

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码&#xff0c;写上注释 当然可以&#xff01;这段代码是 Qt …...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?

uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件&#xff0c;用于在原生应用中加载 HTML 页面&#xff1a; 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

代码随想录刷题day30

1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币&#xff0c;另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额&#xff0c;返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面&#xff0c;gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress&#xff0c;说明目标所使用的cms是wordpress&#xff0c;访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

Redis:现代应用开发的高效内存数据存储利器

一、Redis的起源与发展 Redis最初由意大利程序员Salvatore Sanfilippo在2009年开发&#xff0c;其初衷是为了满足他自己的一个项目需求&#xff0c;即需要一个高性能的键值存储系统来解决传统数据库在高并发场景下的性能瓶颈。随着项目的开源&#xff0c;Redis凭借其简单易用、…...