当前位置: 首页 > news >正文

Spark2.x 入门:DStream 输出操作

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。

这里以《Spark2.1.0入门:DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础进行修改。

把DStream输出到文本文件中

NetworkWordCountStateful.scala

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}StreamingExamples.setStreamingLogLevels()  //设置log4j日志级别val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")val sc = new StreamingContext(conf, Seconds(5))sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制val lines = sc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句,把DStream保存到文本文件中stateDstream.saveAsTextFiles("file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt")sc.start()sc.awaitTermination()}
}

把DStream写入到MySQL数据库中

mysql> use spark
mysql> create table wordcount (word char(20), count int(4));
mysql> select * from wordcount
//这个时候wordcount表是空的,没有任何记录

NetworkWordCountStateful.scala

import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc = (values: Seq[Int], state: Option[Int]) => {val currentCount = values.foldLeft(0)(_ + _)val previousCount = state.getOrElse(0)Some(currentCount + previousCount)}val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")val sc = new StreamingContext(conf, Seconds(5))sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //设置检查点,检查点具有容错机制val lines = sc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" "))val wordDstream = words.map(x => (x, 1))val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句,把DStream保存到MySQL数据库中     stateDstream.foreachRDD(rdd => {//内部函数def func(records: Iterator[(String,Int)]) {var conn: Connection = nullvar stmt: PreparedStatement = nulltry {val url = "jdbc:mysql://localhost:3306/spark"val user = "root"val password = "hadoop"  //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码conn = DriverManager.getConnection(url, user, password)records.foreach(p => {val sql = "insert into wordcount(word,count) values (?,?)"stmt = conn.prepareStatement(sql);stmt.setString(1, p._1.trim)stmt.setInt(2,p._2.toInt)stmt.executeUpdate()})} catch {case e: Exception => e.printStackTrace()} finally {if (stmt != null) {stmt.close()}if (conn != null) {conn.close()}}}val repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(func)})sc.start()sc.awaitTermination()}
}

对于stateDstream,为了把它保存到MySQL数据库中,我们采用了如下的形式:

stateDstream.foreachRDD(function)

其中,function就是一个RDD[T]=>Unit类型的函数,对于本程序而言,就是RDD[(String,Int)]=>Unit类型的函数,也就是说,stateDstream中的每个RDD都是RDD[(String,Int)]类型(想象一下,统计结果的形式是(“hadoop”,3))。这样,对stateDstream中的每个RDD都会执行function中的操作(即把该RDD保存到MySQL的操作)。

下面看function的处理逻辑,在function部分,函数体要执行的处理逻辑实际上是下面的形式:

 def func(records: Iterator[(String,Int)]){……}val repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(func) 

也就是说,这里定义了一个内部函数func,它的功能是,接收records,然后把records保存到MySQL中。到这里,你可能会有疑问?为什么不是把stateDstream中的每个RDD直接拿去保存到MySQL中,还要调用rdd.repartition(3)对这些RDD重新设置分区数为3呢?这是因为,每次保存RDD到MySQL中,都需要启动数据库连接,如果RDD分区数量太大,那么就会带来多次数据库连接开销,为了减少开销,就有必要把RDD的分区数量控制在较小的范围内,所以,这里就把RDD的分区数量重新设置为3。然后,对于每个RDD分区,就调用repartitionedRDD.foreachPartition(func),把每个分区的数据通过func保存到MySQL中,这时,传递给func的输入参数就是Iterator[(String,Int)]类型的records。如果你不好理解下面这种调用形式:

repartitionedRDD.foreachPartition(func) //这种形式func没有带任何参数,可能不太好理解,不是那么直观

实际上,这句语句和下面的语句是等价的,下面的语句形式你可能会更好理解:

repartitionedRDD.foreachPartition(records => func(records)) 

上面这种等价的形式比较直观,为func()函数传入了一个records参数,这就正好和 def func(records: Iterator[(String,Int)])定义对应起来了,方便理解。

相关文章:

Spark2.x 入门:DStream 输出操作

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中。 这里以《Spark2.1.0入门:DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础…...

Python爬虫——简单网页抓取(实战案例)小白篇

Python 爬虫是一种强大的工具,用于从网页中提取数据。这里,我将通过一个简单的实战案例来展示如何使用 Python 和一些流行的库(如 requests 和 BeautifulSoup)来抓取网页数据。 实战案例:抓取一个新闻网站的头条新闻标…...

linux,ubuntu,使用ollama本地部署大模型llama3,模型通用,简易快速安装

文章目录 前言安装ollama启动ollama运行llama3模型查看ollama列表删除模型通过代码进行调用REST API 前言 在拥有了一条4090显卡后,那冗余的性能让你不得不去想着办法整花活,于是就想着部署个llama3,于是发现了ollama这个新大陆,…...

JS中的encodeURIComponent函数示例

JavaScript中的encodeURIComponent函数用于对字符串进行URL编码。它将字符串中的特殊字符转换为相应的编码形式,以确保字符串可以安全地嵌入到URL中。 使用encodeURIComponent函数时,它会将除了字母、数字、-、_、.、~以外的所有字符都进行编码。编码后…...

