MR是Hive的默认执行引擎,在该引擎下,HQL会被转换为MR作业。通过Hive的执行计划可以看出一个SQL语句的执行阶段,通过Hive的执行日志可以看出转换之后的MR作业信息。本文会通过一个具体HQL,解读一下Hive执行计划与执行日志。通过本文,你可以了解到:

  • 如何查看一个HQL会被转换为几个MR Job
  • 执行计划的Stage与转换后的MR之间有什么关系
  • 如何调优一个HQL查询

查询SQL

SELECT dp.region_name,
count(*) AS cnt
FROM user_behavior_log ubl
JOIN dim_province dp ON ubl.province = dp.province_name
WHERE action = 'buy'
GROUP BY dp.region_name

SQL介绍

上面的SQL是统计每个区域购买行为的数量,其中user_behavior_log为用户的行为数据,大概有3G左右大小,dim_province是地区的维表,主要包括省份以及省份对应的区域。

查询结果

dp.region_name cnt
东北 290875
华东 870994
华中 291048
华北 484199
华南 484032
西北 483784
西南 483891

应用程序信息

上图是该查询SQL转为MR的应用程序的执行信息,总共执行了4个MR Job。其中第一个Job包括1个Map任务,无Reduce任务。第二个Job包括11个Map任务,无reduce任务。第三个Job包括2个Map任务和2个Reduce任务。第四个Job包括1个Map任务和1个Reduce任务。

执行计划分析

完整的执行计划

为了方便查看,下面给出完整的执行计划。可以先跳过该部分,直接看下面的详细解释:

STAGE DEPENDENCIES:
Stage-13 is a root stage
Stage-10 depends on stages: Stage-13
Stage-9 depends on stages: Stage-10 , consists of Stage-11, Stage-12, Stage-2
Stage-11 has a backup stage: Stage-2
Stage-7 depends on stages: Stage-11
Stage-3 depends on stages: Stage-2, Stage-7, Stage-8
Stage-4 depends on stages: Stage-3
Stage-12 has a backup stage: Stage-2
Stage-8 depends on stages: Stage-12
Stage-2
Stage-0 depends on stages: Stage-4

STAGE PLANS:
Stage: Stage-13
Map Reduce Local Work
Alias -> Map Local Tables:
dp:br
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
dp:br
TableScan
alias: br
filterExpr: id is not null (type: boolean)
Statistics: Num rows: 7 Data size: 56 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: id is not null (type: boolean)
Statistics: Num rows: 4 Data size: 32 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 id (type: string)
1 region_id (type: string)

Stage: Stage-10
Map Reduce
Map Operator Tree:
TableScan
alias: bp
filterExpr: (region_id is not null and name is not null) (type: boolean)
Statistics: Num rows: 35 Data size: 394 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (region_id is not null and name is not null) (type: boolean)
Statistics: Num rows: 9 Data size: 101 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 id (type: string)
1 region_id (type: string)
outputColumnNames: _col1, _col6
Statistics: Num rows: 9 Data size: 111 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col6 (type: string), _col1 (type: string)
outputColumnNames: _col1, _col3
Statistics: Num rows: 9 Data size: 111 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work

Stage: Stage-9
Conditional Operator

Stage: Stage-11
Map Reduce Local Work
Alias -> Map Local Tables:
$INTNAME
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
$INTNAME
TableScan
HashTable Sink Operator
keys:
0 province (type: string)
1 _col1 (type: string)

Stage: Stage-7
Map Reduce
Map Operator Tree:
TableScan
alias: ubl
filterExpr: (province is not null and (action = 'buy')) (type: boolean)
Statistics: Num rows: 54925330 Data size: 3300747847 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (province is not null and (action = 'buy')) (type: boolean)
Statistics: Num rows: 13731332 Data size: 825186931 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 province (type: string)
1 _col1 (type: string)
outputColumnNames: _col18
Statistics: Num rows: 15104465 Data size: 907705643 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work

Stage: Stage-3
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col18 (type: string)
sort order: +
Map-reduce partition columns: rand() (type: double)
Statistics: Num rows: 15104465 Data size: 907705643 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Group By Operator
aggregations: count()
keys: KEY._col0 (type: string)
mode: partial1
outputColumnNames: _col0, _col1
Statistics: Num rows: 15104465 Data size: 907705643 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-4
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 15104465 Data size: 907705643 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: string)
mode: final
outputColumnNames: _col0, _col1
Statistics: Num rows: 7552232 Data size: 453852791 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 7552232 Data size: 453852791 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-12
Map Reduce Local Work
Alias -> Map Local Tables:
ubl
Fetch Operator
limit: -1
Alias -> Map Local Operator Tree:
ubl
TableScan
alias: ubl
filterExpr: (province is not null and (action = 'buy')) (type: boolean)
Statistics: Num rows: 54925330 Data size: 3300747847 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (province is not null and (action = 'buy')) (type: boolean)
Statistics: Num rows: 13731332 Data size: 825186931 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
keys:
0 province (type: string)
1 _col1 (type: string)

