MapReduce 开发模式 2——排序,二次排序,JOIN

ORDER BY(全局排序)

排序是一个常见的业务需求。我们不止可以用 MR 提供的原生类型进行排序,还能够用自定义的类型进行排序。在 SQL 中,我们使用 ORDER BY 子句进行排序,ORDER BY 子句可以接受一个或多个字段,也可以接受表达式。

MR 支持在两个地方进行排序——首先是 Reducer 接受的各 Mapper 发送的记录会按 KEY 进行归并排序;然后是对每个 KEY,允许在 KEY 对应的 VALUE 集合中进行排序,这个排序的进行是不太明显的。

考虑这样的需求——我们之前的航空公司数据集是按照月份和星期来排序的,现在突然需要将输出的数据按月份升序之后按星期降序去排序。我们要保证所有数据整体有序,且性能足够好

之前学习过 MR 的排序机制,我们知道,如果想要让所有数据全排序并存放在同一个文件里,则只能使用一个 Reducer。否则我们必须定制化 Partitioner 和 KEY,以保证不同结果文件之间有序。

研究一下我们的业务,这里需要按月份去升序,按星期去降序,因此总共的索引数有 12 x 7 = 84 个,它们顺序如下——

  1. 1_7
  2. 1_6
  3. 2_7
  4. 12_2
  5. 12_1

显然,Partitioner 需要保证所有相邻的索引必须分给同一个 Reducer

首先,我们需要定义 KEY 类型(即 MonthDayWeek)以及 VALUE 类型(所需的结果),KEY 类型需要实现 WritableComparable 接口,VALUE 类型需要实现 Writable。

不一定必须要 WritableComparable,也可以从外部指定另外的 Comparator,这个 Comparator 称为 SortComparator。

类型的代码见此,非常繁琐,信息量很小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package mapreduce

import mapreduce.DistributedSortJob.args
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io
import org.apache.hadoop.io.{IntWritable, LongWritable, NullWritable, Text, Writable, WritableComparable}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.{Job, Mapper, Partitioner, Reducer}
import org.apache.hadoop.util.GenericOptionsParser

import java.io.{DataInput, DataOutput}
import java.lang

// Scala 定义这玩意可太难受了,不能用类参数…否则 Scala 会创建出带参数的构造器,没法用反射创建了
// 键类型
class MonthDayWeek extends WritableComparable[MonthDayWeek] {
var month : IntWritable = new IntWritable(0)
var dayOfWeek : IntWritable = new IntWritable(0)

override def write(out: DataOutput): Unit = {
month.write(out)
dayOfWeek.write(out)
}

override def readFields(in: DataInput): Unit = {
month.readFields(in)
dayOfWeek.readFields(in)
}

override def compareTo(o: MonthDayWeek): Int = {
if (this.month.get == o.month.get) // 如果月份相等,则逆序比较 dayOfWeek
-1 * dayOfWeek.compareTo(o.dayOfWeek)
else
this.month.compareTo(o.month)
}

// 自动生成
def canEqual(other: Any): Boolean = other.isInstanceOf[MonthDayWeek]

override def equals(other: Any): Boolean = other match {
case that: MonthDayWeek =>
(that canEqual this) &&
month == that.month &&
dayOfWeek == that.dayOfWeek
case _ => false
}
override def hashCode(): Int = {
val state = Seq(month, dayOfWeek)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}

// 值类型
class DelaysWritable extends Writable {
var year = new IntWritable(0)
var month = new IntWritable(0)
var date = new IntWritable(0)
var dayOfWeek = new IntWritable(0)
var arrDelay = new IntWritable(0)
var depDelay = new IntWritable(0)
var originAirportCode = new Text()
var destAirportCode = new Text()
var carrierCode = new Text()

override def write(out: DataOutput): Unit = {
year.write(out)
month.write(out)
date.write(out)
dayOfWeek.write(out)
arrDelay.write(out)
depDelay.write(out)
originAirportCode.write(out)
destAirportCode.write(out)
carrierCode.write(out)
}

override def readFields(in: DataInput): Unit = {
year.readFields(in)
month.readFields(in)
date.readFields(in)
dayOfWeek.readFields(in)
arrDelay.readFields(in)
depDelay.readFields(in)
originAirportCode.readFields(in)
destAirportCode.readFields(in)
carrierCode.readFields(in)
}
override def toString =
s"$year, $month, $date, $dayOfWeek, $arrDelay, $depDelay, $originAirportCode, $destAirportCode, $carrierCode"

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Mapper 只是做一个映射
class MonthDayWeekSortMapper extends Mapper[LongWritable, Text, MonthDayWeek, DelaysWritable] {
val outputK = new MonthDayWeek
val outputV = new DelaysWritable
override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, MonthDayWeek, DelaysWritable]#Context): Unit = {
if (value.toString.startsWith("Year")) return

import AirlineCol._
val colGetter = AirlineCol.build(value.toString.split(","))

outputK.month = new IntWritable(colGetter(Month).toInt)
outputK.dayOfWeek = new IntWritable(colGetter(DayOfWeek).toInt)

outputV.year = new IntWritable(colGetter(Year).toInt)
outputV.month = new IntWritable(colGetter(Month).toInt)
outputV.dayOfWeek = new IntWritable(colGetter(DayOfWeek).toInt)
outputV.date = new IntWritable(colGetter(DayofMonth).toInt)
outputV.arrDelay = new IntWritable(colGetter(ArrDelay).toInt)
outputV.depDelay = new IntWritable(colGetter(DepDelay).toInt)
outputV.destAirportCode = new Text(colGetter(Dest))
outputV.originAirportCode = new Text(colGetter(Origin))
outputV.carrierCode = new Text(colGetter(UniqueCarrier))

context.write(outputK, outputV)
}
}