8.20 pre day bug

pre-bug1 分号省略 这些语句的分隔规则会导致一些意想不到的情形,如以下的一个示例; let m n f(bc).toString()但该语句最终会被解析为: let m n f(ab).toString();returntrue一定会被解析成 return;true;pre-bug2 Math.random()与Mat…...

位运算专题

分享丨【题单】位运算(基础/性质/拆位/试填/恒等式/思维) - 力扣(LeetCode) Leetcode 3133. 数组最后一个元素的最小值 我的答案与思路: class Solution { public: // 4 --> (100)2 7 --> (0111)2 // 5 --&g…...

HaProxy学习 —300K的TCP Socket并发连接实现(翻译)

HaProxy学习 —300K的TCP Socket并发连接实现(翻译) 1 原文链接2 原文翻译2.1 调整Linux系统参数2.2 调整HAProxy 1 原文链接 Use HAProxy to load balance 300k concurrent tcp socket connections: Port Exhaustion, Keep-alive and others&#xff0…...

92.WEB渗透测试-信息收集-Google语法(6)

免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动! 内容参考于: 易锦网校会员专享课 上一个内容:91.WEB渗透测试-信息收集-Google语法(5) 监控的漏洞也有很多 打…...

[数据集][目标检测]木材缺陷检测数据集VOC+YOLO格式2383张10类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):2383 标注数量(xml文件个数):2383 标注数量(txt文件个数):2383 标注…...

【启明智显分享】智能音箱AI大模型一站式解决方案重塑人机交互体验,2个月高效落地

2010年左右,智能系统接入音箱市场,智能音箱行业在中国市场兴起。但大潮激荡,阿里、小米、百度三大巨头凭借自身强大的资本、技术、粉丝群强势入局,形成三足鼎立态势。经过几年快速普及,智能音箱整体渗透率极高&#xf…...

逻辑与集合论基础及其在编程中的应用

目录 第一篇文章:逻辑与集合论基础及其在编程中的深度应用 引言 命题逻辑与谓词逻辑在编程中的深入应用 集合论及其在编程中的深度运用 函数的概念及其与集合的结合 总结与应用 第一篇文章:逻辑与集合论基础及其在编程中的深度应用 引言 逻辑与集…...

【无标题】为什么 pg_rewind 在 PostgreSQL 中很重要?

文章目录 pg_rewind 的工作原理使用 pg_rewind 的要求Basic Usage of pg_rewind重要注意事项:为什么 pg_rewind 需要干净关闭?无法进行干净关闭的情况处理不正常关机结论 pg_rewind 是 PostgreSQL 中的一个实用程序,用于将一个数据库集群与另一个数据库集…...

hostapd生成beacon_ie

配置文件 /data/vendor/wifi/hostapd/hostapd_wlan0.conf 配置参数 AP启动过程:1.上层配置一些参数并根据参数生成配置文件 2.init的时候设置默认参数并加载配置文件上的参数(如果重复,以配置文件上的设置优先) 相关函数及结构…...

leetcode349:两个数组的交集

两个数组的交集 给定两个数组 nums1 和 nums2 &#xff0c;返回 它们的 交集 。输出结果中的每个元素一定是 唯一 的。我们可以 不考虑输出结果的顺序 。 public int[] intersection(int[] nums1, int[] nums2) {ArrayList<Integer> list new ArrayList<>();Has…...

Metasploit漏洞利用系列(八):MSF渗透测试 - PHPCGI漏洞利用实战

在本系列的第八篇文章中&#xff0c;我们将深入探索如何利用Metasploit Framework (MSF) 来针对PHPCGI (PHP Common Gateway Interface) 的漏洞进行渗透测试。PHPCGI作为一种将Web服务器与PHP脚本交互的方式&#xff0c;其不恰当的配置或老旧版本中可能存在的漏洞常被攻击者利用…...

基于python的主观题自动阅卷系统设计与实现

博主介绍&#xff1a; 大家好&#xff0c;本人精通Java、Python、C#、C、C编程语言&#xff0c;同时也熟练掌握微信小程序、Php和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我有丰富的成品Java、Python、C#毕设项目经验&#xff0c;能够为学生提供各类…...

计算机毕业设计仪器设备管理系统-折旧-报废-转移-借出-归还

本文主要阐述如何采用利用网络数据库技术&#xff0c;在信息管理系统中合理的进行管理。在全面解析系统的设计理念以及设计手段&#xff0c;将系统进程中的所需的工具以及技术进行综合的设计&#xff0c;重点解析信息管理情况以及自动化管理进程&#xff0c;主要包括&#xff1…...

DAY37

零钱兑换 II public int change(int amount, int[] coins) {int []dpnew int[amount1];dp[0]1;for(int i0;i<coins.length;i){for(int jcoins[i];j<amount;j){dp[j]dp[j-coins[i]];}}return dp[amount];}组合总和 Ⅳ public int combinationSum4(int[] nums, int target)…...

将iso格式的镜像文件转化成云平台能安装的镜像格式(raw/vhd/QCOW2/VMDK )亲测--图文详解

