当前位置: 首页 > news >正文

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

文章目录

  • Hadoop中MapReduce过程中Shuffle过程实现自定义排序
    • 一、引言
    • 二、实现WritableComparable接口
      • 1、自定义Key类
    • 三、使用Job.setSortComparatorClass方法
      • 2、设置自定义排序器
      • 3、自定义排序器类
    • 四、使用示例
    • 五、总结

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

在这里插入图片描述

一、引言

MapReduce框架中的Shuffle过程是连接Map阶段和Reduce阶段的桥梁,负责将Map任务的输出结果按照key进行分组和排序,并将相同key的数据传递给对应的Reduce任务进行处理。Shuffle过程的性能直接影响到整个MapReduce作业的执行效率。在默认情况下,Hadoop使用TotalOrderPartitioner进行排序,但有时我们需要根据特定的业务逻辑进行自定义排序。本文将介绍两种方法来实现自定义排序:实现WritableComparable接口和使用Job.setSortComparatorClass方法。下面是详细的步骤和代码示例。

二、实现WritableComparable接口

1、自定义Key类

首先,我们需要定义一个类并实现WritableComparable接口,该接口要求实现compareTo方法,用于定义排序逻辑。

package mr;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Employee implements WritableComparable<Employee> {private int empno;private String ename;private String job;private int mgr;private String hiredate;private int sal;private int comm;private int deptno;@Overridepublic String toString(){return "Employee[empno="+empno+",ename="+ename+",sal="+sal+",deptno="+deptno+"]";}@Overridepublic int compareTo(Employee o) {// 多个列的排序:select * from emp order by deptno,sal;// 首先按照deptno排序if(this.deptno > o.getDeptno()){return 1;}else if(this.deptno < o.getDeptno()){return -1;}// 如果deptno相等,按照sal排序if(this.sal >= o.getSal()){return 1;}else{return -1;}}@Overridepublic void write(DataOutput output) throws IOException {// 序列化output.writeInt(this.empno);output.writeUTF(this.ename);output.writeUTF(this.job);output.writeInt(this.mgr);output.writeUTF(this.hiredate);output.writeInt(this.sal);output.writeInt(this.comm);output.writeInt(this.deptno);}@Overridepublic void readFields(DataInput input) throws IOException {// 反序列化this.empno = input.readInt();this.ename = input.readUTF();this.job = input.readUTF();this.mgr = input.readInt();this.hiredate = input.readUTF();this.sal = input.readInt();this.comm = input.readInt();this.deptno = input.readInt();}
}

三、使用Job.setSortComparatorClass方法

2、设置自定义排序器

除了实现WritableComparable接口外,我们还可以使用Job.setSortComparatorClass方法来设置自定义排序器。这种方法允许我们在不修改Key类的情况下实现自定义排序。

package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomSort {public static class Map extends Mapper<Object, Text, Employee, IntWritable> {private static Employee emp = new Employee();private static IntWritable one = new IntWritable(1);@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] line = value.toString().split("\t");emp.setEmpno(Integer.parseInt(line[0]));emp.setEname(line[1]);emp.setJob(line[2]);emp.setMgr(Integer.parseInt(line[3]));emp.setHiredate(line[4]);emp.setSal(Integer.parseInt(line[5]));emp.setComm(Integer.parseInt(line[6]));emp.setDeptno(Integer.parseInt(line[7]));context.write(emp, one);}}public static class Reduce extends Reducer<Employee, IntWritable, Employee, IntWritable> {@Overrideprotected void reduce(Employee key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable val : values) {context.write(key, val);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "CustomSort");job.setJarByClass(CustomSort.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Employee.class);job.setOutputValueClass(IntWritable.class);// 设置自定义排序器job.setSortComparatorClass(EmployeeComparator.class);Path in = new Path("hdfs://localhost:9000/mr/in/customsort");Path out = new Path("hdfs://localhost:9000/mr/out/customsort");FileInputFormat.addInputPath(job, in);FileOutputFormat.setOutputPath(job, out);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

3、自定义排序器类

package mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class EmployeeComparator extends WritableComparator {protected EmployeeComparator() {super(Employee.class, true);}@Overridepublic int compare(WritableComparable w1, WritableComparable w2) {Employee e1 = (Employee) w1;Employee e2 = (Employee) w2;// 首先按照deptno排序int deptCompare = Integer.compare(e1.getDeptno(), e2.getDeptno());if (deptCompare != 0) {return deptCompare;}// 如果deptno相等,按照sal排序return Integer.compare(e1.getSal(), e2.getSal());}
}

四、使用示例

下面是一个简单的MapReduce示例,展示了Shuffle过程在实际应用中的使用。这个示例中,我们使用了自定义的Employee类作为Key,并设置了自定义的排序器EmployeeComparator

五、总结

通过实现WritableComparable接口和使用Job.setSortComparatorClass方法,我们可以在Hadoop MapReduce过程中实现自定义排序。这两种方法提供了灵活的排序机制,允许我们根据不同的业务需求对数据进行排序处理,从而提高数据处理的效率和准确性。


版权声明:本博客内容为原创,转载请保留原文链接及作者信息。

参考文章

  • Hadoop之mapreduce数据排序案例(详细代码)
  • Java Job.setSortComparatorClass方法代码示例

相关文章:

Hadoop中MapReduce过程中Shuffle过程实现自定义排序

文章目录 Hadoop中MapReduce过程中Shuffle过程实现自定义排序一、引言二、实现WritableComparable接口1、自定义Key类 三、使用Job.setSortComparatorClass方法2、设置自定义排序器3、自定义排序器类 四、使用示例五、总结 Hadoop中MapReduce过程中Shuffle过程实现自定义排序 一…...

数位dp-acwing

题目&#xff1a;Windy数 1083. Windy数 - AcWing题库 分析 不能有前导0&#xff0c;初始化的时候需要有前导0&#xff0c;因为除了最高位数其他位数可以。 windy &#xff1a; 2 5 1 类似这样的数 第二位与第一位相差3 > 2 分类讨论 &#xff1a; 1. 位数跟 n 同位数 的…...

智慧园区小程序开发制作功能介绍

智慧园区小程序开发制作功能介绍 智慧园区小程序系统作为一款面向园区企业的一站式线上服务平台&#xff0c;可为企业提供数智化的园区办公服务。智慧园区小程序功能介绍 1、园区公告、政策信息查看足不出户掌握最新动态&#xff0c;“园区公告、政策信息”等信息。首页点击对应…...

STM32高级 物联网之Wi-Fi通讯

Wi-Fi基础知识 Wi-Fi由来 Wi-Fi,又称“无线网路”,是Wi-Fi联盟的商标,一个基于IEEE 802.11标准的无线局域网技术。“Wi-Fi”常写作“WiFi”或“Wifi”,但是这些写法并没有被Wi-Fi联盟认可。 Wi-Fi这个术语经常被误以为是指无线保真(Wireless Fidelity),类似历史悠久的…...

LLM预训练recipe — 摘要版

文章核心主题&#xff1a; 本文深入探讨了从零开始进行大型语言模型&#xff08;LLM&#xff09;预训练&#xff08;pretrain&#xff09;的各个环节&#xff0c;侧重方法论和实践细节&#xff0c;旨在普及预训练过程中的关键步骤、常见问题及避坑技巧&#xff0c;而非技术原理…...

波动理论、传输线和S参数网络

波动理论、传输线和S参数网络 传输线 求解传输线方程 对于传输线模型&#xff0c;我们通常用 R L G C RLGC RLGC 来表示&#xff1a; 其中 R R R 可以表示导体损耗&#xff0c;由于电子流经非理想导体而产生的能量损耗。 G G G 表示介质损耗&#xff0c;由于非理想电介质…...

nginx-1.23.2版本RPM包发布

nginx-1.23.2-0.x86_64.rpm用于CentOS7系统的安装&#xff0c;安装路径与编译安装是同一个路径。安装方法&#xff1a; 将nginx-1.23.2-0.x86_64.rpm上传至目标服务器&#xff0c;执行rpm -ivh nginx-1.23.2-0.x86_64.rpm命令进行安装。 卸载方法&#xff1a; 卸载前先将nginx服…...

如何用WPS AI提高工作效率

对于每位职场人而言&#xff0c;与Word、Excel和PPT打交道几乎成为日常工作中不可或缺的一部分。在办公软件的选择上&#xff0c;国外以Office为代表&#xff0c;而在国内&#xff0c;WPS则是不可忽视的一大选择。当年一代天才程序员求伯君创造了WPS&#xff0c;后面雷军把它装…...

LabVIEW应用在工业车间

LabVIEW作为一种图形化编程语言&#xff0c;以其强大的数据采集和硬件集成功能广泛应用于工业自动化领域。在工业车间中&#xff0c;LabVIEW不仅能够实现快速开发&#xff0c;还能通过灵活的硬件接口和直观的用户界面提升生产效率和设备管理水平。尽管其高成本和初期学习门槛可…...

Elasticsearch:normalizer

一、概述 ‌Elastic normalizer‌是Elasticsearch中用于处理keyword类型字段的一种工具&#xff0c;主要用于对字段进行规范化处理&#xff0c;确保在索引和查询时保持一致性。 Normalizer与analyzer类似&#xff0c;都是对字段进行处理&#xff0c;但normalizer不会对字段进…...

动态规划子序列问题系列一>等差序列划分II

题目&#xff1a; 解析&#xff1a; 1.状态表示&#xff1a; 2.状态转移方程&#xff1a; 这里注意有个优化 3.初始化&#xff1a; 4.填表顺序&#xff1a; 5.返回值&#xff1a; 返回dp表总和 代码&#xff1a; public int numberOfArithmeticSlices(int[] nums) {in…...

48页PPT|2024智慧仓储解决方案解读

本文概述了智慧物流仓储建设方案的行业洞察、业务蓝图及建设方案。首先&#xff0c;从政策层面分析了2012年至2020年间国家发布的促进仓储业、物流业转型升级的政策&#xff0c;这些政策强调了自动化、标准化、信息化水平的提升&#xff0c;以及智能化立体仓库的建设&#xff0…...

低代码开源项目Joget的研究——Joget8社区版安装部署

大纲 环境准备安装必要软件配置Java配置JAVA_HOME配置Java软链安装三方库 获取源码配置MySql数据库创建用户创建数据库导入初始数据 配置数据库连接配置sessionFactory&#xff08;非必须&#xff0c;如果后续保存再配置&#xff09;编译下载tomcat启动下载aspectjweaver移动jw…...

upload-labs关卡记录15

图片马&#xff0c;这里就可以看到任务和注意事项&#xff1a; 使用一个正常图片&#xff0c;然后拼接一个一句话木马即可实现。这里就用命令窗口进行实现&#xff1a; copy 111.png/b shell.php/a shell.png 注意这里的命令窗口要在存在图片和一句话木马的目录下打开&#…...

1.使用 Couchbase 数仓和 Temporal(一个分布式任务调度和编排框架)实现每 5 分钟的增量任务

在使用 Couchbase 数仓和 Temporal&#xff08;一个分布式任务调度和编排框架&#xff09;实现每 5 分钟的增量任务时&#xff0c;可以按照以下步骤实现&#xff0c;同时需要注意关键点。 实现方案 1. 数据层设计&#xff08;Couchbase 增量存储与标记&#xff09; 在 Couchb…...

matrix-breakout-2-morpheus

将这一关的镜像导入虚拟机&#xff0c;出现以下页面表示导入成功 以root身份打开kali终端&#xff0c;输入以下命令&#xff0c;查看靶机ip arp-scan -l 根据得到的靶机ip&#xff0c;浏览器访问进入环境 我们从当前页面没有得到有用的信息&#xff0c;尝试扫描后台 发现有一个…...

农历节日倒计时:基于Python的公历与农历日期转换及节日查询小程序

农历节日倒计时&#xff1a;基于Python的公历与农历日期转换及节日查询小程序 摘要 又是一年春节即将到来&#xff0c;突然想基于Python编写一个农历节日的倒计时小程序。该程序能够根据用户输入的农历节日名称&#xff0c;计算出距离该节日还有多少天。通过使用lunardate库进…...

【RabbitMQ的死信队列】

死信队列 什么是死信队列死信队列的配置方式死信消息结构 什么是死信队列 消息被消费者确认拒绝。消费者把requeue参数设置为true(false)&#xff0c;并且在消费后&#xff0c;向RabbitMQ返回拒绝。channel.basicReject或者channel.basicNack。消息达到预设的TTL时限还一直没有…...

掌握软件工程基础:知识点全面解析【chap02】

chap02 软件项目管理 1.代码行度量与功能点度量的比较 1.规模度量 是一种直接度量方法。 代码行数 LOC或KLOC 生产率 P1L/E 其中 L 软件项目代码行数 E 软件项目工作量&#xff08;人月 PM&#xff09; P1 软件项目生产率&#xff08;LOC/PM&#xff09; 代码出错…...

公路边坡安全监测中智能化+定制化+全面守护的应用方案

面对公路边坡的安全挑战&#xff0c;我们如何精准施策&#xff0c;有效应对风险&#xff1f;特别是在强降雨等极端天气下&#xff0c;如何防范滑坡、崩塌、路面塌陷等灾害&#xff0c;确保行车安全&#xff1f;国信华源公路边坡安全监测解决方案&#xff0c;以智能化、定制化为…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

从零实现STL哈希容器:unordered_map/unordered_set封装详解

本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说&#xff0c;直接开始吧&#xff01; 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

CMake 从 GitHub 下载第三方库并使用

有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题

分区配置 (ptab.json) img 属性介绍&#xff1a; img 属性指定分区存放的 image 名称&#xff0c;指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件&#xff0c;则以 proj_name:binary_name 格式指定文件名&#xff0c; proj_name 为工程 名&…...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制

目录 节点的功能承载层&#xff08;GATT/Adv&#xff09;局限性&#xff1a; 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能&#xff0c;如 Configuration …...

离线语音识别方案分析

随着人工智能技术的不断发展&#xff0c;语音识别技术也得到了广泛的应用&#xff0c;从智能家居到车载系统&#xff0c;语音识别正在改变我们与设备的交互方式。尤其是离线语音识别&#xff0c;由于其在没有网络连接的情况下仍然能提供稳定、准确的语音处理能力&#xff0c;广…...

若依登录用户名和密码加密

/*** 获取公钥&#xff1a;前端用来密码加密* return*/GetMapping("/getPublicKey")public RSAUtil.RSAKeyPair getPublicKey() {return RSAUtil.rsaKeyPair();}新建RSAUti.Java package com.ruoyi.common.utils;import org.apache.commons.codec.binary.Base64; im…...

JDK 17 序列化是怎么回事

如何序列化&#xff1f;其实很简单&#xff0c;就是根据每个类型&#xff0c;用工厂类调用。逐个完成。 没什么漂亮的代码&#xff0c;只有有效、稳定的代码。 代码中调用toJson toJson 代码 mapper.writeValueAsString ObjectMapper DefaultSerializerProvider 一堆实…...