// Reducer 更是啥也不干
class MonthDayWeekSortReducer extends Reducer[MonthDayWeek, DelaysWritable, DelaysWritable, NullWritable] {
override def reduce(key: MonthDayWeek, values: lang.Iterable[DelaysWritable], context: Reducer[MonthDayWeek, DelaysWritable, DelaysWritable, NullWritable]#Context): Unit = {
values.forEach(context.write(_, NullWritable.get))
}
}

// 继承 Configurable 接口后,就能够拿到配置信息了,通过 key.range 指定配置范围
// 用户给定一个 key.range,表示每个 Reducer 将要接受的 KEY 的数量,如果超出,则最后一个 reducer 照单全收;如果 Reducer 设置太多,后来的 Reducer 将会得不到数据。
// 比如,设置 key.range = 24,设置 Reducer 为 3 个,则第一个 Reducer 处理 0-23,第二个处理 24-48,第三个处理 49-83
class MonthDayWeekSortJobPartitioner extends Partitioner[MonthDayWeek, DelaysWritable] with Configurable {
var indexRange : Int = 84 / 7;
var config : Configuration = new Configuration();
override def setConf(conf: Configuration): Unit = {
config = conf
}
override def getConf: Configuration = config
override def getPartition(key: MonthDayWeek, value: DelaysWritable, numPartitions: Int): Int = {
val index = ((key.month.get - 1) * 7 + (key.dayOfWeek.get - 1)) / config.getInt("key.range", 84)

if (index < numPartitions)
index
else
numPartitions - 1
}
}

这里有两个参数需要配置——mapreduce.job.reduces,标识 Reducer 的数量,key.range,标识每个 Reducer 处理 key 的数量,默认为 84,即所有归一个 Reducer 来处理。将 Reducer 数量设置成 12,key 范围设置成 7,则得到每个文件保存一个月数据的结果。

