我将玫瑰藏于身后,风起花落,从此鲜花赠自己,纵马踏花向自由。

我想,如果可以的话,我想成为神级一般的人物~

前言:由于最近找工作战况严峻,不得以只能再掏个去年的项目,顺便温故一下之前学习分布式知识,commit就是最好的证明 ~ PS:希望写这个Lab的不要太功利,bug才是真正提高自己水平的~

那段时光也是最能静下心来,最美好的时光~

image-20240913225720099

image-20240913225626631

image-20240913233108338

image-20240913233117386

image-20240913233127922

image-20240913233150876

image-20240913233139291

前置知识

MapReduce学习请看这篇:MapReduce: Simplified Data Processing on Large Clusters

go-rpc的使用移步这篇:【Golang | RPC】使用包net/rpc实现基于http协议的RPC服务

实验内容

目标

词频统计

描述

实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker请求任务,进行运算,写出结果到文件。coordinator需要关心worker的任务是否完成,在超时情况下将任务重新分配给别的worker。

规则

  • map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()
  • worker需要将第X个reduce task结果放到mr-out-X中。
  • mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了
  • main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。
  • 任务都完成后,worker也得关闭

提示

  • 一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。
  • map reduce函数都是通过go插件装载 (.so文件)
  • mr/ 文件变了就需要重新build
  • 都在一个文件系统,worker天然实现文件共享,先凑合着起步
  • 中间文件命名 mr-X-Y X是map任务号,y是reduce任务号
  • worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。
  • worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号
  • mrsequential.go 代码可以借鉴
  • coordinator里面的共享数据需要加锁
  • worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下
  • 如果任务重试机制,记得不要生成重复任务
  • mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。
  • 确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名

思路说明

摘自:https://juejin.cn/post/7224476641623556154

image-20240913230116525

客户端通过运行一次mrmaster.go启动一个master进程,启动时,会进行以下操作:

  • 创建一个Master结构体
  • 根据命令行传入的文件生成所有map任务并存放在MasterMapChannel
  • 开启一个新的线程不断循环判断有没有过期(分配出去但十秒内未完成)的mapreduce任务
  • 开始监听来自workerRPC调用

客户端通过运行多次mrworker.go启动多个worker进程,每个worker启动时,都会进行以下操作:

  • 创建一个AWorker结构体
  • 只要master进程没有通知worker进程所有任务都已经完成,worker进程就一直向master进程要任务

master接收到worker的要任务请求后,根据所有任务的完成情况给worker分配任务,分配规则是:所有map任务全部完成后才可以去分配reduce任务

worker要到任务后,worker会首先判断这个任务是map任务还是reduce任务,并根据任务类型执行不同的逻辑

  • 如果是map任务,会执行doMap逻辑,全部执行完后会在/var/tmp目录下生成文件名为mr-X-Y的临时文件,其中X是当前mapWorkerIDY是通过ihash方法计算出来的
  • 如果是reduce任务,会执行doReduce逻辑,worker会从 src/main中读取所有mr-*-Y文件,其中Y是当前reduceWorkerreduce任务的编号,最终会在/var/tmp目录下生成文件名为mr-out-Y的临时文件

worker执行完自己的任务后,会执行mapTaskDonereduceTaskDone方法,告诉master自己的任务完成了,通知完之后会立马继续向master要新的任务

master在接收到worker完成任务的通知后,会先判断这个worker完成这个任务有没有超时,具体就是判断这个任务的状态是不是Running,如果是就没有超时;否则,如果状态是Ready,就说明该任务已经被master中的循环检测任务过期的线程判定为过期并设为Ready状态;如果状态是Finished,说明这个任务被判定为超时后又被分配给其他worker并被那个worker按时完成

如果master判断worker在规定时间内完成了任务,则:

  • worker执行的是map任务:master会调用generateMapFile方法,将worker/var/tmp中生成的临时文件mr-X-Y复制到 src/main中,作为正式的该map任务完成后产生的中间文件,表示master接受了该worker的成果
  • worker执行的是reduce任务:master会调用generateReduceFile方法,将worker/var/tmp中生成的临时文件mr-out-Y复制到 src/main中,作为正式的该reduce任务完成后产生的最终文件,表示master接受了该worker的成果

