分享

Hive-UDAF开发指南

yr123 2015-9-10 18:34:22 发表于 入门帮助 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 20574
问题导读
1、Hive什么时候用UDAF?
2、UDAF分为哪几个部分?各自作用是什么?

3、UDAF的Mode阶段重要性。
4、使用UDF和UDAF,为什么尽可能少地使用new关键字?



在用Hive进行ETL的时候,对于一些复杂的数据处理逻辑,往往不能用简单的HQL来解决,这个时候就需要使用UDAF了。
对于底层的内容还没有细看,先从应用的角度来说一下吧。
使用UDAF需要实现接口GenericUDAFResolver2,或者继承抽象类AbstractGenericUDAFResolver。
UDAF主要分为2个部分,第一个部分是对传入参数进行校验,数据类型的校验。然后根据传入的数据类型不同调用具体的处理逻辑。
比如说,自己写了一个SUM,SUM对于Long类型和Double类型进行求和,没有问题。
但是,如果传入的参数是一个Array呢?这个时候,就需要在Evaluator方法里面,对参数进行校验了。

1.public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)   
2.      throws SemanticException {   
3.    if (parameters.length != 1) {   
4.      throw new UDFArgumentTypeException(parameters.length - 1,   
5.          "Exactly one argument is expected.");   
6.    }   
7.  
8.    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {   
9.      throw new UDFArgumentTypeException(0,   
10.          "Only primitive type arguments are accepted but "  
11.          + parameters[0].getTypeName() + " is passed.");   
12.    }   
13.    switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {   
14.    case BYTE:   
15.    case SHORT:   
16.    case INT:   
17.    case LONG:   
18.    case FLOAT:   
19.    case DOUBLE:   
20.    case STRING:   
21.    case TIMESTAMP:   
22.      return new GenericUDAFAverageEvaluator();   
23.    case BOOLEAN:   
24.    default:   
25.      throw new UDFArgumentTypeException(0,   
26.          "Only numeric or string type arguments are accepted but "  
27.          + parameters[0].getTypeName() + " is passed.");   
28.    }   
29.  }  
这个方法只支持Primitive类型,也就是INT,String,Double,Float这些。
UDAF使用一个ObjectInspector来抽象化每一行数据的读取。
上面使用的Primitive类型的数据,所以使用PrimitiveObjectInspector来读取传入的参数。
UDAF会根据不同的计算模型,产生不同的阶段。
如:SUM()聚合函数,接受一个原始类型的整型数值,然后创建一个整型的PARTIAL数据,
返回一个固定的整型结果。
如:median() 中位数
可以接受原始整型输入,然后会产生一个中间的整数PARTIAL数据(排序),
然后再返回一个固定的整型结果。
注意:

1.聚合操作会在reduce的环境下执行,然后由一个Java进程的内存大小限制这个操作。   
2.因此像排序大结构体的数据,可能会产生对内存不足的异常。   
3.一般情况下可以增加内存来解决这个问题。   
4.<property>   
5.<name>mapred.child.java.opts</name>   
6.<value>-Xmx200m</value>   
7.</property>
在处理逻辑之前,介绍一下UDAF的Mode。
UDAF的Mode,也就是执行阶段。无论怎样的UDAF,最终都会变成MapReduce Job。
Mode是一UDAF的使用类型,主要有4种形势:
因为MapReduce可能是,Map->Reduce也可能是,Map->Reduce->Reduce