可以看到,按 12,7 来设置时,part-r-00000的头部和part-r-00011的尾部是符合需求的——

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ head part-r-00000
2007, 1, 28, 7, 24, -3, PHL, PBI, US
2007, 1, 28, 7, 6, 11, DFW, BNA, AA
2007, 1, 7, 7, 2, -1, ONT, PHX, US
2007, 1, 7, 7, -12, 1, CLE, PHX, CO
2007, 1, 7, 7, 30, 0, ONT, LAS, US
2007, 1, 7, 7, -17, -1, OMA, PHX, US
2007, 1, 28, 7, -4, -15, MOT, MSP, NW
2007, 1, 7, 7, -23, 6, OMA, PHX, US
2007, 1, 7, 7, -20, 13, OMA, PHX, US
2007, 1, 7, 7, -8, -9, RDU, EWR, MQ

$ tail part-r-00011
2007, 12, 31, 1, -2, 2, XNA, DFW, MQ
2007, 12, 24, 1, -12, -1, ATL, TYS, DL
2007, 12, 3, 1, 50, 39, DFW, XNA, MQ
2007, 12, 17, 1, -1, 1, MDW, STL, WN
2007, 12, 24, 1, -18, -3, LGA, MCO, DL
2007, 12, 24, 1, 31, 54, MCO, LGA, DL
2007, 12, 10, 1, 6, 0, DFW, XNA, MQ
2007, 12, 10, 1, 66, 45, DFW, HSV, AA
2007, 12, 24, 1, -22, -12, RSW, BOS, DL
2007, 12, 24, 1, 1, -3, BOS, RSW, DL

至于如何进行进一步的优化呢?一个通常的操作是对输入数据进行采样,获取键的分布情况,并根据该情况特定地去规划分区并分发给相应 Reducer,保证每个 Reducer 的负载均衡。

可以料想,这种模式(ORDER BY)的实现是绝对能进行抽象的,

二次排序

二次排序其实就是使用组合键进行排序,这引入了所谓的 GroupComparator 和 SortComparator。

MR 默认的行为会按 KEY 进行排序,但是同一个 KEY 内的记录之间的顺序仍旧是无法保证的,这些记录来自各个 Mapper,因此即使原数据有序,被 Mapper 处理后的数据仍有序,但在这里将仍旧是无序的。

但 MR 同样也允许对 KEY 内的集合进行排序,这就是所谓的二次排序或辅助排序,二次排序是一个模式而非角色它并非是直接对 VALUE 集合进行排序,而是将想再次排序的字段放在 KEY 里,使第一次排序时直接排出需要的顺序,因此二次排序这个名字实际上只描述了现象,没有点明实质。二次排序的一个难以理解的地方在于,在 Reducer 阶段,对 KEY 进行归并(即合并各个 Partition)和对 KEY 进行分组所使用的 Comparator 可以不是同一个 Comparator

从一个示例看起,现在假设有一张网站浏览记录表,有如下字段——

1
2
3
4
5
website     IP               country  click_time
---------------------------------------------------------
hoogle.com 123.123.123.123 UK 2022-02-02 12:23:34
hoogle.com 123.123.84.94 CA 2022-02-01 12:23:34
.....

现在我们想要获取每个网站的每个国家的最近 5 次的访问时间。显然,我们可以以网站,国家作为 KEY 进行分组,然后维护一个大小为 5 的大顶堆,遍历整个集合得到结果,在 cleanup 方法中将结果写出。

但是这显然有一定的限制——要是我们要求获取最近 500 亿次的访问时间呢?这时,我们就必须考虑一些别的方案,比如,我们可以先把整个表按网站,国家,时间进行排序,再按照网站,国家进行分组(意识到这顺序和 SQL 中的不对应)即可,在 reduce 中只需要维护一个计数器。

但是我们之前所学的东西并不支持这种操作——在我们眼里,KEY 用于排序和分组时,使用的比较器是一样的,就是我们在 KEY 类型中实现的WritableComparable接口。因此,这里需要有一点新东西,被引入的就是所谓的SortComparatorGroupComparator

SortComparator 用于对 Partitioner 的分区结果进行排序和 Reducer 归并 Mapper 结果时,默认的 SortComparator 就是 KEY 实现的 WritableComparable 接口;GroupComparator 用于 Reducer 进行分组并调用 reduce 方法时。GroupComparator 其实只用来比较是否相等——如果返回 0,则相等,传给用户的 reduce 方法;如果不返回 0,则终止当前的 reduce 方法,另起一个。

