当前位置: 首页 > news >正文

Hadoop3教程(十四):MapReduce中的排序

文章目录

  • (99)WritableComparable排序
    • 什么是排序
    • 什么时候需要排序
    • 排序有哪些分类
    • 如何实现自定义排序
  • (100)全排序案例
    • 案例需求
    • 思路分析
    • 实际代码
  • (101)二次排序案例
  • (102) 区内排序案例
  • 参考文献

(99)WritableComparable排序

什么是排序

排序是MR中最重要的操作之一,也是面试中可能被问到的重点。

MapTask和ReduceTask中都会对数据按照KEY来排序,主要是为了效率,排完序之后,相同key值的数据会被放在一起,更方便下一步(如Reducer())的汇总处理。

默认排序是按照字典顺序(字母由小到大,或者是数字由小到大)排序,且实现该排序的方法是快速排序

什么时候需要排序

MR的过程中,什么时候用到了排序呢?

Map阶段:

  • 环形缓冲区溢写到磁盘之前,会将每个分区内数据分别进行一个快排,这个排序是在内存中完成的;(对key的索引,按照字典顺序排列)
  • 环形缓冲区多轮溢写完毕后,会形成一堆文件,这时候会对这些文件做merge归并排序,我理解是单个MapTask最终会汇总形成一个文件;

Reduce阶段:

  • ReduceTask会主动拉取MapTask们的输出文件,理论上是会优先保存到内存里,但是往往内存里放不下,所以多数情况下会直接溢写到磁盘,于是我们会得到多个文件。当文件数量超过阈值,之后需要做归并排序,合并成一个大文件。如果是内存中的数据超过阈值,则会进行一次合并后将数据溢写到磁盘。当所有数据拷贝完后,ReduceTask会统一对内存和磁盘上的所有数据进行一次归并排序
  • 文件合并后其实还可以进行一个分组排序,过于复杂,这里就不介绍了。

排序有哪些分类

MR里的排序还有部分排序全排序辅助排序二次排序的不同说法,注意,它们之间不是像那种传统的排序算法之间的区别,只是当排序在不同场景的时候,分别起了个名字。

MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部是有序的,这就是部分排序

最终输出结果只有一个文件,且文件内部有序。这就是全排序

全排序的实现方式是只设置一个ReduceTask。但是这种方式在处理大型文件时效率很低很低,因为一台机器处理全部数据,完全没有利用MR所提供的并行架构的优势,生产环境上完全不适用。

所以生产环境里,常用的还是部分排序。

辅助排序,就是GroupingComparator分组。

这个似乎是可选的,是在Reduce阶段,Reducer在从Map阶段主动拉取完数据后,会对所有文件做一次归并排序。做完归并排序之后,理论上就可以进行辅助排序。

辅助排序有啥用呢,就是当接收到的Key是个bean对象时,辅助排序可以让一个或者几个字段相同的key(全部字段不相同)进入同一个Reduce(),所以也起名叫做分组排序。

二次排序比较简单,在自定义排序过程中,如果compareTo中的判断条件为两个,那它就是二次排序。

如何实现自定义排序

说到这里,那 如何实现自定义排序 呢?

如果是bean对象作为key传输,那需要实现WritableComparable接口,重写compareTo方法,就可以实现自定义排序。

@Override
public int compareTo(FlowBean bean) {int result;// 按照总流量大小,倒序排列if (this.sumFlow > bean.getSumFlow()) {result = -1;}else if (this.sumFlow < bean.getSumFlow()) {result = 1;}else {result = 0;}return result;
}

(100)全排序案例

案例需求

之前我们做过一个案例,输入文件有一个,里面放的是每个手机号的上行流量和下行流量,输出同样是一个文件,里面放的除了手机号的上行流量和下行流量之外,还多了一行总流量。

这时候我们提一个新需求,就是我不止要这个输出文件,我还要这个文件里的内容,按照总流量降序排列。

思路分析

MapReduce里,只能对Key进行排序。在先前的需求里,我们是用手机号作为key,上行流量、下行流量和总流量组成一个bean,作为value,这样的安排显然不适合新需求。

因此我们需要改变一下,将上行流量、下行流量和总流量组成的bean作为key,而将手机号作为value,如此来排序。

所以第一步,我们需要对我们自定义的FlowBean对象声明WritableComparable接口,并重写CompareTo方法,这一步的目的是使得FlowBean可进行算数比较,从而允许排序:

@Override
public int CompareTo(FlowBean o){// 按照总流量,降序排列return this.sumFlow > o.getSumFlow()?-1:1;
}

注意这里,因为Hadoop里默认的字典排序是从小到大排序,如果想实现案例里由大到小的排序,那么当大于的时候,就要返回-1,从而将大的值排在前面。

其次,Mapper类里:

context.write(bean, 手机号)

bean成了key,手机号成了value。

最后,Reduce类里,需要循环输出,避免出现总流量相同的情况。

for (Text text: values){context.write(text, key);	// 注意顺序,原先的key放在value位置
}

2023-7-19 11:16:04 这里没懂。。。

哦哦明白了,什么样的数据会进一个Reducer呢,当然是key 值相同的会进同一个,又因为我们之前compareTo的时候用的是总流量,所以最后是总流量相同的记录会送进同一个Reducer,然后汇总成一条记录做输出,毕竟reducer就是用来做汇总的。

但"汇总成一条记录"这并不是我们想要的,我们需要的是把这些数据原模原样输出来。这就是为什么我们在Reducer的reduce()里面,要加上循环输出的原因。

实际代码

贴一下教程里的代码实现:

首先是FlowBean对象,需要声明WritableComparable接口,并重写CompareTo()

package com.atguigu.mapreduce.writablecompable;import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements WritableComparable<FlowBean> {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量//提供无参构造public FlowBean() {}//生成三个属性的getter和setter方法public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}//实现序列化和反序列化方法,注意顺序一定要一致@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.upFlow);out.writeLong(this.downFlow);out.writeLong(this.sumFlow);}@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readLong();this.downFlow = in.readLong();this.sumFlow = in.readLong();}//重写ToString,最后要输出FlowBean@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {//按照总流量比较,倒序排列if(this.sumFlow > o.sumFlow){return -1;}else if(this.sumFlow < o.sumFlow){return 1;}else {return 0;}}
}

然后编写Mapper类:

package com.atguigu.mapreduce.writablecompable;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {private FlowBean outK = new FlowBean();private Text outV = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1 获取一行数据String line = value.toString();//2 按照"\t",切割数据String[] split = line.split("\t");//3 封装outK outVoutK.setUpFlow(Long.parseLong(split[1]));outK.setDownFlow(Long.parseLong(split[2]));outK.setSumFlow();outV.set(split[0]);//4 写出outK outVcontext.write(outK,outV);}
}

然后编写Reducer类:

package com.atguigu.mapreduce.writablecompable;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//遍历values集合,循环写出,避免总流量相同的情况for (Text value : values) {//调换KV位置,反向写出context.write(value,key);}}
}

最后编写驱动类:

package com.atguigu.mapreduce.writablecompable;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2 关联本Driver类job.setJarByClass(FlowDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);//4 设置Map端输出数据的KV类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//5 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//6 设置输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));//7 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

完成,仅做了解即可。

(101)二次排序案例

二次排序的概念很简单,其实之前提过了,就是在自定义排序的时候,判断条件有两个。

比如说,原先我对一堆人排序,是按照身高从高到低排,但是身高一样的就没法排序了,这时候我可以再加入一个判断条件,比如说如果身高一样的话,就按体重排序。

具体就是修改FlowBean的CompareTo方法,在第一条件相等的时候,添加第二判定条件。

public int compareTo(FlowBean o) {//按照总流量比较,倒序排列if(this.sumFlow > o.sumFlow){return -1;}else if(this.sumFlow < o.sumFlow){return 1;}else {if (this.upFlow > o.upFlow){return 1;} else if (this.upFlow < o.upFlow){return -1;}else {return 0;}}
}

如果有需要的话,还可以继续加第三判定条件。

(102) 区内排序案例

还是之前的手机号案例,之前我们想要的是,只有一个文件,然后文件内所有数据按照总流量降序排列。

现在我们提出一个新要求,按照前3位来分区输出,比如说136的在一个文件里,137的在一个文件里,以此类推。而且每个文件内部,还需要按照总流量降序排列。

本质上就是之前说的分区 + 排序,这两部分的结合。需要额外定义好Partitioner类。

贴一下教程里的代码示例,其实只需要在上一小节的基础上补充自定义分区类即可:

首先自定义好分区类:

package com.atguigu.mapreduce.partitionercompable;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {@Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions) {//获取手机号前三位String phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量partition,根据prePhone设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分区号partitionreturn partition;}
}

然后在驱动类里注册好分区器:

// 设置自定义分区器
job.setPartitionerClass(ProvincePartitioner2.class);// 设置对应的ReduceTask的个数
job.setNumReduceTasks(5);

其他跟上一小节保持一致即可。

参考文献

  1. 【尚硅谷大数据Hadoop教程,hadoop3.x搭建到集群调优,百万播放】

相关文章:

Hadoop3教程(十四):MapReduce中的排序

文章目录 &#xff08;99&#xff09;WritableComparable排序什么是排序什么时候需要排序排序有哪些分类如何实现自定义排序 &#xff08;100&#xff09;全排序案例案例需求思路分析实际代码 &#xff08;101&#xff09;二次排序案例&#xff08;102&#xff09; 区内排序案例…...