Stage: Stage-8
Map Reduce
Map Operator Tree:
TableScan
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 province (type: string)
1 _col1 (type: string)
outputColumnNames: _col18
Statistics: Num rows: 15104465 Data size: 907705643 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Local Work:
Map Reduce Local Work

Stage: Stage-2
Map Reduce
Map Operator Tree:
TableScan
Reduce Output Operator
key expressions: _col1 (type: string)
sort order: +
Map-reduce partition columns: _col1 (type: string)
Statistics: Num rows: 9 Data size: 111 Basic stats: COMPLETE Column stats: NONE
value expressions: _col3 (type: string)
TableScan
alias: ubl
filterExpr: (province is not null and (action = 'buy')) (type: boolean)
Statistics: Num rows: 54925330 Data size: 3300747847 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: (province is not null and (action = 'buy')) (type: boolean)
Statistics: Num rows: 13731332 Data size: 825186931 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: province (type: string)
sort order: +
Map-reduce partition columns: province (type: string)
Statistics: Num rows: 13731332 Data size: 825186931 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 province (type: string)
1 _col1 (type: string)
outputColumnNames: _col18
Statistics: Num rows: 15104465 Data size: 907705643 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

执行计划解析

stage dependencies

Stage-13 is a root stage
Stage-10 depends on stages: Stage-13
Stage-9 depends on stages: Stage-10 , consists of Stage-11, Stage-12, Stage-2
Stage-11 has a backup stage: Stage-2
Stage-7 depends on stages: Stage-11
Stage-3 depends on stages: Stage-2, Stage-7, Stage-8
Stage-4 depends on stages: Stage-3
Stage-12 has a backup stage: Stage-2
Stage-8 depends on stages: Stage-12
Stage-2
Stage-0 depends on stages: Stage-4

stage plans

执行日志分析

INFO  : Compiling command(queryId=hive_20200618145454_29fb89e3-b5e6-41ef-8e8a-e2635722656f): SELECT dp.region_name,
count(*) AS cnt
FROM user_behavior_log ubl
JOIN dim_province dp ON ubl.province = dp.province_name
WHERE action = 'buy'
GROUP BY dp.region_name
INFO : Semantic Analysis Completed
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:dp.region_name, type:string, comment:null), FieldSchema(name:cnt, type:bigint, comment:null)], properties:null)
INFO : Completed compiling command(queryId=hive_20200618145454_29fb89e3-b5e6-41ef-8e8a-e2635722656f); Time taken: 0.458 seconds
INFO : Executing command(queryId=hive_20200618145454_29fb89e3-b5e6-41ef-8e8a-e2635722656f): SELECT dp.region_name,
count(*) AS cnt
FROM user_behavior_log ubl
JOIN dim_province dp ON ubl.province = dp.province_name
WHERE action = 'buy'
GROUP BY dp.region_name
INFO : Query ID = hive_20200618145454_29fb89e3-b5e6-41ef-8e8a-e2635722656f
INFO : Total jobs = 6
INFO : Starting task [Stage-13:MAPREDLOCAL] in serial mode
20/06/18 14:54:56 WARN conf.HiveConf: HiveConf of name hive.server2.idle.session.timeout_check_operation does not exist
20/06/18 14:54:56 WARN conf.HiveConf: HiveConf of name hive.entity.capture.input.URI does not exist