在这里,有三个组件需要自定义——首先是 Partitioner,要保证所有网站被分在一个文件里(不然这排序就没必要了);然后是 SortComparator 和 GroupComparator,代码见下——

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class WebsiteKey extends WritableComparable[WebsiteKey] {
var website : String = ""
var country : String = ""
var click_time : String = ""
override def write(out: DataOutput): Unit = {
// ...
}

override def readFields(in: DataInput): Unit = {
// ...
}

// 作为 SortComparator
override def compareTo(o: WebsiteKey): Int = {
if (website != o.website)
website.compareTo(o.website)
else if (country != o.country)
country.compareTo(o.country)
else -1 * click_time.compareTo(o.click_time)
}
}

class WebsitePartitioner extends Partitioner[WebsiteKey, Any] {
override def getPartition(key: WebsiteKey, value: Any, numPartitions: Int): Int = ???
}

class WebsiteKeyGroupComparator extends WritableComparator {
override def compare(a: WritableComparable[_], b: WritableComparable[_]): Int = {
try {
val l : WebsiteKey = a.asInstanceOf[WebsiteKey]
val r : WebsiteKey = b.asInstanceOf[WebsiteKey]

if (l.website != r.website)
l.website .compareTo(r.website)
else
l.country.compareTo(r.country)
} catch {case _: Throwable => -1 }
}
}

Mapper 和 Reducer 不表,Mapper 除了抽取出 KEY 什么都不做,Reducer 直接拿到集合的前五个元素。

这里的组合键中有三个字段,其中 GroupComparator 中所使用的称为自然键,其它的称为自然值, 自然键和自然值组成的组合键被 SortComparator 所使用,可以简单地认为 GroupComparator 就是 GROUP BY 中所使用的字段,ORDER BY 就是 SortComparator 所使用的字段。排序应当使用整个 KEY,分组应当使用排序使用的 KEY 的子集,否则可能会发生不可预料的结果。

但这在 SQL 上好像没有对应的语句——ORDER BY 在 SQL 里是最后才调用的,而 GROUP BY 在此之前执行,如果硬要放到 SQL 的语义中的话,可以认为是在 FROM 的表里进行了排序。可见,在 SQL 上的对应其实是不太明显的。SQL 这样设计的原因我猜测是因为聚集函数不关心列的顺序。

验证

二次排序使用的场景在于,需要在同一个分组里的记录有序,且各个记录组(即这里的“有序”区分出来的组)之间有相互依赖,如果没有后者,则组合键和自然键是一致的,就没必要再自己定义一个 GroupComparator。

下面对上面的说法进行测试,我们建立一个非常简单的实体,以及规定相应的 Mapper,Reducer,Reducer 仅用于在每一组之前进行一次打印——

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class TupleWritable extends WritableComparable[TupleWritable] {
var _1 : Int = 0
var _2 : Int = 0
var _3 : Int = 0
// ...
}

class SecondarySortMapper extends Mapper[LongWritable, Text, IntTripleWritable, NullWritable] {
override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, IntTripleWritable, NullWritable]#Context): Unit = {
val Array(_1,_2,_3) = value.toString.split(" ").map(_.toInt)
val triple = new IntTripleWritable
triple._1 = _1
triple._2 = _2
triple._3 = _3
context.write(triple, NullWritable.get)
}
}

class SecondarySortReducer extends Reducer[IntTripleWritable, NullWritable, Text, NullWritable] {
override def reduce(key: IntTripleWritable, values: lang.Iterable[NullWritable], context: Reducer[IntTripleWritable, NullWritable, Text, NullWritable]#Context): Unit = {
context.write(new Text("分组,key:" + key.toString), NullWritable.get)
values.forEach(_=>context.write(new Text(key.toString), NullWritable.get))
}
}

输入数据如下,为 000 到 111 进行洗牌的结果——