测试需要写测试用例吗?

如何理解软件的质量 我们都知道&#xff0c;一个软件从无到有要经过需求设计、编码实现、测试验证、部署发布这四个主要环节。 需求来源于用户反馈、市场调研或者商业判断。意指在市场行为中&#xff0c;部分人群存在某些诉求或痛点&#xff0c;只要想办法满足这些人群的诉求…...

Qt 视口和窗口的区别

视口和窗口 绘图设备的物理坐标是基本的坐标系&#xff0c;通过QPainter的平移、旋转等变换可以得到更容易操作的逻辑坐标 为了实现更方便的坐标&#xff0c;QPainter还提供了视口(Viewport)和窗口(Window)坐标系&#xff0c;通过QPainter内部的坐标变换矩阵自动转换为绘图设…...

使用Git将GitHub仓库下载到本地

前记&#xff1a; git svn sourcetree gitee github gitlab gitblit gitbucket gitolite gogs 版本控制 | 仓库管理 ---- 系列工程笔记. Platform&#xff1a;Windows 10 Git version&#xff1a;git version 2.32.0.windows.1 Function&#xff1a;使用Git将GitHub仓库下载…...

前端需要了解的浏览器缓存知识

文章目录 前言为什么需要缓存&#xff1f;DNS缓存缓存读写顺序缓存位置memory cache&#xff08;浏览器本地缓存&#xff09;disk cache&#xff08;硬盘缓存&#xff09;重点&#xff01;&#xff01;&#xff01; 缓存策略 - 强缓存和协商缓存1&#xff09;强缓存ExpiresCach…...

自动驾驶:控制算法概述

自动驾驶&#xff1a;控制算法概述 常见控制算法PID算法LQR算法MPC算法 自动驾驶控制算法横向控制纵向控制 参考文献 常见控制算法 PID算法 PID&#xff08;Proportional-Integral-Derivative&#xff09;控制是一种经典的反馈控制算法&#xff0c;通常用于稳定性和响应速度要…...

【Mysql】Mysql的字符集和比较规则(三)

字符集和比较规则简介 字符集简介 我们知道在计算机中只能以二进制的方式对数据进行存储&#xff0c;那么他们之间是怎样对应并进行转换的&#xff1f;我们需要了解两个概念&#xff1a; 字符范围&#xff1a;我们可以将哪些字符转换成二进制数据&#xff0c;也就是规定好字…...

【SpringCloud-11】SCA-sentinel

sentinel是一个流量控制、熔断降级的组件&#xff0c;可以替换第一代中的hystrix。 hystrix用起来没有那么方便&#xff1a; 1、要在调用方引入hystrix&#xff0c;没有ui界面进行配置&#xff0c;需要在代码中进行配置&#xff0c;侵入了业务代码。 2、还要自己搭建监控平台…...