1.public static enum Mode {   
2.    /**  
3.     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合  
4.     * 将会调用iterate()和terminatePartial()  
5.     */  
6.    PARTIAL1,   
7.        /**  
8.     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:  
9.     * 将会调用merge() 和 terminatePartial()   
10.     */  
11.    PARTIAL2,   
12.        /**  
13.     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合   
14.     * 将会调用merge()和terminate()  
15.     */  
16.    FINAL,   
17.        /**  
18.     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合  
19.      * 将会调用 iterate()和terminate()  
20.     */  
21.    COMPLETE   
22.};  
有的UDAF函数会可以像UDF函数那样使用,有的必须在聚合函数环境下使用,如group by,over(partition by )
而在使用UDAF进行计算的时候,会启用一个init方法。这个init的方法会在买个阶段前面都启动一次。第一次启动的时候,参数指的是读入每一行记录的参数。第二次启动的时候,传入的参数只有1个,指的是中间结果的参数。这里需要特别注意。
1.@Override  
2.        public ObjectInspector init(Mode m, ObjectInspector[] parameters)   
3.                throws HiveException {   
4.            super.init(m, parameters);   
5.               
6.            //init input   
7.            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有   
8.                LOG.info(" Mode:"+m.toString()+" result has init");   
9.                inputOI = (PrimitiveObjectInspector) parameters[0];   
10.                inputOI2 = (PrimitiveObjectInspector) parameters[1];   
11.//              result = new DoubleWritable(0);   
12.            }   
13.            //init output   
14.            if (m == Mode.PARTIAL2 || m == Mode.FINAL) {   
15.                outputOI = (PrimitiveObjectInspector) parameters[0];   
16.                result = new DoubleWritable(0);   
17.                return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;   
18.            }else{   
19.                result = new DoubleWritable(0);   
20.                return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;   
21.            }   
22.               
23.        }
所以我们使用枚举方法,根据init启动阶段的不同,接入不同的参数。
实现UDAF的时候,实际就是一个Reducer
对于计算过程中的中间结果,会有一个Buffer对象来进行缓冲。
Buffer对象相当于Reducer里面记录结果集的一个内存对象。
这里面可以大大的发挥想象,作出你想要的各种数据类型。
另外,在UDAF输出的时候,也可以输出Struct,Array类型的数据。
这一部分等到用到再进行研究吧。
最后是完整的UDAF代码。实现一个有条件的SUM,传入2个参数,当第二个参数>1 的时候进行SUM。
1.package com.test.udaf;   
2.  
3.import org.apache.commons.logging.Log;   
4.import org.apache.commons.logging.LogFactory;   
5.import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;   
6.import org.apache.hadoop.hive.ql.metadata.HiveException;   
7.import org.apache.hadoop.hive.ql.parse.SemanticException;   
8.import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;   
9.import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;   
10.import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;   
11.import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;   
12.import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;   
13.import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;   
14.import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;   
15.import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;   
16.import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;   
17.import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;   
18.import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;   
19.import org.apache.hadoop.hive.serde2.io.DoubleWritable;   
20.import org.apache.hadoop.util.StringUtils;   
21.  
22.public class GenericUdafMemberLevel2 extends AbstractGenericUDAFResolver {   
23.    private static final Log LOG = LogFactory   
24.            .getLog(GenericUdafMemberLevel2.class.getName());   
25.      
26.    @Override  
27.      public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)   
28.        throws SemanticException {   
29.           
30.        return new GenericUdafMeberLevelEvaluator();   
31.      }   
32.      
33.    public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator {   
34.        private PrimitiveObjectInspector inputOI;   
35.        private PrimitiveObjectInspector inputOI2;   
36.        private PrimitiveObjectInspector outputOI;   
37.        private DoubleWritable result;   
38.  
39.        @Override  
40.        public ObjectInspector init(Mode m, ObjectInspector[] parameters)   
41.                throws HiveException {   
42.            super.init(m, parameters);   
43.               
44.            //init input   
45.            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有   
46.                LOG.info(" Mode:"+m.toString()+" result has init");   
47.                inputOI = (PrimitiveObjectInspector) parameters[0];   
48.                inputOI2 = (PrimitiveObjectInspector) parameters[1];   
49.//              result = new DoubleWritable(0);   
50.            }   
51.            //init output   
52.            if (m == Mode.PARTIAL2 || m == Mode.FINAL) {   
53.                outputOI = (PrimitiveObjectInspector) parameters[0];   
54.                result = new DoubleWritable(0);   
55.                return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;   
56.            }else{   
57.                result = new DoubleWritable(0);   
58.                return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;   
59.            }   
60.               
61.        }   
62.  
63.        /** class for storing count value. */  
64.        static class SumAgg implements AggregationBuffer {   
65.            boolean empty;   
66.            double value;   
67.        }   
68.  
69.        @Override  
70.        //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。   
71.        //使用buffer对象前,先进行内存的清空——reset   
72.        public AggregationBuffer getNewAggregationBuffer() throws HiveException {   
73.            SumAgg buffer = new SumAgg();   
74.            reset(buffer);   
75.            return buffer;   
76.        }   
77.  
78.        @Override  
79.        //重置为0   
80.        //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。   
81.        public void reset(AggregationBuffer agg) throws HiveException {   
82.            ((SumAgg) agg).value = 0.0;   
83.            ((SumAgg) agg).empty = true;   
84.        }   
85.  
86.        private boolean warned = false;   
87.        //迭代   
88.        //只要把保存当前和的对象agg,再加上输入的参数,就可以了。   
89.        @Override  
90.        public void iterate(AggregationBuffer agg, Object[] parameters)   
91.                throws HiveException {   
92.            // parameters == null means the input table/split is empty   
93.            if (parameters == null) {   
94.                return;   
95.            }   
96.            try {   
97.                double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2);   
98.                if(flag > 1.0)   //参数条件   
99.                    merge(agg, parameters[0]);   //这里将迭代数据放入combiner进行合并   
100.              } catch (NumberFormatException e) {   
101.                if (!warned) {   
102.                  warned = true;   
103.                  LOG.warn(getClass().getSimpleName() + " "  
104.                      + StringUtils.stringifyException(e));   
105.                }   
106.              }   
107.  
108.        }   
109.  
110.        @Override  
111.        //这里的操作就是具体的聚合操作。   
112.        public void merge(AggregationBuffer agg, Object partial) {   
113.            if (partial != null) {   
114.                // 通过ObejctInspector取每一个字段的数据   
115.                if (inputOI != null) {   
116.                    double p = PrimitiveObjectInspectorUtils.getDouble(partial,   
117.                            inputOI);   
118.                    LOG.info("add up 1:" + p);   
119.                    ((SumAgg) agg).value += p;   
120.                } else {   
121.                    double p = PrimitiveObjectInspectorUtils.getDouble(partial,   
122.                            outputOI);   
123.                    LOG.info("add up 2:" + p);   
124.                    ((SumAgg) agg).value += p;   
125.                }   
126.            }   
127.        }   
128.  
129.  
130.        @Override  
131.        public Object terminatePartial(AggregationBuffer agg) {   
132.                return terminate(agg);   
133.        }   
134.           
135.        @Override  
136.        public Object terminate(AggregationBuffer agg){   
137.            SumAgg myagg = (SumAgg) agg;   
138.            result.set(myagg.value);   
139.            return result;   
140.        }   
141.    }   
142.}
在使用Hive的UDAF,需要使用ADD JAR语句,将UDAF方程上传到Hadoop Distributed Cache,让每一个DataNode都能共享到这个jar包。
然后才进行调用
1.hive> add jar /home/daxingyu930/test_sum.jar;   
2.hive> drop temporary function sum_test;   
3.hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel';   
4.  
5.hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel2';   
6.  
7.hive> select sum_test(height,2.0) from student_height;
附录:关于UDAF流程介绍
init  当实例化UDAF evaluator的时候执行。
getNewAggregationBuffer  返回一个对象用来保存临时的聚合结果集。
iterate  将一条新的数据处理放到聚合内存块中(aggregation buffer)
terminateParital  返回现有的聚合好的一个持久化的路径,相当于数据对象。这些数据可以通过Hive的数据类型可来访问,这个数据对象可以被Java理解,如Integer,String,或者是Array,Map这种。
相当于第二次MapReduce的map阶段。
merge   将partital数据(分区汇总的数据),于terminateParital数据融合在一起
terminate 返回一个最终的数据聚合结果,是一个结果,或者是一个结果集。
在init阶段,hive会自动检测最终生成的object inspector。
并获取使用聚合函数所处的mode。
iterate和 terminalPartial 都是在map阶段
而terminate和merge 都是在reduce阶段。
merge则用来聚合结果集
注意,无论使用UDF和UDAF,尽可能少地使用new关键字,可以使用静态类。
这样可以减少JVM的GC操作,提高效率。

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条