当前位置: 首页 > news >正文

spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

目录

1. RDD队列

2 textFileStream

3 DIY采集器

4 kafka数据源【重点】


1. RDD队列

       a、使用场景:测试
       b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理

    val  sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")val ssc = new StreamingContext(sparkconf,Seconds(3))// 创建一个队列对象,队列中存放的是RDDval queue = new mutable.Queue[RDD[String]]()// 通过队列创建DStreamval queueDS: InputDStream[String] = ssc.queueStream(queue)queueDS.print()// 启动采集器ssc.start()//这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产for(i <- 1 to 5 ){// 循环创建rddval rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))// 将RDD存放到队列中queue.enqueue(rdd)// 当前线程休眠1秒Thread.sleep(6000)         }// 等待采集器的结束ssc.awaitTermination()}

2 textFileStream

   val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")val ssc = new StreamingContext(sparkConf,Seconds(3))//从文件中读取数据val textDS: DStream[String] = ssc.textFileStream("in")textDS.print()// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()

3 DIY采集器

    1. 自定义采集器
    2. 什么情况下需要自定采集器呢?
         比如从mysql、hbase中读取数据。
         采集器的作用是从指定的地方,按照采集周期对数据进行采集。
         目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
    3. 自定义采集器的步骤,模仿socketTextStream
         a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
         b、重写onStart和onStop方法
            onStart:采集器的如何启动
            onStop:采集的如何停止

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")val ssc = new StreamingContext(sparkConf, Seconds(3))// 获取采集的流val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))ds.print()ssc.start()ssc.awaitTermination()}// 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {private var socket: Socket = _def receive = {// 获取输入流val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8"))// 设定一个间接变量var s: String = nullwhile (true) {// 按行读取数据s = reader.readLine()if (s != null) {// 将数据进行封装store(s)}}}// 1. 启动采集器override def onStart(): Unit = {socket = new Socket(host, port)new Thread("Socket Receiver") {setDaemon(true)override def run() {receive}}.start()}// 2. 停止采集器override def onStop(): Unit = {socket.close()socket = null}}

4 kafka数据源【重点】

-- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
-- 配置信息基本上是固定写法

 // TODO Spark环境// SparkStreaming使用核数最少是2个val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")val ssc = new StreamingContext(sparkConf, Seconds(3))// TODO 使用SparkStreaming读取Kafka的数据// Kafka的配置信息val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))// 获取数据,key是null,value是真实的数据val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())valueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()ssc.start()// 等待采集器的结束ssc.awaitTermination()

 

相关文章:

spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)

目录 1. RDD队列 2 textFileStream 3 DIY采集器 4 kafka数据源【重点】 1. RDD队列 a、使用场景&#xff1a;测试 b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream&#xff0c;每一个推送这个队列的RDD&#xff0c;都会作为一个DStream处理 val sparkco…...

【三:Mock服务的使用】

目录 1、工具包2、mock的demo1、get请求2、post请求3、带cookies的请求4、带请求头的请求5、请求重定向 1、工具包 1、&#xff1a;服务包的下载 moco-runner-0.11.0-standalone.jar 下载 2、&#xff1a;运行命令java -jar ./moco-runner-0.11.0-standalone.jar http -p 888…...

驱动:驱动相关概念,内核模块编程,内核消息打印printk函数的使用

一、驱动相关概念 1.操作系统的功能 向下管理硬件&#xff0c;向上提供接口 操作系统向上提供的接口类型&#xff1a; 内存管理&#xff1a;内存申请&#xff08;malloc&#xff09; 内存释放&#xff08;free&#xff09;等 文件管理&#xff1a; 通过文件系统格式对文件ext2…...

【Qt控件之QListWidget】介绍及使用,利用QListWidget、QToolButton、和布局控件实现抽屉式组合控件

