Hadoop中MapReduce过程中Shuffle过程实现自定义排序
文章目录
- Hadoop中MapReduce过程中Shuffle过程实现自定义排序
- 一、引言
- 二、实现WritableComparable接口
- 1、自定义Key类
- 三、使用Job.setSortComparatorClass方法
- 2、设置自定义排序器
- 3、自定义排序器类
- 四、使用示例
- 五、总结
Hadoop中MapReduce过程中Shuffle过程实现自定义排序

一、引言
MapReduce框架中的Shuffle过程是连接Map阶段和Reduce阶段的桥梁,负责将Map任务的输出结果按照key进行分组和排序,并将相同key的数据传递给对应的Reduce任务进行处理。Shuffle过程的性能直接影响到整个MapReduce作业的执行效率。在默认情况下,Hadoop使用TotalOrderPartitioner进行排序,但有时我们需要根据特定的业务逻辑进行自定义排序。本文将介绍两种方法来实现自定义排序:实现WritableComparable接口和使用Job.setSortComparatorClass方法。下面是详细的步骤和代码示例。
二、实现WritableComparable接口
1、自定义Key类
首先,我们需要定义一个类并实现WritableComparable接口,该接口要求实现compareTo方法,用于定义排序逻辑。
package mr;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Employee implements WritableComparable<Employee> {private int empno;private String ename;private String job;private int mgr;private String hiredate;private int sal;private int comm;private int deptno;@Overridepublic String toString(){return "Employee[empno="+empno+",ename="+ename+",sal="+sal+",deptno="+deptno+"]";}@Overridepublic int compareTo(Employee o) {// 多个列的排序:select * from emp order by deptno,sal;// 首先按照deptno排序if(this.deptno > o.getDeptno()){return 1;}else if(this.deptno < o.getDeptno()){return -1;}// 如果deptno相等,按照sal排序if(this.sal >= o.getSal()){return 1;}else{return -1;}}@Overridepublic void write(DataOutput output) throws IOException {// 序列化output.writeInt(this.empno);output.writeUTF(this.ename);output.writeUTF(this.job);output.writeInt(this.mgr);output.writeUTF(this.hiredate);output.writeInt(this.sal);output.writeInt(this.comm);output.writeInt(this.deptno);}@Overridepublic void readFields(DataInput input) throws IOException {// 反序列化this.empno = input.readInt();this.ename = input.readUTF();this.job = input.readUTF();this.mgr = input.readInt();this.hiredate = input.readUTF();this.sal = input.readInt();this.comm = input.readInt();this.deptno = input.readInt();}
}
三、使用Job.setSortComparatorClass方法
2、设置自定义排序器
除了实现WritableComparable接口外,我们还可以使用Job.setSortComparatorClass方法来设置自定义排序器。这种方法允许我们在不修改Key类的情况下实现自定义排序。
package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomSort {public static class Map extends Mapper<Object, Text, Employee, IntWritable> {private static Employee emp = new Employee();private static IntWritable one = new IntWritable(1);@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] line = value.toString().split("\t");emp.setEmpno(Integer.parseInt(line[0]));emp.setEname(line[1]);emp.setJob(line[2]);emp.setMgr(Integer.parseInt(line[3]));emp.setHiredate(line[4]);emp.setSal(Integer.parseInt(line[5]));emp.setComm(Integer.parseInt(line[6]));emp.setDeptno(Integer.parseInt(line[7]));context.write(emp, one);}}public static class Reduce extends Reducer<Employee, IntWritable, Employee, IntWritable> {@Overrideprotected void reduce(Employee key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable val : values) {context.write(key, val);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "CustomSort");job.setJarByClass(CustomSort.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Employee.class);job.setOutputValueClass(IntWritable.class);// 设置自定义排序器job.setSortComparatorClass(EmployeeComparator.class);Path in = new Path("hdfs://localhost:9000/mr/in/customsort");Path out = new Path("hdfs://localhost:9000/mr/out/customsort");FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
3、自定义排序器类
package mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class EmployeeComparator extends WritableComparator {protected EmployeeComparator() {super(Employee.class, true);}@Overridepublic int compare(WritableComparable w1, WritableComparable w2) {Employee e1 = (Employee) w1;Employee e2 = (Employee) w2;// 首先按照deptno排序int deptCompare = Integer.compare(e1.getDeptno(), e2.getDeptno());if (deptCompare != 0) {return deptCompare;}// 如果deptno相等,按照sal排序return Integer.compare(e1.getSal(), e2.getSal());}
}
四、使用示例
下面是一个简单的MapReduce示例,展示了Shuffle过程在实际应用中的使用。这个示例中,我们使用了自定义的Employee类作为Key,并设置了自定义的排序器EmployeeComparator。
五、总结
通过实现WritableComparable接口和使用Job.setSortComparatorClass方法,我们可以在Hadoop MapReduce过程中实现自定义排序。这两种方法提供了灵活的排序机制,允许我们根据不同的业务需求对数据进行排序处理,从而提高数据处理的效率和准确性。
版权声明:本博客内容为原创,转载请保留原文链接及作者信息。
参考文章:
- Hadoop之mapreduce数据排序案例(详细代码)
- Java Job.setSortComparatorClass方法代码示例
相关文章:
Hadoop中MapReduce过程中Shuffle过程实现自定义排序
文章目录 Hadoop中MapReduce过程中Shuffle过程实现自定义排序一、引言二、实现WritableComparable接口1、自定义Key类 三、使用Job.setSortComparatorClass方法2、设置自定义排序器3、自定义排序器类 四、使用示例五、总结 Hadoop中MapReduce过程中Shuffle过程实现自定义排序 一…...
数位dp-acwing
题目:Windy数 1083. Windy数 - AcWing题库 分析 不能有前导0,初始化的时候需要有前导0,因为除了最高位数其他位数可以。 windy : 2 5 1 类似这样的数 第二位与第一位相差3 > 2 分类讨论 : 1. 位数跟 n 同位数 的…...
智慧园区小程序开发制作功能介绍
智慧园区小程序开发制作功能介绍 智慧园区小程序系统作为一款面向园区企业的一站式线上服务平台,可为企业提供数智化的园区办公服务。智慧园区小程序功能介绍 1、园区公告、政策信息查看足不出户掌握最新动态,“园区公告、政策信息”等信息。首页点击对应…...
STM32高级 物联网之Wi-Fi通讯
Wi-Fi基础知识 Wi-Fi由来 Wi-Fi,又称“无线网路”,是Wi-Fi联盟的商标,一个基于IEEE 802.11标准的无线局域网技术。“Wi-Fi”常写作“WiFi”或“Wifi”,但是这些写法并没有被Wi-Fi联盟认可。 Wi-Fi这个术语经常被误以为是指无线保真(Wireless Fidelity),类似历史悠久的…...
LLM预训练recipe — 摘要版
文章核心主题: 本文深入探讨了从零开始进行大型语言模型(LLM)预训练(pretrain)的各个环节,侧重方法论和实践细节,旨在普及预训练过程中的关键步骤、常见问题及避坑技巧,而非技术原理…...
波动理论、传输线和S参数网络
波动理论、传输线和S参数网络 传输线 求解传输线方程 对于传输线模型,我们通常用 R L G C RLGC RLGC 来表示: 其中 R R R 可以表示导体损耗,由于电子流经非理想导体而产生的能量损耗。 G G G 表示介质损耗,由于非理想电介质…...
nginx-1.23.2版本RPM包发布
nginx-1.23.2-0.x86_64.rpm用于CentOS7系统的安装,安装路径与编译安装是同一个路径。安装方法: 将nginx-1.23.2-0.x86_64.rpm上传至目标服务器,执行rpm -ivh nginx-1.23.2-0.x86_64.rpm命令进行安装。 卸载方法: 卸载前先将nginx服…...
如何用WPS AI提高工作效率
对于每位职场人而言,与Word、Excel和PPT打交道几乎成为日常工作中不可或缺的一部分。在办公软件的选择上,国外以Office为代表,而在国内,WPS则是不可忽视的一大选择。当年一代天才程序员求伯君创造了WPS,后面雷军把它装…...
LabVIEW应用在工业车间
LabVIEW作为一种图形化编程语言,以其强大的数据采集和硬件集成功能广泛应用于工业自动化领域。在工业车间中,LabVIEW不仅能够实现快速开发,还能通过灵活的硬件接口和直观的用户界面提升生产效率和设备管理水平。尽管其高成本和初期学习门槛可…...
Elasticsearch:normalizer
一、概述 Elastic normalizer是Elasticsearch中用于处理keyword类型字段的一种工具,主要用于对字段进行规范化处理,确保在索引和查询时保持一致性。 Normalizer与analyzer类似,都是对字段进行处理,但normalizer不会对字段进…...
动态规划子序列问题系列一>等差序列划分II
题目: 解析: 1.状态表示: 2.状态转移方程: 这里注意有个优化 3.初始化: 4.填表顺序: 5.返回值: 返回dp表总和 代码: public int numberOfArithmeticSlices(int[] nums) {in…...
48页PPT|2024智慧仓储解决方案解读
本文概述了智慧物流仓储建设方案的行业洞察、业务蓝图及建设方案。首先,从政策层面分析了2012年至2020年间国家发布的促进仓储业、物流业转型升级的政策,这些政策强调了自动化、标准化、信息化水平的提升,以及智能化立体仓库的建设࿰…...
低代码开源项目Joget的研究——Joget8社区版安装部署
大纲 环境准备安装必要软件配置Java配置JAVA_HOME配置Java软链安装三方库 获取源码配置MySql数据库创建用户创建数据库导入初始数据 配置数据库连接配置sessionFactory(非必须,如果后续保存再配置)编译下载tomcat启动下载aspectjweaver移动jw…...
upload-labs关卡记录15
图片马,这里就可以看到任务和注意事项: 使用一个正常图片,然后拼接一个一句话木马即可实现。这里就用命令窗口进行实现: copy 111.png/b shell.php/a shell.png 注意这里的命令窗口要在存在图片和一句话木马的目录下打开&#…...
1.使用 Couchbase 数仓和 Temporal(一个分布式任务调度和编排框架)实现每 5 分钟的增量任务
在使用 Couchbase 数仓和 Temporal(一个分布式任务调度和编排框架)实现每 5 分钟的增量任务时,可以按照以下步骤实现,同时需要注意关键点。 实现方案 1. 数据层设计(Couchbase 增量存储与标记) 在 Couchb…...
matrix-breakout-2-morpheus
将这一关的镜像导入虚拟机,出现以下页面表示导入成功 以root身份打开kali终端,输入以下命令,查看靶机ip arp-scan -l 根据得到的靶机ip,浏览器访问进入环境 我们从当前页面没有得到有用的信息,尝试扫描后台 发现有一个…...
农历节日倒计时:基于Python的公历与农历日期转换及节日查询小程序
农历节日倒计时:基于Python的公历与农历日期转换及节日查询小程序 摘要 又是一年春节即将到来,突然想基于Python编写一个农历节日的倒计时小程序。该程序能够根据用户输入的农历节日名称,计算出距离该节日还有多少天。通过使用lunardate库进…...
【RabbitMQ的死信队列】
死信队列 什么是死信队列死信队列的配置方式死信消息结构 什么是死信队列 消息被消费者确认拒绝。消费者把requeue参数设置为true(false),并且在消费后,向RabbitMQ返回拒绝。channel.basicReject或者channel.basicNack。消息达到预设的TTL时限还一直没有…...
掌握软件工程基础:知识点全面解析【chap02】
chap02 软件项目管理 1.代码行度量与功能点度量的比较 1.规模度量 是一种直接度量方法。 代码行数 LOC或KLOC 生产率 P1L/E 其中 L 软件项目代码行数 E 软件项目工作量(人月 PM) P1 软件项目生产率(LOC/PM) 代码出错…...
公路边坡安全监测中智能化+定制化+全面守护的应用方案
面对公路边坡的安全挑战,我们如何精准施策,有效应对风险?特别是在强降雨等极端天气下,如何防范滑坡、崩塌、路面塌陷等灾害,确保行车安全?国信华源公路边坡安全监测解决方案,以智能化、定制化为…...
Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...
2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...
ABAP设计模式之---“简单设计原则(Simple Design)”
“Simple Design”(简单设计)是软件开发中的一个重要理念,倡导以最简单的方式实现软件功能,以确保代码清晰易懂、易维护,并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计,遵循“让事情保…...
rknn toolkit2搭建和推理
安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 ,不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源(最常用) conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...
Pydantic + Function Calling的结合
1、Pydantic Pydantic 是一个 Python 库,用于数据验证和设置管理,通过 Python 类型注解强制执行数据类型。它广泛用于 API 开发(如 FastAPI)、配置管理和数据解析,核心功能包括: 数据验证:通过…...
解析两阶段提交与三阶段提交的核心差异及MySQL实现方案
引言 在分布式系统的事务处理中,如何保障跨节点数据操作的一致性始终是核心挑战。经典的两阶段提交协议(2PC)通过准备阶段与提交阶段的协调机制,以同步决策模式确保事务原子性。其改进版本三阶段提交协议(3PC…...
