当前位置: 首页 > news >正文

用idea工具scala 和 Java开发 spark案例:WordCount

目录

一 环境准备

二 scala代码编写

三 java 代码编写


一 环境准备

        创建一个 maven 工程

        添加下列依赖

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>

        原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改

        

二 scala代码编写

        首先准备好数据,即一个 txt 文本里面加一些单词,可以放在 hdfs 或本地或其它地方,读取的时候注意改代码,这里是读取 hdfs 上的 txt 文本,注意改成自己的地址

         新建一个 scala 的 object,编写代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}object WordCountDemo {def main(args: Array[String]): Unit = {val conf : SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")val sc : SparkContext = SparkContext.getOrCreate(conf)var spark : SparkSession = SparkSession.builder().config(conf).getOrCreate()//    val rdd1: RDD[String] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt")
//    val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))
//    val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
//    val result: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)val result2: RDD[(String, Int)] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)//打印到 console//    result2.glom().collect.foreach(x=>println(x.toList))//保存到 hdfsresult2.saveAsTextFile("hdfs://101.200.63.3:9000/kb23/sparkoutput/wordcount")}}

        这里稍微解释一下代码中的一些函数:

        map:转换函数,数据集合中每个元素进行一次我们定义的方法

        flatMap: 与map类似,但是映射为0个或多个

        collect:以数组的形式返回数据集中的所有元素 

        glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

 

        云服务器的朋友可能有的报错

22/05/0305:48:53 WARN DFSClient: Failed to connect to /10.0.24.10:9866 for block, add to deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]

        出现这种错误看字面意思就很容易明白,这是本地与 datanode 通信时,namenode 给的是 datanode 的内网 ip,所以本地找不到

        解决方法也很简单,设置一下让 namenode 传过来的是服务器名而不是 ip

        在 idea 中,resource 文件夹中添加文件 hdfs-site.xml

        hdfs-site.xml内容:

<!-- datanode 通信是否使用域名,默认为false,改为true --><property><name>dfs.client.use.datanode.hostname</name><value>true</value><description>Whether datanodes should use datanode hostnames whenconnecting to other datanodes for data transfer.</description></property>

