MapReduce
我将玫瑰藏于身后,风起花落,从此鲜花赠自己,纵马踏花向自由。
我想,如果可以的话,我想成为神级一般的人物~
前言:由于最近找工作战况严峻,不得以只能再掏个去年的项目,顺便温故一下之前学习分布式知识,commit就是最好的证明 ~ PS:希望写这个Lab的不要太功利,bug才是真正提高自己水平的~
那段时光也是最能静下心来,最美好的时光~
前置知识
MapReduce学习请看这篇:MapReduce: Simplified Data Processing on Large Clusters
go-rpc的使用移步这篇:【Golang | RPC】使用包net/rpc实现基于http协议的RPC服务
实验内容
目标
词频统计
描述
实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker请求任务,进行运算,写出结果到文件。coordinator需要关心worker的任务是否完成,在超时情况下将任务重新分配给别的worker。
规则
- map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()
- worker需要将第X个reduce task结果放到mr-out-X中。
- mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了
- main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。
- 任务都完成后,worker也得关闭
提示
- 一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。
- map reduce函数都是通过go插件装载 (.so文件)
- mr/ 文件变了就需要重新build
- 都在一个文件系统,worker天然实现文件共享,先凑合着起步
- 中间文件命名 mr-X-Y X是map任务号,y是reduce任务号
- worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。
- worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号
- mrsequential.go 代码可以借鉴
- coordinator里面的共享数据需要加锁
- worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下
- 如果任务重试机制,记得不要生成重复任务
- mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。
- 确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名
思路说明
摘自:https://juejin.cn/post/7224476641623556154
客户端通过运行一次mrmaster.go启动一个master进程,启动时,会进行以下操作:
- 创建一个
Master结构体 - 根据命令行传入的文件生成所有
map任务并存放在Master的MapChannel中 - 开启一个新的线程不断循环判断有没有过期(分配出去但十秒内未完成)的
map和reduce任务 - 开始监听来自
worker的RPC调用
客户端通过运行多次mrworker.go启动多个worker进程,每个worker启动时,都会进行以下操作:
- 创建一个
AWorker结构体 - 只要
master进程没有通知worker进程所有任务都已经完成,worker进程就一直向master进程要任务
master接收到worker的要任务请求后,根据所有任务的完成情况给worker分配任务,分配规则是:所有map任务全部完成后才可以去分配reduce任务
当worker要到任务后,worker会首先判断这个任务是map任务还是reduce任务,并根据任务类型执行不同的逻辑
- 如果是
map任务,会执行doMap逻辑,全部执行完后会在/var/tmp目录下生成文件名为mr-X-Y的临时文件,其中X是当前mapWorker的ID,Y是通过ihash方法计算出来的 - 如果是
reduce任务,会执行doReduce逻辑,worker会从src/main中读取所有mr-*-Y文件,其中Y是当前reduceWorker的reduce任务的编号,最终会在/var/tmp目录下生成文件名为mr-out-Y的临时文件
worker执行完自己的任务后,会执行mapTaskDone或reduceTaskDone方法,告诉master自己的任务完成了,通知完之后会立马继续向master要新的任务
master在接收到worker完成任务的通知后,会先判断这个worker完成这个任务有没有超时,具体就是判断这个任务的状态是不是Running,如果是就没有超时;否则,如果状态是Ready,就说明该任务已经被master中的循环检测任务过期的线程判定为过期并设为Ready状态;如果状态是Finished,说明这个任务被判定为超时后又被分配给其他worker并被那个worker按时完成
如果master判断worker在规定时间内完成了任务,则:
worker执行的是map任务:master会调用generateMapFile方法,将worker在/var/tmp中生成的临时文件mr-X-Y复制到src/main中,作为正式的该map任务完成后产生的中间文件,表示master接受了该worker的成果worker执行的是reduce任务:master会调用generateReduceFile方法,将worker在/var/tmp中生成的临时文件mr-out-Y复制到src/main中,作为正式的该reduce任务完成后产生的最终文件,表示master接受了该worker的成果
当master的MapChannel中存储的所有任务都被完成后,master会将任务分配阶段由MapPhase调整为ReducePhase,在这之后,master就会给来请求任务的worker分配reduce任务
当master的ReduceChannel中存储的所有任务都被完成后,表示所有的map任务和reduce任务都已经完成,此时master会调用finish方法,准备终止master进程,并通知所有的master任务完成了,终止所有worker
关键代码
master程序
mrcoordinator.go
命令:go run mrcoordinator.go pg*.txt
代码会调用:
1 | m := mr.MakeCoordinator(os.Args[1:], 10) |
MakeCoordinator 函数用于初始化一个 Coordinator 实例,包括初始化 Map 任务和 Reduce 任务的状态映射表,并启动 Coordinator 的服务。
1 | func MakeCoordinator(files []string, nReduce int) *Coordinator { |
其中,Coordinator的数据结构如下:
1 | type Coordinator struct { |
接下来开启监听
1 | func (c *Coordinator) server() { |
Map阶段向master发送请求
【这里不给出代码啦,希望看到这篇博客的小伙伴自己实现下,不要太功利了~】算彩蛋吗~
SuportMapTask函数
SuportMapTask 方法是 Coordinator 类的一个方法,用于处理 Worker 请求 Map 任务的逻辑。以下是对该方法的详细解释和总结:
- 加锁:
- 使用互斥锁
Lock确保并发安全,防止多个 Worker 同时请求任务时发生竞态条件。
- 使用互斥锁
- 初始化回复:
- 初始化回复的标志位
Flag为false,表示没有找到可用的 Map 任务。 - 设置回复中的
NumReduce字段为 Coordinator 的ReduceNum,表示 Reduce 任务的数量。
- 初始化回复的标志位
- 遍历 Map 任务:
- 遍历所有 Map 任务,检查是否有未完成的任务(状态为
Undo)。 - 如果找到一个未完成的 Map 任务:
- 设置回复的
Mapfilename为该任务对应的文件名。 - 设置回复的
Flag为true,表示找到了可用的 Map 任务。 - 设置回复的
MapId为该任务的 ID。 - 将该任务的状态设置为
Doing,表示任务正在执行。 - 打印日志,记录 Worker 请求到的任务 ID。
- 启动一个协程,调用
CheckTimeout方法检查任务是否超时。 - 跳出循环,返回任务。
- 设置回复的
- 遍历所有 Map 任务,检查是否有未完成的任务(状态为
- 返回结果:
- 如果没有找到未完成的 Map 任务,回复的
Flag保持为false,表示没有可用的 Map 任务。
- 如果没有找到未完成的 Map 任务,回复的
MasterCheckMapFinish函数
Reduce阶段向master发送的请求
这个·和上面这个思路一致。
处理早退的Casew
CheckTimeout 方法是 Coordinator 类的一个方法,用于检查任务是否超时。以下是该方法的详细流程总结:
- 等待超时时间:
- 方法首先等待指定的超时时间(
timeout秒)。
- 方法首先等待指定的超时时间(
- 加锁:
- 使用互斥锁
Lock确保并发安全,防止多个协程同时修改任务状态时发生竞态条件。
- 使用互斥锁
- 检查任务类型:
- 根据传入的
TaskType参数判断任务类型是Map还是Reduce。
- 根据传入的
- 检查任务状态:
- 如果任务类型是
Map:- 检查对应
Map任务的状态是否为Finish。 - 如果任务状态不是
Finish,则将其状态重置为Undo,表示任务超时未完成。
- 检查对应
- 如果任务类型是
Reduce:- 检查对应
Reduce任务的状态是否为Finish。 - 如果任务状态不是
Finish,则将其状态重置为Undo,表示任务超时未完成。
- 检查对应
- 如果任务类型是
- 解锁:
- 释放互斥锁
Lock,允许其他协程访问任务状态。
- 释放互斥锁
worker程序
原先框架中给出了RPC的示例,照着写就行。
再学习这部分之前,先大致有个思路,那么之后就会更好的理解实现。
Map阶段举例说明:
我们先将txt文件变成input.txt 并返回一个 []KeyValue 数组,其中每个 KeyValue 对表示一个单词和它的出现次数。
1 | kva = [ |
之后根据 Key 的哈希值将键值对分配到不同的 reduce 任务。ihash(tkva.Key) 计算 Key 的哈希值,然后通过取模操作 % cur_reply.NumReduce 来确定应该将键值对放在哪一个桶中。
比如,假设 ihash 函数得出的哈希值如下:
ihash("apple") % 2 = 0ihash("banana") % 2 = 1ihash("orange") % 2 = 0
那么 kvaa 的内容将变为:
1 | kvaa[0] = [ |
遍历 NumReduce,即 2 个桶,并将桶中的 KeyValue 对保存到文件中。文件名格式为 mr-X-Y,其中 X 是 Map 任务编号(cur_reply.MapId),Y 是 reduce 任务编号。
-
对于第一个桶(
i=0),生成文件mr-1-0,内容是:1
2
3
4{"Key":"apple","Value":1}
{"Key":"apple","Value":1}
{"Key":"orange","Value":1}
{"Key":"orange","Value":1} -
对于第二个桶(
i=1),生成文件mr-1-1,内容是:1
2{"Key":"banana","Value":1}
{"Key":"banana","Value":1}
Reduce阶段举例说明:
reduce任务时只需要读取所有的mr-*-reduce文件并合并处理就可以实现所有相同的单词被同一个reduce任务处理的效果,最终经过reduce任务的处理,产生文件mr-out-i。
RPC通信
Map阶段与master的rpc函数
- worker向master请求任务
- map告诉master已经完成任务
- reduce告诉master已经完成任务









