用idea工具scala 和 Java开发 spark案例:WordCount
目录
一 环境准备
二 scala代码编写
三 java 代码编写
一 环境准备
创建一个 maven 工程
添加下列依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改
二 scala代码编写
首先准备好数据,即一个 txt 文本里面加一些单词,可以放在 hdfs 或本地或其它地方,读取的时候注意改代码,这里是读取 hdfs 上的 txt 文本,注意改成自己的地址
新建一个 scala 的 object,编写代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}object WordCountDemo {def main(args: Array[String]): Unit = {val conf : SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")val sc : SparkContext = SparkContext.getOrCreate(conf)var spark : SparkSession = SparkSession.builder().config(conf).getOrCreate()// val rdd1: RDD[String] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt")
// val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))
// val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
// val result: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)val result2: RDD[(String, Int)] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)//打印到 console// result2.glom().collect.foreach(x=>println(x.toList))//保存到 hdfsresult2.saveAsTextFile("hdfs://101.200.63.3:9000/kb23/sparkoutput/wordcount")}}
这里稍微解释一下代码中的一些函数:
map:转换函数,数据集合中每个元素进行一次我们定义的方法
flatMap: 与map类似,但是映射为0个或多个
collect:以数组的形式返回数据集中的所有元素
glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。
云服务器的朋友可能有的报错
22/05/0305:48:53 WARN DFSClient: Failed to connect to /10.0.24.10:9866 for block, add to deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
出现这种错误看字面意思就很容易明白,这是本地与 datanode 通信时,namenode 给的是 datanode 的内网 ip,所以本地找不到
解决方法也很简单,设置一下让 namenode 传过来的是服务器名而不是 ip
在 idea 中,resource 文件夹中添加文件 hdfs-site.xml
hdfs-site.xml内容:
<!-- datanode 通信是否使用域名,默认为false,改为true --><property><name>dfs.client.use.datanode.hostname</name><value>true</value><description>Whether datanodes should use datanode hostnames whenconnecting to other datanodes for data transfer.</description></property>
三 java 代码编写
这里原数据存储在本地,文件名为 input.txt
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.Map;public class WordCount {public static void main(String[] args) {// 创建SparkConf对象SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");// 创建JavaSparkContext对象JavaSparkContext sc = new JavaSparkContext(conf);// 读取文本文件JavaRDD<String> lines = sc.textFile("input.txt");// 计算单词出现次数JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());JavaRDD<String> filteredWords = words.filter(word -> !word.isEmpty());JavaPairRDD<String, Integer> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((x, y) -> x + y);Map<String, Integer> wordCountsMap = wordCounts.collectAsMap();// 输出结果for (Map.Entry<String, Integer> entry : wordCountsMap.entrySet()) {System.out.println(entry.getKey() + ": " + entry.getValue());}// 关闭JavaSparkContext对象sc.close();}
}
相关文章:
用idea工具scala 和 Java开发 spark案例:WordCount
目录 一 环境准备 二 scala代码编写 三 java 代码编写 一 环境准备 创建一个 maven 工程 添加下列依赖 <dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</vers…...
【git merge/rebase】详解合并代码、解决冲突
目录 1.概述 2.merge 3.rebase 4.merge和rabase的区别 5.解决冲突 1.概述 在实际开发中,一个项目往往是多个人一起协作的,头天下班前大家把代码交到远端仓库,第二天工作的第一件事情都是从服务器上拉最新的代码,保证代码版本…...
nrm,npm源的管理工具
npm手动切换淘宝源 查看当前的仓库 npm config get registry设置成淘宝源 npm config set registry https://registry.npmmirror.com/设置回官方源 npm config set registry https://registry.npmjs.org/手动切换不免不太方便,而且网上很多资料淘宝源还是过期的链接…...
HarmonyOS/OpenHarmony原生应用-ArkTS万能卡片组件Stack
堆叠容器,子组件按照顺序依次入栈,后一个子组件覆盖前一个子组件。该组件从API Version 7开始支持。可以包含子组件。 一、接口 Stack(value?: { alignContent?: Alignment }) 从API version 9开始,该接口支持在ArkTS卡片中使用。 二、…...
腾讯云2核4G服务器一年和三年价格性能测评
腾讯云轻量2核4G5M服务器:CPU内存流量带宽系统盘性能测评:轻量应用服务器2核4G5M带宽,免费500GB月流量,60GB系统盘SSD盘,5M带宽下载速度可达640KB/秒,流量超额按照0.8元每GB的价格支付流量费,轻…...
集线器、交换机、路由器是如何转发包的
集线器、交换机、路由器是如何转发包的 集线器交换机MAC地址表的维护 路由器路由表中的信息路由器的包接收操作查询路由表确定输出端口找不到匹配路由时选择默认路由包的有效期通过分片功能拆分大网络包路由器发送操作中的一些特点 参考文档 集线器 集线器是一层(物…...
交通物流模型 | MDRGCN:用于多模式交通客流预测的深度学习模型
城市交通拥堵是造成交通事故的重要原因,也是城市发展的主要障碍。通过学习历史交通流数据,我们可以预测未来一些区域的交通流,这对城市道路规划、交通管理、交通控制等都有重要意义。然而,由于交通网络拓扑结构的复杂性和影响交通流的因素的多样性,交通模式往往是复杂多变…...
保研经历分享(一)
这个系列的文章主要是想记录一下自己大学期间最重要的一件事(保研!!)的经历、过程,外加一些保研流程介绍、面试经验、院校投递、踩坑经历,主要给学弟学妹们避雷,也做一些借鉴吧~ 这一篇主要是对保研过程的一些介绍&…...
【手写数字识别】数据挖掘实验二
文章目录 Ⅰ、项目任务要求任务描述:主要任务要求(必须完成以下内容但不限于这些内容): II、实现过程数据集描述实验运行环境描述KNN模型决策树模型朴素贝叶斯模型SVM模型不同方法对MNIST数据集分类识别结果分析(不同方法识别对比率表及结果分析) 完整代…...
什么是云计算?云计算简介
其实“云计算”作为一个名词而言,那是相当成功滴。很多人都有听过。但提及云计算”具体是什么?很多人,知其然,却不知其所以然! 利用软件将这些成千上万不可靠的硬件组织成一个稳定可靠的IT系统,以此支撑其公司的IT基础服务。这家…...
Vue路由进阶--VueRouter声明式导航
Vue路由进阶–VueRouter声明式导航 文章目录 Vue路由进阶--VueRouter声明式导航1、声明式导航1.1、导航链接1.2、高亮类名1.3、跳转传参1.4、动态路由参数可选符 1、声明式导航 1.1、导航链接 需求:实现导航高亮效果 vue-router提供了一个全局组件router-link(取…...
Oracle 云服务即将支持 PostgreSQL!
2023 年 9 月 19 日,Oracle 产品团队发布了一篇文章,宣布 Oracle 云基础架构(OCI)开始提供 PostgreSQL 服务。目前支持的版本为 PostgreSQL 14.9,提供有限支持,12 月份将会提供正式版本。 众所周知&#x…...
数字孪生项目:突破技术难关,引领未来发展
项目背景 数字孪生技术一直在不断发展,为企业提供了无限的潜力和机会。在这个数字时代,公司需要不断进化,以适应市场的需求和客户的期望。北京智汇云舟一直以“视频孪生”为标签,是数字孪生领域的头部企业,拥有强大的…...
MySQL 如何使用离线模式维护服务器
离线模式 作为 DBA,最常见的任务之一就是批量处理 MySQL 服务的启停或其他一些活动。在停止 MySQL 服务前,我们可能需要检查是否有活动连接;如果有,我们可能需要把它们全部杀死。通常,我们使用 pt-kill 杀死应用连接或…...
期权开户流程合集——期权开户的操作步骤
最详细的期权开户流程介绍是怎样的,下文为大家介绍期权开户流程合集——期权开户的操作步骤的知识点,希望对读者有所帮助,期权开户流程和方式分两种,一种券商,一种期权分仓平台,有啥区别下文揭秘。本文来自…...
mysql改造oracle,以及项目改造
mysql改造oracle,以及springboot项目改造 oracle改造说明 这次的任务是springboot mysql版本改造为oracle版本,mysql5.7,oracle11.2,springboot2.0.2(springboot版本无所谓,都差不多,自己记录…...
利用互斥锁实现多个线程写一个文件
代码 #include <stdio.h> #include <pthread.h> #include <string.h> #include <unistd.h>FILE *fp;//线程函数1 void *wrfunc1(void *arg); //线程函数2 void *wrfunc2(void *arg); //线程函数3 void *wrfunc3(void *arg);//静态创建互斥锁 pthread_…...
【m98】视频缓存PacketBuffer 1 : SeqNumUnwrapper int64映射、ForwardDiff
视频缓存PacketBuffer 对rtp包进行接收处理。 rtp序号 相关 【mediasoup】RtpStreamRecv 对rtp 序号的验证 与这里的处理有不同。...
day58:ARMday5,GPIO流水灯实验
汇编指令: .text .global _start _start: 1.设置GPIOE GPIOF寄存器的时钟使能 RCC_MP_AHB4ENSETR[5:4]->1 0x50000a28 LDR R0,0x50000a28 LDR R1,[R0] ORR R1,R1,#(0x3<<4) STR R1,[R0]2.设置PE10、PF10、PE8管脚为输出模式,GPIOE_MODER[21…...
Linux shell编程学习笔记9:字符串运算 和 if语句
Linux Shell 脚本编程和其他编程语言一样,支持算数、关系、布尔、字符串、文件测试等多种运算,同样也需要进行根据条件进行流程控制,提供了if、for、while、until等语句。 上期学习笔记中我们研究了字符串数据的使用,今天我们研…...
告别效率黑洞:AOSP构建降本增效实战!更有最新技术报告免费领!
近年来,AI模型训练与大型软件构建的复杂度持续攀升,企业级操作系统的多分支、多产品构建正成为工程团队的“效率黑洞”。在 Android 平台,AOSP 构建尤为突出:全量构建耗时长、增量改动触发大规模重建、CI 队列冗长、资源消耗高等问…...
FastAPI + SQLite:从基础CRUD到安全并发的实战指南
核心摘要本文将带你超越FastAPI SQLite的基础CRUD搭建,聚焦于安全防护(认证、授权、输入验证)与并发处理(数据库连接池、异步优化)两大实战痛点。你会获得一套可直接复用的项目骨架,并理解其背后的设计逻辑…...
从原理到实践:深入理解Shellcode免杀技术及其对抗策略
Shellcode免杀技术的深度解析与对抗策略演进 在网络安全攻防对抗的永恒博弈中,Shellcode免杀技术始终占据着特殊地位。不同于传统的恶意软件检测规避,Shellcode免杀更注重代码层面的"隐形"能力,其核心在于让关键载荷在内存中执行时…...
管道应力理论(应用)
本文仅对管道应力涉及的理论知识(偏向于应用)进行简单介绍。管道应力:对管道应力校核是为了防止管壁内应力过大对管道造成破坏,不同的荷载引起不同类型的应力,在实际工程应用中,一般分为三种:一…...
Android TTS开发避坑指南:为什么你的Google语音引擎播不出中文?从初始化到语音包管理的完整解决方案
Android TTS开发实战:解决Google语音引擎中文播报的7个关键问题 在移动应用开发中,文字转语音(TTS)功能正变得越来越重要。从无障碍辅助功能到语音导航、有声阅读,TTS技术为应用增添了更丰富的交互维度。然而,许多Android开发者在…...
Jetson Nano实战:FFmpeg与Nginx的RTMP推流配置全解析
1. Jetson Nano与RTMP推流基础认知 第一次接触Jetson Nano做视频推流时,我对着这块信用卡大小的开发板研究了整整三天。这块搭载了128核NVIDIA Maxwell GPU的小家伙,其实是个隐藏的视频处理高手。RTMP协议就像快递公司的"当日达"服务ÿ…...
intv_ai_mk11实测效果:在24GB显存限制下保持128~512 token长文本生成质量
intv_ai_mk11实测效果:在24GB显存限制下保持128~512 token长文本生成质量 1. 模型效果惊艳展示 intv_ai_mk11作为一款基于Llama架构的中等规模文本生成模型,在24GB显存环境下展现出了令人印象深刻的长文本生成能力。不同于常规模型在显存限制下容易出现…...
Linux文件搜索工具FSearch:从卡顿到闪电的搜索体验革新
Linux文件搜索工具FSearch:从卡顿到闪电的搜索体验革新 【免费下载链接】fsearch A fast file search utility for Unix-like systems based on GTK3 项目地址: https://gitcode.com/gh_mirrors/fs/fsearch 在Linux系统中,文件搜索往往是一场与时…...
C++ 安全子集:探讨在关键任务系统中限制部分 C++ 特性(如 RTTI)的必要性
尊敬的各位专家、各位同仁,大家好。今天,我们齐聚一堂,共同探讨一个在软件工程领域,尤其是在关键任务系统(Critical Mission Systems)开发中至关重要的话题:C 安全子集——在严苛环境下限制部分…...
ruoyi-vue-pro源码部署实战:如何选择稳定版本并快速搭建开发环境
RuoYi-Vue-Pro 稳定版部署指南:从版本选择到开发环境搭建全解析 第一次接触 RuoYi-Vue-Pro 这个 Java 快速开发框架时,我像大多数开发者一样直接克隆了 master 分支,结果编译阶段就遭遇了各种依赖冲突和接口报错。后来才发现,这个…...