三 java 代码编写

        这里原数据存储在本地,文件名为 input.txt

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.Map;public class WordCount {public static void main(String[] args) {// 创建SparkConf对象SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");// 创建JavaSparkContext对象JavaSparkContext sc = new JavaSparkContext(conf);// 读取文本文件JavaRDD<String> lines = sc.textFile("input.txt");// 计算单词出现次数JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());JavaRDD<String> filteredWords = words.filter(word -> !word.isEmpty());JavaPairRDD<String, Integer> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((x, y) -> x + y);Map<String, Integer> wordCountsMap = wordCounts.collectAsMap();// 输出结果for (Map.Entry<String, Integer> entry : wordCountsMap.entrySet()) {System.out.println(entry.getKey() + ": " + entry.getValue());}// 关闭JavaSparkContext对象sc.close();}
}

相关文章:

用idea工具scala 和 Java开发 spark案例:WordCount

目录 一 环境准备 二 scala代码编写 三 java 代码编写 一 环境准备 创建一个 maven 工程 添加下列依赖 <dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</vers…...

【git merge/rebase】详解合并代码、解决冲突

目录 1.概述 2.merge 3.rebase 4.merge和rabase的区别 5.解决冲突 1.概述 在实际开发中&#xff0c;一个项目往往是多个人一起协作的&#xff0c;头天下班前大家把代码交到远端仓库&#xff0c;第二天工作的第一件事情都是从服务器上拉最新的代码&#xff0c;保证代码版本…...

nrm,npm源的管理工具

npm手动切换淘宝源 查看当前的仓库 npm config get registry设置成淘宝源 npm config set registry https://registry.npmmirror.com/设置回官方源 npm config set registry https://registry.npmjs.org/手动切换不免不太方便&#xff0c;而且网上很多资料淘宝源还是过期的链接…...

HarmonyOS/OpenHarmony原生应用-ArkTS万能卡片组件Stack

堆叠容器&#xff0c;子组件按照顺序依次入栈&#xff0c;后一个子组件覆盖前一个子组件。该组件从API Version 7开始支持。可以包含子组件。 一、接口 Stack(value?: { alignContent?: Alignment }) 从API version 9开始&#xff0c;该接口支持在ArkTS卡片中使用。 二、…...

腾讯云2核4G服务器一年和三年价格性能测评

腾讯云轻量2核4G5M服务器&#xff1a;CPU内存流量带宽系统盘性能测评&#xff1a;轻量应用服务器2核4G5M带宽&#xff0c;免费500GB月流量&#xff0c;60GB系统盘SSD盘&#xff0c;5M带宽下载速度可达640KB/秒&#xff0c;流量超额按照0.8元每GB的价格支付流量费&#xff0c;轻…...

集线器、交换机、路由器是如何转发包的

集线器、交换机、路由器是如何转发包的 集线器交换机MAC地址表的维护 路由器路由表中的信息路由器的包接收操作查询路由表确定输出端口找不到匹配路由时选择默认路由包的有效期通过分片功能拆分大网络包路由器发送操作中的一些特点 参考文档 集线器 集线器是一层&#xff08;物…...

交通物流模型 | MDRGCN:用于多模式交通客流预测的深度学习模型

城市交通拥堵是造成交通事故的重要原因,也是城市发展的主要障碍。通过学习历史交通流数据,我们可以预测未来一些区域的交通流,这对城市道路规划、交通管理、交通控制等都有重要意义。然而,由于交通网络拓扑结构的复杂性和影响交通流的因素的多样性,交通模式往往是复杂多变…...

保研经历分享(一)

这个系列的文章主要是想记录一下自己大学期间最重要的一件事&#xff08;保研!!&#xff09;的经历、过程&#xff0c;外加一些保研流程介绍、面试经验、院校投递、踩坑经历&#xff0c;主要给学弟学妹们避雷&#xff0c;也做一些借鉴吧~ 这一篇主要是对保研过程的一些介绍&…...

【手写数字识别】数据挖掘实验二

文章目录 Ⅰ、项目任务要求任务描述&#xff1a;主要任务要求(必须完成以下内容但不限于这些内容)&#xff1a; II、实现过程数据集描述实验运行环境描述KNN模型决策树模型朴素贝叶斯模型SVM模型不同方法对MNIST数据集分类识别结果分析(不同方法识别对比率表及结果分析) 完整代…...

什么是云计算?云计算简介

其实“云计算”作为一个名词而言&#xff0c;那是相当成功滴。很多人都有听过。但提及云计算”具体是什么?很多人&#xff0c;知其然&#xff0c;却不知其所以然! 利用软件将这些成千上万不可靠的硬件组织成一个稳定可靠的IT系统&#xff0c;以此支撑其公司的IT基础服务。这家…...

Vue路由进阶--VueRouter声明式导航

Vue路由进阶–VueRouter声明式导航 文章目录 Vue路由进阶--VueRouter声明式导航1、声明式导航1.1、导航链接1.2、高亮类名1.3、跳转传参1.4、动态路由参数可选符 1、声明式导航 1.1、导航链接 需求&#xff1a;实现导航高亮效果 vue-router提供了一个全局组件router-link(取…...

Oracle 云服务即将支持 PostgreSQL!

2023 年 9 月 19 日&#xff0c;Oracle 产品团队发布了一篇文章&#xff0c;宣布 Oracle 云基础架构&#xff08;OCI&#xff09;开始提供 PostgreSQL 服务。目前支持的版本为 PostgreSQL 14.9&#xff0c;提供有限支持&#xff0c;12 月份将会提供正式版本。 众所周知&#x…...

数字孪生项目:突破技术难关,引领未来发展

项目背景 数字孪生技术一直在不断发展&#xff0c;为企业提供了无限的潜力和机会。在这个数字时代&#xff0c;公司需要不断进化&#xff0c;以适应市场的需求和客户的期望。北京智汇云舟一直以“视频孪生”为标签&#xff0c;是数字孪生领域的头部企业&#xff0c;拥有强大的…...

MySQL 如何使用离线模式维护服务器

离线模式 作为 DBA&#xff0c;最常见的任务之一就是批量处理 MySQL 服务的启停或其他一些活动。在停止 MySQL 服务前&#xff0c;我们可能需要检查是否有活动连接&#xff1b;如果有&#xff0c;我们可能需要把它们全部杀死。通常&#xff0c;我们使用 pt-kill 杀死应用连接或…...

期权开户流程合集——期权开户的操作步骤

最详细的期权开户流程介绍是怎样的&#xff0c;下文为大家介绍期权开户流程合集——期权开户的操作步骤的知识点&#xff0c;希望对读者有所帮助&#xff0c;期权开户流程和方式分两种&#xff0c;一种券商&#xff0c;一种期权分仓平台&#xff0c;有啥区别下文揭秘。本文来自…...

mysql改造oracle,以及项目改造

mysql改造oracle&#xff0c;以及springboot项目改造 oracle改造说明 这次的任务是springboot mysql版本改造为oracle版本&#xff0c;mysql5.7&#xff0c;oracle11.2&#xff0c;springboot2.0.2&#xff08;springboot版本无所谓&#xff0c;都差不多&#xff0c;自己记录…...

利用互斥锁实现多个线程写一个文件

代码 #include <stdio.h> #include <pthread.h> #include <string.h> #include <unistd.h>FILE *fp;//线程函数1 void *wrfunc1(void *arg); //线程函数2 void *wrfunc2(void *arg); //线程函数3 void *wrfunc3(void *arg);//静态创建互斥锁 pthread_…...

【m98】视频缓存PacketBuffer 1 : SeqNumUnwrapper int64映射、ForwardDiff

视频缓存PacketBuffer 对rtp包进行接收处理。 rtp序号 相关 【mediasoup】RtpStreamRecv 对rtp 序号的验证 与这里的处理有不同。...

day58:ARMday5,GPIO流水灯实验

汇编指令&#xff1a; .text .global _start _start: 1.设置GPIOE GPIOF寄存器的时钟使能 RCC_MP_AHB4ENSETR[5:4]->1 0x50000a28 LDR R0,0x50000a28 LDR R1,[R0] ORR R1,R1,#(0x3<<4) STR R1,[R0]2.设置PE10、PF10、PE8管脚为输出模式&#xff0c;GPIOE_MODER[21…...

Linux shell编程学习笔记9:字符串运算 和 if语句

Linux Shell 脚本编程和其他编程语言一样&#xff0c;支持算数、关系、布尔、字符串、文件测试等多种运算&#xff0c;同样也需要进行根据条件进行流程控制&#xff0c;提供了if、for、while、until等语句。 上期学习笔记中我们研究了字符串数据的使用&#xff0c;今天我们研…...

【分享】xpath的属性表达式

在XPath中&#xff0c;要选择HTML文档中具有特定类的元素&#xff0c;您通常需要使用属性选择器 [attribute-nameattribute-value] 来选择元素&#xff0c;其中 attribute-name 是属性名称&#xff0c;attribute-value 是要匹配的属性值。对于HTML元素的类选择器&#xff0c;您…...

Oracle Dataguard跨版本数据迁移(11.2.0.4~19.13.0.0)

一、前期准备 按照DG部署步骤修改DG参数、添加standby redo log、配置静态监听、配置tnsnames文件、备端修改参数文件、创建所需目录等配置好部署环境&#xff0c;这里不再赘述&#xff0c;跟正常部署DG无区别。 环境配置好后&#xff0c;进行后面的操作。 二、使用RMAN备份复…...

零基础Linux_14(基础IO_文件)缓冲区+文件系统inode等

目录 1. 缓冲区 1.1 缓冲区的存在 1.2 缓冲区的刷新策略 1.3 模拟C标准库中的文件操作 完整代码及验证&#xff1a; 1.4 重看缓冲区 1.5 stdout和stderr的区别 2. 文件系统 2.1 磁盘的物理结构CHS等 2.2 磁盘的抽象结构LBA等 2.3 文件管理inode等 2.4 对文件的操作…...

Vue中的router路由的介绍(快速入门)

路由的介绍 文章目录 路由的介绍1、VueRouter的介绍2、VueRouter的使用&#xff08;52&#xff09;2.1、5个基础步骤(固定)2.2、两个核心步骤 3、组件存放的目录&#xff08;组件分类&#xff09; 生活中的路由&#xff1a;设备和ip的映射关系&#xff08;路由器&#xff09; V…...

ESP-07S进行TCP 通信测试

一&#xff0c;TCP Server 为 AP 模式&#xff0c;TCP Client 为 Station 模式。 这里电脑pc作为TCP Server&#xff0c;ESP-07S作为TCP Client 。 二&#xff0c;电脑端配置。 1&#xff0c;开启热点。 2&#xff0c;转到“设置”&#xff0c;编辑热点信息。 3&#xff0c;关闭…...

如何找到新媒体矩阵中存在的问题?

随着数字媒体的发展&#xff0c;企业的新媒体矩阵已成为品牌推广和营销的重要手段之一。 然而&#xff0c;很多企业在搭建新媒体矩阵的过程中&#xff0c;往往会忽略一些问题&#xff0c;导致矩阵发展存在潜在风险&#xff0c;影响整个矩阵运营效果。 因此&#xff0c;找到目前…...

MongoDB-基本常用命令

基本常用命令 MongoDB常用命令a) 案例需求b) 数据库操作b.1) 选择和创建数据库b.2) 删除数据库 c) 集合操作c.1) 集合的显示创建c.2) 集合的隐式创建c.3) 集合的删除 d) 文档基本CRUDd.1) 文档的插入(1) 单个文档的插入(2) 批量插入 d.2) 文档的基本查询(1) 查询所有(2) 投影查…...