masterMapChannel中存储的所有任务都被完成后,master会将任务分配阶段由MapPhase调整为ReducePhase,在这之后,master就会给来请求任务的worker分配reduce任务

masterReduceChannel中存储的所有任务都被完成后,表示所有的map任务和reduce任务都已经完成,此时master会调用finish方法,准备终止master进程,并通知所有的master任务完成了,终止所有worker

关键代码

master程序

mrcoordinator.go

命令:go run mrcoordinator.go pg*.txt

代码会调用:

1
m := mr.MakeCoordinator(os.Args[1:], 10)

MakeCoordinator 函数用于初始化一个 Coordinator 实例,包括初始化 Map 任务和 Reduce 任务的状态映射表,并启动 Coordinator 的服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func MakeCoordinator(files []string, nReduce int) *Coordinator {
// 创建一个 Coordinator 实例
c := Coordinator{}

// 初始化 Coordinator 的各个字段

// 初始化 Map 任务的总数量
c.MapTotNum = len(files)
fmt.Println("总文件数量:", len(files))

// 初始化 Map 任务的状态映射表
c.MapFunc = make(map[int]int)

// 初始化 Map 任务的 ID 计数器
c.MapId = 0

// 遍历所有文件,将文件名添加到 Map 任务的文件名列表中,并初始化每个 Map 任务的状态为 Undo
for map_id, filename := range files {
c.MapFileName = append(c.MapFileName, filename)
c.MapFunc[map_id] = Undo
}

// 初始化 Reduce 任务的状态映射表
c.ReduceFunc = make(map[int]int)

// 初始化 Reduce 任务的总数量
c.ReduceNum = nReduce

// 遍历所有 Reduce 任务,初始化每个 Reduce 任务的状态为 Undo
for i := 0; i < nReduce; i++ {
c.ReduceFunc[i] = Undo
}

// 启动 Coordinator 的服务
c.server()

// 返回初始化后的 Coordinator 实例
return &c
}

其中,Coordinator的数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Coordinator struct {

// Your definitions here.
//map相关定义
MapTotNum int // 记录总共有多少输入文件
MapFileName []string // 文件名称
// MapContent chan string // 文件内容
MapId int // map的编号
Lock sync.RWMutex // 锁
MapFunc map[int]int // 标记状态
// reduce相关定义
ReduceNum int
// ReduceFileName []string
// ReduceContent chan string
ReduceId []int
ReduceFunc map[int]int
}

接下来开启监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (c *Coordinator) server() {
// 注册 Coordinator 实例为 RPC 服务对象
rpc.Register(c)

// 设置 RPC 服务使用 HTTP 协议进行通信
rpc.HandleHTTP()

// 获取 Coordinator 的 Unix 域套接字文件名
sockname := coordinatorSock()

// 删除已存在的 Unix 域套接字文件(如果存在)
os.Remove(sockname)

// 打印 Unix 域套接字文件名
fmt.Println("coordinator:sockname", sockname)

// 创建一个 Unix 域套接字监听器
l, e := net.Listen("unix", sockname)
if e != nil {
// 如果监听失败,记录错误并终止程序
log.Fatal("listen error:", e)
}

// 启动一个 HTTP 服务器,监听 Unix 域套接字
go http.Serve(l, nil)
}

Map阶段向master发送请求

【这里不给出代码啦,希望看到这篇博客的小伙伴自己实现下,不要太功利了~】算彩蛋吗~

SuportMapTask函数

