===== 排序优化 =====
用DISTRIBUTE BY和Sort by关键字代替Order by
===== 分为扫描 =====
对列使用分区
===== 尽量启用本地模式 =====
* 尽量使用本地模式执行sql
* set hive.exec.mode.local.auto=true
* 当一个job满足如下条件才能真正使用本地模式:
* job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
* job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
* job的reduce数必须为0或者1
===== 设置并行模式 =====
* hive将一个查询转化成一个或者多个阶段,默认情况hive一次执行一个阶段,启动并行后可以并行执行多个阶段
* set hive.exec.parallel=true
===== 启用严格模式 =====
* 有些sql会占用大量的资源(如大量全局排序),甚至可能会导致系统不可用,我们可以设置禁止这些查询
* set hive.mapred.mode=strict
===== jvm重用 =====
正常情况下,MapReduce启动的JVM在完成一个task之后就退出了,但是如果任务花费时间很短,又要多次启动JVM的情况下(比如对很大数据量进行计数操作),JVM的启动时间就会变成一个比较大的消耗。在这种情况下,可以使用jvm重用的参数:
set mapred.job.reuse.jvm.num.tasks = 5;
他的作用是让一个jvm运行多次任务之后再退出。这样一来也能节约不少JVM启动时间。
===== 桶表 =====
创建一个桶表
create table t4 (name string,age int,id int) CLUSTERED BY (id) SORTED BY (id) INTO 10 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
可以自动控制reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket个数,推荐使用'set hive.enforce.bucketing = true'
set hive.enforce.bucketing=true;
==== Sorted Merge Bucket Map Join Operator依赖参数 ====
* set hive.auto.convert.sortmerge.join=true;
* set hive.optimize.bucketmapjoin = true;
* set hive.optimize.bucketmapjoin.sortedmerge = true;
* set hive.auto.convert.join.noconditionaltask=true;
查看执行计划
* explain select * from t4 as tt1 inner join t4 as tt2 on tt2.id=tt1.id ;
===== 数据倾斜 =====
数据倾斜分两种,group by引起的倾斜和join引起的倾斜。
group by造成的倾斜有两个参数可以解决
- 一个是hive.map.aggr,默认值已经为true,意思是会做map端的combiner。所以如果你的group by查询只是做count(*)的话,其实是看不出倾斜效果的,但是如果你做的是count(distinct),那么还是会看出一点倾斜效果。
- 另一个参数是hive.groupby.skewindata。这个参数的意思是做reduce操作的时候,拿到的key并不是所有相同值给同一个reduce,而是随机分发,然后reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。所以这个参数其实跟hive.map.aggr做的是类似的事情,只是拿到reduce端来做,而且要额外启动一轮job,所以其实不怎么推荐用,效果不明显。
join引起的倾斜
* hive给出的解决方案叫skew join,其原理把这种user_id = 0的特殊值先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。当然你要告诉hive这个join是个skew join,即:''set hive.optimize.skewjoin = true;''
* 还有要告诉hive如何判断特殊值,根据hive.skewjoin.key设置的数量hive可以知道,比如默认值是100000,那么超过100000条记录的值就是特殊值,就要对这个key启动一个shew join。
skew join的流程可以用下图描述:
{{:pasted:20151016-213138.png}}
===== 小文件合并 =====
当Hive输入由很多个小文件组成,由于每个小文件都会启动一个map任务,非常消耗资源,设置参数
''set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat''
在map处理之前将多个小文件合并。合并文件数由mapred.max.split.size限制的大小决定。
===== Map阶段的优化 =====
如果一个job大部分时间花在任务处理上,说明名可能是map个数太少了,并行度不高,这时可以考虑增加map个数
* 可以通过这个参数mapred.max.split.size(设置每个map的大小,size越到map个数越少)
直接调整mapred.map.tasks这个参数是没有效果的。
* 设置map阶段进行combiner ''set hive.map.aggr=true''
===== Reduce阶段的优化 =====
主要是设置reduce个数
方法一:
* 设置 ''set mapred.reduce.tasks=10''
方法二:
* ''set mapred.reduce.tasks=-1'' (默认)
* ''set hive.exec.reducers.bytes.per.reducer=256000000''(每个reduce任务处理的数据量,当mapred.reduce.tasks=-1时,这个参数生效)
===== 索引 =====
hive中的索引是一个比较鸡肋的功能,hive中一般做扫描操作,而索引主要用于精确查找。
索引可以加快group by的速度
===== 推测执行 =====
目的:是通过加快获取单个 task 的结果以及进行侦测将执行慢的 TaskTracker 加入到黑名单的方式来提高整体的任务执行效率
对于需要消耗大量资源的任务不建议推测执行,因为推测执行会让整个系统资源竞争更严重
$HADOOP_HOME/conf/mapred-site.xml 文件
mapred.map.tasks.speculative.execution
true
mapred.reduce.tasks.speculative.execution
true
===== 数据压缩 =====
**中间压缩**
* 就是对map到reduce之间传输的数据进行压缩,最好选择一个节省CPU耗时的压缩方式。
set hive.exec.compress.intermediate=true;
# 设置用snappy压缩,hive也有一个默认压缩方法DefaultCodec
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
**最终的输出也可以压缩**
* 选择一个压缩效果比较好的,节省了磁盘空间,但是cpu比较耗时。
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapred.output.compression.type=BLOCK:
===== group by优化 =====
''hive.groupby.skewindata=true;''
* 如果是group by过程出现倾斜应该设置为true。
''set hive.groupby.mapaggr.checkinterval=100000;''
* 这个是group的键对应的记录条数超过这个值则会进行优化。
===== union all优化 =====
优化思想是尽量让job并行