当前位置: 首页 > news >正文

spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

目录

1. RDD队列

2 textFileStream

3 DIY采集器

4 kafka数据源【重点】


1. RDD队列

       a、使用场景:测试
       b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理

    val  sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")val ssc = new StreamingContext(sparkconf,Seconds(3))// 创建一个队列对象,队列中存放的是RDDval queue = new mutable.Queue[RDD[String]]()// 通过队列创建DStreamval queueDS: InputDStream[String] = ssc.queueStream(queue)queueDS.print()// 启动采集器ssc.start()//这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产for(i <- 1 to 5 ){// 循环创建rddval rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))// 将RDD存放到队列中queue.enqueue(rdd)// 当前线程休眠1秒Thread.sleep(6000)         }// 等待采集器的结束ssc.awaitTermination()}

2 textFileStream

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")val ssc = new StreamingContext(sparkConf,Seconds(3))//从文件中读取数据val textDS: DStream[String] = ssc.textFileStream("in")textDS.print()// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()

3 DIY采集器

    1. 自定义采集器
    2. 什么情况下需要自定采集器呢?
         比如从mysql、hbase中读取数据。
         采集器的作用是从指定的地方,按照采集周期对数据进行采集。
         目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
    3. 自定义采集器的步骤,模仿socketTextStream
         a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
         b、重写onStart和onStop方法
            onStart:采集器的如何启动
            onStop:采集的如何停止

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")val ssc = new StreamingContext(sparkConf, Seconds(3))// 获取采集的流val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))ds.print()ssc.start()ssc.awaitTermination()}// 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var socket: Socket = _def receive = {// 获取输入流val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))// 设定一个间接变量var s: String = nullwhile (true) {// 按行读取数据s = reader.readLine()if (s != null) {// 将数据进行封装store(s)}}}// 1. 启动采集器override def onStart(): Unit = {socket = new Socket(host, port)new Thread("Socket Receiver") {setDaemon(true)override def run() {receive}}.start()}// 2. 停止采集器override def onStop(): Unit = {socket.close()socket = null}}

4 kafka数据源【重点】

-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法

 // TODO Spark环境// SparkStreaming使用核数最少是2个val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")val ssc = new StreamingContext(sparkConf, Seconds(3))// TODO 使用SparkStreaming读取Kafka的数据// Kafka的配置信息val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))// 获取数据,key是null,value是真实的数据val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()// 等待采集器的结束ssc.awaitTermination()

 

相关文章:

spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

目录 1. RDD队列 2 textFileStream 3 DIY采集器 4 kafka数据源【重点】 1. RDD队列 a、使用场景&#xff1a;测试 b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream&#xff0c;每一个推送这个队列的RDD&#xff0c;都会作为一个DStream处理 val sparkco…...

【三:Mock服务的使用】

目录 1、工具包2、mock的demo1、get请求2、post请求3、带cookies的请求4、带请求头的请求5、请求重定向 1、工具包 1、&#xff1a;服务包的下载 moco-runner-0.11.0-standalone.jar 下载 2、&#xff1a;运行命令java -jar ./moco-runner-0.11.0-standalone.jar http -p 888…...

驱动:驱动相关概念,内核模块编程,内核消息打印printk函数的使用

一、驱动相关概念 1.操作系统的功能 向下管理硬件&#xff0c;向上提供接口 操作系统向上提供的接口类型&#xff1a; 内存管理&#xff1a;内存申请&#xff08;malloc&#xff09; 内存释放&#xff08;free&#xff09;等 文件管理&#xff1a; 通过文件系统格式对文件ext2…...

【Qt控件之QListWidget】介绍及使用,利用QListWidget、QToolButton、和布局控件实现抽屉式组合控件

概述 QListWidget类提供了基于项目的列表小部件。 QListWidget是一个方便的类&#xff0c;类似于QListView提供的列表视图&#xff0c;但使用经典的基于项目的接口来添加和删除项目。QListWidget使用内部模型来管理列表中的每个QListWidgetItem。 对于更灵活的列表视图小部件…...

【Java基础面试二十四】、String类有哪些方法?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;String类有哪些方法&…...