SuportMapTask 方法是 Coordinator 类的一个方法,用于处理 Worker 请求 Map 任务的逻辑。以下是对该方法的详细解释和总结:

  1. 加锁
    • 使用互斥锁 Lock 确保并发安全,防止多个 Worker 同时请求任务时发生竞态条件。
  2. 初始化回复
    • 初始化回复的标志位 Flagfalse,表示没有找到可用的 Map 任务。
    • 设置回复中的 NumReduce 字段为 Coordinator 的 ReduceNum,表示 Reduce 任务的数量。
  3. 遍历 Map 任务
    • 遍历所有 Map 任务,检查是否有未完成的任务(状态为 Undo)。
    • 如果找到一个未完成的 Map 任务:
      • 设置回复的 Mapfilename 为该任务对应的文件名。
      • 设置回复的 Flagtrue,表示找到了可用的 Map 任务。
      • 设置回复的 MapId 为该任务的 ID。
      • 将该任务的状态设置为 Doing,表示任务正在执行。
      • 打印日志,记录 Worker 请求到的任务 ID。
      • 启动一个协程,调用 CheckTimeout 方法检查任务是否超时。
      • 跳出循环,返回任务。
  4. 返回结果
    • 如果没有找到未完成的 Map 任务,回复的 Flag 保持为 false,表示没有可用的 Map 任务。

MasterCheckMapFinish函数

Reduce阶段向master发送的请求

这个·和上面这个思路一致。

处理早退的Casew

CheckTimeout 方法是 Coordinator 类的一个方法,用于检查任务是否超时。以下是该方法的详细流程总结:

  1. 等待超时时间
    • 方法首先等待指定的超时时间(timeout 秒)。
  2. 加锁
    • 使用互斥锁 Lock 确保并发安全,防止多个协程同时修改任务状态时发生竞态条件。
  3. 检查任务类型
    • 根据传入的 TaskType 参数判断任务类型是 Map 还是 Reduce
  4. 检查任务状态
    • 如果任务类型是 Map
      • 检查对应 Map 任务的状态是否为 Finish
      • 如果任务状态不是 Finish,则将其状态重置为 Undo,表示任务超时未完成。
    • 如果任务类型是 Reduce
      • 检查对应 Reduce 任务的状态是否为 Finish
      • 如果任务状态不是 Finish,则将其状态重置为 Undo,表示任务超时未完成。
  5. 解锁
    • 释放互斥锁 Lock,允许其他协程访问任务状态。

worker程序

原先框架中给出了RPC的示例,照着写就行。

再学习这部分之前,先大致有个思路,那么之后就会更好的理解实现。

image-20240913232202672

Map阶段举例说明:

我们先将txt文件变成input.txt 并返回一个 []KeyValue 数组,其中每个 KeyValue 对表示一个单词和它的出现次数。

1
2
3
4
5
6
7
8
kva = [
{"apple", 1},
{"banana", 1},
{"apple", 1},
{"orange", 1},
{"banana", 1},
{"orange", 1}
]

之后根据 Key 的哈希值将键值对分配到不同的 reduce 任务。ihash(tkva.Key) 计算 Key 的哈希值,然后通过取模操作 % cur_reply.NumReduce 来确定应该将键值对放在哪一个桶中。

比如,假设 ihash 函数得出的哈希值如下:

  • ihash("apple") % 2 = 0
  • ihash("banana") % 2 = 1
  • ihash("orange") % 2 = 0

那么 kvaa 的内容将变为:

1
2
3
4
5
6
7
8
9
10
kvaa[0] = [
{"apple", 1},
{"apple", 1},
{"orange", 1},
{"orange", 1}
]
kvaa[1] = [
{"banana", 1},
{"banana", 1}
]

遍历 NumReduce,即 2 个桶,并将桶中的 KeyValue 对保存到文件中。文件名格式为 mr-X-Y,其中 XMap 任务编号(cur_reply.MapId),Yreduce 任务编号。

  • 对于第一个桶(i=0),生成文件 mr-1-0,内容是:

    1
    2
    3
    4
    {"Key":"apple","Value":1}
    {"Key":"apple","Value":1}
    {"Key":"orange","Value":1}
    {"Key":"orange","Value":1}
  • 对于第二个桶(i=1),生成文件 mr-1-1,内容是:

    1
    2
    {"Key":"banana","Value":1}
    {"Key":"banana","Value":1}

Reduce阶段举例说明:

reduce任务时只需要读取所有的mr-*-reduce文件并合并处理就可以实现所有相同的单词被同一个reduce任务处理的效果,最终经过reduce任务的处理,产生文件mr-out-i

RPC通信

Map阶段与master的rpc函数

  • worker向master请求任务
  • map告诉master已经完成任务
  • reduce告诉master已经完成任务