MapReduce
我将玫瑰藏于身后,风起花落,从此鲜花赠自己,纵马踏花向自由。
我想,如果可以的话,我想成为神级一般的人物~
前言:由于最近找工作战况严峻,不得以只能再掏个去年的项目,顺便温故一下之前学习分布式知识,commit就是最好的证明 ~ PS:希望写这个Lab的不要太功利,bug才是真正提高自己水平的~
那段时光也是最能静下心来,最美好的时光~
前置知识
MapReduce学习请看这篇:MapReduce: Simplified Data Processing on Large Clusters
go-rpc的使用移步这篇:【Golang | RPC】使用包net/rpc实现基于http协议的RPC服务
实验内容
目标
词频统计
描述
实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker请求任务,进行运算,写出结果到文件。coordinator需要关心worker的任务是否完成,在超时情况下将任务重新分配给别的worker。
规则
- map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()
- worker需要将第X个reduce task结果放到mr-out-X中。
- mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了
- main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。
- 任务都完成后,worker也得关闭
提示
- 一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。
- map reduce函数都是通过go插件装载 (.so文件)
- mr/ 文件变了就需要重新build
- 都在一个文件系统,worker天然实现文件共享,先凑合着起步
- 中间文件命名 mr-X-Y X是map任务号,y是reduce任务号
- worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。
- worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号
- mrsequential.go 代码可以借鉴
- coordinator里面的共享数据需要加锁
- worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下
- 如果任务重试机制,记得不要生成重复任务
- mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。
- 确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名
思路说明
摘自:https://juejin.cn/post/7224476641623556154
客户端通过运行一次mrmaster.go
启动一个master
进程,启动时,会进行以下操作:
- 创建一个
Master
结构体 - 根据命令行传入的文件生成所有
map
任务并存放在Master
的MapChannel
中 - 开启一个新的线程不断循环判断有没有过期(分配出去但十秒内未完成)的
map
和reduce
任务 - 开始监听来自
worker
的RPC
调用
客户端通过运行多次mrworker.go
启动多个worker
进程,每个worker
启动时,都会进行以下操作:
- 创建一个
AWorker
结构体 - 只要
master
进程没有通知worker
进程所有任务都已经完成,worker
进程就一直向master
进程要任务
master
接收到worker
的要任务请求后,根据所有任务的完成情况给worker
分配任务,分配规则是:所有map
任务全部完成后才可以去分配reduce
任务
当worker
要到任务后,worker
会首先判断这个任务是map
任务还是reduce
任务,并根据任务类型执行不同的逻辑
- 如果是
map
任务,会执行doMap
逻辑,全部执行完后会在/var/tmp
目录下生成文件名为mr-X-Y
的临时文件,其中X
是当前mapWorker
的ID
,Y
是通过ihash
方法计算出来的 - 如果是
reduce
任务,会执行doReduce
逻辑,worker
会从src/main
中读取所有mr-*-Y
文件,其中Y
是当前reduceWorker
的reduce
任务的编号,最终会在/var/tmp
目录下生成文件名为mr-out-Y
的临时文件
worker
执行完自己的任务后,会执行mapTaskDone
或reduceTaskDone
方法,告诉master
自己的任务完成了,通知完之后会立马继续向master
要新的任务
master
在接收到worker
完成任务的通知后,会先判断这个worker
完成这个任务有没有超时,具体就是判断这个任务的状态是不是Running
,如果是就没有超时;否则,如果状态是Ready
,就说明该任务已经被master
中的循环检测任务过期的线程判定为过期并设为Ready
状态;如果状态是Finished
,说明这个任务被判定为超时后又被分配给其他worker
并被那个worker
按时完成
如果master
判断worker
在规定时间内完成了任务,则:
worker
执行的是map
任务:master
会调用generateMapFile
方法,将worker
在/var/tmp
中生成的临时文件mr-X-Y
复制到src/main
中,作为正式的该map
任务完成后产生的中间文件,表示master
接受了该worker
的成果worker
执行的是reduce
任务:master
会调用generateReduceFile
方法,将worker
在/var/tmp
中生成的临时文件mr-out-Y
复制到src/main
中,作为正式的该reduce
任务完成后产生的最终文件,表示master
接受了该worker
的成果
当master
的MapChannel
中存储的所有任务都被完成后,master
会将任务分配阶段由MapPhase
调整为ReducePhase
,在这之后,master
就会给来请求任务的worker
分配reduce
任务
当master
的ReduceChannel
中存储的所有任务都被完成后,表示所有的map
任务和reduce
任务都已经完成,此时master
会调用finish
方法,准备终止master
进程,并通知所有的master
任务完成了,终止所有worker
关键代码
master程序
mrcoordinator.go
命令:go run mrcoordinator.go pg*.txt
代码会调用:
1 | m := mr.MakeCoordinator(os.Args[1:], 10) |
MakeCoordinator
函数用于初始化一个 Coordinator
实例,包括初始化 Map 任务和 Reduce 任务的状态映射表,并启动 Coordinator 的服务。
1 | func MakeCoordinator(files []string, nReduce int) *Coordinator { |
其中,Coordinator的数据结构如下:
1 | type Coordinator struct { |
接下来开启监听
1 | func (c *Coordinator) server() { |
Map阶段向master发送请求
【这里不给出代码啦,希望看到这篇博客的小伙伴自己实现下,不要太功利了~】算彩蛋吗~
SuportMapTask函数
SuportMapTask
方法是 Coordinator 类的一个方法,用于处理 Worker 请求 Map 任务的逻辑。以下是对该方法的详细解释和总结:
- 加锁:
- 使用互斥锁
Lock
确保并发安全,防止多个 Worker 同时请求任务时发生竞态条件。
- 使用互斥锁
- 初始化回复:
- 初始化回复的标志位
Flag
为false
,表示没有找到可用的 Map 任务。 - 设置回复中的
NumReduce
字段为 Coordinator 的ReduceNum
,表示 Reduce 任务的数量。
- 初始化回复的标志位
- 遍历 Map 任务:
- 遍历所有 Map 任务,检查是否有未完成的任务(状态为
Undo
)。 - 如果找到一个未完成的 Map 任务:
- 设置回复的
Mapfilename
为该任务对应的文件名。 - 设置回复的
Flag
为true
,表示找到了可用的 Map 任务。 - 设置回复的
MapId
为该任务的 ID。 - 将该任务的状态设置为
Doing
,表示任务正在执行。 - 打印日志,记录 Worker 请求到的任务 ID。
- 启动一个协程,调用
CheckTimeout
方法检查任务是否超时。 - 跳出循环,返回任务。
- 设置回复的
- 遍历所有 Map 任务,检查是否有未完成的任务(状态为
- 返回结果:
- 如果没有找到未完成的 Map 任务,回复的
Flag
保持为false
,表示没有可用的 Map 任务。
- 如果没有找到未完成的 Map 任务,回复的
MasterCheckMapFinish函数
Reduce阶段向master发送的请求
这个·和上面这个思路一致。
处理早退的Casew
CheckTimeout
方法是 Coordinator 类的一个方法,用于检查任务是否超时。以下是该方法的详细流程总结:
- 等待超时时间:
- 方法首先等待指定的超时时间(
timeout
秒)。
- 方法首先等待指定的超时时间(
- 加锁:
- 使用互斥锁
Lock
确保并发安全,防止多个协程同时修改任务状态时发生竞态条件。
- 使用互斥锁
- 检查任务类型:
- 根据传入的
TaskType
参数判断任务类型是Map
还是Reduce
。
- 根据传入的
- 检查任务状态:
- 如果任务类型是
Map
:- 检查对应
Map
任务的状态是否为Finish
。 - 如果任务状态不是
Finish
,则将其状态重置为Undo
,表示任务超时未完成。
- 检查对应
- 如果任务类型是
Reduce
:- 检查对应
Reduce
任务的状态是否为Finish
。 - 如果任务状态不是
Finish
,则将其状态重置为Undo
,表示任务超时未完成。
- 检查对应
- 如果任务类型是
- 解锁:
- 释放互斥锁
Lock
,允许其他协程访问任务状态。
- 释放互斥锁
worker程序
原先框架中给出了RPC的示例,照着写就行。
再学习这部分之前,先大致有个思路,那么之后就会更好的理解实现。
Map阶段举例说明:
我们先将txt文件变成input.txt
并返回一个 []KeyValue
数组,其中每个 KeyValue
对表示一个单词和它的出现次数。
1 | kva = [ |
之后根据 Key
的哈希值将键值对分配到不同的 reduce
任务。ihash(tkva.Key)
计算 Key
的哈希值,然后通过取模操作 % cur_reply.NumReduce
来确定应该将键值对放在哪一个桶中。
比如,假设 ihash
函数得出的哈希值如下:
ihash("apple") % 2 = 0
ihash("banana") % 2 = 1
ihash("orange") % 2 = 0
那么 kvaa
的内容将变为:
1 | kvaa[0] = [ |
遍历 NumReduce
,即 2 个桶,并将桶中的 KeyValue
对保存到文件中。文件名格式为 mr-X-Y
,其中 X
是 Map
任务编号(cur_reply.MapId
),Y
是 reduce
任务编号。
-
对于第一个桶(
i=0
),生成文件mr-1-0
,内容是:1
2
3
4{"Key":"apple","Value":1}
{"Key":"apple","Value":1}
{"Key":"orange","Value":1}
{"Key":"orange","Value":1} -
对于第二个桶(
i=1
),生成文件mr-1-1
,内容是:1
2{"Key":"banana","Value":1}
{"Key":"banana","Value":1}
Reduce阶段举例说明:
reduce
任务时只需要读取所有的mr-*-reduce
文件并合并处理就可以实现所有相同的单词被同一个reduce
任务处理的效果,最终经过reduce
任务的处理,产生文件mr-out-i
。
RPC通信
Map阶段与master的rpc函数
- worker向master请求任务
- map告诉master已经完成任务
- reduce告诉master已经完成任务