关于 MR 为何需要排序,以及一个分布式排序的示例

之前去了解了一下关于MapReduce的几个比较疑惑的问题,认为有必要专门记录一下,同时也给定一个分布式排序的示例,这应用到了这里谈到的特性。但学习过程中我实际上没有参考多少权威资料,全凭一孔之见,所以非常大的可能有错误。

一些疑问及我对它的理解

为何 MapReduce 需要排序?

有时候我会想,为啥 MapReduce 中的数据需要排序呢?感觉不排序也没什么问题呀?Map 阶段得到了所需的 KV 对,把 KEY 哈希完后直接发送给对应的 Reducer,等拿到这个 KEY 的所有数据就直接开始 reduce 操作呗?

我认为这里有一个比较主要的原因是,对无序的数据集合进行分组(和 reduce)操作是非常昂贵的。倘若要获取某个 KEY 的所有记录,需要遍历整个集合。这样,我们在 reduce 方法中的那种操作就会很迷惑——对每一个 key,都把所有记录遍历一遍,拿到对应的分组。但考虑到访问特定分组必须是流式的,这是必要之举。

这和在普通应用中的分组操作不同,考虑我们在普通应用中进行分组的方法,我们会去定义一个名为 bucket 的哈希表,key 为分组的键,而 value 为记录的集合,我们将迭代整个记录的集合并将其存放在对应的 bucket 中。这就是问题所在——大数据量大情况下我们是无法在内存中维护这样一个 bucket 的

基于这种限制,我认为,如果要对无序的巨量记录进行流式的分组和 reduce 操作,其时间复杂度将是 n^2——对每一个 key,都遍历整个记录集合。

而对有序的集合呢?那问题就容易了,考虑到有序的集合中,同一个 KEY 的记录总是紧挨着的,我们直接维护一个当前的 KEY,遍历整个集合,如果该 KEY 和当前相等,就说明还是这个 KEY 的数据,向用户的 reduce 方法的 value 迭代器里再添一个,如果不相等,则说明是下一个 KEY,终止当前的 reduce 方法,起一个新的。这样,就能在线性时间复杂度的情况下流式地 reduce 掉整个集合了。

我认为,这就是排序的意义,同时这应该也是 KV 存储的意义——让并行性本就不如 map 阶段的 reduce 阶段能更加轻松。同时,排序也使得可以容易得到易处理,人类可读的数据。

如果事实就是如此,那说数据被分组其实是一个谎言——其实什么都没干,只是顺序地迭代集合罢了,只不过表现出来的行为(接口等)是分组而已。但这种抽象,这种“谎言”是非常有趣的。

顺带一提,Haskell 的列表的 group 行为和这非常相似。事实证明,这种算法甚至能够处理无穷列表,或许一切流式操作都有如此特性吧。

1
2
3
4
5
-- Haskell 代码

duetNat = [1 ..] >>= \i -> [i, i] -- [1, 1, 2, 2, 3, 3, ...]
-- >>> take 10 $ group duetNat
-- [[1,1],[2,2],[3,3],[4,4],[5,5],[6,6],[7,7],[8,8],[9,9],[10,10]]

但参考谷歌的论文对 MR 中进行排序的论述,感觉我可能想太多——

我们确保在给定的分区中,中间 key/value pair 数据的处理顺序是按照 key 值增量顺序处理的。这样的顺 序保证对每个分成生成一个有序的输出文件,这对于需要对输出文件按 key 值随机存取的应用非常有意义, 对在排序输出的数据集也很有帮助

We guarantee that within a given partition, the intermediate key/value pairs are processed in increasing key order. This ordering guarantee makes it easy to generate a sorted output file per partition, which is useful when the output file format needs to support efficient random access lookups by key, or users of the output find it convenient to have the data sorted.

另外,JOIN 操作对于有序的集合的复杂度为O(n),这也是排序的一个必要之处。

为何 Partitioner 执行在 Combiner 之前?

太长不看,两个原因——整个集合排序开销大,分区后容易并行 combine

MR 在执行 partitioner 后将每个分组进行排序,再传递给 Combiner,为什么不先 Combine 后再进行排序呢?我怀疑他们是想让 Reducer 和 Combiner 行为一致:接受的 key 总是按顺序的。