[DRAFT] LLVM ThinLTO原理分析

我们在《论文阅读&#xff1a;ThinLTO: Scalable and Incremental LTO》中介绍了ThinLTO论文的主要思想&#xff0c;这里我们介绍下LLVM ThinLTO是如何实现的。本文主要分为如下几个部分&#xff1a; LLVM ThinLTO Object 含有哪些内容&#xff1f;LLVM ThinLTO 是如何做优化的…...

使用Gitlab构建简单流水线CI/CD

什么是Gitlab Gitlab实质上是一套DevOps工具 目前看起来&#xff0c;Gitlab属于是内嵌了一套CI/CD的框架&#xff0c;并且可以提供软件开发中的版本管理、项目管理等等其他功能。 这里需要辨别一下Gitlab和Github Gitee的区别。 GIthub大家都很熟悉了&#xff0c;一般大家都会…...

【AIGC核心技术剖析】用于高效 3D 内容创建生成(从单视图图像生成高质量的纹理网格)

3D 内容创建的最新进展主要利用通过分数蒸馏抽样 &#xff08;SDS&#xff09; 生成的基于优化的 3D 生成。尽管已经显示出有希望的结果&#xff0c;但这些方法通常存在每个样本优化缓慢的问题&#xff0c;限制了它们的实际应用。在本文中&#xff0c;我们提出了DreamGaussian&…...

nginx平滑升级添加echo模块、localtion配置、rewrite配置

nginx平滑升级添加echo模块、location配置、rewrite配置 文章目录 nginx平滑升级添加echo模块、location配置、rewrite配置1.环境说明&#xff1a;2.nginx平滑升级原理&#xff1a;3.平滑升级nginx&#xff0c;并添加echo模块3.1.查看当前nginx版本以及老版本编译参数信息3.2.下…...

系统架构师备考倒计时19天(每日知识点)

软件架构评估&#xff08;ATAM&#xff09; 在SAAM的基础上发展起来的&#xff0c;主要针对性能、实用性、安全性和可修改性&#xff0c;在系统开发之前&#xff0c;对这些质量属性进行评价和折中。ATAM方法的主要活动领域包括&#xff1a; 第一阶段 场景和需求收集 收集场景…...

谈谈 Redis 如何来实现分布式锁

谈谈 Redis 如何来实现分布式锁 基于 setnx 可以实现&#xff0c;但是不是可重入的。 基于 Hash 数据类型 Lua脚本 可以实现可重入的分布式锁。 获取锁的 Lua 脚本&#xff1a; 释放锁的 Lua 脚本&#xff1a; 但是还是存在分布式问题&#xff0c;比如说&#xff0c;一个客…...

.NET 6.0 Web API Hangfire

Hangfire 文档 Hangfire 中文文档 Hangfire GitHub使用示例源码 在线Cron表达式生成器 ● Hangfire允许您以非常简单但可靠的方式在请求管道之外启动方法调用。 这种 后台线程 中执行方法的行为称为 后台任务。 ● 它是由:客户端、作业存储、服务端 组成的。 ● Hangfire可以在…...

基于java的校园论坛系统,ssm+jsp,Mysql数据库,前台用户+后台管理,完美运行,有一万多字论文

目录 演示视频 基本介绍 论文目录 功能架构 系统截图 演示视频 基本介绍 基于java的校园论坛系统&#xff0c;Mysql数据库&#xff0c;系统整体采用ssmjsp设计&#xff0c;前台用户后台管理&#xff0c;完美运行&#xff0c;有一万多字论文。 用户功能&#xff1a; 1.系统…...

Django小白开发指南

文章目录 HTTP协议socket实现一个web服务器WSGI实现一个web服务器WSGI实现支持多URL的web服务器WSGI实现图片显示的web服务器MVC && MTV1.MVC2.MTV3.总结 一、创建Django项目1.创建项目2.创建app3.第一次django 请求 二、模板1.配置settings.py2.模板语法3.继承模板 三…...

保序回归与金融时序数据