概述 QListWidget类提供了基于项目的列表小部件。 QListWidget是一个方便的类&#xff0c;类似于QListView提供的列表视图&#xff0c;但使用经典的基于项目的接口来添加和删除项目。QListWidget使用内部模型来管理列表中的每个QListWidgetItem。 对于更灵活的列表视图小部件…...

【Java基础面试二十四】、String类有哪些方法?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;String类有哪些方法&…...

[DRAFT] LLVM ThinLTO原理分析

我们在《论文阅读&#xff1a;ThinLTO: Scalable and Incremental LTO》中介绍了ThinLTO论文的主要思想&#xff0c;这里我们介绍下LLVM ThinLTO是如何实现的。本文主要分为如下几个部分&#xff1a; LLVM ThinLTO Object 含有哪些内容&#xff1f;LLVM ThinLTO 是如何做优化的…...

使用Gitlab构建简单流水线CI/CD

什么是Gitlab Gitlab实质上是一套DevOps工具 目前看起来&#xff0c;Gitlab属于是内嵌了一套CI/CD的框架&#xff0c;并且可以提供软件开发中的版本管理、项目管理等等其他功能。 这里需要辨别一下Gitlab和Github Gitee的区别。 GIthub大家都很熟悉了&#xff0c;一般大家都会…...

【AIGC核心技术剖析】用于高效 3D 内容创建生成(从单视图图像生成高质量的纹理网格)

3D 内容创建的最新进展主要利用通过分数蒸馏抽样 &#xff08;SDS&#xff09; 生成的基于优化的 3D 生成。尽管已经显示出有希望的结果&#xff0c;但这些方法通常存在每个样本优化缓慢的问题&#xff0c;限制了它们的实际应用。在本文中&#xff0c;我们提出了DreamGaussian&…...

nginx平滑升级添加echo模块、localtion配置、rewrite配置

nginx平滑升级添加echo模块、location配置、rewrite配置 文章目录 nginx平滑升级添加echo模块、location配置、rewrite配置1.环境说明&#xff1a;2.nginx平滑升级原理&#xff1a;3.平滑升级nginx&#xff0c;并添加echo模块3.1.查看当前nginx版本以及老版本编译参数信息3.2.下…...

系统架构师备考倒计时19天(每日知识点)

软件架构评估&#xff08;ATAM&#xff09; 在SAAM的基础上发展起来的&#xff0c;主要针对性能、实用性、安全性和可修改性&#xff0c;在系统开发之前&#xff0c;对这些质量属性进行评价和折中。ATAM方法的主要活动领域包括&#xff1a; 第一阶段 场景和需求收集 收集场景…...

谈谈 Redis 如何来实现分布式锁

谈谈 Redis 如何来实现分布式锁 基于 setnx 可以实现&#xff0c;但是不是可重入的。 基于 Hash 数据类型 Lua脚本 可以实现可重入的分布式锁。 获取锁的 Lua 脚本&#xff1a; 释放锁的 Lua 脚本&#xff1a; 但是还是存在分布式问题&#xff0c;比如说&#xff0c;一个客…...

.NET 6.0 Web API Hangfire

Hangfire 文档 Hangfire 中文文档 Hangfire GitHub使用示例源码 在线Cron表达式生成器 ● Hangfire允许您以非常简单但可靠的方式在请求管道之外启动方法调用。 这种 后台线程 中执行方法的行为称为 后台任务。 ● 它是由:客户端、作业存储、服务端 组成的。 ● Hangfire可以在…...

基于java的校园论坛系统,ssm+jsp,Mysql数据库,前台用户+后台管理,完美运行,有一万多字论文

目录 演示视频 基本介绍 论文目录 功能架构 系统截图 演示视频 基本介绍 基于java的校园论坛系统&#xff0c;Mysql数据库&#xff0c;系统整体采用ssmjsp设计&#xff0c;前台用户后台管理&#xff0c;完美运行&#xff0c;有一万多字论文。 用户功能&#xff1a; 1.系统…...

Django小白开发指南

