当前位置: 首页 > news >正文

[MIT6.5840]MapReduce

MapReduce

Lab 地址

https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

论文地址

https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf

工作原理

简单来讲,MapReduce是一种分布式框架,可以用来处理大规模数据。该框架抽象了两个接口,分别是MapReduce函数:
在这里插入图片描述

凡是符合这个模式的算法都可以使用该框架来实现并行化,执行流程如下图所示。

在这里插入图片描述

整个框架分为Master和Worker,Master负责分配mapreduce任务,Worker负责向Master申请任务并执行。执行流程如下:

Map阶段:

  • 输入是大文件分割后的一组小文件,通常大小为16~64MB。
  • Worker向Master申请任务,假设得到map任务in0。
  • Worker开始执行map任务,将文件名和文件内容作为参数传入map函数中,得到kv list.
  • 最后Worker将kv list分割成reduceNum份(超参数),要求使得具有相同key的kv对在一份中。可以通过hash值%reduceNum实现分割,然后输出到文件中,下图的0-*

Reduce阶段:

  • 输入当前reduce的序号id,从map阶段的输出中选出*-id的文件,也就是将hash值%reduceNum值相同的kv对取出,这样可以保证具有相同key的kv对只用一次处理。
  • 将所有的kv对根据键值排序,使得相同key的kv对能够连续排列,方便合并。
  • 之后合并相同key的kv对,然后将每个key和其对应的value list输入reduce函数,得到合并的结果,再将其输出到文件中。

在这里插入图片描述

本文介绍了大致思想,详细内容请参考原论文。

代码详解

rpc.go

package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import ("fmt""os""strconv"
)const (MAP    = "MAP"REDUCE = "REDUCE"DONE   = "DONE"
)//
// example to show how to declare the arguments
// and reply for an RPC.
//type ApplyArgs struct {WorkerID     intLastTaskType stringLastTaskID   int
}type ReplyArgs struct {TaskId    intTaskType  stringInputFile stringMapNum    intReduceNum int
}// Add your RPC definitions here.// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/5840-mr-"s += strconv.Itoa(os.Getuid())return s
}
// 构造文件名
func tmpMapResult(workerID int, taskID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-%d-%d", workerID, taskID, reduceId)
}func finalMapResult(taskID int, reduceID int) string {return fmt.Sprintf("mr-%d-%d", taskID, reduceID)
}func tmpReduceResult(workerID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-out-%d", workerID, reduceId)
}func finalReduceResult(reduceID int) string {return fmt.Sprintf("mr-out-%d", reduceID)
}

worker.go

package mrimport ("fmt""hash/fnv""io""log""net/rpc""os""sort""strings"
)// Map functions return a slice of KeyValue.
type KeyValue struct {Key   stringValue string
}// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {h := fnv.New32a()h.Write([]byte(key))return int(h.Sum32() & 0x7fffffff)
}// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.id := os.Getegid()// log.Printf("worker %d start working", id)lastTaskId := -1lastTaskType := ""loop:for {args := ApplyArgs{WorkerID:     id,LastTaskType: lastTaskType,LastTaskID:   lastTaskId,}reply := ReplyArgs{}ok := call("Coordinator.ApplyForTask", &args, &reply)if !ok {fmt.Printf("call failed!\n")continue}// log.Printf("reply: %v", reply)lastTaskId = reply.TaskIdlastTaskType = reply.TaskTypeswitch reply.TaskType {case "":// log.Println("finished")break loopcase MAP:// log.Printf("worker %d get map task %d", id, reply.TaskId)doMapTask(id, reply.TaskId, reply.InputFile, reply.ReduceNum, mapf)case REDUCE:// log.Printf("worker %d get reduce task %d", id, reply.TaskId)doReduceTask(id, reply.TaskId, reply.MapNum, reducef)}}// uncomment to send the Example RPC to the coordinator.// CallExample()}// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}func doMapTask(id int, taskId int, filename string, reduceNum int, mapf func(string, string) []KeyValue) {file, err := os.Open(filename)if err != nil {log.Fatalf("%s 文件打开失败! ", filename)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("%s 文件内容读取失败! ", filename)}file.Close()kvList := mapf(filename, string(content)) // kv listhashedKvList := make(map[int]ByKey)for _, kv := range kvList {hashedKey := ihash(kv.Key) % reduceNumhashedKvList[hashedKey] = append(hashedKvList[hashedKey], kv)}for i := 0; i < reduceNum; i++ {outFile, err := os.Create(tmpMapResult(id, taskId, i))if err != nil {log.Fatalf("can not create output file: %e", err)return}for _, kv := range hashedKvList[i] {fmt.Fprintf(outFile, "%v\t%v\n", kv.Key, kv.Value)}outFile.Close()}// log.Printf("worker %d finished map task\n", id)
}func doReduceTask(id int, taskId int, mapNum int, reducef func(string, []string) string) {var kvList ByKeyvar lines []stringfor i := 0; i < mapNum; i++ {mapOutFile := finalMapResult(i, taskId)file, err := os.Open(mapOutFile)if err != nil {log.Fatalf("can not open output file %s: %e", mapOutFile, err)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("file read failed %s: %e", mapOutFile, err)return}lines = append(lines, strings.Split(string(content), "\n")...)}for _, line := range lines {if strings.TrimSpace(line) == "" {continue}split := strings.Split(line, "\t")kvList = append(kvList, KeyValue{Key: split[0], Value: split[1]})}sort.Sort(kvList)outputFile := tmpReduceResult(id, taskId)file, err := os.Create(outputFile)if err != nil {log.Fatalf("can not create output file: %e", err)return}for i := 0; i < len(kvList); {j := i + 1key := kvList[i].Keyvar values []stringfor j < len(kvList) && kvList[j].Key == key {j++}for k := i; k < j; k++ {values = append(values, kvList[k].Value)}res := reducef(key, values)fmt.Fprintf(file, "%v %v\n", key, res)i = j}file.Close()// log.Printf("worker %d finished reduce task", id)
}

