当前位置: 首页 > news >正文

spark stream入门案例:netcat准实时处理wordCount(scala 编程)

目录

案例需求

代码

结果

解析


         案例需求:

        使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

        -- 1. Spark从socket中获取数据:一行一行的获取
        -- 2. Driver程序执行时,streaming处理过程不能结束
        -- 3. 采集器在正常情况下启动后就不应该停止,除非特殊情况
        -- 4. 采集器位于一个executor中,是一个线程,执行时需要一个核,如果设定的总核数为1时,那么在运行时因为没有核数,所以不会有打印结果,所以sparkStreaming使用的核数至少为2个
        -- 5. print()方法,默认是打印10行结果
        -- 6. netcat的指令:
 

      在Windows下:nc -lp 9999在linux下: nc -lk 9999

        代码: 
package cn.olo.streamimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamDemo {def main(args: Array[String]): Unit = {// 连接SparkStreamingval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")/*1.方法:StreamingContext(形参)2.形参:形参1:conf: SparkConf:spark配置对象形参2:batchDuration: Duration:采集时间*/val ssc = new StreamingContext(sparkConf,Seconds(5))// 需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数// 1. 获取netcat工具9999端口的连接,并开始接收数据// 从socket中获取数据:一行一行的获取val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)// 2. 数据处理val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToSumDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_ + _ )// 3. 打印数据wordToSumDS.print()// 4. Driver程序执行时,streaming处理过程不能结束// 采集器在正常情况下启动后就不应该停止,除非特殊情况// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()}}

结果:

解析:

        a、采集周期时间之间,每一个采集周期生成一个RDD,按照时间的顺序依次进行
        b、在每一个采集周期内,会执行wordcount计算,最终得出:统计出每一个采集周期时间的wordcount

相关文章:

spark stream入门案例:netcat准实时处理wordCount(scala 编程)

目录 案例需求 代码 结果 解析 案例需求: 使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数 -- 1. Spark从socket中获取数据:一行一行的获取 -- 2. Driver程序执行时&#xff0c…...

Ansible基础及模块

Ansible是一个基于Python开发的配置管理和应用部署工具,能批量配置、部署、管理上千台主机。比如以前需要切换到每个主机上执行的一或多个操作,使用Ansible只需在固定的一台Ansible控制节点上去完成所有主机的操作 Ansible是基于模块工作的,它…...

Atlassian Confluence OGNL表达式注入RCE CVE-2021-26084

影响版本 All 4.x.x versions All 5.x.x versions All 6.0.x versions All 6.1.x versions All 6.2.x versions All 6.3.x versions All 6.4.x versions All 6.5.x versions All 6.6.x versions All 6.7.x versions All 6.8.x versions All 6.9.x versions All 6.1…...

【c语言】编译链接--详解

文章目录 一.程序的翻译环境和运行环境二.翻译环境:预编译编译汇编链接(一)预编译(二)编译1)词法分析2)语法分析3)语义分析 (三)汇编(四)链接1.编…...

国家开放大学 训练题

试卷代号:2044 教育研究方法 参考试题(开卷) 一、单选题(每题5分,共25分) 1.探索性研究常采用的研究方式包括( )。 A.文献调查、经验调查、典型情况或个案分析 B.调查性研究、…...

【灵动 Mini-G0001开发板】+Keil5开发环境搭建+ST-Link/V2程序下载和仿真+4颗LED100ms闪烁。

我们拿到手里的是【灵动 Mini-G0001开发板】 如下图 我们去官网下载开发板对应资料MM32G0001官网 我们需要下载Mini—G0001开发板的库函数与例程(第一手学习资料)Keil支持包, PCB文件有需要的,可以自行下载。用户指南需要下载&a…...

同为科技(TOWE)关于风力发电雷电防护的解决方案

风能作为一种可再生清洁能源,是国家新能源发展战略的重要组成部分。我国风能开发潜力高达2.510GW以上,近年来风力发电机组逐年增加,截止到2022年,全国风电装机容量约3.5亿千瓦,同比增长16.6%。然而,由于风力…...

gorm 中的事务运用