文章目录 HTTP协议socket实现一个web服务器WSGI实现一个web服务器WSGI实现支持多URL的web服务器WSGI实现图片显示的web服务器MVC && MTV1.MVC2.MTV3.总结 一、创建Django项目1.创建项目2.创建app3.第一次django 请求 二、模板1.配置settings.py2.模板语法3.继承模板 三…...

保序回归与金融时序数据

保序回归在回归问题中的作用是通过拟合一个单调递增或递减的函数&#xff0c;来保持数据点的相对顺序特性。 一、保序回归的作用 主要用于以下情况&#xff1a; 1. 有序数据&#xff1a;当输入数据具有特定的顺序关系时&#xff0c;保序回归可以帮助保持这种顺序关系。例如&…...

基于单片机设计的家用自来水水质监测装置

一、前言 本文介绍基于单片机设计的家用自来水水质监测装置。利用STM32F103ZET6作为主控芯片&#xff0c;结合水质传感器和ADC模块&#xff0c;实现对自来水水质的检测和监测功能。通过0.96寸OLED显示屏&#xff0c;将采集到的水质数据以直观的方式展示给用户。 随着人们对健…...

ubuntu20.04运用startup application开机自启动python程序

运用startup application开机自启动python程序。在终端中输入gnome-session-properties,如果显示没有则先进行安装&#xff0c;sudo apt-get update 和sudo apt install StartupApplications(根据显示提示安装)。在显示程序中搜索startup&#xff0c;打开应用程序。 在程序目录…...

SpringBoot整合Caffeine实现缓存

Caffeine Caffeine是一种基于Java的高性能缓存库&#xff0c;它提供了可配置、快速、灵活的缓存实现。Caffeine具有以下特点&#xff1a; 高性能&#xff1a;Caffeine使用了一些优化技术&#xff0c;如基于链表的并发哈希表和无锁算法&#xff0c;以提供卓越的读写性能。容量…...

DVWA-弱会话IDS

弱会话IDS Session简介&#xff1a; 用户登录后&#xff0c;在服务器就会创建一个会话(session)&#xff0c;叫做会话控制&#xff0c;接着访问页面的时候就不用登录&#xff0c;只需要携带Session去访问即可。 sessionID作为特定用户访问站点所需要的唯一内容。如果能够计算…...

【C++中cin、cin.get()、cin.getline()、getline() 的区别】

文章目录 引入cin基本用法输入多个变量换行符存放在缓冲区中 cin.get()基本用法重载函数换行符残留在缓冲区中 cin.getline()基本使用重载函数换行符不会残留在缓冲区中 string 流中的 getline()总结用法总结几个输入实例输入格式输入格式输入格式输入格式 输出格式 写在最后 引…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻

在如今就业市场竞争日益激烈的背景下&#xff0c;越来越多的求职者将目光投向了日本及中日双语岗位。但是&#xff0c;一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧&#xff1f;面对生疏的日语交流环境&#xff0c;即便提前恶补了…...

HTML 语义化

目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案&#xff1a; 语义化标签&#xff1a; <header>&#xff1a;页头<nav>&#xff1a;导航<main>&#xff1a;主要内容<article>&#x…...

云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?

大家好&#xff0c;欢迎来到《云原生核心技术》系列的第七篇&#xff01; 在上一篇&#xff0c;我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在&#xff0c;我们就像一个拥有了一块崭新数字土地的农场主&#xff0c;是时…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

SciencePlots——绘制论文中的图片

文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了&#xff1a;一行…...

连锁超市冷库节能解决方案:如何实现超市降本增效

在连锁超市冷库运营中&#xff0c;高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术&#xff0c;实现年省电费15%-60%&#xff0c;且不改动原有装备、安装快捷、…...

《通信之道——从微积分到 5G》读书总结

第1章 绪 论 1.1 这是一本什么样的书 通信技术&#xff0c;说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号&#xff08;调制&#xff09; 把信息从信号中抽取出来&am…...