[MIT6.5840]MapReduce
MapReduce
Lab 地址
https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
论文地址
https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf
工作原理
简单来讲,MapReduce是一种分布式框架,可以用来处理大规模数据。该框架抽象了两个接口,分别是Map
和Reduce
函数:
凡是符合这个模式的算法都可以使用该框架来实现并行化,执行流程如下图所示。
整个框架分为Master和Worker,Master负责分配map
和reduce
任务,Worker负责向Master申请任务并执行。执行流程如下:
Map阶段:
- 输入是大文件分割后的一组小文件,通常大小为16~64MB。
- Worker向Master申请任务,假设得到map任务in0。
- Worker开始执行map任务,将文件名和文件内容作为参数传入map函数中,得到kv list.
- 最后Worker将kv list分割成reduceNum份(超参数),要求使得具有相同key的kv对在一份中。可以通过hash值%reduceNum实现分割,然后输出到文件中,下图的0-*
Reduce阶段:
- 输入当前reduce的序号id,从map阶段的输出中选出*-id的文件,也就是将hash值%reduceNum值相同的kv对取出,这样可以保证具有相同key的kv对只用一次处理。
- 将所有的kv对根据键值排序,使得相同key的kv对能够连续排列,方便合并。
- 之后合并相同key的kv对,然后将每个key和其对应的value list输入reduce函数,得到合并的结果,再将其输出到文件中。
本文介绍了大致思想,详细内容请参考原论文。
代码详解
rpc.go
package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import ("fmt""os""strconv"
)const (MAP = "MAP"REDUCE = "REDUCE"DONE = "DONE"
)//
// example to show how to declare the arguments
// and reply for an RPC.
//type ApplyArgs struct {WorkerID intLastTaskType stringLastTaskID int
}type ReplyArgs struct {TaskId intTaskType stringInputFile stringMapNum intReduceNum int
}// Add your RPC definitions here.// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/5840-mr-"s += strconv.Itoa(os.Getuid())return s
}
// 构造文件名
func tmpMapResult(workerID int, taskID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-%d-%d", workerID, taskID, reduceId)
}func finalMapResult(taskID int, reduceID int) string {return fmt.Sprintf("mr-%d-%d", taskID, reduceID)
}func tmpReduceResult(workerID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-out-%d", workerID, reduceId)
}func finalReduceResult(reduceID int) string {return fmt.Sprintf("mr-out-%d", reduceID)
}
worker.go
package mrimport ("fmt""hash/fnv""io""log""net/rpc""os""sort""strings"
)// Map functions return a slice of KeyValue.
type KeyValue struct {Key stringValue string
}// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {h := fnv.New32a()h.Write([]byte(key))return int(h.Sum32() & 0x7fffffff)
}// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.id := os.Getegid()// log.Printf("worker %d start working", id)lastTaskId := -1lastTaskType := ""loop:for {args := ApplyArgs{WorkerID: id,LastTaskType: lastTaskType,LastTaskID: lastTaskId,}reply := ReplyArgs{}ok := call("Coordinator.ApplyForTask", &args, &reply)if !ok {fmt.Printf("call failed!\n")continue}// log.Printf("reply: %v", reply)lastTaskId = reply.TaskIdlastTaskType = reply.TaskTypeswitch reply.TaskType {case "":// log.Println("finished")break loopcase MAP:// log.Printf("worker %d get map task %d", id, reply.TaskId)doMapTask(id, reply.TaskId, reply.InputFile, reply.ReduceNum, mapf)case REDUCE:// log.Printf("worker %d get reduce task %d", id, reply.TaskId)doReduceTask(id, reply.TaskId, reply.MapNum, reducef)}}// uncomment to send the Example RPC to the coordinator.// CallExample()}// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}func doMapTask(id int, taskId int, filename string, reduceNum int, mapf func(string, string) []KeyValue) {file, err := os.Open(filename)if err != nil {log.Fatalf("%s 文件打开失败! ", filename)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("%s 文件内容读取失败! ", filename)}file.Close()kvList := mapf(filename, string(content)) // kv listhashedKvList := make(map[int]ByKey)for _, kv := range kvList {hashedKey := ihash(kv.Key) % reduceNumhashedKvList[hashedKey] = append(hashedKvList[hashedKey], kv)}for i := 0; i < reduceNum; i++ {outFile, err := os.Create(tmpMapResult(id, taskId, i))if err != nil {log.Fatalf("can not create output file: %e", err)return}for _, kv := range hashedKvList[i] {fmt.Fprintf(outFile, "%v\t%v\n", kv.Key, kv.Value)}outFile.Close()}// log.Printf("worker %d finished map task\n", id)
}func doReduceTask(id int, taskId int, mapNum int, reducef func(string, []string) string) {var kvList ByKeyvar lines []stringfor i := 0; i < mapNum; i++ {mapOutFile := finalMapResult(i, taskId)file, err := os.Open(mapOutFile)if err != nil {log.Fatalf("can not open output file %s: %e", mapOutFile, err)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("file read failed %s: %e", mapOutFile, err)return}lines = append(lines, strings.Split(string(content), "\n")...)}for _, line := range lines {if strings.TrimSpace(line) == "" {continue}split := strings.Split(line, "\t")kvList = append(kvList, KeyValue{Key: split[0], Value: split[1]})}sort.Sort(kvList)outputFile := tmpReduceResult(id, taskId)file, err := os.Create(outputFile)if err != nil {log.Fatalf("can not create output file: %e", err)return}for i := 0; i < len(kvList); {j := i + 1key := kvList[i].Keyvar values []stringfor j < len(kvList) && kvList[j].Key == key {j++}for k := i; k < j; k++ {values = append(values, kvList[k].Value)}res := reducef(key, values)fmt.Fprintf(file, "%v %v\n", key, res)i = j}file.Close()// log.Printf("worker %d finished reduce task", id)
}
coordinator.go
package mrimport ("fmt""log""math""net""net/http""net/rpc""os""sync""time"
)type Task struct {id intinputFile stringworker inttaskType stringdeadLine time.Time
}type Coordinator struct {// Your definitions here.mtx sync.MutexinputFile []stringreduceNum intmapNum inttaskStates map[string]TasktodoList chan Taskstage string
}// Your code here -- RPC handlers for the worker to call.// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) ApplyForTask(args *ApplyArgs, reply *ReplyArgs) error {// process the last taskif args.LastTaskID != -1 {taskId := createTaskId(args.LastTaskID, args.LastTaskType)c.mtx.Lock()if task, ok := c.taskStates[taskId]; ok && task.worker != -1 { // 排除过期任务// log.Printf("worker %d finish task %d", args.WorkerID, task.id)if args.LastTaskType == MAP {for i := 0; i < c.reduceNum; i++ {err := os.Rename(tmpMapResult(task.worker, task.id, i), finalMapResult(task.id, i))if err != nil {log.Fatalf("can not rename %s: %e", tmpMapResult(task.worker, task.id, i), err)}}} else if args.LastTaskType == REDUCE {err := os.Rename(tmpReduceResult(task.worker, task.id), finalReduceResult(task.id))if err != nil {log.Fatalf("can not rename %s: %e", tmpReduceResult(task.worker, task.id), err)}}delete(c.taskStates, taskId)if len(c.taskStates) == 0 {c.shift()}}c.mtx.Unlock()}// assign the new tasktask, ok := <-c.todoListif !ok {return nil}reply.InputFile = task.inputFilereply.MapNum = c.mapNumreply.ReduceNum = c.reduceNumreply.TaskId = task.idreply.TaskType = task.taskTypetask.worker = args.WorkerIDtask.deadLine = time.Now().Add(10 * time.Second)// log.Printf("assign %s task %d to worker %d", task.taskType, task.id, args.WorkerID)c.mtx.Lock()c.taskStates[createTaskId(task.id, task.taskType)] = taskc.mtx.Unlock()return nil
}// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}
// 改变当前的状态
func (c *Coordinator) shift() {// 加锁状态if c.stage == MAP {// log.Printf("Map Task finished")c.stage = REDUCE// 分配reduce taskfor i := 0; i < c.reduceNum; i++ {task := Task{id: i,worker: -1,taskType: REDUCE,}c.todoList <- taskc.taskStates[createTaskId(i, REDUCE)] = task}} else if c.stage == REDUCE {close(c.todoList)c.stage = DONE}
}// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {// Your code here.c.mtx.Lock()defer c.mtx.Unlock()return c.stage == DONE
}// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{mtx: sync.Mutex{},inputFile: files,reduceNum: nReduce,mapNum: len(files),taskStates: make(map[string]Task),todoList: make(chan Task, int(math.Max(float64(nReduce), float64(len(files))))),stage: MAP,}for i, file := range files {task := Task{id: i,inputFile: file,worker: -1,taskType: MAP,}c.todoList <- taskc.taskStates[createTaskId(i, MAP)] = task}// 回收任务go c.collectTask()c.server()return &c
}func createTaskId(id int, taskType string) string {return fmt.Sprintf("%d-%s", id, taskType)
}
// worker执行过期后回收任务
func (c *Coordinator) collectTask() {for {time.Sleep(500 * time.Millisecond)c.mtx.Lock()if c.stage == DONE {c.mtx.Unlock()return}for _, task := range c.taskStates {if task.worker != -1 && time.Now().After(task.deadLine) {// task is expiredtask.worker = -1// log.Printf("task %d is expired", task.id)c.todoList <- task}}c.mtx.Unlock()}
}
运行说明
mrcoordinator
cd src/main/
go build -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run mrcoordinator.go pg-*.txt
mrworker
cd src/main/
go run mrworker.go wc.so
测试结果
bash test-mr.sh
MIT6.5840 课程Lab完整项目
https://github.com/Joker0x00/MIT-6.5840-Lab/
相关文章:

[MIT6.5840]MapReduce
MapReduce Lab 地址 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 论文地址 https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf 工作原理 简单来讲,MapReduce是一种分布式框架,可以用来处理…...

【系统架构设计师】计算机组成与体系结构 ⑯ ( 奇偶校验码 | CRC 循环冗余码 | 海明码 | 模 2 除法 )
文章目录 一、校验码1、校验码由来2、奇偶校验码3、CRC 循环冗余码 ( 重点考点 )4、海明码校验 ( 软考不经常考到 ) 二、CRC 循环冗余码 ( 重点考点 )1、模 2 除法概念2、模 2 除法步骤3、模 2 除法示例4、CRC 循环冗余码示例 15、CRC 循环冗余码示例 2 参考之前的博客 : 【计…...
springboot,service 层统一异常抛出时,throws Exception写在接口上还是实现类上
springboot,service 层统一异常抛出时,throws Exception写在实现接口上,不是直接写在实现类上...
深度学习高效性网络
为了减轻Transformer笨重的计算成本,一系列工作重点开发了高效的Vision Transformer,如Swin Transformer、PVT、Twins、CoAtNet和MobileViT。 1、字节TRT-ViT 兼具CNN的速度、Transformer精度的模型 TRT-ViT(Transformer-based Vision Tra…...

PyQt ERROR:ModuleNotFoundError: No module named ‘matplotlib‘
Solution:打开cmd输入指令下载malplotlib pip install matplotlib...
Flutter Geolocator插件使用指南:获取和监听地理位置
Flutter Geolocator插件使用指南:获取和监听地理位置 简介 geolocator 是一个Flutter插件,提供了一个简单易用的API来访问特定平台的地理位置服务。它支持获取设备的最后已知位置、当前位置、连续位置更新、检查设备上是否启用了位置服务,以…...

网站基本布局CSS
代码 <!DOCTYPE html> <html> <head><meta charset"utf-8"><meta name"viewport" content"widthdevice-width, initial-scale1"><title></title><style type"text/css">body {margi…...

ssm框架整合,异常处理器和拦截器(纯注解开发)
目录 ssm框架整合 第一步:指定打包方式和导入所需要的依赖 打包方法:war springMVC所需依赖 解析json依赖 mybatis依赖 数据库驱动依赖 druid数据源依赖 junit依赖 第二步:导入tomcat插件 第三步:编写配置类 SpringCon…...

古籍双层PDF制作教程:保姆级古籍数字化教程
在智慧古籍数字化项目中,很多图书馆要求将古籍导出为双层PDF,并且确保输出双层PDF底层文本与上层图片偏移量控制在1毫米以内。那么本教程带你使用古籍数字化平台,3分钟把一个古籍书籍转化为双侧PDF。 第1步:上传古籍 点批量上传…...
Git 删除 远端的分支
要删除 Git 远端的分支(例如: V3.2.1.13): 可以执行以下命令 git push origin --delete V3.2.1.13这条命令会向远端的仓库删除名为 V3.2.1.13 的分支。如果这个分支只在远端仓库存在而没有对应的本地分支,那么删除后这…...
PrgogressBar实现原理分析
ProgressBar 是 Android 中用于显示进度条的控件,它可以用来表示任务的完成程度或者加载进度等信息。ProgressBar 有两种主要类型:一种是确定性的(determinate),另一种是不确定性的(indeterminateÿ…...

【HarmonyOS】HarmonyOS NEXT学习日记:七、页面与组件的生命周期
【HarmonyOS】HarmonyOS NEXT学习日记:七、页面与组件的生命周期 页面和组件 组件:用Component装饰的代码称为自定义组件页面:Entry装饰的组件即页面的根节点 组件生命周期 aboutToAppear:在创建自定义组件的新实例后…...

【iOS】——Block循环引用
循环引用原因 如果在Block中使用附有_ _strong修饰符的对象类型自动变量,那么当Block从栈复制到堆时,该对象为Block所持有,这样容易引起循环引用。 HPPerson *person [[HPPerson alloc] init];person.block ^{NSLog("person.age--- …...
shell脚本自动化安装启动各种服务
1、自动化配置dns服务器 A主机:vim dns.sh #!/bin/bash# 自动化部署dns# 1、下载bind# 2、修改配置文件# vim /etc/named.conf # listen-on port 53 { 127.0.0.1;any; }; 修改(定位替换)# allow-query { localhost;any; }; 修改&am…...

Python - 开源库 ReportLab 库合并 CVS 和图像生成 PDF 文档
欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/140281680 免责声明:本文来源于个人知识与公开资料,仅用于学术交流,欢迎讨论,不支持转载。 Report…...

Java编写SIP协议
1、编写Server代码 package com.genersoft.iot.vmp.sip; import javax.sip.*; import javax.sip.message.*; import javax.sip.header.*; import java.util.*;public class SimpleSipServer implements SipListener {private SipFactory sipFactory;private SipStack sipStack…...
大型语言模型LLM的核心概念
本文主要介绍了目前主流的,几个大型语言模型LLM的整个训练过程 通常分为下面的几个阶段 1. 预训练 采用互联网上的大量数据进行训练,这一阶段大模型LLM的主体已定,找出共性并且压缩成一个模型。模型的参数量不是越大越好,遵循合理…...

软件测试---网络基础、HTTP
一、网络基础 (1)Web和网络知识 网络基础TCP/IP 使用HTTP协议访问Web WWW万维网的诞生 WWW万维网的构成 (2)IP协议 (3)可靠传输的TCP和三次握手策略 (4)域名解析服务DNS ࿰…...

韩顺平0基础学java——第39天
p820-841 jdbc和连接池 1.JDBC为访问不同的数据库提供了统一的接口,为使用者屏蔽了细节问题。 2.Java程序员使用JDBC,可以连接任何提供了JDBC驱动程序的数据库系统,从而完成对数据库的各种操作。 3.jdbc原理图 JDBC带来的好处 2.JDBC带来的…...

Linux文件恢复
很麻烦 一般还是小心最好 特别恢复的时候 可能不能选择某个文件夹去扫描恢复 所以 删除的时候 用rm -i代替rm 一定小心 以及 探索下linux的垃圾箱机制 注意 一定要恢复到不同文件夹 省的出问题 法1 系统自带工具 debugfs 但是好像不能重启? testdisk 1、安装 …...

MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建
华为云FlexusDeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色,华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型,能助力我们轻松驾驭 DeepSeek-V3/R1,本文中将分享如何…...

蓝桥杯3498 01串的熵
问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798, 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...

基于TurtleBot3在Gazebo地图实现机器人远程控制
1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...
Java求职者面试指南:计算机基础与源码原理深度解析
Java求职者面试指南:计算机基础与源码原理深度解析 第一轮提问:基础概念问题 1. 请解释什么是进程和线程的区别? 面试官:进程是程序的一次执行过程,是系统进行资源分配和调度的基本单位;而线程是进程中的…...
4. TypeScript 类型推断与类型组合
一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式,自动确定它们的类型。 这一特性减少了显式类型注解的需要,在保持类型安全的同时简化了代码。通过分析上下文和初始值,TypeSc…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...

毫米波雷达基础理论(3D+4D)
3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文: 一文入门汽车毫米波雷达基本原理 :https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...