而倘若先进行 Combine,则 Combine 之前必须进行排序,而这样开销更大,假设原记录数为 n,则时间复杂度为 nlogn,而假设分了 k 个区,对每个区进行排序,总时间复杂度为 k * (n/k) * (log n / k),即 n * (log n / k),这还是没有考虑到分过区后每个区可以并行地排序的情况!因此,显然分区后对每个分区进行局部排序开销更小。况且,如果两个 KEY 本就该分给不同的 Reducer,那么对它们进行排序有什么意义呢?

而且,分过区后,可以非常容易地对每个分区进行并行的 combine 操作(当然,不分区也能进行并行的 combine,就像 java8-stream 的 reduce 方法一样)。

但这些猜测仍旧无法彻底让我信服,还得之后再研究。

分布式排序

当只有一个 Reducer 时,MR 可以用于进行分布式排序。它的原理利用了 MR 的执行流程。

每个 Mapper 处理完自己的所有记录集合后,会传递给 Partitioner 进行分区操作,Partitioner 分完区后,每个分区的结果会按照 KEY 进行排序,经过 Combiner 后发送给对应的 Reducer。

每个 Reducer 都将接受每一个 Mapper 传递过来的记录集合,每一个 Mapper 的集合都是有序的,但所有 Mapper(传递给它)的集合总的来说是无序的,因此,Reducer 会进行一种所谓的多路归并算法,将每一个 Mapper 传递的局部有序的集合归并在一起,得到一个整体有序的最终集合,并用于分组。

当只有一个 Reducer 时,所有 Mapper 的有序的集合都将发送给它,并由它进行归并,排序后传递给用户的 reduce 方法。因此,用户接受到的 reduce 方法,它看到的 KEY 是按顺序的,此时若直接把 KEY 输出,最终得到的结果文件里就是排完序的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 在特定文件夹里创建 16 个文件,每个文件里包含 500 万个数
// 总共约 1520M 大小
object CreateInputFiles extends App {
val path = "/路径/到/输入路径"
(0 until 16).foreach {i =>
val writer = new FileWriter(s"$path/$i.txt")
(0 until 5000000).foreach(_ => writer.write(Math.abs(Random.nextLong()) + "\n"))
writer.close()
}
}

class DistributedSortMapper extends Mapper[LongWritable, Text, LongWritable, NullWritable] {
val outputK = new LongWritable()
override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, LongWritable, NullWritable]#Context): Unit = {
value.toString.toLongOption match {
case None => {}
case Some(longValue) => {
outputK.set(longValue)
context.write(outputK, NullWritable.get)
}
}
}
}

class DistributedSortReducer extends Reducer[LongWritable, NullWritable, LongWritable, NullWritable] {
override def reduce(key: LongWritable, values: lang.Iterable[NullWritable], context: Reducer[LongWritable, NullWritable, LongWritable, NullWritable]#Context): Unit = {
// 为什么要调用 forEach 呢?因为可能有多个值相等,这时候它们会被分在一个组里
// 直接打印到结果文件里,那就只留下了不重复的了
// 当然,在这里没必要,在 Long 这么大的范围,8000 万条数据还真的不容易撞
values.forEach(_ => context.write(key, NullWritable.get))
}
}

object DistributedSortJob extends App {
val parser = new GenericOptionsParser(args)
val job = Job.getInstance(parser.getConfiguration)
val inputPath +: outputPath +: _ = parser.getRemainingArgs.toSeq.map(new Path(_))

job.setInputFormatClass(classOf[TextInputFormat])
job.setOutputKeyClass(classOf[TextOutputFormat[_,_]])
job.setMapOutputKeyClass(classOf[LongWritable])
job.setMapOutputValueClass(classOf[NullWritable])
job.setOutputKeyClass(classOf[LongWritable])
job.setOutputValueClass(classOf[NullWritable])

job.setMapperClass(classOf[DistributedSortMapper])
job.setReducerClass(classOf[DistributedSortReducer])
FileInputFormat.setInputPaths(job, inputPath)
FileOutputFormat.setOutputPath(job, outputPath)

System.exit(if (job.waitForCompletion(true)) 0 else 1)
}

这种分布式排序的缺陷之一在于,只有一个 Reducer,因而限制了该过程的并行性,在性能上不太够。一个解决方案是自定义 Partitioner,使每个 Reducer 之间接受的 KEY 是有序的,既然 Reducer 之间是有序的,每个 Reducer 内部都是有序的,则整体就是有序的。Spark 似乎使用类似的分区方式。但这种分区方式是和业务耦合的,需要自定义。