用idea工具scala 和 Java开发 spark案例:WordCount
目录
一 环境准备
二 scala代码编写
三 java 代码编写
一 环境准备
创建一个 maven 工程
添加下列依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
原本就下载过这些依赖的没必要再下一遍,可以用之前的,比如 json,mysql,mysq 这里版本是 mysql 5 ,不一样的注意修改
二 scala代码编写
首先准备好数据,即一个 txt 文本里面加一些单词,可以放在 hdfs 或本地或其它地方,读取的时候注意改代码,这里是读取 hdfs 上的 txt 文本,注意改成自己的地址
新建一个 scala 的 object,编写代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}object WordCountDemo {def main(args: Array[String]): Unit = {val conf : SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")val sc : SparkContext = SparkContext.getOrCreate(conf)var spark : SparkSession = SparkSession.builder().config(conf).getOrCreate()// val rdd1: RDD[String] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt")
// val rdd2: RDD[String] = rdd1.flatMap(x => x.split(" "))
// val rdd3: RDD[(String, Int)] = rdd2.map(x => (x, 1))
// val result: RDD[(String, Int)] = rdd3.reduceByKey(_ + _)val result2: RDD[(String, Int)] = sc.textFile("hdfs://101.200.63.3:9000/kb23/tmp/*.txt").flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)//打印到 console// result2.glom().collect.foreach(x=>println(x.toList))//保存到 hdfsresult2.saveAsTextFile("hdfs://101.200.63.3:9000/kb23/sparkoutput/wordcount")}}
这里稍微解释一下代码中的一些函数:
map:转换函数,数据集合中每个元素进行一次我们定义的方法
flatMap: 与map类似,但是映射为0个或多个
collect:以数组的形式返回数据集中的所有元素
glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。
云服务器的朋友可能有的报错
22/05/0305:48:53 WARN DFSClient: Failed to connect to /10.0.24.10:9866 for block, add to deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.0.24.10:9866]
出现这种错误看字面意思就很容易明白,这是本地与 datanode 通信时,namenode 给的是 datanode 的内网 ip,所以本地找不到
解决方法也很简单,设置一下让 namenode 传过来的是服务器名而不是 ip
在 idea 中,resource 文件夹中添加文件 hdfs-site.xml
hdfs-site.xml内容:
<!-- datanode 通信是否使用域名,默认为false,改为true --><property><name>dfs.client.use.datanode.hostname</name><value>true</value><description>Whether datanodes should use datanode hostnames whenconnecting to other datanodes for data transfer.</description></property>
三 java 代码编写
这里原数据存储在本地,文件名为 input.txt
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.Map;public class WordCount {public static void main(String[] args) {// 创建SparkConf对象SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");// 创建JavaSparkContext对象JavaSparkContext sc = new JavaSparkContext(conf);// 读取文本文件JavaRDD<String> lines = sc.textFile("input.txt");// 计算单词出现次数JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());JavaRDD<String> filteredWords = words.filter(word -> !word.isEmpty());JavaPairRDD<String, Integer> wordCounts = filteredWords.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((x, y) -> x + y);Map<String, Integer> wordCountsMap = wordCounts.collectAsMap();// 输出结果for (Map.Entry<String, Integer> entry : wordCountsMap.entrySet()) {System.out.println(entry.getKey() + ": " + entry.getValue());}// 关闭JavaSparkContext对象sc.close();}
}
相关文章:
用idea工具scala 和 Java开发 spark案例:WordCount
目录 一 环境准备 二 scala代码编写 三 java 代码编写 一 环境准备 创建一个 maven 工程 添加下列依赖 <dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</vers…...

【git merge/rebase】详解合并代码、解决冲突
目录 1.概述 2.merge 3.rebase 4.merge和rabase的区别 5.解决冲突 1.概述 在实际开发中,一个项目往往是多个人一起协作的,头天下班前大家把代码交到远端仓库,第二天工作的第一件事情都是从服务器上拉最新的代码,保证代码版本…...
nrm,npm源的管理工具
npm手动切换淘宝源 查看当前的仓库 npm config get registry设置成淘宝源 npm config set registry https://registry.npmmirror.com/设置回官方源 npm config set registry https://registry.npmjs.org/手动切换不免不太方便,而且网上很多资料淘宝源还是过期的链接…...

HarmonyOS/OpenHarmony原生应用-ArkTS万能卡片组件Stack
堆叠容器,子组件按照顺序依次入栈,后一个子组件覆盖前一个子组件。该组件从API Version 7开始支持。可以包含子组件。 一、接口 Stack(value?: { alignContent?: Alignment }) 从API version 9开始,该接口支持在ArkTS卡片中使用。 二、…...

腾讯云2核4G服务器一年和三年价格性能测评
腾讯云轻量2核4G5M服务器:CPU内存流量带宽系统盘性能测评:轻量应用服务器2核4G5M带宽,免费500GB月流量,60GB系统盘SSD盘,5M带宽下载速度可达640KB/秒,流量超额按照0.8元每GB的价格支付流量费,轻…...

集线器、交换机、路由器是如何转发包的
集线器、交换机、路由器是如何转发包的 集线器交换机MAC地址表的维护 路由器路由表中的信息路由器的包接收操作查询路由表确定输出端口找不到匹配路由时选择默认路由包的有效期通过分片功能拆分大网络包路由器发送操作中的一些特点 参考文档 集线器 集线器是一层(物…...

交通物流模型 | MDRGCN:用于多模式交通客流预测的深度学习模型
城市交通拥堵是造成交通事故的重要原因,也是城市发展的主要障碍。通过学习历史交通流数据,我们可以预测未来一些区域的交通流,这对城市道路规划、交通管理、交通控制等都有重要意义。然而,由于交通网络拓扑结构的复杂性和影响交通流的因素的多样性,交通模式往往是复杂多变…...

保研经历分享(一)
这个系列的文章主要是想记录一下自己大学期间最重要的一件事(保研!!)的经历、过程,外加一些保研流程介绍、面试经验、院校投递、踩坑经历,主要给学弟学妹们避雷,也做一些借鉴吧~ 这一篇主要是对保研过程的一些介绍&…...

【手写数字识别】数据挖掘实验二
文章目录 Ⅰ、项目任务要求任务描述:主要任务要求(必须完成以下内容但不限于这些内容): II、实现过程数据集描述实验运行环境描述KNN模型决策树模型朴素贝叶斯模型SVM模型不同方法对MNIST数据集分类识别结果分析(不同方法识别对比率表及结果分析) 完整代…...

什么是云计算?云计算简介
其实“云计算”作为一个名词而言,那是相当成功滴。很多人都有听过。但提及云计算”具体是什么?很多人,知其然,却不知其所以然! 利用软件将这些成千上万不可靠的硬件组织成一个稳定可靠的IT系统,以此支撑其公司的IT基础服务。这家…...

Vue路由进阶--VueRouter声明式导航
Vue路由进阶–VueRouter声明式导航 文章目录 Vue路由进阶--VueRouter声明式导航1、声明式导航1.1、导航链接1.2、高亮类名1.3、跳转传参1.4、动态路由参数可选符 1、声明式导航 1.1、导航链接 需求:实现导航高亮效果 vue-router提供了一个全局组件router-link(取…...

Oracle 云服务即将支持 PostgreSQL!
2023 年 9 月 19 日,Oracle 产品团队发布了一篇文章,宣布 Oracle 云基础架构(OCI)开始提供 PostgreSQL 服务。目前支持的版本为 PostgreSQL 14.9,提供有限支持,12 月份将会提供正式版本。 众所周知&#x…...

数字孪生项目:突破技术难关,引领未来发展
项目背景 数字孪生技术一直在不断发展,为企业提供了无限的潜力和机会。在这个数字时代,公司需要不断进化,以适应市场的需求和客户的期望。北京智汇云舟一直以“视频孪生”为标签,是数字孪生领域的头部企业,拥有强大的…...
MySQL 如何使用离线模式维护服务器
离线模式 作为 DBA,最常见的任务之一就是批量处理 MySQL 服务的启停或其他一些活动。在停止 MySQL 服务前,我们可能需要检查是否有活动连接;如果有,我们可能需要把它们全部杀死。通常,我们使用 pt-kill 杀死应用连接或…...

期权开户流程合集——期权开户的操作步骤
最详细的期权开户流程介绍是怎样的,下文为大家介绍期权开户流程合集——期权开户的操作步骤的知识点,希望对读者有所帮助,期权开户流程和方式分两种,一种券商,一种期权分仓平台,有啥区别下文揭秘。本文来自…...
mysql改造oracle,以及项目改造
mysql改造oracle,以及springboot项目改造 oracle改造说明 这次的任务是springboot mysql版本改造为oracle版本,mysql5.7,oracle11.2,springboot2.0.2(springboot版本无所谓,都差不多,自己记录…...
利用互斥锁实现多个线程写一个文件
代码 #include <stdio.h> #include <pthread.h> #include <string.h> #include <unistd.h>FILE *fp;//线程函数1 void *wrfunc1(void *arg); //线程函数2 void *wrfunc2(void *arg); //线程函数3 void *wrfunc3(void *arg);//静态创建互斥锁 pthread_…...
【m98】视频缓存PacketBuffer 1 : SeqNumUnwrapper int64映射、ForwardDiff
视频缓存PacketBuffer 对rtp包进行接收处理。 rtp序号 相关 【mediasoup】RtpStreamRecv 对rtp 序号的验证 与这里的处理有不同。...

day58:ARMday5,GPIO流水灯实验
汇编指令: .text .global _start _start: 1.设置GPIOE GPIOF寄存器的时钟使能 RCC_MP_AHB4ENSETR[5:4]->1 0x50000a28 LDR R0,0x50000a28 LDR R1,[R0] ORR R1,R1,#(0x3<<4) STR R1,[R0]2.设置PE10、PF10、PE8管脚为输出模式,GPIOE_MODER[21…...

Linux shell编程学习笔记9:字符串运算 和 if语句
Linux Shell 脚本编程和其他编程语言一样,支持算数、关系、布尔、字符串、文件测试等多种运算,同样也需要进行根据条件进行流程控制,提供了if、for、while、until等语句。 上期学习笔记中我们研究了字符串数据的使用,今天我们研…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地
借阿里云中企出海大会的东风,以**「云启出海,智联未来|打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办,现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...

P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

SpringCloudGateway 自定义局部过滤器
场景: 将所有请求转化为同一路径请求(方便穿网配置)在请求头内标识原来路径,然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解
在 C/C 编程的编译和链接过程中,附加包含目录、附加库目录和附加依赖项是三个至关重要的设置,它们相互配合,确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中,这些概念容易让人混淆,但深入理解它们的作用和联…...

接口自动化测试:HttpRunner基础
相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具,支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议,涵盖接口测试、性能测试、数字体验监测等测试类型…...
Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换
目录 关键点 技术实现1 技术实现2 摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车…...