使用背景 在编写业务代码的过程中,如果涉及到多张表的更新操作,为了确保数据的一致性,我们会在业务代码的过程中加上事务的控制,那么针对go 语言中,如果我们使用gorm框架改如何操作呢? gorm中使用事务的几种方式 方式一(业务层事务)func NewTransaction() *gorm.DB {re…...

maven 新建模块 导入后 按Ctrl 点不进新建模块pom定义

新建的ruoyi-common-mybatisplus 模块,导入一直不正常 画出的模块一直导入不进来 这是提示信息 这是正常的提示信息 加上 <version>3.6.3</version> 后,才一切正常...

idea使用debug无法启动,使用run可以启动

1、将调试断点清除 使用快捷键ctrl shift F8&#xff0c;将勾选的选项去除即可 2、Error running SampleApplication: Command line is too long. Shorten command line for SampleApplication or also for Spring Boot default configuration&#xff0c;报这种错误&#x…...

进程的虚拟地址空间

一、 对于C/C程序员&#xff0c;我们看到的程序中的地址&#xff0c;都不是物理地址&#xff0c;而是操作系统映射的虚拟地址/线性地址&#xff0c;每一个进程都映射了同样结构的虚拟地址空间&#xff0c;让进程以为自己在独享内存资源&#xff0c;下图是以Linux下32位操作系统…...

做web自动化测试遇到Chrome浏览器老是自动更新,怎么办 ? 这里提供两个解决办法 。

web自动化安装驱动安装 进行web自动化时 &#xff0c;需要提前安装浏览器的驱动 &#xff0c;尤其是chrome浏览器 。它的更新速度很快 &#xff0c;是不是更新了新版本 。这就导致我们的驱动也要跟着变化。 1.停止自动更新 那么 &#xff0c;如何关闭chrome浏览器的自动更新…...

腾讯HR面试

一、如何看待腾讯的愿景 腾讯的愿景是成为“最受尊敬的互联网企业”&#xff0c;这一愿景表明了腾讯的目标是成为一个在互联网领域内具有极高影响力和声誉的企业。 为了实现这一愿景&#xff0c;腾讯坚持以长远的眼光、诚信负责的操守、共同成长的理念来发展公司的事业。这种…...

过滤器(Filter)和拦截器(Interceptor)有什么不同?

过滤器&#xff08;Filter&#xff09;和拦截器&#xff08;Interceptor&#xff09;是用于处理请求和响应的中间件组件&#xff0c;但它们在实现方式和应用场景上有一些不同。 实现方式: 过滤器是Servlet规范中定义的一种组件&#xff0c;通常以Java类的形式实现。过滤器通过在…...

Spring 注解 @Qualifier 详解

目录 1. 概述 2. 痛点 3. Qualifier 4. Qualifier VS Primary 5. 通过名称来自动注入 1. 概述 今天带你了解一下 Spring 框架中的 Qualifier 注解&#xff0c;它解决了哪些问题&#xff0c;以及如何使用它。我们还将了解它与 Primary 注解的不同之处。更多的技术解析请访…...

实现更低功耗R5F51406BDNE、R5F51406ADFK、R5F51406ADFL、R5F51406AGFN搭载RXv2内核的32位微控制器

一、简介 RX140产品群是RX100系列中处理性能最强、功耗最低的微控制器。可以广泛应用于家用电器、工业控制和楼宇自动化等领域。RX140采用RXv2内核&#xff0c;工作频率最高48MHz&#xff0c;处理性能是32MHz运行的RX130的近两倍。此外&#xff0c;它在运行时的电路为56μA/MH…...

通信系统中ZF,ML,MRC以及MMSE四种信号检测算法误码率matlab对比仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1、ZF&#xff08;零迫&#xff09;算法 4.2、ML&#xff08;最大似然&#xff09;算法 4.3、MRC&#xff08;最大比合并&#xff09;算法 4.4、MMSE&#xff08;最小均方误差&#xff…...

Redis数据结构之listpack

前言 当数据量较小时&#xff0c;Redis 会优先考虑用 ziplist 来存储 hash、list、zset&#xff0c;这么做可以有效的节省内存空间&#xff0c;因为 ziplist 是一块连续的内存空间&#xff0c;它采用一种紧凑的方式来存储元素。但是它也有缺点&#xff0c;比如查找的时间复杂度…...

VMware 配置记录

VMware 配置笔记 CentOS 7.9 镜像下载 官网太慢&#xff0c;建议在阿里云镜像站去CentOS配置页找标准版下载。 选标准版即可&#xff0c;各版本区别&#xff1a; DVD&#xff1a;标准版&#xff0c;包含常用软件&#xff0c;体积为 4.4 G&#xff1b;Everything&#xff1a…...

【Java基础面试十四】、 封装的目的是什么,为什么要有封装?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a; 封装的目的是什么&…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

【杂谈】-递归进化:人工智能的自我改进与监管挑战

递归进化&#xff1a;人工智能的自我改进与监管挑战 文章目录 递归进化&#xff1a;人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管&#xff1f;3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

1688商品列表API与其他数据源的对接思路

将1688商品列表API与其他数据源对接时&#xff0c;需结合业务场景设计数据流转链路&#xff0c;重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点&#xff1a; 一、核心对接场景与目标 商品数据同步 场景&#xff1a;将1688商品信息…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

前端开发面试题总结-JavaScript篇(一)

文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包&#xff08;Closure&#xff09;&#xff1f;闭包有什么应用场景和潜在问题&#xff1f;2.解释 JavaScript 的作用域链&#xff08;Scope Chain&#xff09; 二、原型与继承3.原型链是什么&#xff1f;如何实现继承&a…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)

目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关&#xff0…...

dify打造数据可视化图表

一、概述 在日常工作和学习中&#xff0c;我们经常需要和数据打交道。无论是分析报告、项目展示&#xff0c;还是简单的数据洞察&#xff0c;一个清晰直观的图表&#xff0c;往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server&#xff0c;由蚂蚁集团 AntV 团队…...

【Go语言基础【13】】函数、闭包、方法

文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数&#xff08;函数作为参数、返回值&#xff09; 三、匿名函数与闭包1. 匿名函数&#xff08;Lambda函…...

Golang——9、反射和文件操作

反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一&#xff1a;使用Read()读取文件2.3、方式二&#xff1a;bufio读取文件2.4、方式三&#xff1a;os.ReadFile读取2.5、写…...

淘宝扭蛋机小程序系统开发:打造互动性强的购物平台

淘宝扭蛋机小程序系统的开发&#xff0c;旨在打造一个互动性强的购物平台&#xff0c;让用户在购物的同时&#xff0c;能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机&#xff0c;实现旋转、抽拉等动作&#xff0c;增…...