当前位置: 首页 > news >正文

[MIT6.5840]MapReduce

MapReduce

Lab 地址

https://pdos.csail.mit.edu/6.824/labs/lab-mr.html

论文地址

https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf

工作原理

简单来讲,MapReduce是一种分布式框架,可以用来处理大规模数据。该框架抽象了两个接口,分别是MapReduce函数:
在这里插入图片描述

凡是符合这个模式的算法都可以使用该框架来实现并行化,执行流程如下图所示。

在这里插入图片描述

整个框架分为Master和Worker,Master负责分配mapreduce任务,Worker负责向Master申请任务并执行。执行流程如下:

Map阶段:

  • 输入是大文件分割后的一组小文件,通常大小为16~64MB。
  • Worker向Master申请任务,假设得到map任务in0。
  • Worker开始执行map任务,将文件名和文件内容作为参数传入map函数中,得到kv list.
  • 最后Worker将kv list分割成reduceNum份(超参数),要求使得具有相同key的kv对在一份中。可以通过hash值%reduceNum实现分割,然后输出到文件中,下图的0-*

Reduce阶段:

  • 输入当前reduce的序号id,从map阶段的输出中选出*-id的文件,也就是将hash值%reduceNum值相同的kv对取出,这样可以保证具有相同key的kv对只用一次处理。
  • 将所有的kv对根据键值排序,使得相同key的kv对能够连续排列,方便合并。
  • 之后合并相同key的kv对,然后将每个key和其对应的value list输入reduce函数,得到合并的结果,再将其输出到文件中。

在这里插入图片描述

本文介绍了大致思想,详细内容请参考原论文。

代码详解

rpc.go

package mr//
// RPC definitions.
//
// remember to capitalize all names.
//import ("fmt""os""strconv"
)const (MAP    = "MAP"REDUCE = "REDUCE"DONE   = "DONE"
)//
// example to show how to declare the arguments
// and reply for an RPC.
//type ApplyArgs struct {WorkerID     intLastTaskType stringLastTaskID   int
}type ReplyArgs struct {TaskId    intTaskType  stringInputFile stringMapNum    intReduceNum int
}// Add your RPC definitions here.// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func coordinatorSock() string {s := "/var/tmp/5840-mr-"s += strconv.Itoa(os.Getuid())return s
}
// 构造文件名
func tmpMapResult(workerID int, taskID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-%d-%d", workerID, taskID, reduceId)
}func finalMapResult(taskID int, reduceID int) string {return fmt.Sprintf("mr-%d-%d", taskID, reduceID)
}func tmpReduceResult(workerID int, reduceId int) string {return fmt.Sprintf("tmp-worker-%d-out-%d", workerID, reduceId)
}func finalReduceResult(reduceID int) string {return fmt.Sprintf("mr-out-%d", reduceID)
}

worker.go