Linux 常用systemctl service 脚本

文章目录 1. jar 包部署 service 脚本2. nginx 服务安装 脚本3.artemis 服务安装脚本 1. jar 包部署 service 脚本 默认jdk 执行&#xff1a; [Service] Typesimple Userroot WorkingDirectory/opt/app/webserver ExecStart/usr/bin/java -Xms512m -Xss256k -jar /opt/app/we…...

flask-sqlalchemy实现读写分离完整版

1. 依赖版本: alembic==1.6.5 click==8.0.1 colorama==0.4.4 Flask==1.1.2 Flask-Migrate==2.7.0 Flask-Script==2.0.6 Flask-SQLAlchemy==2.4.4 greenlet==1.1.0 itsdangerous==2.0.1 Jinja2==3.0.1 Mako==1.1.4 MarkupSafe==2.0.1 protobuf==3.17.3 PyMySQL==1.0.2 python-…...

windows下在cmd和git bash中执行bash download.sh失败

cmd报错信息&#xff1a; 解决办法&#xff1a; win64-wget-1.21.4 安装软件wget&#xff0c;如下这是64位的包&#xff0c;解压后&#xff0c;下面有个wget.exe&#xff0c;拷贝到C:\Windows\System32、 然后打开cmd&#xff0c;执行wget -V 如上&#xff0c;有版本信息就O…...