用户工具


UDF

package fang.qiang;

import org.apache.hadoop.hive.ql.exec.UDF;

public class helloUDF extends UDF {
    public String evaluate(String str) {
        try {
            return "HelloWorld " + str;
        } catch (Exception e) {
            return null;
        }
    }
} 


将该java文件编译成helloudf.jar

hive> add jar helloudf.jar;
hive> create temporary function helloworld as 'fang.qiang.helloUDF';
hive> select helloworld(t.col1) from t limit 10;
hive> drop temporary function helloworld;

UDAF

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

/**
 * 计算平均值
 */
public final class UDAFExampleAvg extends UDAF {

    /**
     * 定义一个类保存中间过程结果
     */
    public static class UDAFAvgState {
        private long mCount;
        private double mSum;
    }

    /**
     * 这是真正的实现聚合的类,hive自动从UDAF类中寻找继承了UDAFEvaluator的内部类
     */
    public static class UDAFExampleAvgEvaluator implements UDAFEvaluator {

        UDAFAvgState state;

        public UDAFExampleAvgEvaluator() {
            super();
            state = new UDAFAvgState();
            init();
        }

        /**
         * Reset the state of the aggregation.
         */
        public void init() {
            state.mSum = 0;
            state.mCount = 0;
        }

        /**
         * 对聚合组中每个行进行迭代,参数个数,类型要和hql中提供的一致
         *
         * 该函数任何时候 return true.
         */
        public boolean iterate(Integer o) {
            if (o != null) {
                state.mSum += o;
                state.mCount++;
            }
            return true;
        }

        /**
         * 因为hsql最后也是转换成mapreduce模型,所以一个聚合组可能存在于多个map中,这个函数对单个map的结果进行汇总
         */
        public UDAFAvgState terminatePartial() {
            // This is SQL standard - average of zero items should be null.
            return state.mCount == 0 ? null : state;
        }

        /**
         * 这个阶段是对应的reduce阶段,对每个map的汇总结果进程再处理
         *
         * 这个函数只会有一个参数,且必须和terminatePartial()函数的返回值类型相同
         */
        public boolean merge(UDAFAvgState o) {
            if (o != null) {
                state.mSum += o.mSum;
                state.mCount += o.mCount;
            }
            return true;
        }

        /**
         * 返回最终结果
         */
        public Double terminate() {
            // This is SQL standard - average of zero items should be null.
            return state.mCount == 0 ? null : Double.valueOf(state.mSum
                    / state.mCount);
        }
    }

    private UDAFExampleAvg() {
        // prevent instantiation
    }

}


将该java文件编译成avj.jar

hive> add jar avg.jar;
hive> create temporary function avg_example as 'UDAFExampleAvg';
hive> select avg_example(t.col1) from t group by id;
hive> drop temporary function helloworld;