Hive 学习笔记 2——SORT,JOIN,分区等

现在跟随《Hive 编程指南》进行学习,此书的翻译…我在序章已经看到两处明显错误了,中英对照着看吧。

这里去学习一下 SELECT,Hive 最重要的部分。Hive 的 SELECT 相较于普通的关系型数据库,增加了内置的集合类型,因此各种操作,如函数,聚合,窗口函数等,都会有一些新东西,新模式可用,更别说还有新增的 UDTF 了;同时由于底层使用 MapReduce,因此分区等概念也需要体现在 SQL 中。这些地方需要特别学习。

下面使用的示例仍旧通过尚硅谷的表和数据描述。

表数据定义见此。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
---- dept.sql ----

CREATE TABLE IF NOT EXISTS dept (
deptno INT,
dname STRING,
loc INT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

---- dept.txt ----

10,ACCOUNTING,1700
20,RESEARCH,1800
30,SALES,1900
40,OPERATIONS,1700

---- emp.sql ----

CREATE TABLE IF NOT EXISTS emp (
empno INT,
ename STRING,
job STRING,
mgr INT, -- 上级
hiredate STRING, -- 入职时间
sal DOUBLE, -- 薪水
comm DOUBLE, -- 奖金
deptno INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

---- emp.txt ----

7369,SMITH,CLERK,7902,1980-12-17,800.00,,20
7499,ALLEN,SALESMAN,7698,1981-2-20,1600.00,300.00,30
7521,WARD,SALESMAN,7698,1981-2-22,1250.00,500.00,30
7566,JONES,MANAGER,7839,1981-4-2,2975.00,,20
7654,MARTIN,SALESMAN,7698,1981-9-28,1250.00,1400.00,30
7698,BLAKE,MANAGER,7839,1981-5-1,2850,,30
7782,CLARK,MANAGER,7839,1981-6-9,2450.00,,10
7788,SCOTT,ANALYST,7566,1987-4-19,3000.00,,20
7839,KING,PRESIDENT,,1981-11-17,5000.00,,10
7844,TURNER,SALESMAN,7698,1981-9-8,1500.00,0.00,30
7876,ADAMS,CLERK,7788,1987-5-23,1100.00,,20
7900,JAMES,CLERK,7698,1981-12-3,950.00,,30
7902,FORD,ANALYST,7566,1981-12-3,3000.00,,20
7934,MILLER,CLERK,7782,1982-1-23,1300.00,,10

表生成函数(UDTF)

聚合函数从多行数据生成单个数据,而表生成函数则反之——将单列数据扩展成多行多列数据,一个展平操作。

最典型的表生成函数是 explode,它能够数组和哈希表展平成多列,使用如下——

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
SELECT explode(ARRAY(1, 2, 3, 2, 1));
+---+
|col|
+---+
|1 |
|2 |
|3 |
|2 |
|1 |
+---+

SELECT explode(MAP('yuuki', 16, 'haruka', 17)) AS (name, age);
+------+---+
|name |age|
+------+---+
|yuuki |16 |
|haruka|17 |
+------+---+

JOIN

关于 Hive 的连接操作,Hive 曾经只能支持等值连接(我用 MapReduce 也只会写等值连接),而现在已经能够支持非等值连接了,并且能够在 ON 中使用 OR 子句,所以现在和关系型数据库的 JOIN 操作已经看不出什么差别了。

进行多表连接操作时,每次连接操作将产生一个 MapReduce 任务,从左到右依次执行,比如有SELECT .. FROM a JOIN b ON a.id=b.id JOIN c ON b.id=c.id,首先是 a 和 b 先进行连接,然后是连接后的输出表与 c 相连接。但在像这样的例子里,Hive 会进行优化——三个表都使用同一个键进行连接,因此可以同时将三个表进行输入,因此只需要一个 MR 任务。

应始终保证是小表 JOIN 大表,即从左到右表的大小依次增加以保证性能(相信 Hive 同样做了优化)。

为了进行优化,可以在 JOIN 之前提前对各连接的表进行一定的筛选操作,该操作应当通过子查询来进行

LEFT SEMI JOIN

考虑这样的需求,我们想获取每个部门工资最高的员工的信息,如果是关系型数据库,我们有好几种解决方式,可能最经济的方式是首先获取每个部门的最高的工资,然后使用 IN,按部门和工资两个字段进行匹配,比如类似这样——

1
2
-- Hive 下无法编译!
SELECT * FROM emp WHERE (deptno, sal) IN (SELECT deptno, MAX(sal) FROM emp GROUP BY deptno);

但是,**’Hive 不支持非关联子查询’**,这是符合逻辑的,因为若使用非关联子查询,这就说明我们需要将子查询执行并进行缓存和随机查询,这是愚蠢的。

而对于关联子查询,Hive 足够聪明,能够将其转换为 JOIN 来执行,比如这样的 SQL 在 Hive 中就是合法的——

1
SELECT * FROM emp AS e1 WHERE sal = (SELECT MAX(sal) FROM emp AS e2 WHERE e1.deptno = e2.deptno);

Hive 也提供了一个额外的手段,称为左半连接 LEFT SEMI JOIN,以用于通过连接的语法表示该种操作并保证性能。左半连接是一种优化过的内连接,它的原理是在内连接的基础上,对左表的每一条记录,在右表上一旦找到满足 ON 的记录,就立刻返回,停止匹配操作。左半连接在 SELECT 和 WHERE 中只允许使用左边的字段

上面的操作使用左半连接进行表述则结果如下——

1
2
3
SELECT * FROM emp AS e1
LEFT SEMI JOIN (SELECT deptno, MAX(sal) AS maxSal FROM emp GROUP BY deptno) AS e2
ON e1.deptno = e2.deptno AND e1.sal = e2.maxSal;

并不太清晰。

SORT BY,DISTRIBUTE BY,CLUSTER BY

在之前学习到分布式排序时有提到,如果让结果数据全局有序的话,Reducer 必须只能有 1 个,这导致 reduce 的阶段的并行性完全丧失了。而 ORDER BY 这个保证数据全局有序的子句也会造成 Hive 的查询中只能使用一个 Reducer,因此 Hive 显然是不推荐使用 ORDER BY 的,以至于在严格模式里,ORDER BY 必须要和 LIMIT 子句配合。

为此,Hive 提供了SORT BY,提供每个 Reducer 内部排序的功能,这样,每个 Reducer 生成的数据是有序的,而不同 Reducer 生成的数据之间仍旧是无序的。

编写 MapReduce 应用的时候,我们可以自定义三个角色——Partitioner,SortComparator,GroupComparator,其中 GroupComparator 对应 GROUP BY,而 Partitioner 和 SortComparator 各有其对应者。

DISTRIBUTE BY 子句对应 Partitioner,其负责指定特定 Mapper 的数据要分发给哪个 Reducer。这在某些应用场景下比较有用,比如,我们希望每个 Reducer 处理特定年份的数据,如果不指定 DISTRIBUTE BY 的话,每个 Reducer 将都有每年的数据,很均匀,但也没有实践意义。

比如,我们想获取每个部门的员工的信息,其中每个部门按工资倒序排序,分别使用和不使用 DISTRIBUTE BY 子句,查看对应执行结果——

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
-- 这里数据量太少,需要强制指定 reducer 个数
-- 为确保效果,让 reducer 的数量大于部门数量即可
set mapred.reduce.tasks = 10;

SELECT * FROM emp
SORT BY deptno, sal DESC;
+-----+------+---------+----+----------+----+----+------+
|empno|ename |job |mgr |hiredate |sal |comm|deptno|
+-----+------+---------+----+----------+----+----+------+
|7902 |FORD |ANALYST |7566|1981-12-3 |3000|NULL|20 |
|7782 |CLARK |MANAGER |7839|1981-6-9 |2450|NULL|10 |
|7369 |SMITH |CLERK |7902|1980-12-17|800 |NULL|20 |
|7654 |MARTIN|SALESMAN |7698|1981-9-28 |1250|1400|30 |
|7839 |KING |PRESIDENT|NULL|1981-11-17|5000|NULL|10 |
|7499 |ALLEN |SALESMAN |7698|1981-2-20 |1600|300 |30 |
|7788 |SCOTT |ANALYST |7566|1987-4-19 |3000|NULL|20 |
|7876 |ADAMS |CLERK |7788|1987-5-23 |1100|NULL|20 |
|7521 |WARD |SALESMAN |7698|1981-2-22 |1250|500 |30 |
|7934 |MILLER|CLERK |7782|1982-1-23 |1300|NULL|10 |
|7900 |JAMES |CLERK |7698|1981-12-3 |950 |NULL|30 |
|7566 |JONES |MANAGER |7839|1981-4-2 |2975|NULL|20 |
|7844 |TURNER|SALESMAN |7698|1981-9-8 |1500|0 |30 |
|7698 |BLAKE |MANAGER |7839|1981-5-1 |2850|NULL|30 |
+-----+------+---------+----+----------+----+----+------+

SELECT * FROM emp
DISTRIBUTE BY deptno
SORT BY deptno, sal DESC;
+-----+------+---------+----+----------+----+----+------+
|empno|ename |job |mgr |hiredate |sal |comm|deptno|
+-----+------+---------+----+----------+----+----+------+
|7839 |KING |PRESIDENT|NULL|1981-11-17|5000|NULL|10 |
|7782 |CLARK |MANAGER |7839|1981-6-9 |2450|NULL|10 |
|7934 |MILLER|CLERK |7782|1982-1-23 |1300|NULL|10 |
|7788 |SCOTT |ANALYST |7566|1987-4-19 |3000|NULL|20 |
|7902 |FORD |ANALYST |7566|1981-12-3 |3000|NULL|20 |
|7566 |JONES |MANAGER |7839|1981-4-2 |2975|NULL|20 |
|7876 |ADAMS |CLERK |7788|1987-5-23 |1100|NULL|20 |
|7369 |SMITH |CLERK |7902|1980-12-17|800 |NULL|20 |
|7698 |BLAKE |MANAGER |7839|1981-5-1 |2850|NULL|30 |
|7499 |ALLEN |SALESMAN |7698|1981-2-20 |1600|300 |30 |
|7844 |TURNER|SALESMAN |7698|1981-9-8 |1500|0 |30 |
|7654 |MARTIN|SALESMAN |7698|1981-9-28 |1250|1400|30 |
|7521 |WARD |SALESMAN |7698|1981-2-22 |1250|500 |30 |
|7900 |JAMES |CLERK |7698|1981-12-3 |950 |NULL|30 |
+-----+------+---------+----+----------+----+----+------+

非常显然,指定分区后,每个 reducer 都处理更加“相关”的数据,因此最后生成的数据也更好看了。当然,这要以一定的数据倾斜作为代价。

顺带一提,DISTRIBUTE BY 子句在使用 GROUP BY 子句的情况下,只能按用于分组的字段进行分区,这是符合逻辑的——如果分区的字段不是分组的字段的子集,则结果必然错误,之前在 MapReduce 中已经验证过了。使用 GROUP BY 时,Hive 默认会按分组的字段进行分区,因此结果必然正确(以至于我都不知道如何写出错误的使用方法)。

CLUSTER BY 是 SORT BY 和 DISTRIBUTE BY 的结合——如果用于排序和用于分区的字段完全相同且排序均为升序,则可以使用 CLUSTER BY 子句来作为替代

结合使用 DISTRIBUTE BY 和 SORT BY,能够利用复数的 reducer 让结果全局有序,但这显然会导致一定的数据倾斜,降低并行性。一般如果要获取前 n 条数据,仍旧是使用一个 reducer 并使用 LIMIT,这样,每个 mapper 就只需要维护 n 条数据(使用大/小顶堆),而传递给 reducer 的就只有 n * mapper 数量的数据了,这代价仍旧是可以容忍的。

抽样查询

抽样是传统数据处理方法中的一个重要部分,在大数据时代,全量查询成为了主流(是这样吗 hhh 我不确定),但 Hive,以及 MapReduce 仍旧提供了对数据集进行抽样的功能。

抽样分为分桶抽样和百分比抽样,分桶抽样就是将特定记录按某(些)字段进行哈希并按桶数取余,以放入特定的桶中,百分比抽样则是字面意思

抽样的语法如SELECT * FROM emp TABLESAMPLE(BUCKET 1 OUT OF 10 ON deptno),这就是按 deptno 进行哈希并分到 10 个桶中,获取其中第一个桶;也可以使用 ON 语法来使用特定列或函数进行分桶。如果表本身没有分桶,则 ON 不能省略,如果表本身分桶且条件为用于分桶的字段,则其性能将会非常好,因为不需要扫描所有的记录了。

也可以使用 rand 函数进行分桶,其每次执行时都会得到不一样的结果

百分比抽样的语法为SELECT * FROM emp TABLESAMPLE(50 PERCENT),这里是按 50%进行抽样。百分比抽样每次结果都是一致的,因为其是使用给定 seed,按随机数进行抽样的,因此多次执行结果相同。

视图

关于 Hive 的视图,一个最重要(常不常用我不知道)的需求就是将一张物理表拆成多张逻辑表,以及进行预先的数据转换操作,它的重要性来源于 Hive 提供集合类型,因而破坏了第一范式。

比如,我们有一个保存一些 HTTP 请求参数的文件——

1
2
3
name=haruka&age=16&clazz=765
age=17&name=chihaya&height=162
...

可以看到,每个记录都是 KV 对形式,但其出现顺序是不一定的,如何处理这样的数据?其实最好的手段是使用相关工具对其进行转换再进行后续建模,这能避免将来读取时解析的时间开销,但这里我们可以直接这样对它建模——

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 只有一个字段,所以不用设置字段之间的分隔符
CREATE TABLE raw_t(cols MAP<STRING,STRING>)
ROW FORMAT DELIMITED
COLLECTION ITEMS TERMINATED BY '&' -- 集合之间的元素的分隔
MAP KEYS TERMINATED BY '='; -- KV 之间的分隔

-- 读取数据……
SELECT * FROM raw_t;
+--------------------------------------------+
|cols |
+--------------------------------------------+
|{"name":"haruka","age":"16","clazz":"765"} |
|{"age":"17","name":"chihaya","height":"162"}|
+--------------------------------------------+

这样的数据用起来肯定是相当麻烦的,我们可以为它建立视图——

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 创建 view 不需要给定类型!
CREATE VIEW idol_t(
name,
age,
height,
clazz
) AS
SELECT
cols['name'],
CAST(cols['age'] AS INT),
CAST(cols['height'] AS INT),
cols['clazz'] FROM raw_t;

SELECT * FROM idol_t;
+-------+---+------+-----+
|name |age|height|clazz|
+-------+---+------+-----+
|haruka |16 |NULL |765 |
|chihaya|17 |162 |NULL |
+-------+---+------+-----+

这样得到的结果和物理表无异!Hive当前也提供了物化视图。

Hive 会尽量对视图查询进行优化,使外部查询语句和视图语句成为单条语句,但并非所有情况下都能进行这种优化。

关于类型转换

Hive 的类型转换规则同 Java 的——范围更窄的类型能隐式转换成范围更广的类型,如 FLOAT 能转换成 DOUBLE,INT 能转换成 BIGINT,其它类型都能隐式转换成 STRING 等。

CAST 关键字用于显式的类型转换,语法形如CAST(1 AS STRING),AS 后跟随目标类型;CAST 能够将数字字符串转化为数字类型,将’TRUE’,’FALSE’转化为布尔类型等,因此它更像是 parse 而非是 cast,CAST 若解析失败,则返回 NULL

Hive 似乎有这样的哲学,就是尽全力保证操作的正常进行,不抛出运行时异常,比如在读取非法数据的时候,这里进行强制类型转换的时候,失败了是返回 NULL。

一些场景不常使用 CAST,如浮点数转换为整型,一般使用 ROUND 函数。(Hive 权威指南居然说 cast 是函数,严肃反对,哪有函数有自己的特定语法的?当这是 lisp 或是某种 DSL 吗?)

分区

分库分表是关系型数据库常用的优化方案,比如,我们可以按天划分表,这样如果查询的是单日的记录,就可以直接从该表进行查询,查询多日的记录就使用 UNION ALL,在多表中查询,这能够避免全表扫描,同时在更新操作的并发性上也能有所提升(Hive 执行分区也使能够避免一些同步问题)。

Hive 的分区实际上进行的就是此种操作,且其是非常有意义的——Hive 的应用环境中,通常要进行全表扫描来满足查询,这时候数据若按查询条件进行分区,便能够减少查询的记录量,但若查询的条件并不在分区里,这反而会造成性能损失——毕竟每个文件对应一个 map task;并且这会造成数据倾斜。

分区字段和表的字段是分开定义的,且其数据只能通过文件夹名称确定,但其使用和普通字段相同。分区对应 PARTITION BY 子句,示例如下——

1
2
3
4
5
6
CREATE TABLE idols(
name STRING,
age INT,
height INT
)
PARTITIONED BY (clazz STRING);

使用 INSERT INTO 来插入数据的时候,分区字段放到表字段的后面。

1
2
3
4
INSERT INTO idols VALUES
("haruka", 17, 157, '765'),
("hibiki", 16, 161, '961'),
("chihaya", 17, 162, '765');

尝试查看一下 HDFS 中的数据,可以看到数据按 clazz 键进行分区了。

1
2
3
4
5
hadoop fs -ls -R -C /user/hive/warehouse/idols 
/user/hive/warehouse/idols/clazz=765
/user/hive/warehouse/idols/clazz=765/000000_0
/user/hive/warehouse/idols/clazz=961
/user/hive/warehouse/idols/clazz=961/000000_0

我们也可以使用ALTER TABLE tb_name ADD PARTITION(field = val, ...) LOCATION 'path'来添加分区同时指定分区的位置,这在使用外部表,和其它工具交互时会比较有用,比如我们这个分区建立到另一个文件夹——

1
ALTER TABLE idols ADD PARTITION(clazz='346') LOCATION '/idol/346';

该路径甚至可以指向其它存储系统,如亚马逊 S3。

对分区进行查询时,Hive 实际上并不关心文件夹存在与否,只是简单返回空结果,这也是 Hive 的设计哲学的结果。

EXPLAIN

调优在工厂实践中(实际上更是在面试中 :))是非常重要的,对其进行学习是必须的。

要学习调优,就要先学习 EXPLAIN,知道 Hive 心里究竟有什么小九九。

比如,要求它解释一下获取最高工资的语句——

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
EXPLAIN SELECT MAX(sal) FROM emp;

STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: emp
Statistics: Num rows: 1 Data size: 6540 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: sal (type: double)
outputColumnNames: sal
Statistics: Num rows: 1 Data size: 6540 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: max(sal)
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: double)
Execution mode: vectorized
Reduce Operator Tree:
Group By Operator
aggregations: max(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

可以看到,Stage-1 是一个 MapReduce 任务。一个 Hive 任务会有一个或多个 Stage,Stage 之间会有依赖关系。Stage 可能是 MapReduce 任务,也可能是抽样,归并,LIMIT 以及其它。

比如使用SELECT * FROM emp [TABLESAMPLE(30 PERCENT)],可以看到它们跑的都是所谓的Fetch Operator,并没有使用 MR 任务,所以性能较好。


暂且告一段落,之后关于 Hive,还需要学习自定义 UDF,常用的函数,SerDe,调优等,以及给出更多示例,现在去学习 Spark,然后是数据仓库。

数据仓库将是重头戏,因为它直接关系到项目经验,相关的理论,方法论也都是岗位会直接用到的。因此,值得为它付出最大量的时间,当然,Hadoop,Hive,Spark等也是不能罔顾的,实际上可能还需要掌握一些flink,Spark Streaming这样的实时数仓所需要的技术。