当前位置: 首页 > news >正文

spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

目录

1. RDD队列

2 textFileStream

3 DIY采集器

4 kafka数据源【重点】


1. RDD队列

       a、使用场景:测试
       b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理

    val  sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")val ssc = new StreamingContext(sparkconf,Seconds(3))// 创建一个队列对象,队列中存放的是RDDval queue = new mutable.Queue[RDD[String]]()// 通过队列创建DStreamval queueDS: InputDStream[String] = ssc.queueStream(queue)queueDS.print()// 启动采集器ssc.start()//这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产for(i <- 1 to 5 ){// 循环创建rddval rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))// 将RDD存放到队列中queue.enqueue(rdd)// 当前线程休眠1秒Thread.sleep(6000)         }// 等待采集器的结束ssc.awaitTermination()}

2 textFileStream

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")val ssc = new StreamingContext(sparkConf,Seconds(3))//从文件中读取数据val textDS: DStream[String] = ssc.textFileStream("in")textDS.print()// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()

3 DIY采集器

    1. 自定义采集器
    2. 什么情况下需要自定采集器呢?
         比如从mysql、hbase中读取数据。
         采集器的作用是从指定的地方,按照采集周期对数据进行采集。
         目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
    3. 自定义采集器的步骤,模仿socketTextStream
         a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
         b、重写onStart和onStop方法
            onStart:采集器的如何启动
            onStop:采集的如何停止

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")val ssc = new StreamingContext(sparkConf, Seconds(3))// 获取采集的流val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))ds.print()ssc.start()ssc.awaitTermination()}// 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var socket: Socket = _def receive = {// 获取输入流val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))// 设定一个间接变量var s: String = nullwhile (true) {// 按行读取数据s = reader.readLine()if (s != null) {// 将数据进行封装store(s)}}}// 1. 启动采集器override def onStart(): Unit = {socket = new Socket(host, port)new Thread("Socket Receiver") {setDaemon(true)override def run() {receive}}.start()}// 2. 停止采集器override def onStop(): Unit = {socket.close()socket = null}}

4 kafka数据源【重点】

-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法

 // TODO Spark环境// SparkStreaming使用核数最少是2个val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")val ssc = new StreamingContext(sparkConf, Seconds(3))// TODO 使用SparkStreaming读取Kafka的数据// Kafka的配置信息val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))// 获取数据,key是null,value是真实的数据val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()// 等待采集器的结束ssc.awaitTermination()

 

相关文章:

spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

目录 1. RDD队列 2 textFileStream 3 DIY采集器 4 kafka数据源【重点】 1. RDD队列 a、使用场景&#xff1a;测试 b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream&#xff0c;每一个推送这个队列的RDD&#xff0c;都会作为一个DStream处理 val sparkco…...

【三:Mock服务的使用】

目录 1、工具包2、mock的demo1、get请求2、post请求3、带cookies的请求4、带请求头的请求5、请求重定向 1、工具包 1、&#xff1a;服务包的下载 moco-runner-0.11.0-standalone.jar 下载 2、&#xff1a;运行命令java -jar ./moco-runner-0.11.0-standalone.jar http -p 888…...

驱动:驱动相关概念,内核模块编程,内核消息打印printk函数的使用

一、驱动相关概念 1.操作系统的功能 向下管理硬件&#xff0c;向上提供接口 操作系统向上提供的接口类型&#xff1a; 内存管理&#xff1a;内存申请&#xff08;malloc&#xff09; 内存释放&#xff08;free&#xff09;等 文件管理&#xff1a; 通过文件系统格式对文件ext2…...

【Qt控件之QListWidget】介绍及使用,利用QListWidget、QToolButton、和布局控件实现抽屉式组合控件

概述 QListWidget类提供了基于项目的列表小部件。 QListWidget是一个方便的类&#xff0c;类似于QListView提供的列表视图&#xff0c;但使用经典的基于项目的接口来添加和删除项目。QListWidget使用内部模型来管理列表中的每个QListWidgetItem。 对于更灵活的列表视图小部件…...

【Java基础面试二十四】、String类有哪些方法?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;String类有哪些方法&…...

[DRAFT] LLVM ThinLTO原理分析

我们在《论文阅读&#xff1a;ThinLTO: Scalable and Incremental LTO》中介绍了ThinLTO论文的主要思想&#xff0c;这里我们介绍下LLVM ThinLTO是如何实现的。本文主要分为如下几个部分&#xff1a; LLVM ThinLTO Object 含有哪些内容&#xff1f;LLVM ThinLTO 是如何做优化的…...

使用Gitlab构建简单流水线CI/CD

什么是Gitlab Gitlab实质上是一套DevOps工具 目前看起来&#xff0c;Gitlab属于是内嵌了一套CI/CD的框架&#xff0c;并且可以提供软件开发中的版本管理、项目管理等等其他功能。 这里需要辨别一下Gitlab和Github Gitee的区别。 GIthub大家都很熟悉了&#xff0c;一般大家都会…...

【AIGC核心技术剖析】用于高效 3D 内容创建生成(从单视图图像生成高质量的纹理网格)

3D 内容创建的最新进展主要利用通过分数蒸馏抽样 &#xff08;SDS&#xff09; 生成的基于优化的 3D 生成。尽管已经显示出有希望的结果&#xff0c;但这些方法通常存在每个样本优化缓慢的问题&#xff0c;限制了它们的实际应用。在本文中&#xff0c;我们提出了DreamGaussian&…...

nginx平滑升级添加echo模块、localtion配置、rewrite配置

