MapReduce编程模型
MapReduce编程模型
- 理解MapReduce编程模型
- 独立完成一个MapReduce程序并运行成功
- 了解MapReduce工程流程
- 掌握并描述出shuffle全过程(面试)
- 独立编写课堂及作业中的MR程序
- 理解并解决数据倾斜
1. MapReduce编程模型
-
Hadoop架构图
Hadoop由HDFS分布式存储、MapReduce分布式计算、Yarn资源调度三部分组成

-
MapReduce是采用一种分而治之的思想设计出来的分布式计算框架
-
MapReduce由两个阶段组成:
- Map阶段(切分成一个个小的任务)
- Reduce阶段(汇总小任务的结果)
-
那什么是分而治之呢?
- 比如一复杂、计算量大、耗时长的的任务,暂且称为“大任务”;
- 此时使用单台服务器无法计算或较短时间内计算出结果时,可将此大任务切分成一个个小的任务,小任务分别在不同的服务器上并行的执行
- 最终再汇总每个小任务的结果

1.1 Map阶段
- map阶段有一个关键的map()函数;
- 此函数的输入是键值对
- 输出是一系列键值对,输出写入本地磁盘。
1.2 Reduce阶段
-
reduce阶段有一个关键的函数reduce()函数
-
此函数的输入也是键值对(即map的输出(kv对))
-
输出也是一系列键值对,结果最终写入HDFS
1.3 Map&Reduce

2. MapReduce编程示例
- 以MapReduce的词频统计为例:统计一批英文文章当中,每个单词出现的总次数
2.1 MapReduce原理图

- Map阶段
- 假设MR的输入文件“Gone With The Wind”有三个block;block1、block2、block3
- MR编程时,每个block对应一个分片split
- 每一个split对应一个map任务(map task)
- 如图共3个map任务(map1、map2、map3);这3个任务的逻辑一样,所以以第一个map任务(map1)为例分析
- map1读取block1的数据;一次读取block1的一行数据;
- 产生键值对(key/value),作为map()的参数传入,调用map();
- 假设当前所读行是第一行
- 将当前所读行的行首相对于当前block开始处的字节偏移量作为key(0)
- 当前行的内容作为value(Dear Bear River)
- map()内
- (按需求,写业务代码),将value当前行内容按空格切分,得到三个单词Dear | Bear | River
- 将每个单词变成键值对,输出出去(Dear, 1) | (Bear, 1) | (River, 1);最终结果写入map任务所在节点的本地磁盘中(内里还有细节,讲到shuffle时,再细细展开)
- block的第一行的数据被处理完后,接着处理第二行;逻辑同上
- 当map任务将当前block中所有的数据全部处理完后,此map任务即运行结束
- 其它的每一个map任务都是如上逻辑,不再赘述
- Reduce阶段
- reduce任务(reduce task)的个数由自己写的程序编程指定,main()内的job.setNumReduceTasks(4)指定reduce任务是4个(reduce1、reduce2、reduce3、reduce4)
- 每一个reduce任务的逻辑一样,所以以第一个reduce任务(reduce1)为例分析
- map1任务完成后,reduce1通过网络,连接到map1,将map1输出结果中属于reduce1的分区的数据,通过网络获取到reduce1端(拷贝阶段)
- 同样也如此连接到map2、map3获取结果
- 最终reduce1端获得4个(Dear, 1)键值对;由于key键相同,它们分到同一组;
- 4个(Dear, 1)键值对,转换成[Dear, Iterable(1, 1, 1, )],作为两个参数传入reduce()
- 在reduce()内部,计算Dear的总数为4,并将(Dear, 4)作为键值对输出
- 每个reduce任务最终输出文件(内里还有细节,讲到shuffle时,再细细展开),文件写入到HDFS
2.2 MR中key的作用
-
MapReduce编程中,key有特殊的作用
-
①数据中,若要针对某个值进行分组、聚合时,需将此值作为MR中的reduce的输入的key
-
如当前的词频统计例子,按单词进行分组,每组中对出现次数做聚合(计算总和);所以需要将每个单词作为reduce输入的key,MapReduce框架自动按照单词分组,进而求出每组即每个单词的总次数

-
②另外,key还具有可排序的特性,因为MR中的key类需要实现WritableComparable接口;而此接口又继承Comparable接口(可查看源码)
-
MR编程时,要充分利用以上两点;结合实际业务需求,设置合适的key


