MapReduce,Hive,Spark 的两个示例——Word Count 和 JOIN

在系统学习 Hive 的 SELECT 之前,我们先来进行一些超有趣的事情——分别使用 MapReduce,Hive,Spark 来实现同一个程序,感受一下三者代码的差异,这里选择去实现一下 WordCount,以及一个表连接的示例。

Word Count

Word Count,不从它开始从谁开始呢?首先是 MapReduce,使用 Java 语言,代码是庞然大物。

MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class NeoWordCount {
public static class NeoWordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private final LongWritable ONE = new LongWritable(1);
private final Text outputK = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
for (String s : value.toString().split(" ")) {
outputK.set(s);
context.write(outputK, ONE);
}
}
}
public static class NeoWordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
private final LongWritable outputV = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
outputV.set(sum);
context.write(key, outputV);
}
}

public static void main(String[] args) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(args);
Job job = Job.getInstance(parser.getConfiguration());
args = parser.getRemainingArgs();

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(LongWritable.class);

job.setMapperClass(NeoWordCountMapper.class);
job.setReducerClass(NeoWordCountReducer.class);
job.setCombinerClass(NeoWordCountReducer.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

可以看到,Mapper 和 Reducer 的代码是很显然的,在 Mapper 中,我们将每一行字符串按空格分割,并构造(WORD, 1)的键值对,然后我们通过 Combiner 进行本地聚集,再发送给各个 Reducer,每个 Reducer 对每个 KEY 对应的所有值,进行 sum 操作,得到结果。

但这里却需要定义一堆东西,更别说 Driver 里的许多东西实际上都是重复的。

Spark

我们再来看看 Scala 的 Spark 的代码,Spark 实现 WordCount 其实是对我们这些开发人员最舒服的,但我还没系统学习过 Spark,所以不知道自己的描述是否正确。

1
2
3
4
5
6
7
8
9
// 定义函数只是为了能形式化地引入外部变量
def wordCount(sc : SparkContext, inputPath : String, outputPath : String) : Unit = {
// 不使用链式调用也是为了清晰
val lines = sc.textFile(inputPath) // 读取
val words = lines.flatMap(_.split(" ")) // 把字符串按空格切割并展平
val wordPairs = words..map((_, 1)) // 对每个 word 构造键值对 (WORD, 1)
val resultPairs = wordPairs.foldByKey(0)(_ + _) // 折叠操作,该方法应当在本地也会进行聚集……应当
resultPairs.saveAsTextFile(outputPath) // 保存
}

不要看着 flatMap 就想入非非,RDD 不是 Monad!但虽然 RDD 不是 Monad,它仍旧可以使用 for,但这时它的上下文是列表的上下文——flatMap 函数的函数参数不能返回 RDD,因此我们在 for 里所能做的只有列表能做的。

另外,我不知道为何 Spark 最后得到的结果的 KEY 为何是无序的……按理说经过 shuffle,这里应当是有序的才对,我只能猜测,Spark 利用单进程的方便之处,在折叠时是并行进行的,并输出到同一个文件中,放到 MapReduce 的语境下,就是多个 Reducer 的输出文件为同一个,这样无论如何也不可能得出有序的结果。但这也是符合需求的——我确实没有指定排序。

Hive

然后是 Hive,这里展示了 Hive 从建模,读取数据到写出数据的全流程。

1
2
3
4
5
6
7
8
9
// 按默认设置建表,默认设置好像是按、001 分割字段,按、n 分割记录,所以满足需求
// 这时候每一行都会被当作一个记录看待
CREATE TABLE docs(line STRING);
LOAD DATA INPATH '/hdfs/path/to/the/document' OVERWRITE INTO TABLE docs; -- 将文件读入(覆盖)到表中

CREATE TABLE word_counts AS
SELECT word, COUNT(*) FROM
(SELECT explode(split(line, ' ')) AS word FROM docs) AS w
GROUP BY word;

(SELECT explode(split(line, ' ')) AS word FROM docs)需要特别解释一下,这里使用了所谓的表生成函数(UDTF) explode,即通过一行数据生成一个表(该函数处理一个 Array,生成一个表,而 split 得到的是 Array),这里是 docs 表中每行数据按空格进行切割,并将每个结果展平,最后得到一张表,表中每一个记录都是一个单词,可以认为这是一种 flatMap,反过来说,这种在 Mapper 阶段干 flatMap 的操作,是显然需要使用 UDTF 的。

在得到这个单词表后,我们将其按照单词进行分组,并对每个分组进行 COUNT 聚集,得到结果并用该查询结果创建一张新表。通过SELECT *可以发现其生成的结果和 MapReduce 的版本一致。

JOIN

现在考虑一个传统的案例——现在有一张部门表和一张雇员表,其中雇员属于特定部门,且有自己的工资,现在要求获取每个部门的工资最大的雇员的信息,相关表的定义如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
---- dept.sql ----

CREATE TABLE IF NOT EXISTS dept (
deptno INT,
dname STRING,
loc INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

---- dept.txt ----

10,ACCOUNTING,1700
20,RESEARCH,1800
30,SALES,1900
40,OPERATIONS,1700

---- emp.sql ----

CREATE TABLE IF NOT EXISTS emp (
empno INT,
ename STRING,
job STRING,
mgr INT, -- 上级
hiredate STRING, -- 入职时间
sal DOUBLE, -- 薪水
comm DOUBLE, -- 奖金
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

---- emp.txt ----

7369,SMITH,CLERK,7902,1980-12-17,800.00,,20
7499,ALLEN,SALESMAN,7698,1981-2-20,1600.00,300.00,30
7521,WARD,SALESMAN,7698,1981-2-22,1250.00,500.00,30
7566,JONES,MANAGER,7839,1981-4-2,2975.00,,20
7654,MARTIN,SALESMAN,7698,1981-9-28,1250.00,1400.00,30
7698,BLAKE,MANAGER,7839,1981-5-1,2850,,30
7782,CLARK,MANAGER,7839,1981-6-9,2450.00,,10
7788,SCOTT,ANALYST,7566,1987-4-19,3000.00,,20
7839,KING,PRESIDENT,,1981-11-17,5000.00,,10
7844,TURNER,SALESMAN,7698,1981-9-8,1500.00,0.00,30
7876,ADAMS,CLERK,7788,1987-5-23,1100.00,,20
7900,JAMES,CLERK,7698,1981-12-3,950.00,,30
7902,FORD,ANALYST,7566,1981-12-3,3000.00,,20
7934,MILLER,CLERK,7782,1982-1-23,1300.00,,10

Hive

这个需求和 SQL 非常契合,所以我们先使用 Hive 进行描述,我们显然需要一个内连接来进行此工作。(可能有更简单的方式,但我还没学到)

1
2
3
4
5
6
7
8
SELECT department,employee,salary FROM
(SELECT
dept.dname AS department,
emp.ename AS employee,
emp.sal AS salary,
RANK() OVER (PARTITION BY dept.deptno ORDER BY emp.sal DESC) as rk
FROM dept JOIN emp ON dept.deptno = emp.deptno) AS t
WHERE rk = 1;

这里使用了 RANK 窗口函数,其中根据部门分区,根据工资降序,获取 rank 为 1 的就是工资最高的,这里耗时 28 秒。一个更加清晰的表述使用子查询,但 Hive 不支持子查询,所以这里使用 JOIN 来表述。

1
2
3
4
SELECT dept.dname, emp.ename, emp.sal FROM
dept JOIN emp ON dept.deptno = emp.deptno
JOIN (SELECT deptno, MAX(sal) AS max_sal FROM emp GROUP BY deptno) AS max_t
ON max_t.deptno = emp.deptno AND max_t.max_sal = emp.sal;

这个更加麻烦一些,而且性能更加差——两次 JOIN,要执行两次 MapReduce 任务才行,它耗时 64 秒(Hive 不是会进行 MapJoin 优化吗??)。下面的 MapReduce 只描述第一种。

MapReduce

关于 MapReduce 的编写,考虑到部门和雇员是一对多的关系,且部门的数量一定是较少的,我们本可以使用 map 端 join,但为了通用性,我们使用 reduce 端 join,为此,用于 join 的键必须要在键值对的键中,因此,Mapper 的输出的键中应当包含 deptno,然后在 reduce 的值的集合里,我们要让部门作为第一个元素,因此我们需要为此创建一个虚拟键。

再考虑分区,分组,排序,关于分区,肯定是按 deptno 进行分区;关于分组,我们也是按 deptno 进行分组;关于排序,为了能保证部门在最前面,我们要按 deptno 和虚拟键进行排序,这里也可以把工资也加进来进行排序,以保证第二个元素就能获取到最高的工资,但这其实并无必要。

根据上面的分析,我们规定,Mapper 的输出类型为<(deptno, virtualKey), Text>(懒得定义一个泛类型了,机械操作太多了),按 deptno 进行分区,按 deptno 进行分组,按 deptno、virtualKey 进行排序;同时也可以发现,这里可以有一个 Combiner;Reducer 的输出类型就随意了,人能看就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// 注意这个需要两个输入,第一个输入是 dept,第二个输入是 emp

// Mapper 输出的 key
public static class CombineKey implements WritableComparable<CombineKey> {
private Integer deptno;
private Integer virtualKey;
@Override
public int compareTo(CombineKey o) {
if (!deptno.equals(o.deptno))
return deptno.compareTo(o.deptno);
return virtualKey.compareTo(o.virtualKey);
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(deptno);
out.writeInt(virtualKey);
}

@Override
public void readFields(DataInput in) throws IOException {
deptno = in.readInt();
virtualKey = in.readInt();
}
}

public static class DeptMapper extends Mapper<LongWritable, Text, CombineKey, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] cols = value.toString().split(",");
int deptno = Integer.parseInt(cols[0]);
CombineKey outputK = new CombineKey();
outputK.deptno = deptno;
outputK.virtualKey = 0;
context.write(outputK, value);
}
}