1
2
3
4
5
6
7
8
1 0 1
1 1 1
0 0 0
0 0 1
0 1 0
0 1 1
1 0 0
1 1 0
  1. 排序和分组一致

按 2 个字段排序,2 个字段分组时,得到——

1
2
3
4
5
6
7
8
9
10
11
12
分组,key:0, 0, 1
0, 0, 1
0, 0, 0
分组,key:0, 1, 1
0, 1, 1
0, 1, 0
分组,key:1, 0, 0
1, 0, 0
1, 0, 1
分组,key:1, 1, 0
1, 1, 0
1, 1, 1

这和预期比较一致。

  1. 按_1 和_2 排序,按_1 分组
1
2
3
4
5
6
7
8
9
10
分组,key:0, 0, 1
0, 0, 1
0, 0, 0
0, 1, 1
0, 1, 0
分组,key:1, 0, 0
1, 0, 0
1, 0, 1
1, 1, 0
1, 1, 1

这相当于是把_2 当作了要排序的自然值,可以看到每个组的_2 都是有序的。

  1. 全排序,按_1 分组
1
2
3
4
5
6
7
8
9
10
分组,key:0, 0, 0
0, 0, 0
0, 0, 1
0, 1, 0
0, 1, 1
分组,key:1, 0, 0
1, 0, 0
1, 0, 1
1, 1, 0
1, 1, 1

这相当于是把_2,_3 当作了要排序的自然值

  1. 按_1 排序,按_1,_2 分组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
分组,key:0, 1, 1
0, 1, 1
0, 1, 0
分组,key:0, 0, 1
0, 0, 1
0, 0, 0
分组,key:1, 1, 0
1, 1, 0
分组,key:1, 0, 0
1, 0, 0
分组,key:1, 1, 1
1, 1, 1
分组,key:1, 0, 1
1, 0, 1

这是一个非法情况,相当于是在分组的时候拿到的数据不是有序的,分组的 KEY 必须为排序的 KEY 的子集!

同时,通过上面的几个测试,可以看到 GroupComparator 是不会改变 SortComparator 排序的结果的,它只负责分组,也就是说只看当前记录是否等于上一条记录,如果 SortComparator 给出了就它看来是错误(无序)的结果,则它也会返回错误的结果。

一个实际的实例

再考虑一个实际的实例,还是使用那张航班表,现在考虑获取每个月每个航班的最长的 10 次航行的时间。

根据需求,我们需要将数据按(月,航班)分组,按(月,航班,总飞行时间(降序))排序,并在每次 reduce 里取得前十条。注意这里使用的组合键,其 hashCode 仅使用 month 和 tailNum,这是为了能在分区的时候分到同一个 Reducer,这样才好分组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class CombineKey extends WritableComparable[CombineKey] {
var month : Int = 0
var tailNum : String = ""
var allDelay : Int = 0

override def write(out: DataOutput): Unit = {
out.writeInt(month)
out.writeUTF(tailNum)
out.writeInt(allDelay)
}

override def readFields(in: DataInput): Unit = {
month = in.readInt()
tailNum = in.readUTF()
allDelay = in.readInt()
}

override def compareTo(o: CombineKey): Int = {
if (month != o.month)
month.compareTo(o.month)
else if (tailNum != o.tailNum)
tailNum.compareTo(o.tailNum)
else -1 * allDelay.compareTo(o.allDelay)
}
override def toString = s"month=$month, tailNum=$tailNum, allDelay =$allDelay"

def canEqual(other: Any): Boolean = other.isInstanceOf[CombineKey]

override def equals(other: Any): Boolean = other match {
case that: CombineKey =>
(that canEqual this) &&
month == that.month &&
tailNum == that.tailNum
case _ => false
}

override def hashCode(): Int = {
val state = Seq(month, tailNum)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}

class LongestAirtimePerMonthMapper extends Mapper[LongWritable, Text, CombineKey, Text] {
override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, CombineKey, Text]#Context): Unit = Try {
if (value.toString.startsWith("Year")) return

val colGetter = AirlineCol.build(value.toString.split(","))
val combineKey = new CombineKey

combineKey.month = colGetter(AirlineCol.Month).toInt
combineKey.tailNum = colGetter(AirlineCol.TailNum)
combineKey.allDelay =
colGetter(AirlineCol.ArrDelay).toIntOption.getOrElse(0)
+ colGetter(AirlineCol.DepDelay).toIntOption.getOrElse(0)
context.write(combineKey, value)
}
}