nginx平滑升级添加echo模块、location配置、rewrite配置 文章目录 nginx平滑升级添加echo模块、location配置、rewrite配置1.环境说明&#xff1a;2.nginx平滑升级原理&#xff1a;3.平滑升级nginx&#xff0c;并添加echo模块3.1.查看当前nginx版本以及老版本编译参数信息3.2.下…...

系统架构师备考倒计时19天(每日知识点)

软件架构评估&#xff08;ATAM&#xff09; 在SAAM的基础上发展起来的&#xff0c;主要针对性能、实用性、安全性和可修改性&#xff0c;在系统开发之前&#xff0c;对这些质量属性进行评价和折中。ATAM方法的主要活动领域包括&#xff1a; 第一阶段 场景和需求收集 收集场景…...

谈谈 Redis 如何来实现分布式锁

谈谈 Redis 如何来实现分布式锁 基于 setnx 可以实现&#xff0c;但是不是可重入的。 基于 Hash 数据类型 Lua脚本 可以实现可重入的分布式锁。 获取锁的 Lua 脚本&#xff1a; 释放锁的 Lua 脚本&#xff1a; 但是还是存在分布式问题&#xff0c;比如说&#xff0c;一个客…...

.NET 6.0 Web API Hangfire

Hangfire 文档 Hangfire 中文文档 Hangfire GitHub使用示例源码 在线Cron表达式生成器 ● Hangfire允许您以非常简单但可靠的方式在请求管道之外启动方法调用。 这种 后台线程 中执行方法的行为称为 后台任务。 ● 它是由:客户端、作业存储、服务端 组成的。 ● Hangfire可以在…...

基于java的校园论坛系统,ssm+jsp,Mysql数据库,前台用户+后台管理,完美运行,有一万多字论文

目录 演示视频 基本介绍 论文目录 功能架构 系统截图 演示视频 基本介绍 基于java的校园论坛系统&#xff0c;Mysql数据库&#xff0c;系统整体采用ssmjsp设计&#xff0c;前台用户后台管理&#xff0c;完美运行&#xff0c;有一万多字论文。 用户功能&#xff1a; 1.系统…...

Django小白开发指南

文章目录 HTTP协议socket实现一个web服务器WSGI实现一个web服务器WSGI实现支持多URL的web服务器WSGI实现图片显示的web服务器MVC && MTV1.MVC2.MTV3.总结 一、创建Django项目1.创建项目2.创建app3.第一次django 请求 二、模板1.配置settings.py2.模板语法3.继承模板 三…...

保序回归与金融时序数据

保序回归在回归问题中的作用是通过拟合一个单调递增或递减的函数&#xff0c;来保持数据点的相对顺序特性。 一、保序回归的作用 主要用于以下情况&#xff1a; 1. 有序数据&#xff1a;当输入数据具有特定的顺序关系时&#xff0c;保序回归可以帮助保持这种顺序关系。例如&…...

基于单片机设计的家用自来水水质监测装置

一、前言 本文介绍基于单片机设计的家用自来水水质监测装置。利用STM32F103ZET6作为主控芯片&#xff0c;结合水质传感器和ADC模块&#xff0c;实现对自来水水质的检测和监测功能。通过0.96寸OLED显示屏&#xff0c;将采集到的水质数据以直观的方式展示给用户。 随着人们对健…...

ubuntu20.04运用startup application开机自启动python程序

运用startup application开机自启动python程序。在终端中输入gnome-session-properties,如果显示没有则先进行安装&#xff0c;sudo apt-get update 和sudo apt install StartupApplications(根据显示提示安装)。在显示程序中搜索startup&#xff0c;打开应用程序。 在程序目录…...

SpringBoot整合Caffeine实现缓存

Caffeine Caffeine是一种基于Java的高性能缓存库&#xff0c;它提供了可配置、快速、灵活的缓存实现。Caffeine具有以下特点&#xff1a; 高性能&#xff1a;Caffeine使用了一些优化技术&#xff0c;如基于链表的并发哈希表和无锁算法&#xff0c;以提供卓越的读写性能。容量…...

DVWA-弱会话IDS

弱会话IDS Session简介&#xff1a; 用户登录后&#xff0c;在服务器就会创建一个会话(session)&#xff0c;叫做会话控制&#xff0c;接着访问页面的时候就不用登录&#xff0c;只需要携带Session去访问即可。 sessionID作为特定用户访问站点所需要的唯一内容。如果能够计算…...

【C++中cin、cin.get()、cin.getline()、getline() 的区别】

文章目录 引入cin基本用法输入多个变量换行符存放在缓冲区中 cin.get()基本用法重载函数换行符残留在缓冲区中 cin.getline()基本使用重载函数换行符不会残留在缓冲区中 string 流中的 getline()总结用法总结几个输入实例输入格式输入格式输入格式输入格式 输出格式 写在最后 引…...

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段&#xff1a; 构建阶段&#xff08;Build Stage&#xff09;&#xff1a…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

测试markdown--肇兴

day1&#xff1a; 1、去程&#xff1a;7:04 --11:32高铁 高铁右转上售票大厅2楼&#xff0c;穿过候车厅下一楼&#xff0c;上大巴车 &#xffe5;10/人 **2、到达&#xff1a;**12点多到达寨子&#xff0c;买门票&#xff0c;美团/抖音&#xff1a;&#xffe5;78人 3、中饭&a…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验

系列回顾&#xff1a; 在上一篇中&#xff0c;我们成功地为应用集成了数据库&#xff0c;并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了&#xff01;但是&#xff0c;如果你仔细审视那些 API&#xff0c;会发现它们还很“粗糙”&#xff1a;有…...

Rust 异步编程

Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&#xff0c;且在 O(n) 时间复杂度…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...