package mrimport ("fmt""hash/fnv""io""log""net/rpc""os""sort""strings"
)// Map functions return a slice of KeyValue.
type KeyValue struct {Key   stringValue string
}// for sorting by key.
type ByKey []KeyValue// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
func ihash(key string) int {h := fnv.New32a()h.Write([]byte(key))return int(h.Sum32() & 0x7fffffff)
}// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {// Your worker implementation here.id := os.Getegid()// log.Printf("worker %d start working", id)lastTaskId := -1lastTaskType := ""loop:for {args := ApplyArgs{WorkerID:     id,LastTaskType: lastTaskType,LastTaskID:   lastTaskId,}reply := ReplyArgs{}ok := call("Coordinator.ApplyForTask", &args, &reply)if !ok {fmt.Printf("call failed!\n")continue}// log.Printf("reply: %v", reply)lastTaskId = reply.TaskIdlastTaskType = reply.TaskTypeswitch reply.TaskType {case "":// log.Println("finished")break loopcase MAP:// log.Printf("worker %d get map task %d", id, reply.TaskId)doMapTask(id, reply.TaskId, reply.InputFile, reply.ReduceNum, mapf)case REDUCE:// log.Printf("worker %d get reduce task %d", id, reply.TaskId)doReduceTask(id, reply.TaskId, reply.MapNum, reducef)}}// uncomment to send the Example RPC to the coordinator.// CallExample()}// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
func call(rpcname string, args interface{}, reply interface{}) bool {// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")sockname := coordinatorSock()c, err := rpc.DialHTTP("unix", sockname)if err != nil {log.Fatal("dialing:", err)}defer c.Close()err = c.Call(rpcname, args, reply)if err == nil {return true}fmt.Println(err)return false
}func doMapTask(id int, taskId int, filename string, reduceNum int, mapf func(string, string) []KeyValue) {file, err := os.Open(filename)if err != nil {log.Fatalf("%s 文件打开失败! ", filename)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("%s 文件内容读取失败! ", filename)}file.Close()kvList := mapf(filename, string(content)) // kv listhashedKvList := make(map[int]ByKey)for _, kv := range kvList {hashedKey := ihash(kv.Key) % reduceNumhashedKvList[hashedKey] = append(hashedKvList[hashedKey], kv)}for i := 0; i < reduceNum; i++ {outFile, err := os.Create(tmpMapResult(id, taskId, i))if err != nil {log.Fatalf("can not create output file: %e", err)return}for _, kv := range hashedKvList[i] {fmt.Fprintf(outFile, "%v\t%v\n", kv.Key, kv.Value)}outFile.Close()}// log.Printf("worker %d finished map task\n", id)
}func doReduceTask(id int, taskId int, mapNum int, reducef func(string, []string) string) {var kvList ByKeyvar lines []stringfor i := 0; i < mapNum; i++ {mapOutFile := finalMapResult(i, taskId)file, err := os.Open(mapOutFile)if err != nil {log.Fatalf("can not open output file %s: %e", mapOutFile, err)return}content, err := io.ReadAll(file)if err != nil {log.Fatalf("file read failed %s: %e", mapOutFile, err)return}lines = append(lines, strings.Split(string(content), "\n")...)}for _, line := range lines {if strings.TrimSpace(line) == "" {continue}split := strings.Split(line, "\t")kvList = append(kvList, KeyValue{Key: split[0], Value: split[1]})}sort.Sort(kvList)outputFile := tmpReduceResult(id, taskId)file, err := os.Create(outputFile)if err != nil {log.Fatalf("can not create output file: %e", err)return}for i := 0; i < len(kvList); {j := i + 1key := kvList[i].Keyvar values []stringfor j < len(kvList) && kvList[j].Key == key {j++}for k := i; k < j; k++ {values = append(values, kvList[k].Value)}res := reducef(key, values)fmt.Fprintf(file, "%v %v\n", key, res)i = j}file.Close()// log.Printf("worker %d finished reduce task", id)
}

coordinator.go