保序回归在回归问题中的作用是通过拟合一个单调递增或递减的函数&#xff0c;来保持数据点的相对顺序特性。 一、保序回归的作用 主要用于以下情况&#xff1a; 1. 有序数据&#xff1a;当输入数据具有特定的顺序关系时&#xff0c;保序回归可以帮助保持这种顺序关系。例如&…...

基于单片机设计的家用自来水水质监测装置

一、前言 本文介绍基于单片机设计的家用自来水水质监测装置。利用STM32F103ZET6作为主控芯片&#xff0c;结合水质传感器和ADC模块&#xff0c;实现对自来水水质的检测和监测功能。通过0.96寸OLED显示屏&#xff0c;将采集到的水质数据以直观的方式展示给用户。 随着人们对健…...

ubuntu20.04运用startup application开机自启动python程序

运用startup application开机自启动python程序。在终端中输入gnome-session-properties,如果显示没有则先进行安装&#xff0c;sudo apt-get update 和sudo apt install StartupApplications(根据显示提示安装)。在显示程序中搜索startup&#xff0c;打开应用程序。 在程序目录…...

SpringBoot整合Caffeine实现缓存

Caffeine Caffeine是一种基于Java的高性能缓存库&#xff0c;它提供了可配置、快速、灵活的缓存实现。Caffeine具有以下特点&#xff1a; 高性能&#xff1a;Caffeine使用了一些优化技术&#xff0c;如基于链表的并发哈希表和无锁算法&#xff0c;以提供卓越的读写性能。容量…...

DVWA-弱会话IDS

弱会话IDS Session简介&#xff1a; 用户登录后&#xff0c;在服务器就会创建一个会话(session)&#xff0c;叫做会话控制&#xff0c;接着访问页面的时候就不用登录&#xff0c;只需要携带Session去访问即可。 sessionID作为特定用户访问站点所需要的唯一内容。如果能够计算…...

【C++中cin、cin.get()、cin.getline()、getline() 的区别】

文章目录 引入cin基本用法输入多个变量换行符存放在缓冲区中 cin.get()基本用法重载函数换行符残留在缓冲区中 cin.getline()基本使用重载函数换行符不会残留在缓冲区中 string 流中的 getline()总结用法总结几个输入实例输入格式输入格式输入格式输入格式 输出格式 写在最后 引…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

如何在看板中有效管理突发紧急任务

在看板中有效管理突发紧急任务需要&#xff1a;设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP&#xff08;Work-in-Progress&#xff09;弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中&#xff0c;设立专门的紧急任务通道尤为重要&#xff0c;这能…...

C# SqlSugar:依赖注入与仓储模式实践

C# SqlSugar&#xff1a;依赖注入与仓储模式实践 在 C# 的应用开发中&#xff0c;数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护&#xff0c;许多开发者会选择成熟的 ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;SqlSugar 就是其中备受…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”

2025年#高考 将在近日拉开帷幕&#xff0c;#AI 监考一度冲上热搜。当AI深度融入高考&#xff0c;#时间同步 不再是辅助功能&#xff0c;而是决定AI监考系统成败的“生命线”。 AI亮相2025高考&#xff0c;40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕&#xff0c;江西、…...

作为测试我们应该关注redis哪些方面

1、功能测试 数据结构操作&#xff1a;验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化&#xff1a;测试aof和aof持久化机制&#xff0c;确保数据在开启后正确恢复。 事务&#xff1a;检查事务的原子性和回滚机制。 发布订阅&#xff1a;确保消息正确传递。 2、性…...

Web后端基础(基础知识)

BS架构&#xff1a;Browser/Server&#xff0c;浏览器/服务器架构模式。客户端只需要浏览器&#xff0c;应用程序的逻辑和数据都存储在服务端。 优点&#xff1a;维护方便缺点&#xff1a;体验一般 CS架构&#xff1a;Client/Server&#xff0c;客户端/服务器架构模式。需要单独…...

django blank 与 null的区别

1.blank blank控制表单验证时是否允许字段为空 2.null null控制数据库层面是否为空 但是&#xff0c;要注意以下几点&#xff1a; Django的表单验证与null无关&#xff1a;null参数控制的是数据库层面字段是否可以为NULL&#xff0c;而blank参数控制的是Django表单验证时字…...