coordinator.go

package mrimport ("fmt""log""math""net""net/http""net/rpc""os""sync""time"
)type Task struct {id        intinputFile stringworker    inttaskType  stringdeadLine  time.Time
}type Coordinator struct {// Your definitions here.mtx        sync.MutexinputFile  []stringreduceNum  intmapNum     inttaskStates map[string]TasktodoList   chan Taskstage      string
}// Your code here -- RPC handlers for the worker to call.// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) ApplyForTask(args *ApplyArgs, reply *ReplyArgs) error {// process the last taskif args.LastTaskID != -1 {taskId := createTaskId(args.LastTaskID, args.LastTaskType)c.mtx.Lock()if task, ok := c.taskStates[taskId]; ok && task.worker != -1 { // 排除过期任务// log.Printf("worker %d finish task %d", args.WorkerID, task.id)if args.LastTaskType == MAP {for i := 0; i < c.reduceNum; i++ {err := os.Rename(tmpMapResult(task.worker, task.id, i), finalMapResult(task.id, i))if err != nil {log.Fatalf("can not rename %s: %e", tmpMapResult(task.worker, task.id, i), err)}}} else if args.LastTaskType == REDUCE {err := os.Rename(tmpReduceResult(task.worker, task.id), finalReduceResult(task.id))if err != nil {log.Fatalf("can not rename %s: %e", tmpReduceResult(task.worker, task.id), err)}}delete(c.taskStates, taskId)if len(c.taskStates) == 0 {c.shift()}}c.mtx.Unlock()}// assign the new tasktask, ok := <-c.todoListif !ok {return nil}reply.InputFile = task.inputFilereply.MapNum = c.mapNumreply.ReduceNum = c.reduceNumreply.TaskId = task.idreply.TaskType = task.taskTypetask.worker = args.WorkerIDtask.deadLine = time.Now().Add(10 * time.Second)// log.Printf("assign %s task %d to worker %d", task.taskType, task.id, args.WorkerID)c.mtx.Lock()c.taskStates[createTaskId(task.id, task.taskType)] = taskc.mtx.Unlock()return nil
}// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}
// 改变当前的状态
func (c *Coordinator) shift() {// 加锁状态if c.stage == MAP {// log.Printf("Map Task finished")c.stage = REDUCE// 分配reduce taskfor i := 0; i < c.reduceNum; i++ {task := Task{id:       i,worker:   -1,taskType: REDUCE,}c.todoList <- taskc.taskStates[createTaskId(i, REDUCE)] = task}} else if c.stage == REDUCE {close(c.todoList)c.stage = DONE}
}// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {// Your code here.c.mtx.Lock()defer c.mtx.Unlock()return c.stage == DONE
}// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{mtx:        sync.Mutex{},inputFile:  files,reduceNum:  nReduce,mapNum:     len(files),taskStates: make(map[string]Task),todoList:   make(chan Task, int(math.Max(float64(nReduce), float64(len(files))))),stage:      MAP,}for i, file := range files {task := Task{id:        i,inputFile: file,worker:    -1,taskType:  MAP,}c.todoList <- taskc.taskStates[createTaskId(i, MAP)] = task}// 回收任务go c.collectTask()c.server()return &c
}func createTaskId(id int, taskType string) string {return fmt.Sprintf("%d-%s", id, taskType)
}
// worker执行过期后回收任务
func (c *Coordinator) collectTask() {for {time.Sleep(500 * time.Millisecond)c.mtx.Lock()if c.stage == DONE {c.mtx.Unlock()return}for _, task := range c.taskStates {if task.worker != -1 && time.Now().After(task.deadLine) {// task is expiredtask.worker = -1// log.Printf("task %d is expired", task.id)c.todoList <- task}}c.mtx.Unlock()}
}

运行说明

mrcoordinator

cd src/main/
go build -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run mrcoordinator.go pg-*.txt

mrworker

cd src/main/
go run mrworker.go wc.so

测试结果

bash test-mr.sh

在这里插入图片描述

MIT6.5840 课程Lab完整项目

https://github.com/Joker0x00/MIT-6.5840-Lab/

相关文章:

[MIT6.5840]MapReduce

MapReduce Lab 地址 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 论文地址 https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf 工作原理 简单来讲&#xff0c;MapReduce是一种分布式框架&#xff0c;可以用来处理…...

【系统架构设计师】计算机组成与体系结构 ⑯ ( 奇偶校验码 | CRC 循环冗余码 | 海明码 | 模 2 除法 )

文章目录 一、校验码1、校验码由来2、奇偶校验码3、CRC 循环冗余码 ( 重点考点 )4、海明码校验 ( 软考不经常考到 ) 二、CRC 循环冗余码 ( 重点考点 )1、模 2 除法概念2、模 2 除法步骤3、模 2 除法示例4、CRC 循环冗余码示例 15、CRC 循环冗余码示例 2 参考之前的博客 : 【计…...

springboot,service 层统一异常抛出时,throws Exception写在接口上还是实现类上

springboot,service 层统一异常抛出时&#xff0c;throws Exception写在实现接口上&#xff0c;不是直接写在实现类上...

深度学习高效性网络

为了减轻Transformer笨重的计算成本&#xff0c;一系列工作重点开发了高效的Vision Transformer&#xff0c;如Swin Transformer、PVT、Twins、CoAtNet和MobileViT。 1、字节TRT-ViT 兼具CNN的速度、Transformer精度的模型 TRT-ViT&#xff08;Transformer-based Vision Tra…...

PyQt ERROR:ModuleNotFoundError: No module named ‘matplotlib‘

Solution:打开cmd输入指令下载malplotlib pip install matplotlib...

Flutter Geolocator插件使用指南:获取和监听地理位置

Flutter Geolocator插件使用指南&#xff1a;获取和监听地理位置 简介 geolocator 是一个Flutter插件&#xff0c;提供了一个简单易用的API来访问特定平台的地理位置服务。它支持获取设备的最后已知位置、当前位置、连续位置更新、检查设备上是否启用了位置服务&#xff0c;以…...

网站基本布局CSS

代码 <!DOCTYPE html> <html> <head><meta charset"utf-8"><meta name"viewport" content"widthdevice-width, initial-scale1"><title></title><style type"text/css">body {margi…...

ssm框架整合,异常处理器和拦截器(纯注解开发)

目录 ssm框架整合 第一步&#xff1a;指定打包方式和导入所需要的依赖 打包方法&#xff1a;war springMVC所需依赖 解析json依赖 mybatis依赖 数据库驱动依赖 druid数据源依赖 junit依赖 第二步&#xff1a;导入tomcat插件 第三步&#xff1a;编写配置类 SpringCon…...

古籍双层PDF制作教程:保姆级古籍数字化教程

在智慧古籍数字化项目中&#xff0c;很多图书馆要求将古籍导出为双层PDF&#xff0c;并且确保输出双层PDF底层文本与上层图片偏移量控制在1毫米以内。那么本教程带你使用古籍数字化平台&#xff0c;3分钟把一个古籍书籍转化为双侧PDF。 第1步&#xff1a;上传古籍 点批量上传…...

Git 删除 远端的分支

要删除 Git 远端的分支&#xff08;例如&#xff1a; V3.2.1.13&#xff09;&#xff1a; 可以执行以下命令 git push origin --delete V3.2.1.13这条命令会向远端的仓库删除名为 V3.2.1.13 的分支。如果这个分支只在远端仓库存在而没有对应的本地分支&#xff0c;那么删除后这…...

PrgogressBar实现原理分析

ProgressBar 是 Android 中用于显示进度条的控件&#xff0c;它可以用来表示任务的完成程度或者加载进度等信息。ProgressBar 有两种主要类型&#xff1a;一种是确定性的&#xff08;determinate&#xff09;&#xff0c;另一种是不确定性的&#xff08;indeterminate&#xff…...

【HarmonyOS】HarmonyOS NEXT学习日记:七、页面与组件的生命周期

【HarmonyOS】HarmonyOS NEXT学习日记&#xff1a;七、页面与组件的生命周期 页面和组件 组件&#xff1a;用Component装饰的代码称为自定义组件页面&#xff1a;Entry装饰的组件即页面的根节点 组件生命周期 aboutToAppear&#xff1a;在创建自定义组件的新实例后&#xf…...

【iOS】——Block循环引用

循环引用原因 如果在Block中使用附有_ _strong修饰符的对象类型自动变量&#xff0c;那么当Block从栈复制到堆时&#xff0c;该对象为Block所持有&#xff0c;这样容易引起循环引用。 HPPerson *person [[HPPerson alloc] init];person.block ^{NSLog("person.age--- …...

shell脚本自动化安装启动各种服务

1、自动化配置dns服务器 A主机&#xff1a;vim dns.sh #!/bin/bash# 自动化部署dns# 1、下载bind# 2、修改配置文件# vim /etc/named.conf # listen-on port 53 { 127.0.0.1;any; }; 修改&#xff08;定位替换&#xff09;# allow-query { localhost;any; }; 修改&am…...

Python - 开源库 ReportLab 库合并 CVS 和图像生成 PDF 文档

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/140281680 免责声明&#xff1a;本文来源于个人知识与公开资料&#xff0c;仅用于学术交流&#xff0c;欢迎讨论&#xff0c;不支持转载。 Report…...

Java编写SIP协议

1、编写Server代码 package com.genersoft.iot.vmp.sip; import javax.sip.*; import javax.sip.message.*; import javax.sip.header.*; import java.util.*;public class SimpleSipServer implements SipListener {private SipFactory sipFactory;private SipStack sipStack…...

大型语言模型LLM的核心概念

本文主要介绍了目前主流的&#xff0c;几个大型语言模型LLM的整个训练过程 通常分为下面的几个阶段 1. 预训练 采用互联网上的大量数据进行训练&#xff0c;这一阶段大模型LLM的主体已定&#xff0c;找出共性并且压缩成一个模型。模型的参数量不是越大越好&#xff0c;遵循合理…...

软件测试---网络基础、HTTP

一、网络基础 &#xff08;1&#xff09;Web和网络知识 网络基础TCP/IP 使用HTTP协议访问Web WWW万维网的诞生 WWW万维网的构成 &#xff08;2&#xff09;IP协议 &#xff08;3&#xff09;可靠传输的TCP和三次握手策略 &#xff08;4&#xff09;域名解析服务DNS &#xff0…...

韩顺平0基础学java——第39天

p820-841 jdbc和连接池 1.JDBC为访问不同的数据库提供了统一的接口&#xff0c;为使用者屏蔽了细节问题。 2.Java程序员使用JDBC&#xff0c;可以连接任何提供了JDBC驱动程序的数据库系统&#xff0c;从而完成对数据库的各种操作。 3.jdbc原理图 JDBC带来的好处 2.JDBC带来的…...

Linux文件恢复

很麻烦 一般还是小心最好 特别恢复的时候 可能不能选择某个文件夹去扫描恢复 所以 删除的时候 用rm -i代替rm 一定小心 以及 探索下linux的垃圾箱机制 注意 一定要恢复到不同文件夹 省的出问题 法1 系统自带工具 debugfs 但是好像不能重启&#xff1f; testdisk 1、安装 …...

MFC内存泄露

1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建

华为云FlexusDeepSeek征文&#xff5c;DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色&#xff0c;华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型&#xff0c;能助力我们轻松驾驭 DeepSeek-V3/R1&#xff0c;本文中将分享如何…...

蓝桥杯3498 01串的熵

问题描述 对于一个长度为 23333333的 01 串, 如果其信息熵为 11625907.5798&#xff0c; 且 0 出现次数比 1 少, 那么这个 01 串中 0 出现了多少次? #include<iostream> #include<cmath> using namespace std;int n 23333333;int main() {//枚举 0 出现的次数//因…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题

在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件&#xff0c;这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下&#xff0c;实现高效测试与快速迭代&#xff1f;这一命题正考验着…...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

Java求职者面试指南:计算机基础与源码原理深度解析

Java求职者面试指南&#xff1a;计算机基础与源码原理深度解析 第一轮提问&#xff1a;基础概念问题 1. 请解释什么是进程和线程的区别&#xff1f; 面试官&#xff1a;进程是程序的一次执行过程&#xff0c;是系统进行资源分配和调度的基本单位&#xff1b;而线程是进程中的…...

4. TypeScript 类型推断与类型组合

一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式&#xff0c;自动确定它们的类型。 这一特性减少了显式类型注解的需要&#xff0c;在保持类型安全的同时简化了代码。通过分析上下文和初始值&#xff0c;TypeSc…...

C语言中提供的第三方库之哈希表实现

一. 简介 前面一篇文章简单学习了C语言中第三方库&#xff08;uthash库&#xff09;提供对哈希表的操作&#xff0c;文章如下&#xff1a; C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...

毫米波雷达基础理论(3D+4D)

3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文&#xff1a; 一文入门汽车毫米波雷达基本原理 &#xff1a;https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...