package mrimport ("fmt""log""math""net""net/http""net/rpc""os""sync""time"
)type Task struct {id        intinputFile stringworker    inttaskType  stringdeadLine  time.Time
}type Coordinator struct {// Your definitions here.mtx        sync.MutexinputFile  []stringreduceNum  intmapNum     inttaskStates map[string]TasktodoList   chan Taskstage      string
}// Your code here -- RPC handlers for the worker to call.// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
func (c *Coordinator) ApplyForTask(args *ApplyArgs, reply *ReplyArgs) error {// process the last taskif args.LastTaskID != -1 {taskId := createTaskId(args.LastTaskID, args.LastTaskType)c.mtx.Lock()if task, ok := c.taskStates[taskId]; ok && task.worker != -1 { // 排除过期任务// log.Printf("worker %d finish task %d", args.WorkerID, task.id)if args.LastTaskType == MAP {for i := 0; i < c.reduceNum; i++ {err := os.Rename(tmpMapResult(task.worker, task.id, i), finalMapResult(task.id, i))if err != nil {log.Fatalf("can not rename %s: %e", tmpMapResult(task.worker, task.id, i), err)}}} else if args.LastTaskType == REDUCE {err := os.Rename(tmpReduceResult(task.worker, task.id), finalReduceResult(task.id))if err != nil {log.Fatalf("can not rename %s: %e", tmpReduceResult(task.worker, task.id), err)}}delete(c.taskStates, taskId)if len(c.taskStates) == 0 {c.shift()}}c.mtx.Unlock()}// assign the new tasktask, ok := <-c.todoListif !ok {return nil}reply.InputFile = task.inputFilereply.MapNum = c.mapNumreply.ReduceNum = c.reduceNumreply.TaskId = task.idreply.TaskType = task.taskTypetask.worker = args.WorkerIDtask.deadLine = time.Now().Add(10 * time.Second)// log.Printf("assign %s task %d to worker %d", task.taskType, task.id, args.WorkerID)c.mtx.Lock()c.taskStates[createTaskId(task.id, task.taskType)] = taskc.mtx.Unlock()return nil
}// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}
// 改变当前的状态
func (c *Coordinator) shift() {// 加锁状态if c.stage == MAP {// log.Printf("Map Task finished")c.stage = REDUCE// 分配reduce taskfor i := 0; i < c.reduceNum; i++ {task := Task{id:       i,worker:   -1,taskType: REDUCE,}c.todoList <- taskc.taskStates[createTaskId(i, REDUCE)] = task}} else if c.stage == REDUCE {close(c.todoList)c.stage = DONE}
}// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
func (c *Coordinator) Done() bool {// Your code here.c.mtx.Lock()defer c.mtx.Unlock()return c.stage == DONE
}// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{mtx:        sync.Mutex{},inputFile:  files,reduceNum:  nReduce,mapNum:     len(files),taskStates: make(map[string]Task),todoList:   make(chan Task, int(math.Max(float64(nReduce), float64(len(files))))),stage:      MAP,}for i, file := range files {task := Task{id:        i,inputFile: file,worker:    -1,taskType:  MAP,}c.todoList <- taskc.taskStates[createTaskId(i, MAP)] = task}// 回收任务go c.collectTask()c.server()return &c
}func createTaskId(id int, taskType string) string {return fmt.Sprintf("%d-%s", id, taskType)
}
// worker执行过期后回收任务
func (c *Coordinator) collectTask() {for {time.Sleep(500 * time.Millisecond)c.mtx.Lock()if c.stage == DONE {c.mtx.Unlock()return}for _, task := range c.taskStates {if task.worker != -1 && time.Now().After(task.deadLine) {// task is expiredtask.worker = -1// log.Printf("task %d is expired", task.id)c.todoList <- task}}c.mtx.Unlock()}
}

运行说明

mrcoordinator

cd src/main/
go build -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run mrcoordinator.go pg-*.txt

mrworker

cd src/main/
go run mrworker.go wc.so

测试结果

bash test-mr.sh

在这里插入图片描述

MIT6.5840 课程Lab完整项目

https://github.com/Joker0x00/MIT-6.5840-Lab/

相关文章:

[MIT6.5840]MapReduce

MapReduce Lab 地址 https://pdos.csail.mit.edu/6.824/labs/lab-mr.html 论文地址 https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf 工作原理 简单来讲&#xff0c;MapReduce是一种分布式框架&#xff0c;可以用来处理…...

【系统架构设计师】计算机组成与体系结构 ⑯ ( 奇偶校验码 | CRC 循环冗余码 | 海明码 | 模 2 除法 )

文章目录 一、校验码1、校验码由来2、奇偶校验码3、CRC 循环冗余码 ( 重点考点 )4、海明码校验 ( 软考不经常考到 ) 二、CRC 循环冗余码 ( 重点考点 )1、模 2 除法概念2、模 2 除法步骤3、模 2 除法示例4、CRC 循环冗余码示例 15、CRC 循环冗余码示例 2 参考之前的博客 : 【计…...

springboot,service 层统一异常抛出时,throws Exception写在接口上还是实现类上

springboot,service 层统一异常抛出时&#xff0c;throws Exception写在实现接口上&#xff0c;不是直接写在实现类上...

深度学习高效性网络

为了减轻Transformer笨重的计算成本&#xff0c;一系列工作重点开发了高效的Vision Transformer&#xff0c;如Swin Transformer、PVT、Twins、CoAtNet和MobileViT。 1、字节TRT-ViT 兼具CNN的速度、Transformer精度的模型 TRT-ViT&#xff08;Transformer-based Vision Tra…...

PyQt ERROR:ModuleNotFoundError: No module named ‘matplotlib‘

Solution:打开cmd输入指令下载malplotlib pip install matplotlib...

Flutter Geolocator插件使用指南:获取和监听地理位置

Flutter Geolocator插件使用指南&#xff1a;获取和监听地理位置 简介 geolocator 是一个Flutter插件&#xff0c;提供了一个简单易用的API来访问特定平台的地理位置服务。它支持获取设备的最后已知位置、当前位置、连续位置更新、检查设备上是否启用了位置服务&#xff0c;以…...

网站基本布局CSS

代码 <!DOCTYPE html> <html> <head><meta charset"utf-8"><meta name"viewport" content"widthdevice-width, initial-scale1"><title></title><style type"text/css">body {margi…...

ssm框架整合,异常处理器和拦截器(纯注解开发)

目录 ssm框架整合 第一步&#xff1a;指定打包方式和导入所需要的依赖 打包方法&#xff1a;war springMVC所需依赖 解析json依赖 mybatis依赖 数据库驱动依赖 druid数据源依赖 junit依赖 第二步&#xff1a;导入tomcat插件 第三步&#xff1a;编写配置类 SpringCon…...

古籍双层PDF制作教程:保姆级古籍数字化教程

在智慧古籍数字化项目中&#xff0c;很多图书馆要求将古籍导出为双层PDF&#xff0c;并且确保输出双层PDF底层文本与上层图片偏移量控制在1毫米以内。那么本教程带你使用古籍数字化平台&#xff0c;3分钟把一个古籍书籍转化为双侧PDF。 第1步&#xff1a;上传古籍 点批量上传…...

Git 删除 远端的分支

要删除 Git 远端的分支&#xff08;例如&#xff1a; V3.2.1.13&#xff09;&#xff1a; 可以执行以下命令 git push origin --delete V3.2.1.13这条命令会向远端的仓库删除名为 V3.2.1.13 的分支。如果这个分支只在远端仓库存在而没有对应的本地分支&#xff0c;那么删除后这…...

PrgogressBar实现原理分析

ProgressBar 是 Android 中用于显示进度条的控件&#xff0c;它可以用来表示任务的完成程度或者加载进度等信息。ProgressBar 有两种主要类型&#xff1a;一种是确定性的&#xff08;determinate&#xff09;&#xff0c;另一种是不确定性的&#xff08;indeterminate&#xff…...

【HarmonyOS】HarmonyOS NEXT学习日记:七、页面与组件的生命周期

【HarmonyOS】HarmonyOS NEXT学习日记&#xff1a;七、页面与组件的生命周期 页面和组件 组件&#xff1a;用Component装饰的代码称为自定义组件页面&#xff1a;Entry装饰的组件即页面的根节点 组件生命周期 aboutToAppear&#xff1a;在创建自定义组件的新实例后&#xf…...

【iOS】——Block循环引用

循环引用原因 如果在Block中使用附有_ _strong修饰符的对象类型自动变量&#xff0c;那么当Block从栈复制到堆时&#xff0c;该对象为Block所持有&#xff0c;这样容易引起循环引用。 HPPerson *person [[HPPerson alloc] init];person.block ^{NSLog("person.age--- …...

shell脚本自动化安装启动各种服务

1、自动化配置dns服务器 A主机&#xff1a;vim dns.sh #!/bin/bash# 自动化部署dns# 1、下载bind# 2、修改配置文件# vim /etc/named.conf # listen-on port 53 { 127.0.0.1;any; }; 修改&#xff08;定位替换&#xff09;# allow-query { localhost;any; }; 修改&am…...

Python - 开源库 ReportLab 库合并 CVS 和图像生成 PDF 文档

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/140281680 免责声明&#xff1a;本文来源于个人知识与公开资料&#xff0c;仅用于学术交流&#xff0c;欢迎讨论&#xff0c;不支持转载。 Report…...

Java编写SIP协议

1、编写Server代码 package com.genersoft.iot.vmp.sip; import javax.sip.*; import javax.sip.message.*; import javax.sip.header.*; import java.util.*;public class SimpleSipServer implements SipListener {private SipFactory sipFactory;private SipStack sipStack…...

大型语言模型LLM的核心概念

本文主要介绍了目前主流的&#xff0c;几个大型语言模型LLM的整个训练过程 通常分为下面的几个阶段 1. 预训练 采用互联网上的大量数据进行训练&#xff0c;这一阶段大模型LLM的主体已定&#xff0c;找出共性并且压缩成一个模型。模型的参数量不是越大越好&#xff0c;遵循合理…...

软件测试---网络基础、HTTP

一、网络基础 &#xff08;1&#xff09;Web和网络知识 网络基础TCP/IP 使用HTTP协议访问Web WWW万维网的诞生 WWW万维网的构成 &#xff08;2&#xff09;IP协议 &#xff08;3&#xff09;可靠传输的TCP和三次握手策略 &#xff08;4&#xff09;域名解析服务DNS &#xff0…...

韩顺平0基础学java——第39天

p820-841 jdbc和连接池 1.JDBC为访问不同的数据库提供了统一的接口&#xff0c;为使用者屏蔽了细节问题。 2.Java程序员使用JDBC&#xff0c;可以连接任何提供了JDBC驱动程序的数据库系统&#xff0c;从而完成对数据库的各种操作。 3.jdbc原理图 JDBC带来的好处 2.JDBC带来的…...

Linux文件恢复

很麻烦 一般还是小心最好 特别恢复的时候 可能不能选择某个文件夹去扫描恢复 所以 删除的时候 用rm -i代替rm 一定小心 以及 探索下linux的垃圾箱机制 注意 一定要恢复到不同文件夹 省的出问题 法1 系统自带工具 debugfs 但是好像不能重启&#xff1f; testdisk 1、安装 …...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

Golang dig框架与GraphQL的完美结合

将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用&#xff0c;可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器&#xff0c;能够帮助开发者更好地管理复杂的依赖关系&#xff0c;而 GraphQL 则是一种用于 API 的查询语言&#xff0c;能够提…...

2.Vue编写一个app

1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

全球首个30米分辨率湿地数据集(2000—2022)

数据简介 今天我们分享的数据是全球30米分辨率湿地数据集&#xff0c;包含8种湿地亚类&#xff0c;该数据以0.5X0.5的瓦片存储&#xff0c;我们整理了所有属于中国的瓦片名称与其对应省份&#xff0c;方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

k8s业务程序联调工具-KtConnect

概述 原理 工具作用是建立了一个从本地到集群的单向VPN&#xff0c;根据VPN原理&#xff0c;打通两个内网必然需要借助一个公共中继节点&#xff0c;ktconnect工具巧妙的利用k8s原生的portforward能力&#xff0c;简化了建立连接的过程&#xff0c;apiserver间接起到了中继节…...

QT3D学习笔记——圆台、圆锥

类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体&#xff08;对象或容器&#xff09;QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质&#xff08;定义颜色、反光等&#xff09;QFirstPersonC…...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...

淘宝扭蛋机小程序系统开发:打造互动性强的购物平台

淘宝扭蛋机小程序系统的开发&#xff0c;旨在打造一个互动性强的购物平台&#xff0c;让用户在购物的同时&#xff0c;能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机&#xff0c;实现旋转、抽拉等动作&#xff0c;增…...