1.首先,你将你的iso的文件按照正常的流程和需求安装到你的虚拟机中,我这里使用的是vmware,安装完成之后,关机。再次点开你安装好的那台虚拟机的窗口,如下图 选中要导出的镜像,镜像需要关机 2.点击工具栏的文件------选择 导出 整个工程到 ovf 格式—这里你可以选择你要导…...

Numba加速计算(CPU + GPU + prange)

文章目录 加速方法&#xff1a;Numba、CuPy、PyTorch、PyCUDA、Dask、Rapids一、Numba简介二、Numba类型&#xff1a;CPU GPU三、项目实战 —— 数组的每个元素加23.1、使用 python - range 循环计算 —— &#xff08;时耗&#xff1a;137.37 秒&#xff09;3.2、使用 python…...

electron 两个渲染进程之间通信

一、使用主进程作为中介 使用主进程作为中介相对较为灵活&#xff0c;但可能会增加主进程的负担 1. 从一个渲染进程向主进程发送消息 在发送消息的渲染进程中&#xff0c;可以使用 ipcRenderer 模块向主进程发送消息。例如&#xff1a; const { ipcRenderer } require(&qu…...

配置ROS环境

1.检测环境变量 运行printenv | grep ROS命令&#xff0c;有如下返回值则表示环境变量配置正确&#xff1a; ROS_ROOT/opt/ros/kinetic/share/ros ROS_PACKAGE_PATH/opt/ros/kinetic/share ROS_MASTER_URIhttp://localhost:11311 ROSLISP_PACKAGE_DIRECTORIES ROS_DIST…...

力扣 128. 最长连续序列

题目描述 我的思路 我的思路比较暴力&#xff0c;就是首先将数组从小到大进行排序&#xff0c;然后再依次遍历判断序列是否连续并时时更新连续序列的最长长度。比如示例1&#xff1a;nums [100, 4, 200, 1, 3, 2]&#xff0c;第一步先将数组进行排序得到sort_nums [1, 2, 3,…...

Stable Diffusion AI绘画工具的安装与配置(MAC用户)

AI绘画的热潮席卷了整个创意行业&#xff0c;Stable Diffusion作为其中的翘楚&#xff0c;让艺术创作变得前所未有的简单。然而&#xff0c;对于使用Mac电脑用户来说&#xff0c;安装和配置Stable Diffusion可能显得有些棘手。别担心&#xff0c;这份详细的教程将手把手教你如何…...

flowable源码解读——并行多实例节点任务是否是顺序生成

最近在项目开发中需要在多实例开始监听里修改一个全局的计数变量&#xff0c;不太确定并行多实例任务在底层引擎是顺序生成还是并行生成的&#xff0c;如果是顺序生成的则不影响&#xff0c;如果是并行生成 则修改一个全局的计数变量就会出现数据错误问题&#xff0c;查阅了flo…...

【机器学习】AGI的基本概念、技术挑战和应用前景

引言 AGI是指机器能够完成人类能够完成的任何智力任务的能力 文章目录 引言一、什么是AGI1.1 AGI&#xff0c;Artificial General Intelligence&#xff08;通用人工智能&#xff09;1.2 AGI的定义和标准1.3 AGI的发展 二、AGI的技术挑战2.1 理解人类智能2.2 认知复杂性2.3 自主…...

flink 使用RocksDB作为状态后端

RocksDB flink在生产环境中常用RocksDB作为状态后端 1、subtask在taskmanager中作为一个线程运行&#xff0c;如果设置了RocksDB状态后端&#xff0c;RocksDB也会启动一个独立的线程&#xff0c;供subtask来使用。 2、RocksDB是一个kv数据库&#xff0c;因此只能存储flink的键…...

【运维高级内容--MySQL】

目录 一、mysql安装 二、MySQL主从复制 一、mysql安装 yum install cmake gcc-c openssl-devel ncurses-devel.x86_64 rpcgen.x86_64 #安装依赖性 #在root路径下下载mysql-boost-5.7.44、libtirpc-devel-1.3.3-8.el9_4.x86_64.rpm安装包 yum install libtirpc-devel…...

【仿真与实物设计】基于51单片机设计的打地鼠游戏机——程序源码原理图proteus仿真图PCB设计文档演示视频元件清单等(文末工程资料下载)

基于51单片机设计的打地鼠游戏机 演示视频&#xff1a; 基于51单片机设计的打地鼠游戏机 功能描述&#xff1a;使用 51单片机为核心制作一个打地鼠游戏机。按下启动开关&#xff0c;8盏LED流水点亮并闪烁2次&#xff0c;随即开始播放游戏音乐&#xff0c;直到开始选择模式。选…...

iPhone设备使用技巧:忘记密码的情况下如何解除iOS 18/17屏幕时间

我们给了儿子一部新手机。在尝试擦除旧手机上的所有内容并恢复出厂设置时&#xff0c;它要求提供 4 位屏幕时间密码。我已经尝试了我们会使用的所有可能性&#xff0c;但无法弄清楚。我们如何绕过这个问题或将手机恢复出厂设置以便我们可以出售它&#xff1f; Apple 社区 对于…...