分享

HIVE UDTF 简单示例

xiaobaiyang 发表于 2016-1-28 17:54:09 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 12033

(1) 继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF。

(2)实现initialize, process, close三个方法。

UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果返回。最后close()方法调用,对需要清理的方法进行清理。


下面是我写的一个用来切分”key:value;key:value;”这种字符串,返回结果为key, value两个字段。供参考


package com.it.udf;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

/*
* 获取并显示每个人的平均成绩、最大分、最小分、总分
* A    90
* A    87
* B    89
* C    57
* */
public class getNewUdtf extends GenericUDTF {
    Integer nTotalScore = Integer.valueOf(0);       //总分  
    Float avgScore=Float.valueOf(0);
    String forwardObj[] = new String[1];  
    String strStudent="";       //学生姓名  
    Integer count=0;
    Integer max=0;
    Integer min=100;
   
    @Override
    public StructObjectInspector initialize(ObjectInspector[] args)
            throws UDFArgumentException {
        if (args.length != 2) {
            throw new UDFArgumentLengthException("ExplodeMap takes only two argument");
        }
        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("ExplodeMap takes string as a parameter");
        }

        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("studName");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("sumScore");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("avgScore");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("maxScore");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("minScore");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
    }
   
    @Override
    public void process(Object[] args) throws HiveException {
         if(!strStudent.isEmpty() && !strStudent.equals(args[0].toString()))  
         //  if(!strStudent.equals(args[0].toString()))
            {  
               //当学生名字变化时,输出该学生的总分  
               ArrayList<String> arrayList = new ArrayList<String>();
               avgScore=(float) (nTotalScore/count);
               arrayList.add(String.valueOf(strStudent));
               arrayList.add(String.valueOf(nTotalScore));
               arrayList.add(String.valueOf(avgScore));
               arrayList.add(String.valueOf(max));
               arrayList.add(String.valueOf(min));
               forward(arrayList.toArray(new String[]{}));  
               nTotalScore=0;  
               avgScore=(float) 0;
               max=0;
               count=0;
               min=100;
            }     
            strStudent=args[0].toString();  
            count++;
            nTotalScore+=Integer.parseInt(args[1].toString());
            //求最值
            if(max<Integer.valueOf(args[1].toString())){
                max=Integer.valueOf(args[1].toString());
            }
            if(min>Integer.valueOf(args[1].toString())){
                min=Integer.valueOf(args[1].toString());
            }
    }
   
    @Override
    public void close() throws HiveException {
         //输出最后一个学生的总分  
        avgScore=(float) (nTotalScore/count);
        ArrayList<String> arrayList = new ArrayList<String>();
        arrayList.add(String.valueOf(strStudent));
        arrayList.add(String.valueOf(nTotalScore));
        arrayList.add(String.valueOf(avgScore));
        arrayList.add(String.valueOf(max));
        arrayList.add(String.valueOf(min));
        forward(arrayList.toArray(new String[]{}));
    }
}


操作:
1.create external table studentscore(name string,score int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\,' stored as textfile;
2.load data local inpath '/root/jar/score.txt' overwrite into table studentscore;
3.add jar hdfs://hadoop:9000/udtftest.jar;
4.create temporary function statics as 'com.it.udf.getNewUdtf';
执行结果:

在执行期间,有可能遇到以下问题:
Error during job, obtaining debugging information...
Examining task ID: task_201601271725_0012_m_000002 (and more) from job job_201601271725_0012
Exception in thread "Thread-26" java.lang.RuntimeException: Error while reading from task log url
        at org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getStackTraces(TaskLogProcessor.java:240)
        at org.apache.hadoop.hive.ql.exec.JobDebugger.showJobFailDebugInfo(JobDebugger.java:227)
        at org.apache.hadoop.hive.ql.exec.JobDebugger.run(JobDebugger.java:92)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Server returned HTTP response code: 400 for URL: http://hadoop1:50060/tasklog?taskid=attempt_201601271725_0012_m_000000_1&start=-8193
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1436)
        at java.net.URL.openStream(URL.java:1010)
        at org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getStackTraces(TaskLogProcessor.java:192)
        ... 3 more
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
MapReduce Jobs Launched:
Job 0: Map: 1   HDFS Read: 0 HDFS Write: 0 FAIL


  解决过程如下:

  看错误日志,url是:

  http://slaver1:50060/tasklog?taskid=attempt_201112211741_0005_m_000000_1&start=-8193  

  再看看HADOOP的源码:TaskLogServlet  


String attemptIdStr = request.getParameter("attemptid");  
  if (attemptIdStr == null) {

        response.sendError(HttpServletResponse.SC_BAD_REQUEST,  
                           "Argument attemptid is required");
        return;
      }
  所以taskid应该是attemptid ,应该是hadoop的版本不一致导致的。

  然后在浏览器里访问:http://slaver1:50060/tasklog?attemptid=attempt_201112211741_0005_m_000000_1&start=-8193  

  获得真正的错误信息,class not found 导致的,然后就好办了,把对应的jar包扔到hadoop的lib .
  stop-mapred.sh  
  start-mapred.sh

注意:1.jar放在本地或者hdfs 上都是可以的,若遇到空指针的问题时,可以考虑将jar包放在hadoop集群环境的主节点上,并将jar包拷贝到对应hadoop目录下的lib目录下,否则可能遇到NoClassDefFoundError问题;
2.当需要传入一个或多个参数时,在第二个阶段process时进行接收并进行逻辑处理,args[0].toString()进行参数接收;



已有(3)人评论

跳转到指定楼层
xiaobaiyang 发表于 2016-1-28 17:56:34
C:\Users\Administrator\Desktop/udtf1.jpg
回复

使用道具 举报

xiaobaiyang 发表于 2016-1-28 17:58:43
执行结果:

A                253                84.0        90        78
B                146                73.0        79        67
C                90                90.0        90        90
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条