class LongestAirtimePerMonthReducer extends Reducer[CombineKey, Text, Text, Text] {
override def reduce(key: CombineKey, values: lang.Iterable[Text], context: Reducer[CombineKey, Text, Text, Text]#Context) = {
import scala.jdk.CollectionConverters._
context.write(new Text("新分组"), new Text(""))
// 在这里使用 take 函数会得到奇怪的结果,不知道底层是啥机制,只能使用最原始的迭代器了。
var counter = 0
val iter = values.iterator()
while (counter < 10 && iter.hasNext) {
counter += 1
context.write(new Text(key.toString), iter.next())
}
}
}

class LongestAirtimePerMonthGroupComparator extends WritableComparator(classOf[CombineKey], true) {
override def compare(a: WritableComparable[_], b: WritableComparable[_]): Int = Try {
val l = a.asInstanceOf[CombineKey]
val r = b.asInstanceOf[CombineKey]

if (l.month != r.month)
l.month.compareTo(r.month)
else l.tailNum.compareTo(r.tailNum)
}.getOrElse(-1)
}

总结

二次排序看上去是相当的复杂,但其实原理就是那么简单——把要进一步排序的字段放到 KEY 里,同时确认哪些字段用于排序(一般是全部),哪些字段用于分组,BINGO

二次排序能够解决许多原本需要在 Reducer 实例中使用实例变量来维护一些状态才能解决的问题,比如前面说的求最近几次的访问时间,这就需要维护一个大顶堆。Hadoop 本就挺复杂了,再引入带副作用的 map 方法 reduce 方法……对脑袋是一个负担。

同时,二次排序必须结合对 Partitioner 的重写。在很多情况下我们需要让一系列数据在一个 reducer 中进行处理,比如 Hadoop 权威指南的那个例子,我们要求每年的最高气温,这时候我们按年,气温进行排序,按年进行分组,这时如果按照默认的 HashPartitioner(它按 KEY 的 hashCode 进行分区),则同年的数据可能分到不同的 Reducer 中,我们就会得到错误结果。

感觉分区使用的字段好像很多时候和分组使用的字段相同。

JOIN

连接操作应该是 MR 下的最后一个开发模式了,再之后就是去学习原理,输入输出等,然后是 Hive,学习 Hive 时要再次回来联想 MR 的模式。

连接操作就是 SQL 中的 JOIN,将两个数据集的记录做叉积并筛选出其中匹配的部分。

在 MR 中,连接操作的实现需要使用一些特定模式来实现,事实上,Hadoop 权威指南也承认 MR 编写连接操作比较棘手,建议使用 Pig,Hive,Spark 等抽象程度更高的框架。且根据两个数据集的大小以及分区方式的不同,具体的处理方式也会不同。比如,如果一个数据集非常大,而一个数据集非常小,则可以把较小的数据集直接维护在每一个节点的内存里并进行使用(使用名为“Side Data”的方式)。这和 SQL 简简单单一个 JOIN 一个 ON 就解决问题形成了鲜明对比,但这是数据集的巨大导致的不得已而为之。

连接操作如果在 Mapper 处进行,则称为 map 端连接,如果在 Reducer 处执行,则称为 reduce 端连接。数据的组织方式决定了究竟使用map端连接或reduce端连接,这两种连接方式都能够处理两个数据集规模均大的情况。

Hadoop权威指南没有展开讲map连接,所以我也跳过map连接了。

reduce连接

reduce连接虽然性能较差,但适用范围更广,因此更常用,先去学习它。