2020-06-18 02:54:58 Dump the side-table for tag: 0 with group count: 7 into file: file:/tmp/hive/dd28ddf1-03db-472a-8e34-98ee658d82a1/hive_2020-06-18_14-54-52_573_920702636242824197-27/-local-10010/HashTable-Stage-10/MapJoin-mapfile370--.hashtable
2020-06-18 02:54:58 End of local task; Time Taken: 2.035 sec.
INFO : Execution completed successfully
INFO : MapredLocal task succeeded
INFO : Launching Job 1 out of 6
INFO : Starting task [Stage-10:MAPRED] in parallel
INFO : Number of reduce tasks is set to 0 since there's no reduce operator
INFO : Selecting local mode for task: Stage-10
INFO : number of splits:1
INFO : Submitting tokens for job: job_1582600603106_1942
INFO : The url to track the job: http://cdh03:8088/proxy/application_1582600603106_1942/
INFO : Starting Job = job_1582600603106_1942, Tracking URL = http://cdh03:8088/proxy/application_1582600603106_1942/
INFO : Kill Command = /opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/hadoop/bin/hadoop job -kill job_1582600603106_1942
INFO : Hadoop job information for Stage-10: number of mappers: 1; number of reducers: 0
INFO : 2020-06-18 14:55:05,948 Stage-10 map = 0%, reduce = 0%
INFO : 2020-06-18 14:55:11,196 Stage-10 map = 100%, reduce = 0%, Cumulative CPU 3.2 sec
INFO : MapReduce Total cumulative CPU time: 3 seconds 200 msec
INFO : Ended Job = job_1582600603106_1942
INFO : Starting task [Stage-9:CONDITIONAL] in parallel
INFO : Stage-11 is selected by condition resolver.
INFO : Stage-12 is filtered out by condition resolver.
INFO : Stage-2 is filtered out by condition resolver.
INFO : Starting task [Stage-11:MAPREDLOCAL] in serial mode
20/06/18 14:55:17 WARN conf.HiveConf: HiveConf of name hive.server2.idle.session.timeout_check_operation does not exist
INFO : Execution completed successfully
INFO : MapredLocal task succeeded
INFO : Launching Job 3 out of 6
INFO : Starting task [Stage-7:MAPRED] in parallel
INFO : Number of reduce tasks is set to 0 since there's no reduce operator
INFO : Cannot run job locally: Input Size (= 3355673177) is larger than hive.exec.mode.local.auto.inputbytes.max (= 50000000)
INFO : number of splits:11
INFO : Submitting tokens for job: job_1582600603106_1943
INFO : The url to track the job: http://cdh03:8088/proxy/application_1582600603106_1943/
INFO : Starting Job = job_1582600603106_1943, Tracking URL = http://cdh03:8088/proxy/application_1582600603106_1943/
INFO : Kill Command = /opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/hadoop/bin/hadoop job -kill job_1582600603106_1943
INFO : Hadoop job information for Stage-7: number of mappers: 11; number of reducers: 0
INFO : 2020-06-18 14:55:27,886 Stage-7 map = 0%, reduce = 0%
INFO : 2020-06-18 14:55:37,354 Stage-7 map = 9%, reduce = 0%, Cumulative CPU 8.64 sec
INFO : 2020-06-18 14:55:38,419 Stage-7 map = 36%, reduce = 0%, Cumulative CPU 38.01 sec
INFO : 2020-06-18 14:55:40,511 Stage-7 map = 55%, reduce = 0%, Cumulative CPU 60.16 sec
INFO : 2020-06-18 14:55:41,569 Stage-7 map = 64%, reduce = 0%, Cumulative CPU 71.54 sec
INFO : 2020-06-18 14:55:45,756 Stage-7 map = 73%, reduce = 0%, Cumulative CPU 79.75 sec
INFO : 2020-06-18 14:55:46,813 Stage-7 map = 91%, reduce = 0%, Cumulative CPU 96.4 sec
INFO : 2020-06-18 14:55:47,866 Stage-7 map = 100%, reduce = 0%, Cumulative CPU 105.15 sec
INFO : MapReduce Total cumulative CPU time: 1 minutes 45 seconds 150 msec
INFO : Ended Job = job_1582600603106_1943
INFO : Launching Job 4 out of 6
INFO : Starting task [Stage-3:MAPRED] in parallel
INFO : Number of reduce tasks not specified. Estimated from input data size: 2
INFO : In order to change the average load for a reducer (in bytes):
INFO : set hive.exec.reducers.bytes.per.reducer=<number>
INFO : In order to limit the maximum number of reducers:
INFO : set hive.exec.reducers.max=<number>
INFO : In order to set a constant number of reducers:
INFO : set mapreduce.job.reduces=<number>
INFO : Cannot run job locally: Input Size (= 82139608) is larger than hive.exec.mode.local.auto.inputbytes.max (= 50000000)
INFO : number of splits:2
INFO : Submitting tokens for job: job_1582600603106_1944
INFO : The url to track the job: http://cdh03:8088/proxy/application_1582600603106_1944/
INFO : Starting Job = job_1582600603106_1944, Tracking URL = http://cdh03:8088/proxy/application_1582600603106_1944/
INFO : Kill Command = /opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/hadoop/bin/hadoop job -kill job_1582600603106_1944
INFO : Hadoop job information for Stage-3: number of mappers: 2; number of reducers: 2
INFO : 2020-06-18 14:55:58,247 Stage-3 map = 0%, reduce = 0%
INFO : 2020-06-18 14:56:07,695 Stage-3 map = 50%, reduce = 0%, Cumulative CPU 8.96 sec
INFO : 2020-06-18 14:56:12,969 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 23.17 sec
INFO : 2020-06-18 14:56:19,310 Stage-3 map = 100%, reduce = 50%, Cumulative CPU 28.33 sec
INFO : 2020-06-18 14:56:20,348 Stage-3 map = 100%, reduce = 100%, Cumulative CPU 33.89 sec
INFO : MapReduce Total cumulative CPU time: 33 seconds 890 msec
INFO : Ended Job = job_1582600603106_1944
INFO : Launching Job 5 out of 6
INFO : Starting task [Stage-4:MAPRED] in parallel
INFO : Number of reduce tasks not specified. Estimated from input data size: 1
INFO : In order to change the average load for a reducer (in bytes):
INFO : set hive.exec.reducers.bytes.per.reducer=<number>
INFO : In order to limit the maximum number of reducers:
INFO : set hive.exec.reducers.max=<number>
INFO : In order to set a constant number of reducers:
INFO : set mapreduce.job.reduces=<number>
INFO : Selecting local mode for task: Stage-4
INFO : number of splits:1
INFO : Submitting tokens for job: job_1582600603106_1945
INFO : The url to track the job: http://cdh03:8088/proxy/application_1582600603106_1945/
INFO : Starting Job = job_1582600603106_1945, Tracking URL = http://cdh03:8088/proxy/application_1582600603106_1945/
INFO : Kill Command = /opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/hadoop/bin/hadoop job -kill job_1582600603106_1945
INFO : Hadoop job information for Stage-4: number of mappers: 1; number of reducers: 1
INFO : 2020-06-18 14:56:28,550 Stage-4 map = 0%, reduce = 0%
INFO : 2020-06-18 14:56:33,832 Stage-4 map = 100%, reduce = 0%, Cumulative CPU 1.51 sec
INFO : 2020-06-18 14:56:40,167 Stage-4 map = 100%, reduce = 100%, Cumulative CPU 4.94 sec
INFO : MapReduce Total cumulative CPU time: 4 seconds 940 msec
INFO : Ended Job = job_1582600603106_1945
INFO : MapReduce Jobs Launched:
INFO : Stage-Stage-10: Map: 1 Cumulative CPU: 3.2 sec HDFS Read: 6389 HDFS Write: 1199 SUCCESS
INFO : Stage-Stage-7: Map: 11 Cumulative CPU: 105.15 sec HDFS Read: 3356995378 HDFS Write: 82139608 SUCCESS
INFO : Stage-Stage-3: Map: 2 Reduce: 2 Cumulative CPU: 33.89 sec HDFS Read: 82150071 HDFS Write: 584 SUCCESS
INFO : Stage-Stage-4: Map: 1 Reduce: 1 Cumulative CPU: 4.94 sec HDFS Read: 5695 HDFS Write: 140 SUCCESS
INFO : Total MapReduce CPU Time Spent: 2 minutes 27 seconds 180 msec
INFO : Completed executing command(queryId=hive_20200618145454_29fb89e3-b5e6-41ef-8e8a-e2635722656f); Time taken: 108.915 seconds
INFO : OK

三个参数:

-- 每个reduce处理的数据量字节数,默认64M
-- 例如:如果输入大小为 10GiB并且该项设置为1GiB,Hive 将使用10个reducer
set hive.exec.reducers.bytes.per.reducer = 67108864
-- 每个job的最大reduce数量,默认1099
-- 如果配置参数mapreduce.job.reduces为负,则Hive会将reducer数量限制为此参数的值。
set hive.exec.reducers.max=1099
-- 设定每个job的reduce数量,默认-1,表示不限制
-- 每项作业中reduce任务的默认数量。通常设置为接近可用主机数量的素数。Hadoop 默认将该项设置为 1,而 Hive -- 默认使用 -1。当设置为 -1 时,Hive 将自动决定用于每项作业的 reducer 的适用数量
set mapreduce.job.reduces = -1

mapreduce.input.fileinputformat.split.minsize = 1默认

reduce数量计算:3357127507/67108864≈51

数据总行数:54925330

数据总大小:3300747847