public static class EmpMapper extends Mapper<LongWritable, Text, CombineKey, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] cols = value.toString().split(",");
int deptno = Integer.parseInt(cols[7]);
CombineKey outputK = new CombineKey();
outputK.deptno = deptno;
outputK.virtualKey = 1;
context.write(outputK, value);
}
}

public static class DeptnoPartitioner extends Partitioner<CombineKey, Text> {
@Override
public int getPartition(CombineKey combineKey, Text text, int numPartitions) {
return (combineKey.deptno.hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}

public static class DeptnoGroupComparator extends WritableComparator {
// 永远记着定义 Comparator 的时候别把这茬忘掉了……
public DeptnoGroupComparator() {
super(CombineKey.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
return ((CombineKey) a).deptno.compareTo(((CombineKey) b).deptno);
}
}

// Combiner 的职责是在每个 Mapper 本地聚集
// 对于 DeptMapper,没有聚集的必要,对于 EmpMapper,可以直接找到该 Mapper 该部门最大工资的雇员,只写它就行
public static class SomeCombiner extends Reducer<CombineKey, Text, CombineKey, Text> {
@Override
protected void reduce(CombineKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
if (key.virtualKey.equals(0)) {
for (Text value : values) {
context.write(key, value);
}
return;
}

double maxSal = -1;
String maxEmp = null;

for (Text value : values) {
String emp = value.toString();
double sal = Double.parseDouble(emp.split(",")[5]);
if (sal > maxSal) {
maxSal = sal;
maxEmp = emp;
}
}
context.write(key, new Text(maxEmp));
}
}

public static class SomeReducer extends Reducer<CombineKey, Text, NullWritable, Text> {
@Override
protected void reduce(CombineKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 虚拟键不可能是 1,如果是 1,则说明这些雇员根本没有对应的部门
// 考虑当前连接是内连接,这时直接返回即可

if (key.virtualKey.equals(1)) {
return;
}

String[] deptCols = values.iterator().next().toString().split(",");

double maxSal = -1;
String maxEmp = null;
for (Text value : values) {
String emp = value.toString();
double sal = Double.parseDouble(emp.split(",")[5]);
if (sal > maxSal) {
maxSal = sal;
maxEmp = emp;
}
}

// 部门没有任何雇员,这里是内连接,也是直接返回
if (maxEmp == null) {
return;
}

String[] empCols = maxEmp.split(",");

String result = new StringJoiner(",")
.add(deptCols[1])
.add(empCols[1])
.add(empCols[5])
.toString();

context.write(NullWritable.get(), new Text(result));
}
}

MapReduce 编写这种业务代码真的是非常……非常痛苦,底层细节太多了,远不如 SQL 那样直接,更不用说有三个以上的表进行 reduce 端 join 的时候需要使用多个 MR 任务,且需要注意的细节更多,需要定义更多 Bean……想想就痛苦,和 CRUD 一样的痛苦。

Spark

然后是 Spark 版本。Spark 版本怎么写其实我还没有学过,但跟随着直觉,还是挺容易写的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def someComplexJob(sc : SparkContext, deptInputPath : String, empInputPath : String, outputPath : String) : Unit = {
val depts = sc.textFile(deptInputPath)
.map(_.split(","))
.map{cols => (cols(0), cols)}
val emps = sc.textFile(empInputPath)
.map(_.split(","))
.map{cols => (cols(7), cols)}

// 值得注意的是,Spark 进行 join 操作不需要
// 像 MapReduce 那样在 KEY 里给定一个虚拟键,
// 但相信 Spark 的实现也会采取类似的手段
val deptEmps = depts.join(emps)

// 做一个 max 操作
// 需注意,reduceByKey 中的参数拿不到 KEY,这点很好
// 它怎么推断不出来……
deptEmps.reduceByKey{ (a, b) =>
val (pairA@(_, empA), pairB@(_, empB)) = (a, b)
if (empA(5).toDouble > empB(5).toDouble)
pairA
else pairB
}.map{case (_, (dept, emp)) =>
Array(dept(1),emp(1),emp(5)).mkString(",")
}.saveAsTextFile(outputPath)
}

Spark 的代码真是优雅!但一个遗憾的地方是函数参数无法解构,使用 PartialFunction 解构,它居然说类型推断不出来,要我给出类型……i 服了 you。

总结

总结个啥,我只能说 Spark 太香了。上面的 JOIN 代码,如果在工程实践中应当定义 Bean,MapReduce 进行连接操作时应该使用 GenericWritable 作为 VALUE。实际上本还想写个分布式排序示例,但我对 Hive 和 Spark 都还没学到那里,先跳过了。