用户工具


排序优化

用DISTRIBUTE BY和Sort by关键字代替Order by

分为扫描

对列使用分区

尽量启用本地模式

  • 尽量使用本地模式执行sql
  • set hive.exec.mode.local.auto=true
    • 当一个job满足如下条件才能真正使用本地模式:
    • job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
    • job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
    • job的reduce数必须为0或者1

设置并行模式

  • hive将一个查询转化成一个或者多个阶段,默认情况hive一次执行一个阶段,启动并行后可以并行执行多个阶段
  • set hive.exec.parallel=true

启用严格模式

  • 有些sql会占用大量的资源(如大量全局排序),甚至可能会导致系统不可用,我们可以设置禁止这些查询
  • set hive.mapred.mode=strict

jvm重用

正常情况下,MapReduce启动的JVM在完成一个task之后就退出了,但是如果任务花费时间很短,又要多次启动JVM的情况下(比如对很大数据量进行计数操作),JVM的启动时间就会变成一个比较大的消耗。在这种情况下,可以使用jvm重用的参数:

set mapred.job.reuse.jvm.num.tasks = 5;

他的作用是让一个jvm运行多次任务之后再退出。这样一来也能节约不少JVM启动时间。

桶表

创建一个桶表
create table t4 (name string,age int,id int) CLUSTERED BY (id) SORTED BY (id) INTO 10 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

可以自动控制reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket个数,推荐使用'set hive.enforce.bucketing = true'  
set hive.enforce.bucketing=true;

Sorted Merge Bucket Map Join Operator依赖参数

  • set hive.auto.convert.sortmerge.join=true;
  • set hive.optimize.bucketmapjoin = true;
  • set hive.optimize.bucketmapjoin.sortedmerge = true;
  • set hive.auto.convert.join.noconditionaltask=true;

查看执行计划

  • explain select * from t4 as tt1 inner join t4 as tt2 on tt2.id=tt1.id ;

数据倾斜

数据倾斜分两种,group by引起的倾斜和join引起的倾斜。

group by造成的倾斜有两个参数可以解决

  1. 一个是hive.map.aggr,默认值已经为true,意思是会做map端的combiner。所以如果你的group by查询只是做count(*)的话,其实是看不出倾斜效果的,但是如果你做的是count(distinct),那么还是会看出一点倾斜效果。
  2. 另一个参数是hive.groupby.skewindata。这个参数的意思是做reduce操作的时候,拿到的key并不是所有相同值给同一个reduce,而是随机分发,然后reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。所以这个参数其实跟hive.map.aggr做的是类似的事情,只是拿到reduce端来做,而且要额外启动一轮job,所以其实不怎么推荐用,效果不明显。

join引起的倾斜

  • hive给出的解决方案叫skew join,其原理把这种user_id = 0的特殊值先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。当然你要告诉hive这个join是个skew join,即:set hive.optimize.skewjoin = true;
  • 还有要告诉hive如何判断特殊值,根据hive.skewjoin.key设置的数量hive可以知道,比如默认值是100000,那么超过100000条记录的值就是特殊值,就要对这个key启动一个shew join。

skew join的流程可以用下图描述:

小文件合并

当Hive输入由很多个小文件组成,由于每个小文件都会启动一个map任务,非常消耗资源,设置参数 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat 在map处理之前将多个小文件合并。合并文件数由mapred.max.split.size限制的大小决定。

Map阶段的优化

如果一个job大部分时间花在任务处理上,说明名可能是map个数太少了,并行度不高,这时可以考虑增加map个数

  • 可以通过这个参数mapred.max.split.size(设置每个map的大小,size越到map个数越少)

直接调整mapred.map.tasks这个参数是没有效果的。

  • 设置map阶段进行combiner set hive.map.aggr=true

Reduce阶段的优化

主要是设置reduce个数

方法一:

  • 设置 set mapred.reduce.tasks=10

方法二:

  • set mapred.reduce.tasks=-1 (默认)
  • set hive.exec.reducers.bytes.per.reducer=256000000(每个reduce任务处理的数据量,当mapred.reduce.tasks=-1时,这个参数生效)

索引

hive中的索引是一个比较鸡肋的功能,hive中一般做扫描操作,而索引主要用于精确查找。

索引可以加快group by的速度

推测执行

目的:是通过加快获取单个 task 的结果以及进行侦测将执行慢的 TaskTracker 加入到黑名单的方式来提高整体的任务执行效率

对于需要消耗大量资源的任务不建议推测执行,因为推测执行会让整个系统资源竞争更严重

$HADOOP_HOME/conf/mapred-site.xml 文件

  <property>
  <name>mapred.map.tasks.speculative.execution </name>
  <value>true</value>
  </property>
<property>
  <name>mapred.reduce.tasks.speculative.execution </name>
  <value>true</value>
  </property>
 

数据压缩

  • 就是对map到reduce之间传输的数据进行压缩,最好选择一个节省CPU耗时的压缩方式。
set hive.exec.compress.intermediate=true;
# 设置用snappy压缩,hive也有一个默认压缩方法DefaultCodec
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
  • 选择一个压缩效果比较好的,节省了磁盘空间,但是cpu比较耗时。
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapred.output.compression.type=BLOCK:

group by优化

hive.groupby.skewindata=true;

  • 如果是group by过程出现倾斜应该设置为true。

set hive.groupby.mapaggr.checkinterval=100000;

  • 这个是group的键对应的记录条数超过这个值则会进行优化。

union all优化

优化思想是尽量让job并行