MapReduce,Hive,Spark 的两个示例——Word Count 和 JOIN
在系统学习 Hive 的 SELECT 之前,我们先来进行一些超有趣的事情——分别使用 MapReduce,Hive,Spark 来实现同一个程序,感受一下三者代码的差异,这里选择去实现一下 WordCount,以及一个表连接的示例。
Word Count
Word Count,不从它开始从谁开始呢?首先是 MapReduce,使用 Java 语言,代码是庞然大物。
MapReduce
1 |
|
可以看到,Mapper 和 Reducer 的代码是很显然的,在 Mapper 中,我们将每一行字符串按空格分割,并构造(WORD, 1)
的键值对,然后我们通过 Combiner 进行本地聚集,再发送给各个 Reducer,每个 Reducer 对每个 KEY 对应的所有值,进行 sum 操作,得到结果。
但这里却需要定义一堆东西,更别说 Driver 里的许多东西实际上都是重复的。
Spark
我们再来看看 Scala 的 Spark 的代码,Spark 实现 WordCount 其实是对我们这些开发人员最舒服的,但我还没系统学习过 Spark,所以不知道自己的描述是否正确。
1 |
|
不要看着 flatMap 就想入非非,RDD 不是 Monad!但虽然 RDD 不是 Monad,它仍旧可以使用 for,但这时它的上下文是列表的上下文——flatMap 函数的函数参数不能返回 RDD,因此我们在 for 里所能做的只有列表能做的。
另外,我不知道为何 Spark 最后得到的结果的 KEY 为何是无序的……按理说经过 shuffle,这里应当是有序的才对,我只能猜测,Spark 利用单进程的方便之处,在折叠时是并行进行的,并输出到同一个文件中,放到 MapReduce 的语境下,就是多个 Reducer 的输出文件为同一个,这样无论如何也不可能得出有序的结果。但这也是符合需求的——我确实没有指定排序。
Hive
然后是 Hive,这里展示了 Hive 从建模,读取数据到写出数据的全流程。
1 |
|
(SELECT explode(split(line, ' ')) AS word FROM docs)
需要特别解释一下,这里使用了所谓的表生成函数(UDTF) explode
,即通过一行数据生成一个表(该函数处理一个 Array,生成一个表,而 split 得到的是 Array),这里是 docs 表中每行数据按空格进行切割,并将每个结果展平,最后得到一张表,表中每一个记录都是一个单词,可以认为这是一种 flatMap,反过来说,这种在 Mapper 阶段干 flatMap 的操作,是显然需要使用 UDTF 的。
在得到这个单词表后,我们将其按照单词进行分组,并对每个分组进行 COUNT 聚集,得到结果并用该查询结果创建一张新表。通过SELECT *
可以发现其生成的结果和 MapReduce 的版本一致。
JOIN
现在考虑一个传统的案例——现在有一张部门表和一张雇员表,其中雇员属于特定部门,且有自己的工资,现在要求获取每个部门的工资最大的雇员的信息,相关表的定义如下。
1 |
|
Hive
这个需求和 SQL 非常契合,所以我们先使用 Hive 进行描述,我们显然需要一个内连接来进行此工作。(可能有更简单的方式,但我还没学到)
1 |
|
这里使用了 RANK 窗口函数,其中根据部门分区,根据工资降序,获取 rank 为 1 的就是工资最高的,这里耗时 28 秒。一个更加清晰的表述使用子查询,但 Hive 不支持子查询,所以这里使用 JOIN 来表述。
1 |
|
这个更加麻烦一些,而且性能更加差——两次 JOIN,要执行两次 MapReduce 任务才行,它耗时 64 秒(Hive 不是会进行 MapJoin 优化吗??)。下面的 MapReduce 只描述第一种。
MapReduce
关于 MapReduce 的编写,考虑到部门和雇员是一对多的关系,且部门的数量一定是较少的,我们本可以使用 map 端 join,但为了通用性,我们使用 reduce 端 join,为此,用于 join 的键必须要在键值对的键中,因此,Mapper 的输出的键中应当包含 deptno,然后在 reduce 的值的集合里,我们要让部门作为第一个元素,因此我们需要为此创建一个虚拟键。
再考虑分区,分组,排序,关于分区,肯定是按 deptno 进行分区;关于分组,我们也是按 deptno 进行分组;关于排序,为了能保证部门在最前面,我们要按 deptno 和虚拟键进行排序,这里也可以把工资也加进来进行排序,以保证第二个元素就能获取到最高的工资,但这其实并无必要。
根据上面的分析,我们规定,Mapper 的输出类型为<(deptno, virtualKey), Text>
(懒得定义一个泛类型了,机械操作太多了),按 deptno 进行分区,按 deptno 进行分组,按 deptno、virtualKey 进行排序;同时也可以发现,这里可以有一个 Combiner;Reducer 的输出类型就随意了,人能看就行。
1 |
|
MapReduce 编写这种业务代码真的是非常……非常痛苦,底层细节太多了,远不如 SQL 那样直接,更不用说有三个以上的表进行 reduce 端 join 的时候需要使用多个 MR 任务,且需要注意的细节更多,需要定义更多 Bean……想想就痛苦,和 CRUD 一样的痛苦。
Spark
然后是 Spark 版本。Spark 版本怎么写其实我还没有学过,但跟随着直觉,还是挺容易写的。
1 |
|
Spark 的代码真是优雅!但一个遗憾的地方是函数参数无法解构,使用 PartialFunction 解构,它居然说类型推断不出来,要我给出类型……i 服了 you。
总结
总结个啥,我只能说 Spark 太香了。上面的 JOIN 代码,如果在工程实践中应当定义 Bean,MapReduce 进行连接操作时应该使用 GenericWritable 作为 VALUE。实际上本还想写个分布式排序示例,但我对 Hive 和 Spark 都还没学到那里,先跳过了。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 协议 ,转载请注明出处!