设计模式:简单工厂模式(C#、JAVA、JavaScript、C++、Python、Go、PHP):

简介&#xff1a; 简单工厂模式&#xff0c;它提供了一个用于创建对象的接口&#xff0c;但具体创建的对象类型可以在运行时决定。这种模式通常用于创建具有共同接口的对象&#xff0c;并且可以根据客户端代码中的参数或配置来选择要创建的具体对象类型。 在简单工厂模式中&am…...

浅谈智能照明控制系统在智慧建筑中的应用

贾丽丽 安科瑞电气股份有限公司 上海嘉定 201801 摘要&#xff1a;新时期&#xff0c;建筑行业发展迅速&#xff0c;在信息化背景下&#xff0c;建筑功能逐渐拓展&#xff0c;呈现了智能化的发展态势。智能建筑更加安全、节能、环保&#xff0c;也符合绿色建筑理念。在建筑智…...

lower_bound()以及upper_bound()

lower_bound&#xff08;&#xff09;&#xff1a; lower_bound()的返回值是第一个大于等于 target 的值的地址&#xff0c;用这个地址减去first&#xff0c;得到的就是第一个大于等于target的值的下标。 在数组中&#xff1a; int poslower_bound(a,an,target)-a;\\n为数组…...

unity(WebGL) 截图拼接并保存本地,下载PDF

截图参考&#xff1a;Unity3D 局部截图、全屏截图、带UI截图三种方法_unity 截图_野区捕龙为宠的博客-CSDN博客 文档下载&#xff1a; Unity WebGL 生成doc保存到本地电脑_unity webgl 保存文件_野区捕龙为宠的博客-CSDN博客 中文输入&#xff1a;Unity WebGL中文输入 支持输…...

加速企业云计算部署:应对新时代的挑战

随着科技的飞速发展&#xff0c;企业面临着诸多挑战。在这个高度互联的世界中&#xff0c;企业的成功与否常常取决于其能否快速、有效地响应市场的变化。云计算作为一种新兴的技术趋势&#xff0c;为企业提供了实现这一目标的可能。通过加速企业云计算部署&#xff0c;企业可以…...

ubuntu 18.04 LTS交叉编译opencv 3.4.16并编译工程[全记录]

一、下载并解压opencv 3.4.16源码 https://opencv.org/releases/ 放到home路径下的Exe文件夹&#xff08;专门放用户安装的软件&#xff09;中&#xff0c;其中build是后期自建的 为了版本控制&#xff0c;保留了3.4.16&#xff0c;并增加了-gcc-arm 二、安装cmake和cmake-g…...

禁用和开启笔记本电脑的键盘功能,最快的方式

笔记本键盘通常较小&#xff0c;按键很不方便&#xff0c;当我们外接了键盘时就不需要再使用自带的键盘了&#xff0c;而且午睡的时候&#xff0c;总是担心碰到笔记本的键盘&#xff0c;可能会删掉我们的代码什么的&#xff0c;所以就想着怎么禁用掉&#xff0c;下面是操作步骤…...

【单片机基础】使用51单片机制作函数信号发生器(DAC0832使用仿真)

文章目录 &#xff08;1&#xff09;DA转换&#xff08;2&#xff09;DAC0832简介&#xff08;3&#xff09;电路设计&#xff08;4&#xff09;参考例程&#xff08;5&#xff09;参考文献 &#xff08;1&#xff09;DA转换 单片机作为一个数字电路系统&#xff0c;当需要采集…...

springcloud组件

https://www.bilibili.com/video/BV1QX4y1t7v5?p32&vd_source297c866c71fa77b161812ad631ea2c25 eureka : 主要是收集服务的注册信息。 如果有了eureka启动了。内部之前的调用其实就可以用服务名了&#xff0c; 本来是要是用ip端口来访问的&#xff0c;只要eureka启来了…...

手机爬虫用Appium详细教程:利用Python控制移动App进行自动化抓取数据

Appium是一个强大的跨平台工具&#xff0c;它可以让你使用Python来控制移动App进行自动化操作&#xff0c;从而实现数据的抓取和处理。今天&#xff0c;我将与大家分享一份关于使用Appium进行手机爬虫的详细教程&#xff0c;让我们一起来探索Appium的功能和操作&#xff0c;为手…...

deb包构建详解

deb包构建详解 一、deb包构建流程二、deb包构建描述文件详解2.1 control文件2.2 postinst 文件 (post-installation script)2.3 postrm 文件 (post-removal script)2.4 prerm 文件 (pre-removal script)2.5 preinst 文件 (pre-installation script)2.6 rules 文件2.7 changelog…...

【Spring Cloud】网关Gateway的请求过滤工厂RequestRateLimiterGatewayFilterFactory

概念 关于微服务网关Gateway中有几十种过滤工厂&#xff0c;这一篇博文记录的是关于请求限流过滤工厂&#xff0c;也就是标题中的RequestRateLimiterGatewayFilterFactory。这个路由过滤工厂是用来判断当前请求是否应该被处理&#xff0c;如果不会被处理就会返回HTTP状态码为42…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

Python爬虫(二):爬虫完整流程

爬虫完整流程详解&#xff08;7大核心步骤实战技巧&#xff09; 一、爬虫完整工作流程 以下是爬虫开发的完整流程&#xff0c;我将结合具体技术点和实战经验展开说明&#xff1a; 1. 目标分析与前期准备 网站技术分析&#xff1a; 使用浏览器开发者工具&#xff08;F12&…...

C++八股 —— 单例模式

文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全&#xff08;Thread Safety&#xff09; 线程安全是指在多线程环境下&#xff0c;某个函数、类或代码片段能够被多个线程同时调用时&#xff0c;仍能保证数据的一致性和逻辑的正确性&#xf…...

如何在最短时间内提升打ctf(web)的水平?

刚刚刷完2遍 bugku 的 web 题&#xff0c;前来答题。 每个人对刷题理解是不同&#xff0c;有的人是看了writeup就等于刷了&#xff0c;有的人是收藏了writeup就等于刷了&#xff0c;有的人是跟着writeup做了一遍就等于刷了&#xff0c;还有的人是独立思考做了一遍就等于刷了。…...

【Oracle】分区表

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

C# 表达式和运算符(求值顺序)

求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如&#xff0c;已知表达式3*52&#xff0c;依照子表达式的求值顺序&#xff0c;有两种可能的结果&#xff0c;如图9-3所示。 如果乘法先执行&#xff0c;结果是17。如果5…...

【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)

LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 题目描述解题思路Java代码 题目描述 题目链接&#xff1a;LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...