spark stream入门案例:netcat准实时处理wordCount(scala 编程)
目录
案例需求
代码
结果
解析
案例需求:
使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
-- 1. Spark从socket中获取数据:一行一行的获取
-- 2. Driver程序执行时,streaming处理过程不能结束
-- 3. 采集器在正常情况下启动后就不应该停止,除非特殊情况
-- 4. 采集器位于一个executor中,是一个线程,执行时需要一个核,如果设定的总核数为1时,那么在运行时因为没有核数,所以不会有打印结果,所以sparkStreaming使用的核数至少为2个
-- 5. print()方法,默认是打印10行结果
-- 6. netcat的指令:
在Windows下:nc -lp 9999在linux下: nc -lk 9999
代码:
package cn.olo.streamimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object StreamDemo {def main(args: Array[String]): Unit = {// 连接SparkStreamingval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")/*1.方法:StreamingContext(形参)2.形参:形参1:conf: SparkConf:spark配置对象形参2:batchDuration: Duration:采集时间*/val ssc = new StreamingContext(sparkConf,Seconds(5))// 需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数// 1. 获取netcat工具9999端口的连接,并开始接收数据// 从socket中获取数据:一行一行的获取val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost",9999)// 2. 数据处理val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToSumDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_ + _ )// 3. 打印数据wordToSumDS.print()// 4. Driver程序执行时,streaming处理过程不能结束// 采集器在正常情况下启动后就不应该停止,除非特殊情况// 启动采集器ssc.start()// 等待采集器的结束ssc.awaitTermination()}}
结果:


解析:
a、采集周期时间之间,每一个采集周期生成一个RDD,按照时间的顺序依次进行
b、在每一个采集周期内,会执行wordcount计算,最终得出:统计出每一个采集周期时间的wordcount
相关文章:
spark stream入门案例:netcat准实时处理wordCount(scala 编程)
目录 案例需求 代码 结果 解析 案例需求: 使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数 -- 1. Spark从socket中获取数据:一行一行的获取 -- 2. Driver程序执行时,…...
Ansible基础及模块
Ansible是一个基于Python开发的配置管理和应用部署工具,能批量配置、部署、管理上千台主机。比如以前需要切换到每个主机上执行的一或多个操作,使用Ansible只需在固定的一台Ansible控制节点上去完成所有主机的操作 Ansible是基于模块工作的,它…...
Atlassian Confluence OGNL表达式注入RCE CVE-2021-26084
影响版本 All 4.x.x versions All 5.x.x versions All 6.0.x versions All 6.1.x versions All 6.2.x versions All 6.3.x versions All 6.4.x versions All 6.5.x versions All 6.6.x versions All 6.7.x versions All 6.8.x versions All 6.9.x versions All 6.1…...
【c语言】编译链接--详解
文章目录 一.程序的翻译环境和运行环境二.翻译环境:预编译编译汇编链接(一)预编译(二)编译1)词法分析2)语法分析3)语义分析 (三)汇编(四)链接1.编…...
国家开放大学 训练题
试卷代号:2044 教育研究方法 参考试题(开卷) 一、单选题(每题5分,共25分) 1.探索性研究常采用的研究方式包括( )。 A.文献调查、经验调查、典型情况或个案分析 B.调查性研究、…...
【灵动 Mini-G0001开发板】+Keil5开发环境搭建+ST-Link/V2程序下载和仿真+4颗LED100ms闪烁。
我们拿到手里的是【灵动 Mini-G0001开发板】 如下图 我们去官网下载开发板对应资料MM32G0001官网 我们需要下载Mini—G0001开发板的库函数与例程(第一手学习资料)Keil支持包, PCB文件有需要的,可以自行下载。用户指南需要下载&a…...
同为科技(TOWE)关于风力发电雷电防护的解决方案
风能作为一种可再生清洁能源,是国家新能源发展战略的重要组成部分。我国风能开发潜力高达2.510GW以上,近年来风力发电机组逐年增加,截止到2022年,全国风电装机容量约3.5亿千瓦,同比增长16.6%。然而,由于风力…...
gorm 中的事务运用
使用背景 在编写业务代码的过程中,如果涉及到多张表的更新操作,为了确保数据的一致性,我们会在业务代码的过程中加上事务的控制,那么针对go 语言中,如果我们使用gorm框架改如何操作呢? gorm中使用事务的几种方式 方式一(业务层事务)func NewTransaction() *gorm.DB {re…...
maven 新建模块 导入后 按Ctrl 点不进新建模块pom定义
新建的ruoyi-common-mybatisplus 模块,导入一直不正常 画出的模块一直导入不进来 这是提示信息 这是正常的提示信息 加上 <version>3.6.3</version> 后,才一切正常...
idea使用debug无法启动,使用run可以启动
1、将调试断点清除 使用快捷键ctrl shift F8,将勾选的选项去除即可 2、Error running SampleApplication: Command line is too long. Shorten command line for SampleApplication or also for Spring Boot default configuration,报这种错误&#x…...
进程的虚拟地址空间
一、 对于C/C程序员,我们看到的程序中的地址,都不是物理地址,而是操作系统映射的虚拟地址/线性地址,每一个进程都映射了同样结构的虚拟地址空间,让进程以为自己在独享内存资源,下图是以Linux下32位操作系统…...
做web自动化测试遇到Chrome浏览器老是自动更新,怎么办 ? 这里提供两个解决办法 。
web自动化安装驱动安装 进行web自动化时 ,需要提前安装浏览器的驱动 ,尤其是chrome浏览器 。它的更新速度很快 ,是不是更新了新版本 。这就导致我们的驱动也要跟着变化。 1.停止自动更新 那么 ,如何关闭chrome浏览器的自动更新…...
腾讯HR面试
一、如何看待腾讯的愿景 腾讯的愿景是成为“最受尊敬的互联网企业”,这一愿景表明了腾讯的目标是成为一个在互联网领域内具有极高影响力和声誉的企业。 为了实现这一愿景,腾讯坚持以长远的眼光、诚信负责的操守、共同成长的理念来发展公司的事业。这种…...
过滤器(Filter)和拦截器(Interceptor)有什么不同?
过滤器(Filter)和拦截器(Interceptor)是用于处理请求和响应的中间件组件,但它们在实现方式和应用场景上有一些不同。 实现方式: 过滤器是Servlet规范中定义的一种组件,通常以Java类的形式实现。过滤器通过在…...
Spring 注解 @Qualifier 详解
目录 1. 概述 2. 痛点 3. Qualifier 4. Qualifier VS Primary 5. 通过名称来自动注入 1. 概述 今天带你了解一下 Spring 框架中的 Qualifier 注解,它解决了哪些问题,以及如何使用它。我们还将了解它与 Primary 注解的不同之处。更多的技术解析请访…...
实现更低功耗R5F51406BDNE、R5F51406ADFK、R5F51406ADFL、R5F51406AGFN搭载RXv2内核的32位微控制器
一、简介 RX140产品群是RX100系列中处理性能最强、功耗最低的微控制器。可以广泛应用于家用电器、工业控制和楼宇自动化等领域。RX140采用RXv2内核,工作频率最高48MHz,处理性能是32MHz运行的RX130的近两倍。此外,它在运行时的电路为56μA/MH…...
通信系统中ZF,ML,MRC以及MMSE四种信号检测算法误码率matlab对比仿真
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1、ZF(零迫)算法 4.2、ML(最大似然)算法 4.3、MRC(最大比合并)算法 4.4、MMSE(最小均方误差ÿ…...
Redis数据结构之listpack
前言 当数据量较小时,Redis 会优先考虑用 ziplist 来存储 hash、list、zset,这么做可以有效的节省内存空间,因为 ziplist 是一块连续的内存空间,它采用一种紧凑的方式来存储元素。但是它也有缺点,比如查找的时间复杂度…...
VMware 配置记录
VMware 配置笔记 CentOS 7.9 镜像下载 官网太慢,建议在阿里云镜像站去CentOS配置页找标准版下载。 选标准版即可,各版本区别: DVD:标准版,包含常用软件,体积为 4.4 G;Everything:…...
【Java基础面试十四】、 封装的目的是什么,为什么要有封装?
文章底部有个人公众号:热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享? 踩过的坑没必要让别人在再踩,自己复盘也能加深记忆。利己利人、所谓双赢。 面试官: 封装的目的是什么&…...
逻辑回归:给不确定性划界的分类大师
想象你是一名医生。面对患者的检查报告(肿瘤大小、血液指标),你需要做出一个**决定性判断**:恶性还是良性?这种“非黑即白”的抉择,正是**逻辑回归(Logistic Regression)** 的战场&a…...
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
三分算法与DeepSeek辅助证明是单峰函数
前置 单峰函数有唯一的最大值,最大值左侧的数值严格单调递增,最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值,最小值左侧的数值严格单调递减,最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...
Oracle11g安装包
Oracle 11g安装包 适用于windows系统,64位 下载路径 oracle 11g 安装包...
二维FDTD算法仿真
二维FDTD算法仿真,并带完全匹配层,输入波形为高斯波、平面波 FDTD_二维/FDTD.zip , 6075 FDTD_二维/FDTD_31.m , 1029 FDTD_二维/FDTD_32.m , 2806 FDTD_二维/FDTD_33.m , 3782 FDTD_二维/FDTD_34.m , 4182 FDTD_二维/FDTD_35.m , 4793...