reduce连接的思路使用一句话来描述,就是将要连接的键放入Mapper输出的KEY之中,而要连接的两个数据集都作为Mpper输出的VALUE,这样,在同一个迭代集合里我们就能够获取到两个数据集的值。显然,我们需要妥善处理它们的顺序(为此需要使用辅助排序),如果是一对多的关系,我们可以让第一个元素作为那个“一”,从而能够在后续的折叠过程中维护该记录;如果是多对多的关系……那就得具体问题具体分析了。

进行这种操作时,reduce的逻辑可能是这样,当作伪代码看吧——

1
2
3
4
5
6
7
8
9
10
11
12
13
// 不想标注类型
def reduce = (key, values, context) => {
val iter = values.iterator()
Try(iter.next)
.map(parse)
.filter(_.type == 'Station') // 如果第一个值不是Station,则说明没有匹配的气象站,这时候的处理方式估计就对应内连接外连接了
.foreach { elem =>
while(iter.hasNext) {
// 这时候elem就是第一个元素了,现在对iter进行迭代,执行业务操作
// ...
}
}
}

从示例开始

比如,我们现在有两个表,一个是气象站表,一个是气象记录表,它们的示例如下,我们想将它们通过气象站ID进行连接——

Stations表:

StationID StationName
1199 SIHCCAJAVRI
1265 TYNSET-HANSMOEN

Records表:

StationID Timestamp Temperature
1265 194903241200 111
1265 194903241800 78
1199 195005150700 0
1199 195005151200 22
1199 195005151800 -11

连接后的表:

StationID StationName Timestamp Temperature
1199 SIHCCAJAVRI 195005150700 0
1199 SIHCCAJAVRI 195005151200 22
1199 SIHCCAJAVRI 195005151800 -11
1265 TYNSET-HANSMOEN 194903241200 111
1265 TYNSET-HANSMOEN 194903241800 78

这时候,我们就以StationID以ID,将两个表的文件直接全都读到Mapper中。

那么如何在Mapper中处理多种形式的文件呢?我们可以在同一个Mapper中处理这两种数据,但也可以使用所谓的MultipleInputs类,使用多种Mapper,从多个数据源中读取数据,但各个Mapper的输出类型需要一致,这和我们的需求是吻合的。

但是在这么个简单的例子中,我们干脆只使用一个Mapper,在逻辑中区分两个表以避免引入额外的复杂度。

首先先假设输入文件,我们有两个输入文件分别作为气象站数据和天气数据,都使用CSV格式——

1
2
3
4
5
6
7
8
9
10
-- Station.csv
1199,SIHCCAJAVRI
1265,TYNSET-HANSMOEN

-- Records.csv
1265,194903241200,111
1265,194903241800,78
1199,195005150700,0
1199,195005151200,22
1199,195005151800,-11

只需要检测其中,的数量即可判定数据究竟是气象站还是记录。

然后是Mapper的输出数据,我们输出的VALUE直接使用原数据,而KEY需要额外的设计——必须保证气象站的数据放在最前面,这时候我们就想到了我们的老朋友——二次排序,我们需要创建一个虚拟的键用来排序,而为了让气象站成为“一”,我们需要给它一个零,因为默认排序是按升序排序的,给它一个0保证它作为第一位,当然这需要其它的记录都是1。

为此,我们定义一个Pair,其中第一个字段是StationID,第二个字段是虚拟键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Scala可以自定义自己的构造器……我之前好像没学过这个
// 但是为了能够使用这鬼畜的序列化、反序列化框架,必须使用var定义,我麻了好嘛
// 想要不使用var的话就必须使用状态模式了,更麻
case class KeyPair(var stationID : String, var virtualKey : Int) extends WritableComparable[KeyPair] {
def this() = this("", -1) // 供反射使用
override def compareTo(o: KeyPair): Int = {
if (stationID != o.stationID)
stationID.compareTo(o.stationID)
else virtualKey.compareTo(o.virtualKey)
}

override def write(out: DataOutput): Unit = {
out.writeUTF(stationID)
out.writeInt(virtualKey)
}

override def readFields(in: DataInput): Unit = {
stationID = in.readUTF()
virtualKey = in.readInt()
}
}

