现在跟随《Hive 编程指南》进行学习,此书的翻译…我在序章已经看到两处明显错误了,中英对照着看吧。
这里去学习一下 SELECT,Hive 最重要的部分。Hive 的 SELECT 相较于普通的关系型数据库,增加了内置的集合类型,因此各种操作,如函数,聚合,窗口函数等,都会有一些新东西,新模式可用,更别说还有新增的 UDTF 了 ;同时由于底层使用 MapReduce,因此分区 等概念也需要体现在 SQL 中。这些地方需要特别学习。
下面使用的示例仍旧通过尚硅谷的表和数据描述。
表数据定义见此。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 ---- dept.sql ---- CREATE TABLE IF NOT EXISTS dept ( deptno INT, dname STRING, loc INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ---- dept.txt ----10 ,ACCOUNTING,1700 20 ,RESEARCH,1800 30 ,SALES,1900 40 ,OPERATIONS,1700 ---- emp.sql ---- CREATE TABLE IF NOT EXISTS emp ( empno INT, ename STRING, job STRING, mgr INT, -- 上级 hiredate STRING, -- 入职时间 sal DOUBLE, -- 薪水 comm DOUBLE, -- 奖金 deptno INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ---- emp.txt ----7369 ,SMITH,CLERK,7902,1980 -12-17,800 .00 ,,20 7499 ,ALLEN,SALESMAN,7698,1981 -2-20,1600 .00,300.00 ,30 7521 ,WARD,SALESMAN,7698,1981 -2-22,1250 .00,500.00 ,30 7566 ,JONES,MANAGER,7839,1981 -4-2,2975 .00 ,,20 7654 ,MARTIN,SALESMAN,7698,1981 -9-28,1250 .00,1400.00 ,30 7698 ,BLAKE,MANAGER,7839,1981 -5-1,2850 ,,30 7782 ,CLARK,MANAGER,7839,1981 -6-9,2450 .00 ,,10 7788 ,SCOTT,ANALYST,7566,1987 -4-19,3000 .00 ,,20 7839 ,KING,PRESIDENT,,1981-11-17 ,5000 .00 ,,10 7844 ,TURNER,SALESMAN,7698,1981 -9-8,1500 .00,0.00,30 7876 ,ADAMS,CLERK,7788,1987 -5-23,1100 .00 ,,20 7900 ,JAMES,CLERK,7698,1981 -12-3,950 .00 ,,30 7902 ,FORD,ANALYST,7566,1981 -12-3,3000 .00 ,,20 7934 ,MILLER,CLERK,7782,1982 -1-23,1300 .00 ,,10
表生成函数(UDTF) 聚合函数从多行数据生成单个数据,而表生成函数则反之——将单列数据扩展成多行多列数据,一个展平操作。
最典型的表生成函数是 explode,它能够数组和哈希表展平成多列,使用如下——
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 SELECT explode(ARRAY (1 , 2 , 3 , 2 , 1 ));+ | col| + | 1 | | 2 | | 3 | | 2 | | 1 | + SELECT explode(MAP('yuuki' , 16 , 'haruka' , 17 )) AS (name, age);+ | name | age| + | yuuki | 16 | | haruka| 17 | +
JOIN 关于 Hive 的连接操作,Hive 曾经只能支持等值连接(我用 MapReduce 也只会写等值连接),而现在已经能够支持非等值连接了,并且能够在 ON 中使用 OR 子句,所以现在和关系型数据库的 JOIN 操作已经看不出什么差别了。
进行多表连接操作时,每次连接操作将产生一个 MapReduce 任务,从左到右依次执行,比如有SELECT .. FROM a JOIN b ON a.id=b.id JOIN c ON b.id=c.id,首先是 a 和 b 先进行连接,然后是连接后的输出表与 c 相连接。但在像这样的例子里,Hive 会进行优化——三个表都使用同一个键进行连接,因此可以同时将三个表进行输入,因此只需要一个 MR 任务。
应始终保证是小表 JOIN 大表 ,即从左到右表的大小依次增加以保证性能(相信 Hive 同样做了优化)。
为了进行优化,可以在 JOIN 之前提前对各连接的表进行一定的筛选操作,该操作应当通过子查询来进行 。
LEFT SEMI JOIN 考虑这样的需求,我们想获取每个部门工资最高的员工的信息,如果是关系型数据库,我们有好几种解决方式,可能最经济的方式是首先获取每个部门的最高的工资,然后使用 IN,按部门和工资两个字段进行匹配,比如类似这样——
SELECT * FROM emp WHERE (deptno, sal) IN (SELECT deptno, MAX (sal) FROM emp GROUP BY deptno);
但是,**’Hive 不支持非关联子查询’**,这是符合逻辑的,因为若使用非关联子查询,这就说明我们需要将子查询执行并进行缓存和随机查询,这是愚蠢的。
而对于关联子查询,Hive 足够聪明,能够将其转换为 JOIN 来执行,比如这样的 SQL 在 Hive 中就是合法的——
SELECT * FROM emp AS e1 WHERE sal = (SELECT MAX (sal) FROM emp AS e2 WHERE e1.deptno = e2.deptno);
Hive 也提供了一个额外的手段,称为左半连接 LEFT SEMI JOIN,以用于通过连接的语法表示该种操作并保证性能。左半连接是一种优化过的内连接 ,它的原理是在内连接的基础上,对左表的每一条记录,在右表上一旦找到满足 ON 的记录,就立刻返回,停止匹配操作。左半连接在 SELECT 和 WHERE 中只允许使用左边的字段 。
上面的操作使用左半连接进行表述则结果如下——
SELECT * FROM emp AS e1 LEFT SEMI JOIN (SELECT deptno, MAX (sal) AS maxSal FROM emp GROUP BY deptno) AS e2 ON e1.deptno = e2.deptno AND e1.sal = e2.maxSal;
并不太清晰。
SORT BY,DISTRIBUTE BY,CLUSTER BY 在之前学习到分布式排序时有提到,如果让结果数据全局有序的话,Reducer 必须只能有 1 个,这导致 reduce 的阶段的并行性完全丧失了。而 ORDER BY 这个保证数据全局有序的子句也会造成 Hive 的查询中只能使用一个 Reducer,因此 Hive 显然是不推荐使用 ORDER BY 的,以至于在严格模式里,ORDER BY 必须要和 LIMIT 子句配合。
为此,Hive 提供了SORT BY,提供每个 Reducer 内部排序的功能,这样,每个 Reducer 生成的数据是有序的,而不同 Reducer 生成的数据之间仍旧是无序的。
编写 MapReduce 应用的时候,我们可以自定义三个角色——Partitioner,SortComparator,GroupComparator,其中 GroupComparator 对应 GROUP BY,而 Partitioner 和 SortComparator 各有其对应者。
DISTRIBUTE BY 子句对应 Partitioner ,其负责指定特定 Mapper 的数据要分发给哪个 Reducer。这在某些应用场景下比较有用,比如,我们希望每个 Reducer 处理特定年份的数据,如果不指定 DISTRIBUTE BY 的话,每个 Reducer 将都有每年的数据,很均匀,但也没有实践意义。
比如,我们想获取每个部门的员工的信息,其中每个部门按工资倒序排序,分别使用和不使用 DISTRIBUTE BY 子句,查看对应执行结果——
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 set mapred.reduce.tasks = 10 ;SELECT * FROM emp SORT BY deptno, sal DESC ;+ | empno| ename | job | mgr | hiredate | sal | comm| deptno| + | 7902 | FORD | ANALYST | 7566 | 1981 -12 -3 | 3000 | NULL | 20 | | 7782 | CLARK | MANAGER | 7839 | 1981 -6 -9 | 2450 | NULL | 10 | | 7369 | SMITH | CLERK | 7902 | 1980 -12 -17 | 800 | NULL | 20 | | 7654 | MARTIN| SALESMAN | 7698 | 1981 -9 -28 | 1250 | 1400 | 30 | | 7839 | KING | PRESIDENT| NULL | 1981 -11 -17 | 5000 | NULL | 10 | | 7499 | ALLEN | SALESMAN | 7698 | 1981 -2 -20 | 1600 | 300 | 30 | | 7788 | SCOTT | ANALYST | 7566 | 1987 -4 -19 | 3000 | NULL | 20 | | 7876 | ADAMS | CLERK | 7788 | 1987 -5 -23 | 1100 | NULL | 20 | | 7521 | WARD | SALESMAN | 7698 | 1981 -2 -22 | 1250 | 500 | 30 | | 7934 | MILLER| CLERK | 7782 | 1982 -1 -23 | 1300 | NULL | 10 | | 7900 | JAMES | CLERK | 7698 | 1981 -12 -3 | 950 | NULL | 30 | | 7566 | JONES | MANAGER | 7839 | 1981 -4 -2 | 2975 | NULL | 20 | | 7844 | TURNER| SALESMAN | 7698 | 1981 -9 -8 | 1500 | 0 | 30 | | 7698 | BLAKE | MANAGER | 7839 | 1981 -5 -1 | 2850 | NULL | 30 | + SELECT * FROM emp DISTRIBUTE BY deptno SORT BY deptno, sal DESC ;+ | empno| ename | job | mgr | hiredate | sal | comm| deptno| + | 7839 | KING | PRESIDENT| NULL | 1981 -11 -17 | 5000 | NULL | 10 | | 7782 | CLARK | MANAGER | 7839 | 1981 -6 -9 | 2450 | NULL | 10 | | 7934 | MILLER| CLERK | 7782 | 1982 -1 -23 | 1300 | NULL | 10 | | 7788 | SCOTT | ANALYST | 7566 | 1987 -4 -19 | 3000 | NULL | 20 | | 7902 | FORD | ANALYST | 7566 | 1981 -12 -3 | 3000 | NULL | 20 | | 7566 | JONES | MANAGER | 7839 | 1981 -4 -2 | 2975 | NULL | 20 | | 7876 | ADAMS | CLERK | 7788 | 1987 -5 -23 | 1100 | NULL | 20 | | 7369 | SMITH | CLERK | 7902 | 1980 -12 -17 | 800 | NULL | 20 | | 7698 | BLAKE | MANAGER | 7839 | 1981 -5 -1 | 2850 | NULL | 30 | | 7499 | ALLEN | SALESMAN | 7698 | 1981 -2 -20 | 1600 | 300 | 30 | | 7844 | TURNER| SALESMAN | 7698 | 1981 -9 -8 | 1500 | 0 | 30 | | 7654 | MARTIN| SALESMAN | 7698 | 1981 -9 -28 | 1250 | 1400 | 30 | | 7521 | WARD | SALESMAN | 7698 | 1981 -2 -22 | 1250 | 500 | 30 | | 7900 | JAMES | CLERK | 7698 | 1981 -12 -3 | 950 | NULL | 30 | +
非常显然,指定分区后,每个 reducer 都处理更加“相关”的数据,因此最后生成的数据也更好看了。当然,这要以一定的数据倾斜作为代价。
顺带一提,DISTRIBUTE BY 子句在使用 GROUP BY 子句的情况下,只能按用于分组的字段进行分区 ,这是符合逻辑的——如果分区的字段不是分组的字段的子集,则结果必然错误,之前在 MapReduce 中已经验证过了。使用 GROUP BY 时,Hive 默认会按分组的字段进行分区,因此结果必然正确(以至于我都不知道如何写出错误的使用方法)。
CLUSTER BY 是 SORT BY 和 DISTRIBUTE BY 的结合——如果用于排序和用于分区的字段完全相同且排序均为升序,则可以使用 CLUSTER BY 子句来作为替代 。
结合使用 DISTRIBUTE BY 和 SORT BY,能够利用复数的 reducer 让结果全局有序,但这显然会导致一定的数据倾斜,降低并行性。一般如果要获取前 n 条数据,仍旧是使用一个 reducer 并使用 LIMIT ,这样,每个 mapper 就只需要维护 n 条数据(使用大/小顶堆),而传递给 reducer 的就只有 n * mapper 数量的数据了,这代价仍旧是可以容忍的。
抽样查询 抽样是传统数据处理方法中的一个重要部分,在大数据时代,全量查询成为了主流(是这样吗 hhh 我不确定),但 Hive,以及 MapReduce 仍旧提供了对数据集进行抽样的功能。
抽样分为分桶抽样和百分比抽样,分桶抽样就是将特定记录按某(些)字段进行哈希并按桶数取余,以放入特定的桶中,百分比抽样则是字面意思 。
抽样的语法如SELECT * FROM emp TABLESAMPLE(BUCKET 1 OUT OF 10 ON deptno),这就是按 deptno 进行哈希并分到 10 个桶中,获取其中第一个桶;也可以使用 ON 语法来使用特定列或函数进行分桶。如果表本身没有分桶,则 ON 不能省略 ,如果表本身分桶且条件为用于分桶的字段,则其性能将会非常好,因为不需要扫描所有的记录了。
也可以使用 rand 函数进行分桶,其每次执行时都会得到不一样的结果 。
百分比抽样的语法为SELECT * FROM emp TABLESAMPLE(50 PERCENT),这里是按 50%进行抽样。百分比抽样每次结果都是一致的 ,因为其是使用给定 seed,按随机数进行抽样的,因此多次执行结果相同。
视图 关于 Hive 的视图,一个最重要(常不常用我不知道)的需求就是将一张物理表拆成多张逻辑表,以及进行预先的数据转换操作 ,它的重要性来源于 Hive 提供集合类型,因而破坏了第一范式。
比如,我们有一个保存一些 HTTP 请求参数的文件——
name = haruka&age =16 &clazz =765 age = 17 &name =chihaya&height =162 ...
可以看到,每个记录都是 KV 对形式,但其出现顺序是不一定的,如何处理这样的数据?其实最好的手段是使用相关工具对其进行转换再进行后续建模,这能避免将来读取时解析的时间开销,但这里我们可以直接这样对它建模——
CREATE TABLE raw_t(cols MAP< STRING,STRING> )ROW FORMAT DELIMITED COLLECTION ITEMS TERMINATED BY '&' MAP KEYS TERMINATED BY '=' ; SELECT * FROM raw_t;+ | cols | + | {"name":"haruka","age":"16","clazz":"765"} | | {"age":"17","name":"chihaya","height":"162"}| +
这样的数据用起来肯定是相当麻烦的,我们可以为它建立视图——
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 CREATE VIEW idol_t( name, age, height, clazz ) AS SELECT cols['name' ], CAST (cols['age' ] AS INT ), CAST (cols['height' ] AS INT ), cols['clazz' ] FROM raw_t;SELECT * FROM idol_t;+ | name | age| height| clazz| + | haruka | 16 | NULL | 765 | | chihaya| 17 | 162 | NULL | +
这样得到的结果和物理表无异!Hive当前也提供了物化视图。
Hive 会尽量对视图查询进行优化,使外部查询语句和视图语句成为单条语句,但并非所有情况下都能进行这种优化。
关于类型转换 Hive 的类型转换规则同 Java 的——范围更窄的类型能隐式转换成范围更广的类型,如 FLOAT 能转换成 DOUBLE,INT 能转换成 BIGINT,其它类型都能隐式转换成 STRING 等。
CAST 关键字用于显式的类型转换,语法形如CAST(1 AS STRING),AS 后跟随目标类型;CAST 能够将数字字符串转化为数字类型,将’TRUE’,’FALSE’转化为布尔类型等,因此它更像是 parse 而非是 cast,CAST 若解析失败,则返回 NULL 。
Hive 似乎有这样的哲学,就是尽全力保证操作的正常进行,不抛出运行时异常,比如在读取非法数据的时候,这里进行强制类型转换的时候,失败了是返回 NULL。
一些场景不常使用 CAST,如浮点数转换为整型,一般使用 ROUND 函数。(Hive 权威指南居然说 cast 是函数,严肃反对,哪有函数有自己的特定语法的?当这是 lisp 或是某种 DSL 吗?)
分区 分库分表是关系型数据库常用的优化方案,比如,我们可以按天划分表,这样如果查询的是单日的记录,就可以直接从该表进行查询,查询多日的记录就使用 UNION ALL,在多表中查询,这能够避免全表扫描,同时在更新操作的并发性上也能有所提升(Hive 执行分区也使能够避免一些同步问题)。
Hive 的分区 实际上进行的就是此种操作,且其是非常有意义的——Hive 的应用环境中,通常要进行全表扫描来满足查询,这时候数据若按查询条件进行分区,便能够减少查询的记录量,但若查询的条件并不在分区里,这反而会造成性能损失——毕竟每个文件对应一个 map task;并且这会造成数据倾斜。
分区字段和表的字段是分开定义的,且其数据只能通过文件夹名称确定,但其使用和普通字段相同 。分区对应 PARTITION BY 子句,示例如下——
CREATE TABLE idols( name STRING, age INT , height INT ) PARTITIONED BY (clazz STRING);
使用 INSERT INTO 来插入数据的时候,分区字段放到表字段的后面。
INSERT INTO idols VALUES ("haruka", 17 , 157 , '765' ), ("hibiki", 16 , 161 , '961' ), ("chihaya", 17 , 162 , '765' );
尝试查看一下 HDFS 中的数据,可以看到数据按 clazz 键进行分区了。
hadoop fs -ls -R -C /user/ hive/warehouse/i dols /user/ hive/warehouse/i dols/clazz=765 /user/ hive/warehouse/i dols/clazz=765/ 000000 _0/user/ hive/warehouse/i dols/clazz=961 /user/ hive/warehouse/i dols/clazz=961/ 000000 _0
我们也可以使用ALTER TABLE tb_name ADD PARTITION(field = val, ...) LOCATION 'path'来添加分区同时指定分区的位置,这在使用外部表,和其它工具交互时会比较有用,比如我们这个分区建立到另一个文件夹——
ALTER TABLE idols ADD PARTITION (clazz= '346' ) LOCATION '/idol/346' ;
该路径甚至可以指向其它存储系统,如亚马逊 S3。
对分区进行查询时,Hive 实际上并不关心文件夹存在与否,只是简单返回空结果,这也是 Hive 的设计哲学的结果。
EXPLAIN 调优在工厂实践中(实际上更是在面试中 :))是非常重要的,对其进行学习是必须的。
要学习调优,就要先学习 EXPLAIN,知道 Hive 心里究竟有什么小九九。
比如,要求它解释一下获取最高工资的语句——
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 EXPLAIN SELECT MAX (sal) FROM emp; STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: emp Statistics: Num rows : 1 Data size: 6540 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: sal (type: double ) outputColumnNames: sal Statistics: Num rows : 1 Data size: 6540 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: max (sal) mode: hash outputColumnNames: _col0 Statistics: Num rows : 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator sort order : Statistics: Num rows : 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: double ) Execution mode: vectorized Reduce Operator Tree: Group By Operator aggregations: max (VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows : 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows : 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE table : input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink
可以看到,Stage-1 是一个 MapReduce 任务。一个 Hive 任务会有一个或多个 Stage,Stage 之间会有依赖关系。Stage 可能是 MapReduce 任务,也可能是抽样,归并,LIMIT 以及其它。
比如使用SELECT * FROM emp [TABLESAMPLE(30 PERCENT)],可以看到它们跑的都是所谓的Fetch Operator,并没有使用 MR 任务,所以性能较好。
暂且告一段落,之后关于 Hive,还需要学习自定义 UDF,常用的函数,SerDe,调优 等,以及给出更多示例,现在去学习 Spark,然后是数据仓库。
数据仓库将是重头戏,因为它直接关系到项目经验,相关的理论,方法论也都是岗位会直接用到的 。因此,值得为它付出最大量的时间,当然,Hadoop,Hive,Spark等也是不能罔顾的,实际上可能还需要掌握一些flink,Spark Streaming这样的实时数仓所需要的技术。