ORDER BY(全局排序) 排序是一个常见的业务需求。我们不止可以用 MR 提供的原生类型进行排序,还能够用自定义的类型进行排序。在 SQL 中,我们使用 ORDER BY 子句进行排序,ORDER BY 子句可以接受一个或多个字段,也可以接受表达式。
MR 支持在两个地方进行排序——首先是 Reducer 接受的各 Mapper 发送的记录会按 KEY 进行归并排序;然后是对每个 KEY,允许在 KEY 对应的 VALUE 集合中进行排序,这个排序的进行是不太明显的。
考虑这样的需求——我们之前的航空公司数据集是按照月份和星期来排序的,现在突然需要将输出的数据按月份升序之后按星期降序去排序。我们要保证所有数据整体有序,且性能足够好 。
之前学习过 MR 的排序机制,我们知道,如果想要让所有数据全排序并存放在同一个文件里,则只能使用一个 Reducer。否则我们必须定制化 Partitioner 和 KEY,以保证不同结果文件之间有序。
研究一下我们的业务,这里需要按月份去升序,按星期去降序,因此总共的索引数有 12 x 7 = 84 个,它们顺序如下——
1_7
1_6
…
2_7
…
12_2
12_1
显然,Partitioner 需要保证所有相邻的索引必须分给同一个 Reducer 。
首先,我们需要定义 KEY 类型(即 MonthDayWeek)以及 VALUE 类型(所需的结果),KEY 类型需要实现 WritableComparable 接口,VALUE 类型需要实现 Writable。
不一定必须要 WritableComparable,也可以从外部指定另外的 Comparator,这个 Comparator 称为 SortComparator。
类型的代码见此,非常繁琐,信息量很小。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 package mapreduceimport mapreduce.DistributedSortJob .argsimport org.apache.hadoop.conf.{Configurable , Configuration }import org.apache.hadoop.fs.Path import org.apache.hadoop.ioimport org.apache.hadoop.io.{IntWritable , LongWritable , NullWritable , Text , Writable , WritableComparable }import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat , TextInputFormat }import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat , TextOutputFormat }import org.apache.hadoop.mapreduce.{Job , Mapper , Partitioner , Reducer }import org.apache.hadoop.util.GenericOptionsParser import java.io.{DataInput , DataOutput }import java.langclass MonthDayWeek extends WritableComparable [MonthDayWeek ] { var month : IntWritable = new IntWritable (0 ) var dayOfWeek : IntWritable = new IntWritable (0 ) override def write (out: DataOutput ): Unit = { month.write(out) dayOfWeek.write(out) } override def readFields (in: DataInput ): Unit = { month.readFields(in) dayOfWeek.readFields(in) } override def compareTo (o: MonthDayWeek ): Int = { if (this .month.get == o.month.get) -1 * dayOfWeek.compareTo(o.dayOfWeek) else this .month.compareTo(o.month) } def canEqual (other: Any ): Boolean = other.isInstanceOf[MonthDayWeek ] override def equals (other: Any ): Boolean = other match { case that: MonthDayWeek => (that canEqual this ) && month == that.month && dayOfWeek == that.dayOfWeek case _ => false } override def hashCode (): Int = { val state = Seq (month, dayOfWeek) state.map(_.hashCode()).foldLeft(0 )((a, b) => 31 * a + b) } }class DelaysWritable extends Writable { var year = new IntWritable (0 ) var month = new IntWritable (0 ) var date = new IntWritable (0 ) var dayOfWeek = new IntWritable (0 ) var arrDelay = new IntWritable (0 ) var depDelay = new IntWritable (0 ) var originAirportCode = new Text () var destAirportCode = new Text () var carrierCode = new Text () override def write (out: DataOutput ): Unit = { year.write(out) month.write(out) date.write(out) dayOfWeek.write(out) arrDelay.write(out) depDelay.write(out) originAirportCode.write(out) destAirportCode.write(out) carrierCode.write(out) } override def readFields (in: DataInput ): Unit = { year.readFields(in) month.readFields(in) date.readFields(in) dayOfWeek.readFields(in) arrDelay.readFields(in) depDelay.readFields(in) originAirportCode.readFields(in) destAirportCode.readFields(in) carrierCode.readFields(in) } override def toString = s"$year , $month , $date , $dayOfWeek , $arrDelay , $depDelay , $originAirportCode , $destAirportCode , $carrierCode " }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class MonthDayWeekSortMapper extends Mapper [LongWritable , Text , MonthDayWeek , DelaysWritable ] { val outputK = new MonthDayWeek val outputV = new DelaysWritable override def map (key: LongWritable , value: Text , context: Mapper [LongWritable , Text , MonthDayWeek , DelaysWritable ]#Context ): Unit = { if (value.toString.startsWith("Year" )) return import AirlineCol ._ val colGetter = AirlineCol .build(value.toString.split("," )) outputK.month = new IntWritable (colGetter(Month ).toInt) outputK.dayOfWeek = new IntWritable (colGetter(DayOfWeek ).toInt) outputV.year = new IntWritable (colGetter(Year ).toInt) outputV.month = new IntWritable (colGetter(Month ).toInt) outputV.dayOfWeek = new IntWritable (colGetter(DayOfWeek ).toInt) outputV.date = new IntWritable (colGetter(DayofMonth ).toInt) outputV.arrDelay = new IntWritable (colGetter(ArrDelay ).toInt) outputV.depDelay = new IntWritable (colGetter(DepDelay ).toInt) outputV.destAirportCode = new Text (colGetter(Dest )) outputV.originAirportCode = new Text (colGetter(Origin )) outputV.carrierCode = new Text (colGetter(UniqueCarrier )) context.write(outputK, outputV) } }class MonthDayWeekSortReducer extends Reducer [MonthDayWeek , DelaysWritable , DelaysWritable , NullWritable ] { override def reduce (key: MonthDayWeek , values: lang.Iterable [DelaysWritable ], context: Reducer [MonthDayWeek , DelaysWritable , DelaysWritable , NullWritable ]#Context ): Unit = { values.forEach(context.write(_, NullWritable .get)) } }class MonthDayWeekSortJobPartitioner extends Partitioner [MonthDayWeek , DelaysWritable ] with Configurable { var indexRange : Int = 84 / 7 ; var config : Configuration = new Configuration (); override def setConf (conf: Configuration ): Unit = { config = conf } override def getConf : Configuration = config override def getPartition (key: MonthDayWeek , value: DelaysWritable , numPartitions: Int ): Int = { val index = ((key.month.get - 1 ) * 7 + (key.dayOfWeek.get - 1 )) / config.getInt("key.range" , 84 ) if (index < numPartitions) index else numPartitions - 1 } }
这里有两个参数需要配置——mapreduce.job.reduces
,标识 Reducer 的数量,key.range
,标识每个 Reducer 处理 key 的数量,默认为 84,即所有归一个 Reducer 来处理。将 Reducer 数量设置成 12,key 范围设置成 7,则得到每个文件保存一个月数据的结果。
可以看到,按 12,7 来设置时,part-r-00000
的头部和part-r-00011
的尾部是符合需求的——
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 $ head part-r-00000 2007, 1, 28, 7, 24, -3 , PHL, PBI, US 2007, 1, 28, 7, 6, 11, DFW, BNA, AA 2007, 1, 7, 7, 2, -1 , ONT, PHX, US 2007, 1, 7, 7, -12 , 1, CLE, PHX, CO 2007, 1, 7, 7, 30, 0, ONT, LAS, US 2007, 1, 7, 7, -17 , -1 , OMA, PHX, US 2007, 1, 28, 7, -4 , -15 , MOT, MSP, NW 2007, 1, 7, 7, -23 , 6, OMA, PHX, US 2007, 1, 7, 7, -20 , 13, OMA, PHX, US 2007, 1, 7, 7, -8 , -9 , RDU, EWR, MQ $ tail part-r-00011 2007, 12, 31, 1, -2 , 2, XNA, DFW, MQ 2007, 12, 24, 1, -12 , -1 , ATL, TYS, DL 2007, 12, 3, 1, 50, 39, DFW, XNA, MQ 2007, 12, 17, 1, -1 , 1, MDW, STL, WN 2007, 12, 24, 1, -18 , -3 , LGA, MCO, DL 2007, 12, 24, 1, 31, 54, MCO, LGA, DL 2007, 12, 10, 1, 6, 0, DFW, XNA, MQ 2007, 12, 10, 1, 66, 45, DFW, HSV, AA 2007, 12, 24, 1, -22 , -12 , RSW, BOS, DL 2007, 12, 24, 1, 1, -3 , BOS, RSW, DL
至于如何进行进一步的优化呢?一个通常的操作是对输入数据进行采样,获取键的分布情况,并根据该情况特定地去规划分区并分发给相应 Reducer,保证每个 Reducer 的负载均衡。
可以料想,这种模式(ORDER BY)的实现是绝对能进行抽象的,
二次排序
二次排序其实就是使用组合键进行排序,这引入了所谓的 GroupComparator 和 SortComparator。
MR 默认的行为会按 KEY 进行排序,但是同一个 KEY 内的记录之间的顺序仍旧是无法保证的,这些记录来自各个 Mapper,因此即使原数据有序,被 Mapper 处理后的数据仍有序,但在这里将仍旧是无序的。
但 MR 同样也允许对 KEY 内的集合进行排序,这就是所谓的二次排序或辅助排序,二次排序是一个模式而非角色 ,它并非是直接对 VALUE 集合进行排序,而是将想再次排序的字段放在 KEY 里,使第一次排序时直接排出需要的顺序 ,因此二次排序这个名字实际上只描述了现象,没有点明实质。二次排序的一个难以理解的地方在于,在 Reducer 阶段,对 KEY 进行归并(即合并各个 Partition)和对 KEY 进行分组所使用的 Comparator 可以不是同一个 Comparator 。
从一个示例看起,现在假设有一张网站浏览记录表,有如下字段——
website IP country click_time --------------------------------------------------------- hoogle.com 123.123.123.123 UK 2022-02-02 12:23:34 hoogle.com 123.123.84.94 CA 2022-02-01 12:23:34 .....
现在我们想要获取每个网站的每个国家的最近 5 次的访问时间 。显然,我们可以以网站,国家作为 KEY 进行分组,然后维护一个大小为 5 的大顶堆,遍历整个集合得到结果,在 cleanup 方法中将结果写出。
但是这显然有一定的限制——要是我们要求获取最近 500 亿次的访问时间呢?这时,我们就必须考虑一些别的方案,比如,我们可以先把整个表按网站,国家,时间进行排序,再按照网站,国家进行分组 (意识到这顺序和 SQL 中的不对应)即可,在 reduce 中只需要维护一个计数器。
但是我们之前所学的东西并不支持这种操作——在我们眼里,KEY 用于排序和分组时,使用的比较器是一样的,就是我们在 KEY 类型中实现的WritableComparable
接口。因此,这里需要有一点新东西,被引入的就是所谓的SortComparator
和GroupComparator
。
SortComparator 用于对 Partitioner 的分区结果进行排序和 Reducer 归并 Mapper 结果时,默认的 SortComparator 就是 KEY 实现的 WritableComparable 接口;GroupComparator 用于 Reducer 进行分组并调用 reduce 方法时。GroupComparator 其实只用来比较是否相等——如果返回 0,则相等,传给用户的 reduce 方法;如果不返回 0,则终止当前的 reduce 方法,另起一个。
在这里,有三个组件需要自定义——首先是 Partitioner,要保证所有网站被分在一个文件里(不然这排序就没必要了);然后是 SortComparator 和 GroupComparator,代码见下——
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class WebsiteKey extends WritableComparable [WebsiteKey ] { var website : String = "" var country : String = "" var click_time : String = "" override def write (out: DataOutput ): Unit = { } override def readFields (in: DataInput ): Unit = { } override def compareTo (o: WebsiteKey ): Int = { if (website != o.website) website.compareTo(o.website) else if (country != o.country) country.compareTo(o.country) else -1 * click_time.compareTo(o.click_time) } }class WebsitePartitioner extends Partitioner [WebsiteKey , Any ] { override def getPartition (key: WebsiteKey , value: Any , numPartitions: Int ): Int = ??? }class WebsiteKeyGroupComparator extends WritableComparator { override def compare (a: WritableComparable [_], b: WritableComparable [_]): Int = { try { val l : WebsiteKey = a.asInstanceOf[WebsiteKey ] val r : WebsiteKey = b.asInstanceOf[WebsiteKey ] if (l.website != r.website) l.website .compareTo(r.website) else l.country.compareTo(r.country) } catch {case _: Throwable => -1 } } }
Mapper 和 Reducer 不表,Mapper 除了抽取出 KEY 什么都不做,Reducer 直接拿到集合的前五个元素。
这里的组合键中有三个字段,其中 GroupComparator 中所使用的称为自然键 ,其它的称为自然值 , 自然键和自然值组成的组合键被 SortComparator 所使用,可以简单地认为 GroupComparator 就是 GROUP BY 中所使用的字段,ORDER BY 就是 SortComparator 所使用的字段。排序应当使用整个 KEY,分组应当使用排序使用的 KEY 的子集,否则可能会发生不可预料的结果。
但这在 SQL 上好像没有对应的语句——ORDER BY 在 SQL 里是最后才调用的,而 GROUP BY 在此之前执行,如果硬要放到 SQL 的语义中的话,可以认为是在 FROM 的表里进行了排序。可见,在 SQL 上的对应其实是不太明显的。SQL 这样设计的原因我猜测是因为聚集函数不关心列的顺序。
验证 二次排序使用的场景在于,需要在同一个分组里的记录有序,且各个记录组(即这里的“有序”区分出来的组)之间有相互依赖 ,如果没有后者,则组合键和自然键是一致的,就没必要再自己定义一个 GroupComparator。
下面对上面的说法进行测试,我们建立一个非常简单的实体,以及规定相应的 Mapper,Reducer,Reducer 仅用于在每一组之前进行一次打印——
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class TupleWritable extends WritableComparable [TupleWritable ] { var _1 : Int = 0 var _2 : Int = 0 var _3 : Int = 0 }class SecondarySortMapper extends Mapper [LongWritable , Text , IntTripleWritable , NullWritable ] { override def map (key: LongWritable , value: Text , context: Mapper [LongWritable , Text , IntTripleWritable , NullWritable ]#Context ): Unit = { val Array (_1,_2,_3) = value.toString.split(" " ).map(_.toInt) val triple = new IntTripleWritable triple._1 = _1 triple._2 = _2 triple._3 = _3 context.write(triple, NullWritable .get) } }class SecondarySortReducer extends Reducer [IntTripleWritable , NullWritable , Text , NullWritable ] { override def reduce (key: IntTripleWritable , values: lang.Iterable [NullWritable ], context: Reducer [IntTripleWritable , NullWritable , Text , NullWritable ]#Context ): Unit = { context.write(new Text ("分组,key:" + key.toString), NullWritable .get) values.forEach(_=>context.write(new Text (key.toString), NullWritable .get)) } }
输入数据如下,为 000 到 111 进行洗牌的结果——
1 0 1 1 1 1 0 0 0 0 0 1 0 1 0 0 1 1 1 0 0 1 1 0
排序和分组一致
按 2 个字段排序,2 个字段分组时,得到——
分组,key :0 , 0 , 1 0 , 0 , 1 0 , 0 , 0 分组,key :0 , 1 , 1 0 , 1 , 1 0 , 1 , 0 分组,key :1 , 0 , 0 1 , 0 , 0 1 , 0 , 1 分组,key :1 , 1 , 0 1 , 1 , 0 1 , 1 , 1
这和预期比较一致。
按_1 和_2 排序,按_1 分组
分组,key :0 , 0 , 1 0 , 0 , 1 0 , 0 , 0 0 , 1 , 1 0 , 1 , 0 分组,key :1 , 0 , 0 1 , 0 , 0 1 , 0 , 1 1 , 1 , 0 1 , 1 , 1
这相当于是把_2 当作了要排序的自然值 ,可以看到每个组的_2 都是有序的。
全排序,按_1 分组
分组,key :0 , 0 , 0 0 , 0 , 0 0 , 0 , 1 0 , 1 , 0 0 , 1 , 1 分组,key :1 , 0 , 0 1 , 0 , 0 1 , 0 , 1 1 , 1 , 0 1 , 1 , 1
这相当于是把_2,_3 当作了要排序的自然值 。
按_1 排序,按_1,_2 分组
分组,key :0 , 1 , 1 0 , 1 , 1 0 , 1 , 0 分组,key :0 , 0 , 1 0 , 0 , 1 0 , 0 , 0 分组,key :1 , 1 , 0 1 , 1 , 0 分组,key :1 , 0 , 0 1 , 0 , 0 分组,key :1 , 1 , 1 1 , 1 , 1 分组,key :1 , 0 , 1 1 , 0 , 1
这是一个非法情况,相当于是在分组的时候拿到的数据不是有序的,分组的 KEY 必须为排序的 KEY 的子集!
同时,通过上面的几个测试,可以看到 GroupComparator 是不会改变 SortComparator 排序的结果的,它只负责分组,也就是说只看当前记录是否等于上一条记录 ,如果 SortComparator 给出了就它看来是错误(无序)的结果,则它也会返回错误的结果。
一个实际的实例 再考虑一个实际的实例,还是使用那张航班表,现在考虑获取每个月每个航班的最长的 10 次航行的时间。
根据需求,我们需要将数据按(月,航班)分组,按(月,航班,总飞行时间(降序))排序,并在每次 reduce 里取得前十条。注意这里使用的组合键,其 hashCode 仅使用 month 和 tailNum,这是为了能在分区的时候分到同一个 Reducer,这样才好分组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 class CombineKey extends WritableComparable [CombineKey ] { var month : Int = 0 var tailNum : String = "" var allDelay : Int = 0 override def write (out: DataOutput ): Unit = { out.writeInt(month) out.writeUTF(tailNum) out.writeInt(allDelay) } override def readFields (in: DataInput ): Unit = { month = in.readInt() tailNum = in.readUTF() allDelay = in.readInt() } override def compareTo (o: CombineKey ): Int = { if (month != o.month) month.compareTo(o.month) else if (tailNum != o.tailNum) tailNum.compareTo(o.tailNum) else -1 * allDelay.compareTo(o.allDelay) } override def toString = s"month=$month , tailNum=$tailNum , allDelay =$allDelay " def canEqual (other: Any ): Boolean = other.isInstanceOf[CombineKey ] override def equals (other: Any ): Boolean = other match { case that: CombineKey => (that canEqual this ) && month == that.month && tailNum == that.tailNum case _ => false } override def hashCode (): Int = { val state = Seq (month, tailNum) state.map(_.hashCode()).foldLeft(0 )((a, b) => 31 * a + b) } }class LongestAirtimePerMonthMapper extends Mapper [LongWritable , Text , CombineKey , Text ] { override def map (key: LongWritable , value: Text , context: Mapper [LongWritable , Text , CombineKey , Text ]#Context ): Unit = Try { if (value.toString.startsWith("Year" )) return val colGetter = AirlineCol .build(value.toString.split("," )) val combineKey = new CombineKey combineKey.month = colGetter(AirlineCol .Month ).toInt combineKey.tailNum = colGetter(AirlineCol .TailNum ) combineKey.allDelay = colGetter(AirlineCol .ArrDelay ).toIntOption.getOrElse(0 ) + colGetter(AirlineCol .DepDelay ).toIntOption.getOrElse(0 ) context.write(combineKey, value) } }class LongestAirtimePerMonthReducer extends Reducer [CombineKey , Text , Text , Text ] { override def reduce (key: CombineKey , values: lang.Iterable [Text ], context: Reducer [CombineKey , Text , Text , Text ]#Context ) = { import scala.jdk.CollectionConverters ._ context.write(new Text ("新分组" ), new Text ("" )) var counter = 0 val iter = values.iterator() while (counter < 10 && iter.hasNext) { counter += 1 context.write(new Text (key.toString), iter.next()) } } }class LongestAirtimePerMonthGroupComparator extends WritableComparator (classOf[CombineKey ], true ) { override def compare (a: WritableComparable [_], b: WritableComparable [_]): Int = Try { val l = a.asInstanceOf[CombineKey ] val r = b.asInstanceOf[CombineKey ] if (l.month != r.month) l.month.compareTo(r.month) else l.tailNum.compareTo(r.tailNum) }.getOrElse(-1 ) }
总结 二次排序看上去是相当的复杂,但其实原理就是那么简单——把要进一步排序的字段放到 KEY 里,同时确认哪些字段用于排序(一般是全部),哪些字段用于分组,BINGO 。
二次排序能够解决许多原本需要在 Reducer 实例中使用实例变量来维护一些状态才能解决的问题 ,比如前面说的求最近几次的访问时间,这就需要维护一个大顶堆。Hadoop 本就挺复杂了,再引入带副作用的 map 方法 reduce 方法……对脑袋是一个负担。
同时,二次排序必须结合对 Partitioner 的重写 。在很多情况下我们需要让一系列数据在一个 reducer 中进行处理,比如 Hadoop 权威指南的那个例子,我们要求每年的最高气温,这时候我们按年,气温进行排序,按年进行分组,这时如果按照默认的 HashPartitioner(它按 KEY 的 hashCode 进行分区),则同年的数据可能分到不同的 Reducer 中,我们就会得到错误结果。
感觉分区使用的字段好像很多时候和分组使用的字段相同。
JOIN 连接操作应该是 MR 下的最后一个开发模式了,再之后就是去学习原理,输入输出等,然后是 Hive,学习 Hive 时要再次回来联想 MR 的模式。
连接操作就是 SQL 中的 JOIN,将两个数据集的记录做叉积并筛选出其中匹配的部分。
在 MR 中,连接操作的实现需要使用一些特定模式来实现,事实上,Hadoop 权威指南也承认 MR 编写连接操作比较棘手,建议使用 Pig,Hive,Spark 等抽象程度更高的框架。且根据两个数据集的大小以及分区方式的不同,具体的处理方式也会不同。比如,如果一个数据集非常大,而一个数据集非常小,则可以把较小的数据集直接维护在每一个节点的内存里并进行使用(使用名为“Side Data”的方式)。这和 SQL 简简单单一个 JOIN 一个 ON 就解决问题形成了鲜明对比,但这是数据集的巨大导致的不得已而为之。
连接操作如果在 Mapper 处进行,则称为 map 端连接,如果在 Reducer 处执行,则称为 reduce 端连接。数据的组织方式决定了究竟使用map端连接或reduce端连接,这两种连接方式都能够处理两个数据集规模均大的情况。
Hadoop权威指南没有展开讲map连接,所以我也跳过map连接了。
reduce连接 reduce连接虽然性能较差,但适用范围更广,因此更常用,先去学习它。
reduce连接的思路使用一句话来描述,就是将要连接的键放入Mapper输出的KEY之中,而要连接的两个数据集都作为Mpper输出的VALUE ,这样,在同一个迭代集合里我们就能够获取到两个数据集的值。显然,我们需要妥善处理它们的顺序(为此需要使用辅助排序),如果是一对多的关系,我们可以让第一个元素作为那个“一”,从而能够在后续的折叠过程中维护该记录;如果是多对多的关系……那就得具体问题具体分析了。
进行这种操作时,reduce的逻辑可能是这样,当作伪代码看吧——
def reduce = (key, values, context) => { val iter = values.iterator() Try (iter.next) .map(parse) .filter(_.type == 'Station ') .foreach { elem => while (iter.hasNext) { } } }
从示例开始 比如,我们现在有两个表,一个是气象站表,一个是气象记录表,它们的示例如下,我们想将它们通过气象站ID进行连接——
Stations表:
StationID
StationName
1199
SIHCCAJAVRI
1265
TYNSET-HANSMOEN
Records表:
StationID
Timestamp
Temperature
1265
194903241200
111
1265
194903241800
78
1199
195005150700
0
1199
195005151200
22
1199
195005151800
-11
连接后的表:
StationID
StationName
Timestamp
Temperature
1199
SIHCCAJAVRI
195005150700
0
1199
SIHCCAJAVRI
195005151200
22
1199
SIHCCAJAVRI
195005151800
-11
1265
TYNSET-HANSMOEN
194903241200
111
1265
TYNSET-HANSMOEN
194903241800
78
这时候,我们就以StationID以ID,将两个表的文件直接全都读到Mapper中。
那么如何在Mapper中处理多种形式的文件呢?我们可以在同一个Mapper中处理这两种数据,但也可以使用所谓的MultipleInputs
类,使用多种Mapper,从多个数据源中读取数据,但各个Mapper的输出类型需要一致,这和我们的需求是吻合的。
但是在这么个简单的例子中,我们干脆只使用一个Mapper,在逻辑中区分两个表以避免引入额外的复杂度。
首先先假设输入文件,我们有两个输入文件分别作为气象站数据和天气数据,都使用CSV格式——
-- Station.csv 1199 ,SIHCCAJAVRI 1265 ,TYNSET-HANSMOEN -- Records.csv 1265 ,194903241200,111 1265 ,194903241800,78 1199 ,195005150700,0 1199 ,195005151200,22 1199 ,195005151800,-11
只需要检测其中,
的数量即可判定数据究竟是气象站还是记录。
然后是Mapper的输出数据,我们输出的VALUE直接使用原数据,而KEY需要额外的设计——必须保证气象站的数据放在最前面,这时候我们就想到了我们的老朋友——二次排序,我们需要创建一个虚拟的键用来排序,而为了让气象站成为“一”,我们需要给它一个零,因为默认排序是按升序排序的,给它一个0保证它作为第一位,当然这需要其它的记录都是1。
为此,我们定义一个Pair,其中第一个字段是StationID,第二个字段是虚拟键。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 case class KeyPair (var stationID : String , var virtualKey : Int ) extends WritableComparable [KeyPair ] { def this () = this ("" , -1 ) override def compareTo (o: KeyPair ): Int = { if (stationID != o.stationID) stationID.compareTo(o.stationID) else virtualKey.compareTo(o.virtualKey) } override def write (out: DataOutput ): Unit = { out.writeUTF(stationID) out.writeInt(virtualKey) } override def readFields (in: DataInput ): Unit = { stationID = in.readUTF() virtualKey = in.readInt() } }
整个Pair都将被用来排序,其中先按气象站ID进行排序,再按虚拟键进行排序,均为升序。
然后是GroupComparator,这里只按照气象站ID进行分组。如果定义Partitioner,也应只使用StationID的哈希值,这里没有定义Partitioner。
class JoinClauseGroupComparator extends WritableComparator (classOf[KeyPair ], true ) { override def compare (a: WritableComparable [_], b: WritableComparable [_]): Int = { val l = a.asInstanceOf[KeyPair ] val r = b.asInstanceOf[KeyPair ] l.stationID.compareTo(r.stationID) } }
Mapper的定义同样简单,根据当前处理的数据的类型构造KEY即可。
class JoinClauseMapper extends Mapper [LongWritable , Text , KeyPair , Text ] { override def map (key: LongWritable , value: Text , context: Mapper [LongWritable , Text , KeyPair , Text ]#Context ): Unit = { val cols = value.toString.split("," ) context.write(KeyPair (cols(0 ), if (cols.length == 2 ) 0 else 1 ), value) } }
最麻烦的是Reducer,Reducer首先要检查第一个元素是否是气象站,如果不是说明这个气象站ID对应的气象站不存在,直接退出执行;如果的确是气象站的话,对之后的每个元素同第一个元素都进行连接操作并写出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class JoinClauseReducer extends Reducer [KeyPair , Text , Text , NullWritable ] { override def reduce (key: KeyPair , values: lang.Iterable [Text ], context: Reducer [KeyPair , Text , Text , NullWritable ]#Context ): Unit = { val iter = values.iterator() for { firstElem <- Try (iter.next()) Array (stationId, stationName) = firstElem.toString.split("," ) } yield iter.forEachRemaining { elem => val Array (_, timeStamp, temperature) = elem.toString.split("," ) context.write( new Text ( Array (stationId, stationName, timeStamp, temperature).mkString("," )), NullWritable .get) } } }
可以看到,Scala处理这种业务问题相当优美,但需要明确for的使用方式以及如何还原成原生Scala代码。需要记住,for代码体是不在Try的上下文之中的,这里如果抛出异常不会被捕获,除非用yield。
使用上面的csv文件作为输入进行执行,得到下面的结果,BINGO~
1199 ,SIHCCAJAVRI,195005151800 ,-11 1199 ,SIHCCAJAVRI,195005151200 ,22 1199 ,SIHCCAJAVRI,195005150700 ,0 1265 ,TYNSET-HANSMOEN,194903241800 ,78 1265 ,TYNSET-HANSMOEN,194903241200 ,111
如果想要使用多个Reducer使能够并行执行,则必须自定义Partitioner。
结论 使用MR进行分组表面上看起来还是比较简单的——把要合并的两个数据集放在同一个values集合里并特别地去规划它们的顺序即可,ON操作则是交给了二次排序;可是这种假想一遇到真实的业务需求就要抓瞎了——如果连接三个表,四个表,五个表怎么办?如果不使用等于运算符进行连接怎么办?如果连接的键不是主键,从而让这里不是一对多,而是多对多的情况该怎么办?这只能具体情况具体分析了,如果要真正地让这玩意成为生产力,更抽象的技术是必须的。