===== 排序优化 ===== 用DISTRIBUTE BY和Sort by关键字代替Order by ===== 分为扫描 ===== 对列使用分区 ===== 尽量启用本地模式 ===== * 尽量使用本地模式执行sql * set hive.exec.mode.local.auto=true * 当一个job满足如下条件才能真正使用本地模式: * job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB) * job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4) * job的reduce数必须为0或者1 ===== 设置并行模式 ===== * hive将一个查询转化成一个或者多个阶段,默认情况hive一次执行一个阶段,启动并行后可以并行执行多个阶段 * set hive.exec.parallel=true ===== 启用严格模式 ===== * 有些sql会占用大量的资源(如大量全局排序),甚至可能会导致系统不可用,我们可以设置禁止这些查询 * set hive.mapred.mode=strict ===== jvm重用 ===== 正常情况下,MapReduce启动的JVM在完成一个task之后就退出了,但是如果任务花费时间很短,又要多次启动JVM的情况下(比如对很大数据量进行计数操作),JVM的启动时间就会变成一个比较大的消耗。在这种情况下,可以使用jvm重用的参数: set mapred.job.reuse.jvm.num.tasks = 5; 他的作用是让一个jvm运行多次任务之后再退出。这样一来也能节约不少JVM启动时间。 ===== 桶表 ===== 创建一个桶表 create table t4 (name string,age int,id int) CLUSTERED BY (id) SORTED BY (id) INTO 10 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; 可以自动控制reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket个数,推荐使用'set hive.enforce.bucketing = true' set hive.enforce.bucketing=true; ==== Sorted Merge Bucket Map Join Operator依赖参数 ==== * set hive.auto.convert.sortmerge.join=true; * set hive.optimize.bucketmapjoin = true; * set hive.optimize.bucketmapjoin.sortedmerge = true; * set hive.auto.convert.join.noconditionaltask=true; 查看执行计划 * explain select * from t4 as tt1 inner join t4 as tt2 on tt2.id=tt1.id ; ===== 数据倾斜 ===== 数据倾斜分两种,group by引起的倾斜和join引起的倾斜。 group by造成的倾斜有两个参数可以解决 - 一个是hive.map.aggr,默认值已经为true,意思是会做map端的combiner。所以如果你的group by查询只是做count(*)的话,其实是看不出倾斜效果的,但是如果你做的是count(distinct),那么还是会看出一点倾斜效果。 - 另一个参数是hive.groupby.skewindata。这个参数的意思是做reduce操作的时候,拿到的key并不是所有相同值给同一个reduce,而是随机分发,然后reduce做聚合,做完之后再做一轮MR,拿前面聚合过的数据再算结果。所以这个参数其实跟hive.map.aggr做的是类似的事情,只是拿到reduce端来做,而且要额外启动一轮job,所以其实不怎么推荐用,效果不明显。 join引起的倾斜 * hive给出的解决方案叫skew join,其原理把这种user_id = 0的特殊值先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。当然你要告诉hive这个join是个skew join,即:''set hive.optimize.skewjoin = true;'' * 还有要告诉hive如何判断特殊值,根据hive.skewjoin.key设置的数量hive可以知道,比如默认值是100000,那么超过100000条记录的值就是特殊值,就要对这个key启动一个shew join。 skew join的流程可以用下图描述: {{:pasted:20151016-213138.png}} ===== 小文件合并 ===== 当Hive输入由很多个小文件组成,由于每个小文件都会启动一个map任务,非常消耗资源,设置参数 ''set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat'' 在map处理之前将多个小文件合并。合并文件数由mapred.max.split.size限制的大小决定。 ===== Map阶段的优化 ===== 如果一个job大部分时间花在任务处理上,说明名可能是map个数太少了,并行度不高,这时可以考虑增加map个数 * 可以通过这个参数mapred.max.split.size(设置每个map的大小,size越到map个数越少) 直接调整mapred.map.tasks这个参数是没有效果的。 * 设置map阶段进行combiner ''set hive.map.aggr=true'' ===== Reduce阶段的优化 ===== 主要是设置reduce个数 方法一: * 设置 ''set mapred.reduce.tasks=10'' 方法二: * ''set mapred.reduce.tasks=-1'' (默认) * ''set hive.exec.reducers.bytes.per.reducer=256000000''(每个reduce任务处理的数据量,当mapred.reduce.tasks=-1时,这个参数生效) ===== 索引 ===== hive中的索引是一个比较鸡肋的功能,hive中一般做扫描操作,而索引主要用于精确查找。 索引可以加快group by的速度 ===== 推测执行 ===== 目的:是通过加快获取单个 task 的结果以及进行侦测将执行慢的 TaskTracker 加入到黑名单的方式来提高整体的任务执行效率 对于需要消耗大量资源的任务不建议推测执行,因为推测执行会让整个系统资源竞争更严重 $HADOOP_HOME/conf/mapred-site.xml 文件 mapred.map.tasks.speculative.execution true mapred.reduce.tasks.speculative.execution true ===== 数据压缩 ===== **中间压缩** * 就是对map到reduce之间传输的数据进行压缩,最好选择一个节省CPU耗时的压缩方式。 set hive.exec.compress.intermediate=true; # 设置用snappy压缩,hive也有一个默认压缩方法DefaultCodec set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; set hive.intermediate.compression.type=BLOCK; **最终的输出也可以压缩** * 选择一个压缩效果比较好的,节省了磁盘空间,但是cpu比较耗时。 set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec; set mapred.output.compression.type=BLOCK: ===== group by优化 ===== ''hive.groupby.skewindata=true;'' * 如果是group by过程出现倾斜应该设置为true。 ''set hive.groupby.mapaggr.checkinterval=100000;'' * 这个是group的键对应的记录条数超过这个值则会进行优化。 ===== union all优化 ===== 优化思想是尽量让job并行