整个Pair都将被用来排序,其中先按气象站ID进行排序,再按虚拟键进行排序,均为升序。

然后是GroupComparator,这里只按照气象站ID进行分组。如果定义Partitioner,也应只使用StationID的哈希值,这里没有定义Partitioner。

1
2
3
4
5
6
7
8
9
10
// 考虑到是使用整个KEY进行排序,这里不用自定义SortComparator,让它调用KEY里定义的compareTo方法即可
// 顺便,记得调用指定的构造器,使用WritableComparator的空参构造器一定会抛出空指针异常
class JoinClauseGroupComparator extends WritableComparator(classOf[KeyPair], true) {
override def compare(a: WritableComparable[_], b: WritableComparable[_]): Int = {
// 犯了非常愚蠢的错误……变量名写错了,debug了半个小时
val l = a.asInstanceOf[KeyPair]
val r = b.asInstanceOf[KeyPair]
l.stationID.compareTo(r.stationID)
}
}

Mapper的定义同样简单,根据当前处理的数据的类型构造KEY即可。

1
2
3
4
5
6
7
class JoinClauseMapper extends Mapper[LongWritable, Text, KeyPair, Text] {
override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, KeyPair, Text]#Context): Unit = {
val cols = value.toString.split(",")
// 如果有两列,说明是Station表,虚拟键为0,否则是记录表,虚拟键为1
context.write(KeyPair(cols(0), if (cols.length == 2) 0 else 1), value)
}
}

最麻烦的是Reducer,Reducer首先要检查第一个元素是否是气象站,如果不是说明这个气象站ID对应的气象站不存在,直接退出执行;如果的确是气象站的话,对之后的每个元素同第一个元素都进行连接操作并写出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class JoinClauseReducer extends Reducer[KeyPair, Text, Text, NullWritable] {
override def reduce(key: KeyPair, values: lang.Iterable[Text], context: Reducer[KeyPair, Text, Text, NullWritable]#Context): Unit = {
val iter = values.iterator()

// 使用Try的上下文——如果第一列不是站点(模式匹配会失败),则直接抛异常跑路
for {
firstElem <- Try(iter.next())
Array(stationId, stationName) = firstElem.toString.split(",")
} yield iter.forEachRemaining { elem =>
val Array(_, timeStamp, temperature) = elem.toString.split(",")
context.write(
new Text(
Array(stationId,
stationName,
timeStamp,
temperature).mkString(",")),
NullWritable.get)
}
}
}

可以看到,Scala处理这种业务问题相当优美,但需要明确for的使用方式以及如何还原成原生Scala代码。需要记住,for代码体是不在Try的上下文之中的,这里如果抛出异常不会被捕获,除非用yield。

使用上面的csv文件作为输入进行执行,得到下面的结果,BINGO~

1
2
3
4
5
1199,SIHCCAJAVRI,195005151800,-11
1199,SIHCCAJAVRI,195005151200,22
1199,SIHCCAJAVRI,195005150700,0
1265,TYNSET-HANSMOEN,194903241800,78
1265,TYNSET-HANSMOEN,194903241200,111

如果想要使用多个Reducer使能够并行执行,则必须自定义Partitioner。

结论

使用MR进行分组表面上看起来还是比较简单的——把要合并的两个数据集放在同一个values集合里并特别地去规划它们的顺序即可,ON操作则是交给了二次排序;可是这种假想一遇到真实的业务需求就要抓瞎了——如果连接三个表,四个表,五个表怎么办?如果不使用等于运算符进行连接怎么办?如果连接的键不是主键,从而让这里不是一对多,而是多对多的情况该怎么办?这只能具体情况具体分析了,如果要真正地让这玩意成为生产力,更抽象的技术是必须的。


本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 协议 ,转载请注明出处!