-
2.3 创建MAVEN工程
所有编程操作,在hadoop集群某节点的IDEA中完成
- 使用IDEA创建maven工程
- pom文件参考提供的pom.xml,主要用到的dependencies有
<properties><cdh.version>2.6.0-cdh5.14.2</cdh.version></properties><repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0-mr1-cdh5.14.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${cdh.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${cdh.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${cdh.version}</version></dependency><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.testng</groupId><artifactId>testng</artifactId><version>RELEASE</version><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version><scope>compile</scope></dependency></dependencies>
2.4 MR参考代码
-
创建包com.kaikeba.hadoop.wordcount
-
在包中创建自定义mapper类、自定义reducer类、包含main类
2.4.1 Mapper代码
package com.kaikeba.hadoop.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 类Mapper<LongWritable, Text, Text, IntWritable>的四个泛型分别表示* map方法的输入的键的类型kin、值的类型vin;输出的键的类型kout、输出的值的类型vout* kin指的是当前所读行行首相对于split分片开头的字节偏移量,所以是long类型,对应序列化类型LongWritable* vin指的是当前所读行,类型是String,对应序列化类型Text* kout根据需求,输出键指的是单词,类型是String,对应序列化类型是Text* vout根据需求,输出值指的是单词的个数,1,类型是int,对应序列化类型是IntWritable**/
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {/*** 处理分片split中的每一行的数据;针对每行数据,会调用一次map方法* 在一次map方法调用时,从一行数据中,获得一个个单词word,再将每个单词word变成键值对形式(word, 1)输出出去* 输出的值最终写到本地磁盘中* @param key 当前所读行行首相对于split分片开头的字节偏移量* @param value 当前所读行* @param context* @throws IOException* @throws InterruptedException*/public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//当前行的示例数据(单词间空格分割):Dear Bear River//取得当前行的数据String line = value.toString();//按照\t进行分割,得到当前行所有单词String[] words = line.split("\t");for (String word : words) {//将每个单词word变成键值对形式(word, 1)输出出去//同样,输出前,要将kout, vout包装成对应的可序列化类型,如String对应Text,int对应IntWritablecontext.write(new Text(word), new IntWritable(1));}}
}
2.4.2 Reducer代码
package com.kaikeba.hadoop.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/**** Reducer<Text, IntWritable, Text, IntWritable>的四个泛型分别表示* reduce方法的输入的键的类型kin、输入值的类型vin;输出的键的类型kout、输出的值的类型vout* 注意:因为map的输出作为reduce的输入,所以此处的kin、vin类型分别与map的输出的键类型、值类型相同* kout根据需求,输出键指的是单词,类型是String,对应序列化类型是Text* vout根据需求,输出值指的是每个单词的总个数,类型是int,对应序列化类型是IntWritable**/
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {/**** key相同的一组kv对,会调用一次reduce方法* 如reduce task汇聚了众多的键值对,有key是hello的键值对,也有key是spark的键值对,如下* (hello, 1)* (hello, 1)* (hello, 1)* (hello, 1)* ...* (spark, 1)* (spark, 1)* (spark, 1)** 其中,key是hello的键值对被分成一组;merge成[hello, Iterable(1,1,1,1)],调用一次reduce方法* 同样,key是spark的键值对被分成一组;merge成[spark, Iterable(1,1,1)],再调用一次reduce方法** @param key 当前组的key* @param values 当前组中,所有value组成的可迭代集和* @param context reduce上下文环境对象* @throws IOException* @throws InterruptedException*/public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {//定义变量,用于累计当前单词出现的次数int sum = 0;for (IntWritable count : values) {//从count中获得值,累加到sum中sum += count.get();}//将单词、单词次数,分别作为键值对,输出context.write(key, new IntWritable(sum));// 输出最终结果};
}
2.4.3 Main程序入口
package com.kaikeba.hadoop.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;/**** MapReduce程序入口* 注意:* 导包时,不要导错了;* 另外,map\reduce相关的类,使用mapreduce包下的,是新API,如org.apache.hadoop.mapreduce.Job;;*/
public class WordCountMain {//若在IDEA中本地执行MR程序,需要将mapred-site.xml中的mapreduce.framework.name值修改成local//参数 c:/test/README.txt c:/test/wcpublic static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {//判断一下,输入参数是否是两个,分别表示输入路径、输出路径if (args.length != 2 || args == null) {System.out.println("please input Path!");System.exit(0);}Configuration configuration = new Configuration();//configuration.set("mapreduce.framework.name","local");//告诉程序,要运行的jar包在哪//configuration.set("mapreduce.job.jar","/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");//调用getInstance方法,生成job实例Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());//设置job的jar包,如果参数指定的类包含在一个jar包中,则此jar包作为job的jar包; 参数class跟主类在一个工程即可;一般设置成主类
// job.setJarByClass(WordCountMain.class);job.setJarByClass(WordCountMain.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat,输出格式是TextOutputFormat;所以下两行可以注释掉
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(WordCountMap.class);//设置map combine类,减少网路传出量job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(WordCountReduce.class);//注意:如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型//注意:此处设置的map输出的key/value类型,一定要与自定义map类输出的kv对类型一致;否则程序运行报错
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);//设置reduce task最终输出key/value的类型//注意:此处设置的reduce输出的key/value类型,一定要与自定义reduce类输出的kv对类型一致;否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 提交作业job.waitForCompletion(true);}
}
程序运行有两种方式,分别是windows本地运行、集群运行,依次演示
2.5 本地运行
在windows的IDEA中运行
2.5.1 初次运行WordCountMain,先设置main方法参数,根据图示操作即可


-
弹出窗口中,设置包含main方法的类
-
设置main方法在参数
注意:两个参数间有一个英文空格,表示两个参数

c:/test/README.txt c:/test/wc

2.5.3 本地运行程序
- 在WordCountMain代码上,点击鼠标右键,运行程序

2.5.4 查看结果
①成功标识文件
②结果文件

2.6 集群运行
- 用maven插件打jar包;①点击Maven,②双击package打包

-
控制台打印结果;①表示打包成功;②是生成的jar所在路径

-
将jar包上传到node01用户主目录/home/hadoop下
-
用hadoop jar命令运行mr程序
[hadoop@node01 ~]$ cd
[hadoop@node01 ~]$ hadoop jar com.kaikeba.hadoop-1.0-SNAPSHOT.jar com.kaikeba.hadoop.wordcount.WordCountMain /README.txt /wordcount01
说明:
com.kaikeba.hadoop-1.0-SNAPSHOT.jar是jar包名
com.kaikeba.hadoop.wordcount.WordCountMain是包含main方法的类的全限定名
/NOTICE.txt和/wordcount是main方法的两个参数,表示输入路径、输出路径
- 确认结果
[hadoop@node01 ~]$ hadoop fs -ls /wordcount01

2.7 总结
- MR分为两个阶段:map阶段、reduce阶段
- MR输入的文件有几个block,就会生成几个map任务
- MR的reduce任务的个数,由程序中编程指定:job.setNumReduceTasks(4)
- map任务
- map任务中map()一次读取block的一行数据,以kv对的形式输入map()
- map()的输出作为reduce()的输入
- reduce任务
- reduce任务通过网络将各执行完成的map任务输出结果中,属于自己的数据取过来
- key相同的键值对作为一组,调用一次reduce()
- reduce任务生成一个结果文件
- 文件写入HDFS
3. WEB UI查看结果
3.1 Yarn
node01是resourcemanager所在节点主机名,根据自己的实际情况修改主机名
浏览器访问url地址:http://node01:8088

3.2 HDFS结果
浏览器输入URL:http://node01:50070
①点击下拉框;②浏览文件系统;③输入根目录,查看hdfs根路径中的内容

4. MapReduce编程(了解一下就ok):(海量)数据清洗
mapreduce在企业中,可以用于对海量数据的数据清洗;当然,随着新一代大数据框架的出现,也可以使用spark、flink等框架,做数据清洗
4.1 需求
- 现有一批日志文件,日志来源于用户使用搜狗搜索引擎搜索新闻,并点击查看搜索结果过程;
- 但是,日志中有一些记录损坏,现需要使用MapReduce来将这些损坏记录(如记录中少字段、多字段)从日志文件中删除,此过程就是传说中的数据清洗。
- 并且在清洗时,要统计损坏的记录数。
4.2 数据结构
-
日志格式:每行记录有6个字段;分别表示时间datetime、用户ID userid、新闻搜索关键字searchkwd、当前记录在返回列表中的序号retorder、用户点击链接的顺序cliorder、点击的URL连接cliurl

关于retorder、cliorder说明:

4.3 逻辑分析
-
MapReduce程序一般分为map阶段,将任务分而治之;
-
reduce阶段,将map阶段的结果进行聚合;
-
而有些mapreduce应用不需要数据聚合的操作,也就是说不需要reduce阶段。即编程时,不需要编写自定义的reducer类;在main()中调用job.setNumReduceTasks(0)设置
-
而本例的数据清洗就是属于此种情况
-
统计损坏的记录数,可使用自定义计数器的方式进行

-
map方法的逻辑:取得每一行数据,与每条记录的固定格式比对,是否符合;
- 若符合,则是完好的记录;
- 否则是损坏的记录。并对自定义计数器累加
4.4 MR代码
若要集群运行,需先将sogou.50w.utf8上传到HDFS根目录
[hadoop@node01 soft]$ pwd /kkb/soft [hadoop@node01 soft]$ hadoop fs -put sogou.50w.utf8 /运行等操作,与上边类似;不再赘述
4.4.1 Mapper类
具体逻辑,可详见代码注释
注意:实际工作中,写良好的代码注释也是基本的职业素养
package com.kaikeba.hadoop.dataclean;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/**** 现对sogou日志数据,做数据清洗;将不符合格式要求的数据删除* 每行记录有6个字段;* 分别表示时间datetime、用户ID userid、新闻搜索关键字searchkwd、当前记录在返回列表中的序号retorder、用户点击链接的顺序cliorder、点击的URL连接cliurl* 日志样本:* 20111230111308 0bf5778fc7ba35e657ee88b25984c6e9 nba直播 4 1 http://www.hoopchina.com/tv**/
public class DataClean {/**** 基本上大部分MR程序的main方法逻辑,大同小异;将其他MR程序的main方法代码拷贝过来,稍做修改即可* 实际开发中,也会有很多的复制、粘贴、修改** 注意:若要IDEA中,本地运行MR程序,需要将resources/mapred-site.xml中的mapreduce.framework.name属性值,设置成local* @param args*/public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//判断一下,输入参数是否是两个,分别表示输入路径、输出路径if (args.length != 2 || args == null) {System.out.println("please input Path!");System.exit(0);}Configuration configuration = new Configuration();//调用getInstance方法,生成job实例Job job = Job.getInstance(configuration, DataClean.class.getSimpleName());//设置jar包,参数是包含main方法的类job.setJarByClass(DataClean.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(DataCleanMapper.class);//注意:此处设置的map输出的key/value类型,一定要与自定义map类输出的kv对类型一致;否则程序运行报错job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//注意:因为不需要reduce聚合阶段,所以,需要显示设置reduce task个数是0job.setNumReduceTasks(0);// 提交作业job.waitForCompletion(true);}/*** * 自定义mapper类* 注意:若自定义的mapper类,与main方法在同一个类中,需要将自定义mapper类,声明成static的*/public static class DataCleanMapper extends Mapper<LongWritable, Text, Text, NullWritable> {//为了提高程序的效率,避免创建大量短周期的对象,出发频繁GC;此处生成一个对象,共用NullWritable nullValue = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//自定义计数器,用于记录残缺记录数Counter counter = context.getCounter("DataCleaning", "damagedRecord");//获得当前行数据//样例数据:20111230111645 169796ae819ae8b32668662bb99b6c2d 塘承高速公路规划线路图 1 1 http://auto.ifeng.com/roll/20111212/729164.shtmlString line = value.toString();//将行数据按照记录中,字段分隔符切分String[] fields = line.split("\t");//判断字段数组长度,是否为6if(fields.length != 6) {//若不是,则不输出,并递增自定义计数器counter.increment(1L);} else {//若是6,则原样输出context.write(value, nullValue);}}}
}
4.4.2 运行结果
仅以本地运行演示
-
运行
-
①reduce 0%,job就已经successfully,表示此MR程序没有reduce阶段
-
②DataCleaning是自定义计数器组名;damagedRecord是自定义的计数器;值为6,表示有6条损坏记录

-
图中part-m-00000中的m表示,此文件是由map任务生成

4.5 总结
-
MR可用于数据清洗;另外,也可以使用Spark、Flink等组件做数据清洗
-
可使用自定义计数器记录符合特定条件的记录数,用于统计
5. MapReduce编程:用户搜索次数(了解一下就ok)
5.1 需求
- 使用MR编程,统计sogou日志数据中,每个用户搜索的次数;结果写入HDFS
5.2 数据结构
-
数据来源自“MapReduce编程:数据清洗”中的输出结果
-
仍然是sogou日志数据;不再赘述

5.3 逻辑分析
- 还记得之前提到的MR中key的作用吗?
- MR编程时,若要针对某个值对数据进行分组、聚合时,如当前的词频统计例子,需要将每个单词作为reduce输入的key,从而按照单词分组,进而求出每组即每个单词的总次数
- 那么此例也是类似的。
- 统计每个用户的搜索次数,将userid放到reduce输入的key的位置;
- 对userid进行分组
- 进而统计每个用户的搜索次数
5.4 MR代码
给MR程序在IDEA中设置参数,运行等操作,与上边类似;不再赘述
此处MR程序的输入文件是“MapReduce编程:数据清洗”中的输出结果文件
package com.kaikeba.hadoop.searchcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/**** 本MR示例,用于统计每个用户搜索并查看URL链接的次数*/
public class UserSearchCount {/**** @param args C:\test\datacleanresult c:\test\usersearch* @throws IOException* @throws ClassNotFoundException* @throws InterruptedException*/public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//判断一下,输入参数是否是两个,分别表示输入路径、输出路径if (args.length != 2 || args == null) {System.out.println("please input Path!");System.exit(0);}Configuration configuration = new Configuration();//configuration.set("mapreduce.job.jar","/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");//调用getInstance方法,生成job实例Job job = Job.getInstance(configuration, UserSearchCount.class.getSimpleName());//设置jar包,参数是包含main方法的类job.setJarByClass(UserSearchCount.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat,输出格式是TextOutputFormat;所以下两行可以注释掉
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(SearchCountMapper.class);//设置map combine类,减少网路传出量//job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(SearchCountReducer.class);//如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型//注意:此处设置的map输出的key/value类型,一定要与自定义map类输出的kv对类型一致;否则程序运行报错
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);//设置reduce task最终输出key/value的类型//注意:此处设置的reduce输出的key/value类型,一定要与自定义reduce类输出的kv对类型一致;否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//提交作业job.waitForCompletion(true);}public static class SearchCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {//定义共用的对象,减少GC压力Text userIdKOut = new Text();IntWritable vOut = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获得当前行的数据//样例数据:20111230111645 169796ae819ae8b32668662bb99b6c2d 塘承高速公路规划线路图 1 1 http://auto.ifeng.com/roll/20111212/729164.shtmlString line = value.toString();//切分,获得各字段组成的数组String[] fields = line.split("\t");//因为要统计每个user搜索并查看URL的次数,所以将userid放到输出key的位置//注意:MR编程中,根据业务需求设计key是很重要的能力String userid = fields[1];//设置输出的key的值userIdKOut.set(userid);//输出结果context.write(userIdKOut, vOut);}}public static class SearchCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {//定义共用的对象,减少GC压力IntWritable totalNumVOut = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for(IntWritable value: values) {sum += value.get();}//设置当前user搜索并查看总次数totalNumVOut.set(sum);context.write(key, totalNumVOut);}}
}
5.4.1 结果
- 运行参数
C:\test\datacleanresult c:\test\usersearch
- 运行结果

5.5 总结
- 结合本例子的需求,设计MR程序;因为要统计每个用户的搜索次数,所以最终userid作为reduce的输出的key
- MR编程能够根据业务需求设计合适的key是一个很重要的能力;而这是需要建立在自己地MR框架原理有清晰认识的基础之上的
6. Shuffle(重点 )!!!洗牌
- shuffle主要指的是map端的输出作为reduce端输入的过程
6.1 shuffle简图

6.2 shuffle细节图

-
分区用到了分区器,默认分区器是HashPartitioner
源码:
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {public void configure(JobConf job) {}/** Use {@link Object#hashCode()} to partition. */public int getPartition(K2 key, V2 value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}
6.3 map端
- 每个map任务都有一个对应的环形内存缓冲区;输出是kv对,先写入到环形缓冲区(默认大小100M),当内容占据80%缓冲区空间后,由一个后台线程将缓冲区中的数据溢出写到一个磁盘文件
- 在溢出写的过程中,map任务可以继续向环形缓冲区写入数据;但是若写入速度大于溢出写的速度,最终造成100m占满后,map任务会暂停向环形缓冲区中写数据的过程;只执行溢出写的过程;直到环形缓冲区的数据全部溢出写到磁盘,才恢复向缓冲区写入
- 后台线程溢写磁盘过程,有以下几个步骤:
- 先对每个溢写的kv对做分区;分区的个数由MR程序的reduce任务数决定;默认使用HashPartitioner计算当前kv对属于哪个分区;计算公式:(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
- 每个分区中,根据kv对的key做内存中排序;
- 若设置了map端本地聚合combiner,则对每个分区中,排好序的数据做combine操作;
- 若设置了对map输出压缩的功能,会对溢写数据压缩
- 随着不断的向环形缓冲区中写入数据,会多次触发溢写(每当环形缓冲区写满100m),本地磁盘最终会生成多个溢出文件
- 合并溢写文件:在map task完成之前,所有溢出文件会被合并成一个大的溢出文件;且是已分区、已排序的输出文件
- 小细节:
- 在合并溢写文件时,如果至少有3个溢写文件,并且设置了map端combine的话,会在合并的过程中触发combine操作;
- 但是若只有2个或1个溢写文件,则不触发combine操作(因为combine操作,本质上是一个reduce,需要启动JVM虚拟机,有一定的开销)
6.4 reduce端
-
reduce task会在每个map task运行完成后,通过HTTP获得map task输出中,属于自己的分区数据(许多kv对)
-
如果map输出数据比较小,先保存在reduce的jvm内存中,否则直接写入reduce磁盘
-
一旦内存缓冲区达到阈值(默认0.66)或map输出数的阈值(默认1000),则触发归并merge,结果写到本地磁盘
-
若MR编程指定了combine,在归并过程中会执行combine操作
-
随着溢出写的文件的增多,后台线程会将它们合并大的、排好序的文件
-
reduce task将所有map task复制完后,将合并磁盘上所有的溢出文件
-
默认一次合并10个
-
最后一批合并,部分数据来自内存,部分来自磁盘上的文件
-
进入“归并、排序、分组阶段”
-
每组数据调用一次reduce方法
6.5 总结
- map端
- map()输出结果先写入环形缓冲区
- 缓冲区100M;写满80M后,开始溢出写磁盘文件
- 此过程中,会进行分区、排序、combine(可选)、压缩(可选)
- map任务完成前,会将多个小的溢出文件,合并成一个大的溢出文件(已分区、排序)
- reduce端
- 拷贝阶段:reduce任务通过http将map任务属于自己的分区数据拉取过来
- 开始merge及溢出写磁盘文件
- 所有map任务的分区全部拷贝过来后,进行阶段合并、排序、分组阶段
- 每组数据调用一次reduce()
- 结果写入HDFS
7. 自定义分区(重点)!!!
7.1 分区原理
-
根据之前讲的shuffle,我们知道在map任务中,从环形缓冲区溢出写磁盘时,会先对kv对数据进行分区操作
-
分区操作是由MR中的分区器负责的
-
MapReduce有自带的默认分区器
- HashPartitioner
- 关键方法getPartition返回当前键值对的分区索引(partition index)
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {public void configure(JobConf job) {}/** Use {@link Object#hashCode()} to partition. */public int getPartition(K2 key, V2 value, int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;} } -
环形缓冲区溢出写磁盘前,将每个kv对,作为getPartition()的参数传入;
-
先对键值对中的key求hash值(int类型),与MAX_VALUE按位与;再模上reduce task个数,假设reduce task个数设置为4(可在程序中使用job.setNumReduceTasks(4)指定reduce task个数为4)
- 那么map任务溢出文件有4个分区,分区index分别是0、1、2、3
- getPartition()结果有四种:0、1、2、3
- 根据计算结果,决定当前kv对,落入哪个分区,如结果是0,则当前kv对落入溢出文件的0分区中
- 最终被相应的reduce task通过http获得


- 若是MR默认分区器,不满足需求;可根据业务逻辑,设计自定义分区器,比如实现图上的功能
7.2 默认分区
程序执行略
代码详见工程com.kaikeba.hadoop.partitioner包
-
MR读取三个文件part1.txt、part2.txt、part3.txt;三个文件放到HDFS目录:/customParttitioner中

-
part1.txt内容如下:
Dear Bear River Dear Car -
part2.txt内容如下:
Car Car River Dear Bear -
part3.txt内容如下:
Dear Car Bear Car Car -
默认HashPartitioner分区时,查看结果(看代码)

- 运行参数:
/customParttitioner /cp01
- 打jar包运行,结果如下:

只有part-r-00001、part-r-00003有数据;另外两个没有数据
HashPartitioner将Bear分到index=1的分区;将Car|Dear|River分到index=3分区
7.3 自定义分区
7.3.1 需求
- 自定义分区,使得文件中,分别以Dear、Bear、River、Car为键的键值对,分别落到index是0、1、2、3的分区中
7.3.2 逻辑分析
- 若要实现以上的分区策略,需要自定义分区类
- 此类实现Partitioner接口
- 在getPartition()中实现分区逻辑
- main方法中
- 设定reduce个数为4
- 设置自定义的分区类,调用job.setPartitionerClass方法
7.3.3 MR代码
完整代码见代码工程
- 自定义分区类如下
package com.kaikeba.hadoop.partitioner;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;import java.util.HashMap;public class CustomPartitioner extends Partitioner<Text, IntWritable> {public static HashMap<String, Integer> dict = new HashMap<String, Integer>();//定义每个键对应的分区index,使用map数据结构完成static{dict.put("Dear", 0);dict.put("Bear", 1);dict.put("River", 2);dict.put("Car", 3);}public int getPartition(Text text, IntWritable intWritable, int i) {//int partitionIndex = dict.get(text.toString());return partitionIndex;}
}

- 运行结果

结果满足需求
7.4 总结
- 如果默认分区器不满足业务需求,可以自定义分区器
- 自定义分区器的类继承Partitioner类
- 覆写getPartition(),在方法中,定义自己的分区策略
- 在main()方法中调用job.setPartitionerClass()
- main()中设置reduce任务数
8. 自定义Combiner(重点)!!!本质是reduce,Map端聚合
8.1 需求
-
普通的MR是reduce通过http,取得map任务的分区结果;具体的聚合出结果是在reduce端进行的;
-
以单词计数为例:
- 下图中的第一个map任务(map1),本地磁盘中的结果有5个键值对:(Dear, 1)、(Bear, 1)、(River, 1)、(Dear, 1)、(Car, 1)
- 其中,map1中的两个相同的键值对(Dear, 1)、(Dear, 1),会被第一个reduce任务(reduce1)通过网络拉取到reduce1端
- 那么假设map1中(Dear, 1)有1亿个呢?按原思路,map1端需要存储1亿个(Dear, 1),再将1亿个(Dear, 1)通过网络被reduce1获得,然后再在reduce1端汇总
- 这样做map端本地磁盘IO、数据从map端到reduce端传输的网络IO比较大
- 那么想,能不能在reduce1从map1拉取1亿个(Dear, 1)之前,在map端就提前先做下reduce汇总,得到结果(Dear, 100000000),然后再将这个结果(一个键值对)传输到reduce1呢?
- 答案是可以的
- 我们称之为combine操作
-
map端combine本地聚合(本质是reduce)

8.2 逻辑分析
-
注意:
-
不论运行多少次Combine操作,都不能影响最终的结果
-
并非所有的mr都适合combine操作,比如求平均值
参考:《并非所有MR都适合combine.txt》
-
-
原理图
看原图

-
当每个map任务的环形缓冲区添满80%,开始溢写磁盘文件
-
此过程会分区、每个分区内按键排序、再combine操作(若设置了combine的话)、若设置map输出压缩的话则再压缩
- 在合并溢写文件时,如果至少有3个溢写文件,并且设置了map端combine的话,会在合并的过程中触发combine操作;
- 但是若只有2个或1个溢写文件,则不触发combine操作(因为combine操作,本质上是一个reduce,需要启动JVM虚拟机,有一定的开销)
-
combine本质上也是reduce;因为自定义的combine类继承自Reducer父类
-
map: (K1, V1) -> list(K2, V2)
-
combiner: (K2, list(V2)) -> (K2, V2)
-
reduce: (K2, list(V2)) -> (K3, V3)
- reduce函数与combine函数通常是一样的
- K3与K2类型相同;
- V3与V2类型相同
- 即reduce的输入的kv类型分别与输出的kv类型相同
8.3 MR代码
对原词频统计代码做修改;
详细代码见代码工程
- WordCountMap、WordCountReduce代码保持不变
- 唯一需要做的修改是在WordCountMain中,增加job.setCombinerClass(WordCountReduce.class);
- 修改如下:

8.4 小结
- 使用combine时,首先考虑当前MR是否适合combine
- 总原则是不论使不使用combine不能影响最终的结果
- 在MR时,发生数据倾斜,且可以使用combine时,可以使用combine缓解数据倾斜
9. MR压缩
9.1 需求
- 作用:在MR中,为了减少磁盘IO及网络IO,可考虑在map端、reduce端设置压缩功能
- 给“MapReduce编程:用户搜索次数”代码,增加压缩功能
9.2 逻辑分析
- 那么如何设置压缩功能呢?只需在main方法中,给Configuration对象增加如下设置即可
//开启map输出进行压缩的功能
configuration.set("mapreduce.map.output.compress", "true");
//设置map输出的压缩算法是:BZip2Codec,它是hadoop默认支持的压缩算法,且支持切分
configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");
//开启job输出压缩功能
configuration.set("mapreduce.output.fileoutputformat.compress", "true");
//指定job输出使用的压缩算法
configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");
9.3 MR代码
-
给“MapReduce编程:用户搜索次数”代码,增加压缩功能,代码如下
如何打jar包,已演示过,此处不再赘述
package com.kaikeba.hadoop.mrcompress;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** 本MR示例,用于统计每个用户搜索并查看URL链接的次数*/
public class UserSearchCount {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//判断以下,输入参数是否是两个,分别表示输入路径、输出路径if (args.length != 2 || args == null) {System.out.println("please input Path!");System.exit(0);}Configuration configuration = new Configuration();//configuration.set("mapreduce.job.jar","/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");//开启map输出进行压缩的功能configuration.set("mapreduce.map.output.compress", "true");//设置map输出的压缩算法是:BZip2Codec,它是hadoop默认支持的压缩算法,且支持切分configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");//开启job输出压缩功能configuration.set("mapreduce.output.fileoutputformat.compress", "true");//指定job输出使用的压缩算法configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");//调用getInstance方法,生成job实例Job job = Job.getInstance(configuration, UserSearchCount.class.getSimpleName());//设置jar包,参数是包含main方法的类job.setJarByClass(UserSearchCount.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat,所以下两行可以注释掉
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// FileOutputFormat.setCompressOutput(job, true);
// FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);//设置处理Map阶段的自定义的类job.setMapperClass(SearchCountMapper.class);//设置map combine类,减少网路传出量//job.setCombinerClass(WordCountReduce.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(SearchCountReducer.class);//如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型//注意:此处设置的map输出的key/value类型,一定要与自定义map类输出的kv对类型一致;否则程序运行报错
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(IntWritable.class);//设置reduce task最终输出key/value的类型//注意:此处设置的reduce输出的key/value类型,一定要与自定义reduce类输出的kv对类型一致;否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 提交作业job.waitForCompletion(true);}public static class SearchCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {//定义共用的对象,减少GC压力Text userIdKOut = new Text();IntWritable vOut = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获得当前行的数据//样例数据:20111230111645 169796ae819ae8b32668662bb99b6c2d 塘承高速公路规划线路图 1 1 http://auto.ifeng.com/roll/20111212/729164.shtmlString line = value.toString();//切分,获得各字段组成的数组String[] fields = line.split("\t");//因为要统计每个user搜索并查看URL的次数,所以将userid放到输出key的位置//注意:MR编程中,根据业务需求设计key是很重要的能力String userid = fields[1];//设置输出的key的值userIdKOut.set(userid);//输出结果context.write(userIdKOut, vOut);}}public static class SearchCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {//定义共用的对象,减少GC压力IntWritable totalNumVOut = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for(IntWritable value: values) {sum += value.get();}//设置当前user搜索并查看总次数totalNumVOut.set(sum);context.write(key, totalNumVOut);}}
}
- 生成jar包,并运行jar包
[hadoop@node01 target]$ hadoop jar com.kaikeba.hadoop-1.0-SNAPSHOT.jar com.kaikeba.hadoop.mrcompress.UserSearchCount /sogou.2w.utf8 /compressed
-
查看结果
可增加数据量,查看使用压缩算法前后的系统各计数器的数据量变化
[hadoop@node01 target]$ hadoop fs -ls -h /compressed

9.4 总结
- MR过程中使用压缩可减少数据量,进而减少磁盘IO、网络IO数据量
- 可设置map端输出的压缩
- 可设置job最终结果的压缩
- 通过相应的配置项即可实现
10. 自定义InputFormat(难点)!!!!!
10.1 MapReduce执行过程

-
上图也描述了mapreduce的一个完整的过程;我们主要看map任务是如何从hdfs读取分片数据的部分
-
涉及3个关键的类
-
①InputFormat输入格式类
②InputSplit输入分片类:getSplits()
- InputFormat输入格式类将输入文件分成一个个分片InputSplit
- 每个Map任务对应一个split分片
③RecordReader记录读取器类:createRecordReader()
- RecordReader(记录读取器)读取分片数据,一行记录生成一个键值对
- 传入map任务的map()方法,调用map()
-
-
所以,如果需要根据自己的业务情况,自定义输入的话,需要自定义两个类:
- InputFormat类
- RecordReader类
-
详细流程:
-
客户端调用InputFormat的**getSplits()**方法,获得输入文件的分片信息

-
针对每个MR job会生成一个相应的app master,负责map\reduce任务的调度及监控执行情况
-
将分片信息传递给MR job的app master
-
app master根据分片信息,尽量将map任务尽量调度在split分片数据所在节点(移动计算不移动数据)

-
有几个分片,就生成几个map任务
-
每个map任务将split分片传递给createRecordReader()方法,生成此分片对应的RecordReader
-
RecordReader用来读取分片的数据,生成记录的键值对
- nextKeyValue()判断是否有下一个键值对,如果有,返回true;否则,返回false
- 如果返回true,调用getCurrentKey()获得当前的键
- 调用getCurrentValue()获得当前的值
-
map任务运行过程

-
map任务运行时,会调用run()
-
首先运行一次setup()方法;只在map任务启动时,运行一次;一些初始化的工作可以在setup方法中完成;如要连接数据库之类的操作
-
while循环,调用context.nextKeyValue();会委托给RecordRecord的nextKeyValue(),判断是否有下一个键值对
-
如果有下一个键值对,调用context.getCurrentKey()、context.getCurrentValue()获得当前的键、值的值(也是调用RecordReader的同名方法)

-
作为参数传入map(key, value, context),调用一次map()
-
当读取分片尾,context.nextKeyValue()返回false;退出循环
-
调用cleanup()方法,只在map任务结束之前,调用一次;所以,一些回收资源的工作可在此方法中实现,如关闭数据库连接
-
-
10.2 需求
- 无论hdfs还是mapreduce,处理小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案
10.3 逻辑分析
- 小文件的优化无非以下几种方式:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS(SequenceFile方案)
- 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并;可使用自定义InputFormat实现
- 在mapreduce处理时,可采用CombineFileInputFormat提高效率
- 本例使用第二种方案,自定义输入格式
10.4 MR代码
-
自定义InputFormat
package com.kaikeba.hadoop.inputformat;import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;/*** 自定义InputFormat类;* 泛型:* 键:因为不需要使用键,所以设置为NullWritable* 值:值用于保存小文件的内容,此处使用BytesWritable*/ public class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {/**** 返回false,表示输入文件不可切割* @param context* @param file* @return*/@Overrideprotected boolean isSplitable(JobContext context, Path file) {return false;}/*** 生成读取分片split的RecordReader* @param split* @param context* @return* @throws IOException* @throws InterruptedException*/@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException {//使用自定义的RecordReader类WholeFileRecordReader reader = new WholeFileRecordReader();//初始化RecordReaderreader.initialize(split, context);return reader;} } -
自定义RecordReader
实现6个相关方法
package com.kaikeba.hadoop.inputformat;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/**** RecordReader的核心工作逻辑:* 通过nextKeyValue()方法去读取数据构造将返回的key value* 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value** @author*/ public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {//要读取的分片private FileSplit fileSplit;private Configuration conf;//读取的value数据private BytesWritable value = new BytesWritable();/**** 标识变量,分片是否已被读取过;因为小文件设置成了不可切分,所以一个小文件只有一个分片;* 而这一个分片的数据,只读取一次,一次读完所有数据* 所以设置此标识*/private boolean processed = false;/*** 初始化* @param split* @param context* @throws IOException* @throws InterruptedException*/@Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {this.fileSplit = (FileSplit) split;this.conf = context.getConfiguration();}/*** 判断是否有下一个键值对。若有,则读取分片中的所有的数据* @return* @throws IOException* @throws InterruptedException*/@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!processed) {byte[] contents = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try {in = fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents, 0, contents.length);} finally {IOUtils.closeStream(in);}processed = true;return true;}return false;}/*** 获得当前的key* @return* @throws IOException* @throws InterruptedException*/@Overridepublic NullWritable getCurrentKey() throws IOException,InterruptedException {return NullWritable.get();}/*** 获得当前的value* @return* @throws IOException* @throws InterruptedException*/@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {return value;}/*** 获得分片读取的百分比;因为如果读取分片数据的话,会一次性的读取完;所以进度要么是1,要么是0* @return* @throws IOException*/@Overridepublic float getProgress() throws IOException {//因为一个文件作为一个整体处理,所以,如果processed为true,表示已经处理过了,进度为1;否则为0return processed ? 1.0f : 0.0f;}@Overridepublic void close() throws IOException {} } -
main方法
package com.kaikeba.hadoop.inputformat;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/*** 让主类继承Configured类,实现Tool接口* 实现run()方法* 将以前main()方法中的逻辑,放到run()中* 在main()中,调用ToolRunner.run()方法,第一个参数是当前对象;第二个参数是输入、输出*/ public class SmallFiles2SequenceFile extends Configured implements Tool {/*** 自定义Mapper类* mapper类的输入键值对类型,与自定义InputFormat的输入键值对保持一致* mapper类的输出的键值对类型,分别是文件名、文件内容*/static class SequenceFileMapper extendsMapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;/*** 取得文件名* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void setup(Context context) throws IOException,InterruptedException {InputSplit split = context.getInputSplit();//获得当前文件路径Path path = ((FileSplit) split).getPath();filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value,Context context) throws IOException, InterruptedException {context.write(filenameKey, value);}}public int run(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf,"combine small files to sequencefile");job.setJarByClass(SmallFiles2SequenceFile.class);//设置自定义输入格式job.setInputFormatClass(WholeFileInputFormat.class);WholeFileInputFormat.addInputPath(job,new Path(args[0]));//设置输出格式SequenceFileOutputFormat及输出路径job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setOutputPath(job,new Path(args[1]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SmallFiles2SequenceFile(),args);System.exit(exitCode);} }
10.5 总结
- 若要自定义InputFormat的话
- 需要自定义InputFormat类,并覆写getRecordReader()方法
- 自定义RecordReader类,实现方法
- initialize()
- nextKeyValue()
- getCurrentKey()
- getCurrentValue()
- getProgress()
- close()
11. 自定义OutputFormat
11.1 需求
-
现在有一些订单的评论数据,要将订单的好评与其它级别的评论(中评、差评)进行区分开来,将最终的数据分开到不同的文件夹下面去
-
数据第九个字段表示评分等级:0 好评,1 中评,2 差评

11.2 逻辑分析
- 程序的关键点是在一个mapreduce程序中,根据数据的不同(好评的评级不同),输出两类结果到不同目录
- 这类灵活的输出,需求通过自定义OutputFormat来实现
11.3 实现要点
- 在mapreduce中访问外部资源
- 自定义OutputFormat类,覆写getRecordWriter()方法
- 自定义RecordWriter类,覆写具体输出数据的方法write()
11.4 MR代码
- 自定义OutputFormat
package com.kaikeba.hadoop.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/**** 本例使用框架默认的Reducer,它将Mapper输入的kv对,原样输出;所以reduce输出的kv类型分别是Text, NullWritable* 自定义OutputFormat的类,泛型表示reduce输出的键值对类型;要保持一致;* map--(kv)-->reduce--(kv)-->OutputFormat*/
public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> {/*** 两个输出文件;* good用于保存好评文件;其它评级保存到bad中* 根据实际情况修改path;node01及端口号8020*/String bad = "hdfs://node01:8020/outputformat/bad/r.txt";String good = "hdfs://node01:8020/outputformat/good/r.txt";/**** @param context* @return* @throws IOException* @throws InterruptedException*/@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {//获得文件系统对象FileSystem fs = FileSystem.get(context.getConfiguration());//两个输出文件路径Path badPath = new Path(bad);Path goodPath = new Path(good);FSDataOutputStream badOut = fs.create(badPath);FSDataOutputStream goodOut = fs.create(goodPath);return new MyRecordWriter(badOut,goodOut);}/*** 泛型表示reduce输出的键值对类型;要保持一致*/static class MyRecordWriter extends RecordWriter<Text, NullWritable>{FSDataOutputStream badOut = null;FSDataOutputStream goodOut = null;public MyRecordWriter(FSDataOutputStream badOut, FSDataOutputStream goodOut) {this.badOut = badOut;this.goodOut = goodOut;}/*** 自定义输出kv对逻辑* @param key* @param value* @throws IOException* @throws InterruptedException*/@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {if (key.toString().split("\t")[9].equals("0")){//好评goodOut.write(key.toString().getBytes());goodOut.write("\r\n".getBytes());}else{//其它评级badOut.write(key.toString().getBytes());badOut.write("\r\n".getBytes());}}/*** 关闭流* @param context* @throws IOException* @throws InterruptedException*/@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if(goodOut !=null){goodOut.close();}if(badOut !=null){badOut.close();}}}
}
- main方法
package com.kaikeba.hadoop.outputformat;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class MyOwnOutputFormatMain extends Configured implements Tool {public int run(String[] args) throws Exception {Configuration conf = super.getConf();Job job = Job.getInstance(conf, MyOwnOutputFormatMain.class.getSimpleName());job.setJarByClass(MyOwnOutputFormatMain.class);//默认项,可以省略或者写出也可以//job.setInputFormatClass(TextInputFormat.class);//设置输入文件TextInputFormat.addInputPath(job, new Path(args[0]));job.setMapperClass(MyMapper.class);//job.setMapOutputKeyClass(Text.class);//job.setMapOutputValueClass(NullWritable.class);//设置自定义的输出类job.setOutputFormatClass(MyOutPutFormat.class);//设置一个输出目录,这个目录会输出一个success的成功标志的文件MyOutPutFormat.setOutputPath(job, new Path(args[1]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//默认项,即默认有一个reduce任务,所以可以省略
// job.setNumReduceTasks(1);
// //Reducer将输入的键值对原样输出
// job.setReducerClass(Reducer.class);boolean b = job.waitForCompletion(true);return b ? 0: 1;}/**** Mapper输出的key、value类型* 文件每行的内容作为输出的key,对应Text类型* 输出的value为null,对应NullWritable*/public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//把当前行内容作为key输出;value为nullcontext.write(value, NullWritable.get());}}/**** @param args /ordercomment.csv /ofo* @throws Exception*/public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();ToolRunner.run(configuration, new MyOwnOutputFormatMain(), args);}
}
11.5 总结
-
自定义outputformat
- 泛型与reduce输出的键值对类型保持一致
- 覆写getRecordWriter()方法
-
自定义RecordWriter
- 泛型与reduce输出的键值对类型保持一致
- 覆写具体输出数据的方法write()、close()
-
main方法
- job.setOutputFormatClass使用自定义在输出类
12. 二次排序(重点 )
12.1 需求
-
数据:有一个简单的关于员工工资的记录文件salary.txt
-
每条记录如下,有3个字段,分别表示name、age、salary
-
nancy 22 8000
-
-
使用MR处理记录,实现结果中
- 按照工资从高到低的降序排序
- 若工资相同,则按年龄升序排序
12.2 逻辑分析
-
利用MR中key具有可比较的特点
-
MapReduce中,根据key进行分区、排序、分组
-
有些MR的输出的key可以直接使用hadoop框架的可序列化可比较类型表示,如Text、IntWritable等等,而这些类型本身是可比较的;如IntWritable默认升序排序

-
但有时,使用MR编程,输出的key,若使用hadoop自带的key类型无法满足需求
- 此时,需要自定义的key类型(包含的是非单一信息,如此例包含工资、年龄);
- 并且也得是**可序列化、可比较的**
-
需要自定义key,定义排序规则
- 实现:按照人的salary降序排序,若相同,则再按age升序排序;若salary、age相同,则放入同一组
12.3 MR代码
- 详见工程代码
- 自定义key类型Person类
package com.kaikeba.hadoop.secondarysort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;//根据输入文件格式,定义JavaBean,作为MR时,Map的输出key类型;要求此类可序列化、可比较
public class Person implements WritableComparable<Person> {private String name;private int age;private int salary;public Person() {}public Person(String name, int age, int salary) {//super();this.name = name;this.age = age;this.salary = salary;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public int getSalary() {return salary;}public void setSalary(int salary) {this.salary = salary;}@Overridepublic String toString() {return this.salary + " " + this.age + " " + this.name;}//两个Person对象的比较规则:①先比较salary,高的排序在前;②若相同,age小的在前public int compareTo(Person other) {int compareResult= this.salary - other.salary;if(compareResult != 0) {//若两个人工资不同//工资降序排序;即工资高的排在前边return -compareResult;} else {//若工资相同//年龄升序排序;即年龄小的排在前边return this.age - other.age;}}//序列化,将NewKey转化成使用流传送的二进制public void write(DataOutput dataOutput) throws IOException {//注意:①使用正确的write方法;②记住此时的序列化的顺序,name、age、salarydataOutput.writeUTF(name);dataOutput.writeInt(age);dataOutput.writeInt(salary);}//使用in读字段的顺序,要与write方法中写的顺序保持一致:name、age、salarypublic void readFields(DataInput dataInput) throws IOException {//read string//注意:①使用正确的read方法;②读取顺序与write()中序列化的顺序保持一致this.name = dataInput.readUTF();this.age = dataInput.readInt();this.salary = dataInput.readInt();}
}
- main类、mapper、reducer
package com.kaikeba.hadoop.secondarysort;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;public class SecondarySort {/**** @param args /salary.txt /secondarysort* @throws Exception*/public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");Job job = Job.getInstance(configuration, SecondarySort.class.getSimpleName());FileSystem fileSystem = FileSystem.get(URI.create(args[1]), configuration);//生产中慎用if (fileSystem.exists(new Path(args[1]))) {fileSystem.delete(new Path(args[1]), true);}FileInputFormat.setInputPaths(job, new Path(args[0]));job.setMapperClass(MyMap.class);//由于mapper与reducer输出的kv类型分别相同,所以,下两行可以省略
// job.setMapOutputKeyClass(Person.class);
// job.setMapOutputValueClass(NullWritable.class);//设置reduce的个数;默认为1//job.setNumReduceTasks(1);job.setReducerClass(MyReduce.class);job.setOutputKeyClass(Person.class);job.setOutputValueClass(NullWritable.class);FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);}//MyMap的输出key用自定义的Person类型;输出的value为nullpublic static class MyMap extends Mapper<LongWritable, Text, Person, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] fields = value.toString().split("\t");String name = fields[0];int age = Integer.parseInt(fields[1]);int salary = Integer.parseInt(fields[2]);//在自定义类中进行比较Person person = new Person(name, age, salary);//person对象作为输出的keycontext.write(person, NullWritable.get());}}public static class MyReduce extends Reducer<Person, NullWritable, Person, NullWritable> {@Overrideprotected void reduce(Person key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {//输入的kv对,原样输出context.write(key, NullWritable.get());}}
}
12.4 总结
- 如果MR时,key的排序规则比较复杂,比如需要根据字段1排序,若字段1相同,则需要根据字段2排序…,此时,可以使用自定义key实现
- 将自定义的key作为MR中,map输出的key的类型(reduce输入的类型)
- 自定义的key
- 实现WritableComparable接口
- 实现compareTo比较方法
- 实现write序列化方法
- 实现readFields反序列化方法
13. 自定义分组求topN(重难点)
13.1 需求
-
现有一个淘宝用户订单历史记录文件;每条记录有6个字段,分别表示
- userid、datetime、title商品标题、unitPrice商品单价、purchaseNum购买量、productId商品ID

-
现使用MR编程,求出每个用户、每个月消费金额最多的两笔订单,花了多少钱
- 所以得相同用户、同一个年月的数据,分到同一组
13.2 逻辑分析
- 根据文件格式,自定义JavaBean类OrderBean
- 实现WritableComparable接口
- 包含6个字段分别对应文件中的6个字段
- 重点实现compareTo方法
- 先比较userid是否相等;若不相等,则userid升序排序
- 若相等,比较两个Bean的日期是否相等;若不相等,则日期升序排序
- 若相等,再比较总开销,降序排序
- 实现序列化方法write()
- 实现反序列化方法readFields()
- 自定义分区类
- 继承Partitioner类
- getPartiton()实现,userid相同的,处于同一个分区
- 自定义Mapper类
- 输出key是当前记录对应的Bean对象
- 输出的value对应当前订单的总开销
- 自定义分组类
- 决定userid相同、日期(年月)相同的记录,分到同一组中,调用一次reduce()
- 自定义Reduce类
- reduce()中求出当前一组数据中,开销头两笔的信息
- main方法
- job.setMapperClass
- job.setPartitionerClass
- job.setReducerClass
- job.setGroupingComparatorClass
13.3 MR代码
详细代码见代码工程
- OrderBean
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;//实现WritableComparable接口
public class OrderBean implements WritableComparable<OrderBean> {//用户IDprivate String userid;//年月//year+month -> 201408private String datetime;//标题private String title;//单价private double unitPrice;//购买量private int purchaseNum;//商品IDprivate String produceId;public OrderBean() {}public OrderBean(String userid, String datetime, String title, double unitPrice, int purchaseNum, String produceId) {super();this.userid = userid;this.datetime = datetime;this.title = title;this.unitPrice = unitPrice;this.purchaseNum = purchaseNum;this.produceId = produceId;}//key的比较规则public int compareTo(OrderBean other) {//OrderBean作为MR中的key;如果对象中的userid相同,即ret1为0;就表示两个对象是同一个用户int ret1 = this.userid.compareTo(other.userid);if (ret1 == 0) {//如果userid相同,比较年月String thisYearMonth = this.getDatetime();String otherYearMonth = other.getDatetime();int ret2 = thisYearMonth.compareTo(otherYearMonth);if(ret2 == 0) {//若datetime相同//如果userid、年月都相同,比较单笔订单的总开销Double thisTotalPrice = this.getPurchaseNum()*this.getUnitPrice();Double oTotalPrice = other.getPurchaseNum()*other.getUnitPrice();//总花销降序排序;即总花销高的排在前边return -thisTotalPrice.compareTo(oTotalPrice);} else {//若datatime不同,按照datetime升序排序return ret2;}} else {//按照userid升序排序return ret1;}}/*** 序列化* @param dataOutput* @throws IOException*/public void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(userid);dataOutput.writeUTF(datetime);dataOutput.writeUTF(title);dataOutput.writeDouble(unitPrice);dataOutput.writeInt(purchaseNum);dataOutput.writeUTF(produceId);}/*** 反序列化* @param dataInput* @throws IOException*/public void readFields(DataInput dataInput) throws IOException {this.userid = dataInput.readUTF();this.datetime = dataInput.readUTF();this.title = dataInput.readUTF();this.unitPrice = dataInput.readDouble();this.purchaseNum = dataInput.readInt();this.produceId = dataInput.readUTF();}/*** 使用默认分区器,那么userid相同的,落入同一分区;* 另外一个方案:此处不覆写hashCode方法,而是自定义分区器,getPartition方法中,对OrderBean的userid求hashCode值%reduce任务数* @return*/
// @Override
// public int hashCode() {
// return this.userid.hashCode();
// }@Overridepublic String toString() {return "OrderBean{" +"userid='" + userid + '\'' +", datetime='" + datetime + '\'' +", title='" + title + '\'' +", unitPrice=" + unitPrice +", purchaseNum=" + purchaseNum +", produceId='" + produceId + '\'' +'}';}public String getUserid() {return userid;}public void setUserid(String userid) {this.userid = userid;}public String getDatetime() {return datetime;}public void setDatetime(String datetime) {this.datetime = datetime;}public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}public double getUnitPrice() {return unitPrice;}public void setUnitPrice(double unitPrice) {this.unitPrice = unitPrice;}public int getPurchaseNum() {return purchaseNum;}public void setPurchaseNum(int purchaseNum) {this.purchaseNum = purchaseNum;}public String getProduceId() {return produceId;}public void setProduceId(String produceId) {this.produceId = produceId;}
}
- MyPartitioner
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;//mapper的输出key类型是自定义的key类型OrderBean;输出value类型是单笔订单的总开销double -> DoubleWritable
public class MyPartitioner extends Partitioner<OrderBean, DoubleWritable> {@Overridepublic int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int numReduceTasks) {//userid相同的,落入同一分区return (orderBean.getUserid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
}
- MyMapper
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 输出kv,分别是OrderBean、用户每次下单的总开销*/
public class MyMapper extends Mapper<LongWritable, Text, OrderBean, DoubleWritable> {DoubleWritable valueOut = new DoubleWritable();DateUtils dateUtils = new DateUtils();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//13764633023 2014-12-01 02:20:42.000 全视目Allseelook 原宿风暴显色美瞳彩色隐形艺术眼镜1片 拍2包邮 33.6 2 18067781305String record = value.toString();String[] fields = record.split("\t");if(fields.length == 6) {String userid = fields[0];String datetime = fields[1];String yearMonth = dateUtils.getYearMonthString(datetime);String title = fields[2];double unitPrice = Double.parseDouble(fields[3]);int purchaseNum = Integer.parseInt(fields[4]);String produceId = fields[5];//生成OrderBean对象OrderBean orderBean = new OrderBean(userid, yearMonth, title, unitPrice, purchaseNum, produceId);//此订单的总开销double totalPrice = unitPrice * purchaseNum;valueOut.set(totalPrice);context.write(orderBean, valueOut);}}
}
- MyReducer
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;//输出的key为userid拼接上年月的字符串,对应Text;输出的value对应单笔订单的金额
public class MyReducer extends Reducer<OrderBean, DoubleWritable, Text, DoubleWritable> {/*** ①由于自定义分组逻辑,相同用户、相同年月的订单是一组,调用一次reduce();* ②由于自定义的key类OrderBean中,比较规则compareTo规定,相同用户、相同年月的订单,按总金额降序排序* 所以取出头两笔,就实现需求* @param key* @param values* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void reduce(OrderBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {//求每个用户、每个月、消费金额最多的两笔多少钱int num = 0;for(DoubleWritable value: values) {if(num < 2) {String keyOut = key.getUserid() + " " + key.getDatetime();context.write(new Text(keyOut), value);num++;} else {break;}}}
}
- MyGroup
package com.kaikeba.hadoop.grouping;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;//自定义分组类:reduce端调用reduce()前,对数据做分组;每组数据调用一次reduce()
public class MyGroup extends WritableComparator {public MyGroup() {//第一个参数表示key classsuper(OrderBean.class, true);}//分组逻辑@Overridepublic int compare(WritableComparable a, WritableComparable b) {//userid相同,且同一月的分成一组OrderBean aOrderBean = (OrderBean)a;OrderBean bOrderBean = (OrderBean)b;String aUserId = aOrderBean.getUserid();String bUserId = bOrderBean.getUserid();//userid、年、月相同的,作为一组int ret1 = aUserId.compareTo(bUserId);if(ret1 == 0) {//同一用户//年月也相同返回0,在同一组;return aOrderBean.getDatetime().compareTo(bOrderBean.getDatetime());} else {return ret1;}}
}
- CustomGroupingMain
package com.kaikeba.hadoop.grouping;import com.kaikeba.hadoop.wordcount.WordCountMain;
import com.kaikeba.hadoop.wordcount.WordCountMap;
import com.kaikeba.hadoop.wordcount.WordCountReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class CustomGroupingMain extends Configured implements Tool {///tmall-201412-test.csv /cgopublic static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new CustomGroupingMain(), args);System.exit(exitCode);}@Overridepublic int run(String[] args) throws Exception {//判断以下,输入参数是否是两个,分别表示输入路径、输出路径if (args.length != 2 || args == null) {System.out.println("please input Path!");System.exit(0);}Configuration configuration = new Configuration();//告诉程序,要运行的jar包在哪//configuration.set("mapreduce.job.jar","/home/hadoop/IdeaProjects/Hadoop/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");//调用getInstance方法,生成job实例Job job = Job.getInstance(configuration, CustomGroupingMain.class.getSimpleName());//设置jar包,参数是包含main方法的类job.setJarByClass(CustomGroupingMain.class);//通过job设置输入/输出格式//MR的默认输入格式是TextInputFormat,所以下两行可以注释掉
// job.setInputFormatClass(TextInputFormat.class);
// job.setOutputFormatClass(TextOutputFormat.class);//设置输入/输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置处理Map阶段的自定义的类job.setMapperClass(MyMapper.class);//设置map combine类,减少网路传出量//job.setCombinerClass(MyReducer.class);job.setPartitionerClass(MyPartitioner.class);//设置处理Reduce阶段的自定义的类job.setReducerClass(MyReducer.class);job.setGroupingComparatorClass(MyGroup.class);//如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型//注意:此处设置的map输出的key/value类型,一定要与自定义map类输出的kv对类型一致;否则程序运行报错job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(DoubleWritable.class);//设置reduce task最终输出key/value的类型//注意:此处设置的reduce输出的key/value类型,一定要与自定义reduce类输出的kv对类型一致;否则程序运行报错job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 提交作业return job.waitForCompletion(true) ? 0 : 1;}
}
- DateUtils
package com.kaikeba.hadoop.grouping;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;public class DateUtils {public static void main(String[] args) {//test1
// String str1 = "13764633024 2014-10-01 02:20:42.000";
// String str2 = "13764633023 2014-11-01 02:20:42.000";
// System.out.println(str1.compareTo(str2));//test2
// String datetime = "2014-12-01 02:20:42.000";
// LocalDateTime localDateTime = parseDateTime(datetime);
// int year = localDateTime.getYear();
// int month = localDateTime.getMonthValue();
// int day = localDateTime.getDayOfMonth();
// System.out.println("year-> " + year + "; month -> " + month + "; day -> " + day);//test3
// String datetime = "2014-12-01 02:20:42.000";
// System.out.println(getYearMonthString(datetime));}public LocalDateTime parseDateTime(String dateTime) {DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");LocalDateTime localDateTime = LocalDateTime.parse(dateTime, formatter);return localDateTime;}//日期格式转换工具类:将2014-12-14 20:42:14.000转换成201412public String getYearMonthString(String dateTime) {DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");LocalDateTime localDateTime = LocalDateTime.parse(dateTime, formatter);int year = localDateTime.getYear();int month = localDateTime.getMonthValue();return year + "" + month;}}
13.4 总结
- 要实现自定义分组逻辑
- 一般会自定义JavaBean,作为map输出的key
- 实现其中的compareTo方法,设置key的比较逻辑
- 实现序列化方法write()
- 实现反序列化方法readFields()
- 自定义mapper类、reducer类
- 自定义partition类,getPartition方法,决定哪些key落入哪些分区
- 自定义group分组类,决定reduce阶段,哪些kv对,落入同一组,调用一次reduce()
- 写main方法,设置自定义的类
- job.setMapperClass
- job.setPartitionerClass
- job.setReducerClass
- job.setGroupingComparatorClass
- 一般会自定义JavaBean,作为map输出的key
14. MapReduce数据倾斜(经常被问到)!!!
-
什么是数据倾斜?
- 数据中不可避免地会出现离群值(outlier),并导致数据倾斜。这些离群值会显著地拖慢MapReduce的执行。
-
常见的数据倾斜有以下几类:
- 数据频率倾斜——某一个区域的数据量要远远大于其他区域。比如某一个key对应的键值对远远大于其他键的键值对。
- 数据大小倾斜——部分记录的大小远远大于平均值。
-
在map端和reduce端都有可能发生数据倾斜
- 在map端的数据倾斜可以考虑使用combine
- 在reduce端的数据倾斜常常来源于MapReduce的默认分区器
-
数据倾斜会导致map和reduce的任务执行时间大为延长,也会让需要缓存数据集的操作消耗更多的内存资源
14.1 如何诊断是否存在数据倾斜
- 如何诊断哪些键存在数据倾斜?
- 发现倾斜数据之后,有必要诊断造成数据倾斜的那些键。有一个简便方法就是在代码里实现追踪每个键的最大值。
- 为了减少追踪量,可以设置数据量阀值,只追踪那些数据量大于阀值的键,并输出到日志中。实现代码如下
- 运行作业后就可以从日志中判断发生倾斜的键以及倾斜程度;跟踪倾斜数据是了解数据的重要一步,也是设计MapReduce作业的重要基础
package com.kaikeba.hadoop.dataskew;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import org.apache.log4j.Logger;import java.io.IOException;public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {private int maxValueThreshold;//日志类private static final Logger LOGGER = Logger.getLogger(WordCountReduce.class);@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//一个键达到多少后,会做数据倾斜记录maxValueThreshold = 10000;}/*(hello, 1)(hello, 1)(hello, 1)...(spark, 1)key: hellovalue: List(1, 1, 1)*/public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;//用于记录键出现的次数int i = 0;for (IntWritable count : values) {sum += count.get();i++;}//如果当前键超过10000个,则打印日志if(i > maxValueThreshold) {LOGGER.info("Received " + i + " values for key " + key);}context.write(key, new IntWritable(sum));// 输出最终结果};}
14.2 减缓数据倾斜
-
Reduce数据倾斜一般是指map的输出数据中存在数据频率倾斜的状况,即部分输出键的数据量远远大于其它的输出键
-
如何减小reduce端数据倾斜的性能损失?常用方式有:
-
一、自定义分区
-
基于输出键的背景知识进行自定义分区。
-
例如,如果map输出键的单词来源于一本书。其中大部分必然是省略词(stopword)。那么就可以将自定义分区将这部分省略词发送给固定的一部分reduce实例。而将其他的都发送给剩余的reduce实例。
-
-
二、Combine
- 使用Combine可以大量地减小数据频率倾斜和数据大小倾斜。
- combine的目的就是聚合并精简数据。
-
三、抽样和范围分区
-
Hadoop默认的分区器是HashPartitioner,基于map输出键的哈希值分区。这仅在数据分布比较均匀时比较好。在有数据倾斜时就很有问题。
-
使用分区器需要首先了解数据的特性。TotalOrderPartitioner中,可以通过对原始数据进行抽样得到的结果集来预设分区边界值。
-
TotalOrderPartitioner中的范围分区器可以通过预设的分区边界值进行分区。因此它也可以很好地用在矫正数据中的部分键的数据倾斜问题。
-
-
四、数据大小倾斜的自定义策略
-
在map端或reduce端的数据大小倾斜都会对缓存造成较大的影响,乃至导致OutOfMemoryError异常。处理这种情况并不容易。可以参考以下方法。
-
设置mapreduce.input.linerecordreader.line.maxlength来限制RecordReader读取的最大长度。
-
RecordReader在TextInputFormat和KeyValueTextInputFormat类中使用。默认长度没有上限。
-
-
15. MR调优
- 有调优专题
16. 抽样、范围分区
16.1 数据
-
数据:气象站气象数据,来源美国国家气候数据中心(NCDC)(1901-2001年数据,每年一个文件)
- 气候数据record的格式如下

16.2 需求
- 对气象数据,按照气温进行排序(气温符合正太分布)
16.3 实现方案
-
三种实现思路
- 方案一:
- 设置一个分区,即一个reduce任务;在一个reduce中对结果进行排序;
- 失去了MR框架并行计算的优势
- 方案二:
- 自定义分区,人为指定各温度区间的记录,落入哪个分区;如分区温度边界值分别是-15、0、20,共4个分区
- 但由于对整个数据集的气温分布不了解,可能某些分区的数据量大,其它的分区小,数据倾斜
- 方案三:
- 通过对键空间采样
- 只查看一小部分键,获得键的近似分布(好温度的近似分布)
- 进而据此结果创建分区,实现尽可能的均匀的划分数据集;
- Hadoop内置了采样器;InputSampler
- 方案一:
16.4 MR代码
分两大步
-
一、先将数据按气温对天气数据集排序。结果存储为sequencefile文件,气温作为输出键,数据行作为输出值
-
代码
此代码处理原始日志文件
结果用SequenceFile格式存储;
温度作为SequenceFile的key;记录作为value
package com.kaikeba.hadoop.totalorder;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import java.io.IOException;/*** 此代码处理原始日志文件 1901* 结果用SequenceFile格式存储;* 温度作为SequenceFile的key;记录作为value*/
public class SortDataPreprocessor {//输出的key\value分别是气温、记录static class CleanerMapper extends Mapper<LongWritable, Text, IntWritable, Text> {private NcdcRecordParser parser = new NcdcRecordParser();private IntWritable temperature = new IntWritable();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//0029029070999991901010106004+64333+023450FM-12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999parser.parse(value);if (parser.isValidTemperature()) {//是否是有效的记录temperature.set(parser.getAirTemperature());context.write(temperature, value);}}}//两个参数:/ncdc/input /ncdc/sfoutputpublic static void main(String[] args) throws Exception {if (args.length != 2) {System.out.println("<input> <output>");}Configuration conf = new Configuration();Job job = Job.getInstance(conf, SortDataPreprocessor.class.getSimpleName());job.setJarByClass(SortDataPreprocessor.class);//FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(CleanerMapper.class);//最终输出的键、值类型job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);//reduce个数为0job.setNumReduceTasks(0);//以sequencefile的格式输出job.setOutputFormatClass(SequenceFileOutputFormat.class);//开启job输出压缩功能//方案一conf.set("mapreduce.output.fileoutputformat.compress", "true");conf.set("mapreduce.output.fileoutputformat.compress.type","RECORD");//指定job输出使用的压缩算法conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");//方案二//设置sequencefile的压缩、压缩算法、sequencefile文件压缩格式block//SequenceFileOutputFormat.setCompressOutput(job, true);//SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);//SequenceFileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);//SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
-
二、全局排序
使用全排序分区器TotalOrderPartitioner
package com.kaikeba.hadoop.totalorder;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;import java.net.URI;/*** 使用TotalOrderPartitioner全局排序一个SequenceFile文件的内容;* 此文件是SortDataPreprocessor的输出文件;* key是IntWritble,气象记录中的温度*/
public class SortByTemperatureUsingTotalOrderPartitioner{/*** 两个参数:/ncdc/sfoutput /ncdc/totalorder* 第一个参数是SortDataPreprocessor的输出文件*/public static void main(String[] args) throws Exception {if (args.length != 2) {System.out.println("<input> <output>");}Configuration conf = new Configuration();Job job = Job.getInstance(conf, SortByTemperatureUsingTotalOrderPartitioner.class.getSimpleName());job.setJarByClass(SortByTemperatureUsingTotalOrderPartitioner.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//输入文件是SequenceFilejob.setInputFormatClass(SequenceFileInputFormat.class);//Hadoop提供的方法来实现全局排序,要求Mapper的输入、输出的key必须保持类型一致job.setOutputKeyClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);//分区器:全局排序分区器job.setPartitionerClass(TotalOrderPartitioner.class);//分了3个区;且分区i-1中的key小于i分区中所有的键job.setNumReduceTasks(3);/*** 随机采样器从所有的分片中采样* 每一个参数:采样率;* 第二个参数:总的采样数* 第三个参数:采样的最大分区数;* 只要numSamples和maxSplitSampled(第二、第三参数)任一条件满足,则停止采样*/InputSampler.Sampler<IntWritable, Text> sampler =new InputSampler.RandomSampler<IntWritable, Text>(0.1, 5000, 10);
// TotalOrderPartitioner.setPartitionFile();/*** 存储定义分区的键;即整个数据集中温度的大致分布情况;* 由TotalOrderPartitioner读取,作为全排序的分区依据,让每个分区中的数据量近似*/InputSampler.writePartitionFile(job, sampler);//根据上边的SequenceFile文件(包含键的近似分布情况),创建分区String partitionFile = TotalOrderPartitioner.getPartitionFile(job.getConfiguration());URI partitionUri = new URI(partitionFile);// JobConf jobConf = new JobConf();//与所有map任务共享此文件,添加到分布式缓存中DistributedCache.addCacheFile(partitionUri, job.getConfiguration());
// job.addCacheFile(partitionUri);//方案一:输出的文件RECORD级别,使用BZip2Codec进行压缩conf.set("mapreduce.output.fileoutputformat.compress", "true");conf.set("mapreduce.output.fileoutputformat.compress.type","RECORD");//指定job输出使用的压缩算法conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.BZip2Codec");//方案二//SequenceFileOutputFormat.setCompressOutput(job, true);//SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);//SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}
17 总结拓展
-
对大量数据进行全局排序
-
先使用InputSampler.Sampler采样器,对整个key空间进行采样,得到key的近似分布
-
保存到key分布情况文件中
-
使用TotalOrderPartitioner,利用上边的key分布情况文件,进行分区;每个分区的数据量近似,从而防止数据倾斜
-
-
扩展阅读:《Hadoop权威指南(第4版)》
-
7.3小节 - shuffle和排序
-
8.2 输入格式——MR中还有一些自带的输入格式
-
9.2.3 全排序


-


- 描述MR的shuffle全流程(面试)
- 谈谈什么是数据倾斜,什么情况会造成数据倾斜?(面试)
- 对MR数据倾斜,如何解决?(面试)
- 补充图解》》》》》》》》》》
相关文章:
MapReduce编程模型
MapReduce编程模型 理解MapReduce编程模型独立完成一个MapReduce程序并运行成功了解MapReduce工程流程掌握并描述出shuffle全过程(面试)独立编写课堂及作业中的MR程序理解并解决数据倾斜 1. MapReduce编程模型 Hadoop架构图 Hadoop由HDFS分布式存储、M…...
SQL server2022的详细安装流程以及简单使用
鉴于SQL Server2008R2版本过于老旧,本文主要讲述如何安装SQL Server 2022。 本文主要详细介绍SQL server2022的详细安装流程以及简单使用,以《数据库系统概论(第5版)》的第79页—第80页为例,详细介绍如何使用SQL serv…...
Linux的诞生:一场自由与协作的技术革命
Linux的诞生:一场自由与协作的技术革命 在今天的互联网世界,Linux几乎无处不在——从智能手机(Android内核)到超级计算机,从云计算平台到家用路由器,它的身影渗透在技术的各个角落。但这样一个改变世界的操…...
Pytorch为什么 nn.CrossEntropyLoss = LogSoftmax + nn.NLLLoss?
为什么 nn.CrossEntropyLoss LogSoftmax nn.NLLLoss? 在使用 PyTorch 时,我们经常听说 nn.CrossEntropyLoss 是 LogSoftmax 和 nn.NLLLoss 的组合。这句话听起来简单,但背后到底是怎么回事?为什么这两个分开的功能加起来就等于…...
Go入门之文件
以只读方式打开文件 package mainimport ("fmt""io""os" )func main() {file, err : os.Open("./main.go")defer file.Close()if err ! nil {fmt.Println(err)return}fmt.Println(file)var tempSlice make([]byte, 128)var strSlice…...
基因型—环境两向表数据分析——品种生态区划分
参考资料:农作物品种试验数据管理与分析 用于品种生态区划分的GGE双标图有两种功能图:试点向量功能图和“谁赢在哪里”功能图。双标图的具体模型基于SD定标和h加权和试点中心化的数据。本例中籽粒产量的GGE双标图仅解释了G和GE总变异的53.6%,…...
Leetcode2414:最长的字母序连续子字符串的长度
题目描述: 字母序连续字符串 是由字母表中连续字母组成的字符串。换句话说,字符串 "abcdefghijklmnopqrstuvwxyz" 的任意子字符串都是 字母序连续字符串 。 例如,"abc" 是一个字母序连续字符串,而 "ac…...
React(12)案例前期准备
1、创建项目 npx creat-react-app xxx 这里注意 react版本过高会导致antd组件无法安装 需要手动修改pagejson文件中的react和react-demo版本号为 18.2.0 npm i 在配置别名路径 创建craco文件 const path require("path"); module.exports {webpack: {alias: …...
2025年2月28日(RAG)
从图片中的内容来看,用户提到的“RAG”实际上是“Retrieval-Augmented Generation”的缩写,中文称为“检索增强生成”。这是一种结合了检索(Retrieval)和生成(Generation)的技术,用于增强自然语…...
python-leetcode-寻找重复数
287. 寻找重复数 - 力扣(LeetCode) class Solution:def findDuplicate(self, nums: List[int]) -> int:# Step 1: 找到环的相遇点slow nums[0]fast nums[0]# 使用快慢指针,直到相遇while True:slow nums[slow] # 慢指针走一步fast nu…...
Vue 3 中,如果 public 目录下的 .js 文件中有一个函数执行后生成数据,并希望将这些数据传递到组件中
在 Vue 3 中,如果 public 目录下的 .js 文件中有一个函数执行后生成数据,并希望将这些数据传递到组件中,可以使用 window.postMessage,但需要结合具体场景。以下是不同方法的详细说明: 方法 1:使用 window…...
ai大模型自动化测试-TensorFlow Testing 测试模型实例
AI大模型自动化测试是确保模型质量、可靠性和性能的关键环节,以下将从测试流程、测试内容、测试工具及测试挑战与应对几个方面进行详细介绍: 测试流程 测试计划制定 确定测试目标:明确要测试的AI大模型的具体功能、性能、安全性等方面的目标,例如评估模型在特定任务上的准…...
初阶MySQL(两万字全面解析)
文章目录 1.初识MySQL1.1数据库1.2查看数据库1.3创建数据库1.4字符集编码和排序规则1.5修改数据库1.6删除数据库 2.MySQL常用数据类型和表的操作2.(一)常用数据类型1.数值类2.字符串类型3.二进制类型4.日期类型 2.(二)表的操作1查看指定库中所有表2.创建表 3.查看表结构和查看表…...
数据库数据恢复—SQL Server附加数据库报错“错误 823”怎么办?
SQL Server数据库附加数据库过程中比较常见的报错是“错误 823”,附加数据库失败。 如果数据库有备份则只需还原备份即可。但是如果没有备份,备份时间太久,或者其他原因导致备份不可用,那么就需要通过专业手段对数据库进行数据恢复…...
SpringBatch简单处理多表批量动态更新
项目需要处理一堆表,这些表数据量不是很大都有经纬度信息,但是这些表的数据没有流域信息,需要按经纬度信息计算所属流域信息。比较简单的项目,按DeepSeek提示思索完成开发,AI真好用。 阿里AI个人版本IDEA安装 IDEA中使…...
夜莺监控 - 边缘告警引擎架构详解
前言 夜莺类似 Grafana 可以接入多个数据源,查询数据源的数据做告警和展示。但是有些数据源所在的机房和中心机房之间网络链路不好,如果由 n9e 进程去周期性查询数据并判定告警,那在网络链路抖动或拥塞的时候,告警就不稳定了。所…...
18440二维差分
18440二维差分 ⭐️难度:中等 📖 📚 import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner scanner new Scanner(System.in);int n scanner.nextInt();int m scanner.nextInt();int q scanne…...
安全传输,高效共享 —— 体验FileLink的跨网文件传输
在当今数字化转型的浪潮中,企业在进行跨网文件传输时面临诸多挑战,包括数据安全、传输速度和用户体验等。为了解决这些问题,FileLink应运而生,成为一款高效、安全的跨网文件传输解决方案。 一、FileLink的核心特点 1.加密技术 …...
SOME/IP 教程知识点总结
总结关于SOME/IP的教程,首先通读整个文件,理解各个部分的内容。看起来这个教程从介绍开始,讲到了为什么在车辆中使用以太网,然后详细讲解了SOME/IP的概念、序列化、消息传递、服务发现(SOME/IP-SD)、发布/订阅机制以及支持情况。 首先,我需要确认每个章节的主要知识点。…...
学习路程八 langchin核心组件 Models补充 I/O和 Redis Cache
前序 之前了解了Models,Prompt,但有些资料又把这块与输出合称为模型输入输出(Model I/O):这是与各种大语言模型进行交互的基本组件。它允许开发者管理提示(prompt),通过通用接口调…...
微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...
Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...
Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...
三分算法与DeepSeek辅助证明是单峰函数
前置 单峰函数有唯一的最大值,最大值左侧的数值严格单调递增,最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值,最小值左侧的数值严格单调递减,最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...
MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
