分享

hadoop的mapreduce如何自定义分割文件【改变默认按行分割】

本帖最后由 xuanxufeng 于 2016-5-8 17:11 编辑
问题导读
1.mapreduce处理文件是否按行分割?
2.如何事先自定义分割文件?
3.哪个函数实现了分割文件?





我一直有这样的偏执,认为,如果要想做要一个事情学好一样东西,一定要知道这个东西的原理,不知道本质的东西,是很难使用好一个工具,hadoop是个工具,分布式工具,和其他的工具一样也不一样,这里不打算介绍太多的原理,只是阐明我的态度,原理部分打算另外启动一个系列来进行阐述。
在阐述本文想要解决的问题之前,我们先来回忆一下有这样的先验知识,默认的文件分割是按照“行”的概念来进行文件切分的,hadoop coding中map函数中处理的K是行的偏移值,V是行的数据(对应的class是LineRecordReader.class)。那么,问题来了,如果我不想按照行来进行数据切分,想要按照指定的分隔符来进行分割,那怎么做呢?比如,如下这种case?
<doc>
a=a1
b=b1
...
</doc>
<doc>
a=a2
b=b2
...
</doc>
<doc>
a=a3
b=b3
...
</doc>
<doc>
...
</doc>
从文件的格式可以看的很清楚,我想要map处理一次数据是一个doc,我想要的分隔符貌似有两个<doc>开始,</doc>结束。如果这类问题能够能够解决,那么推广开去,以单个分隔符分割的诉求也是可以解决的了。
下面会提供一些技术方案来实现,重点阐述方案二
方案1:将每个doc变成行数据,那么自然可以沿用默认的行处理的方式即可,这个方案的难点在将每一个doc数据变成一行数据,这可能需要其他的工具来进行辅助,大家可以去思考怎么实现
方案2:自定义Inputformat,RecordReader,LineReader来进行实现,要想知道自己怎么来动工,那么先要了解默认的是咋弄的,好吧,let's Go!
首先要从JobConf说起,要跑得动一个hadoop程序,需要说明一些相关的配置来告诉hadoop计算框架现在来的是一号神马人物,那么常见的一些配置有,输入的文件格式啊,map,reduce的处理class啊,程序的输入/输出地址等,其中有个配置是和我们这里谈的比较相关的,就是输入文件的配置 ,coding时一般是这样写的:
jobConf.setInputFormat(xxxInputFormat.class); 这个默认的xxxInputFormat就是TextInputFormat(源码如图1)
可以看出这个类其实是个代理类,其中画粗线的方法是真正决定实现的关键点,getRecordReader函数,返回是一个RecordReader对象,每调用一次这个函数,即返回一个k,v作为map函数的k,v输入!
1.png

图1 TextInputFormat源码
那么,RecordReader是我们要探寻的第二个目的地,这是一个接口类,outline如图2所示,可以看出这是一个迭代类,重点的方法是next函数,那么我们再回到这个默认的RecordReader实现类,LineRecordReader去一探究竟吧(如图三所示),果然没有辜负我的期望,看看画线的那一块吧,就是最终的答案, 每调用一次next,即调用了一次in.readline函数,value作为入参,调用结束后,赋值满满的返回!
2.png

图2,RecordReader的outline

3.png

图3 LineRecordReader部分源码
这个in到底是什么呢?private LineReader in;  哈哈,出来了,LineReader ,这个背后的决策者终于被找出来了,那赶紧来看看咋实现的吧,如图四所示,看到标志性的建筑了吧,“\n\r”,good,这个函数就是我们要找的那个!
4.png

图4 readline函数部分源码
好了,到这里我们来总结一下刚才的发现之旅,从jobconf知道了分割的格式处理工具InputFormat,找从InputFormat到了hadoop默认的TextInputFormat,再从TextInputFormat找到了K,V对象RecordReader,再从RecordReader找打行K,V对象LineRecordReader,最后找到了LineReader:负责对文件分割的真正的决策者!那么,有了这样的认识,自然知道,我们首先该做的是建立一个DocReader用于分割,这次我将代码贴了出来:
    private byte[] sp = { '<', '/', 'd', 'o', 'c', '>' };
    private byte[] A = { '\\','A'};
    Text strBk ;
    public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
            throws IOException {
        str.clear();
        boolean hadSpliter = false;
        boolean hitEndOfFile = false;
        int startPosn = bufferPosn;
        long bytesConsumed = 0;
        // get a split content
        outerLoop: while (true) {
            startPosn = bufferPosn;
            if (bufferPosn >= bufferLength) {
                if (!backfill()) {
                    hitEndOfFile = true;
                    break;
                }
            }
            startPosn = bufferPosn;
            for (; bufferPosn < bufferLength; ++bufferPosn) {
                if (bufferPosn+sp.length <= bufferLength && checkSpliter(bufferPosn, sp)) {
                    hadSpliter = true;
                    bufferPosn += sp.length;
                    break outerLoop;
                }
            }
            bytesConsumed += bufferPosn - startPosn;
            if (bytesConsumed >= maxBytesToConsume) {
                return (int) Math.min(bytesConsumed, (long) Integer.MAX_VALUE);
            }
        }
        if (!hitEndOfFile) {
            bytesConsumed += bufferPosn - startPosn;
            int length = bufferPosn - startPosn + (hadSpliter ? 1 : 0);
            length = (int) Math.min(length, maxLineLength - str.getLength());
            if (length > 0) {
                if(strBk!=null)
                {
                    str.set(strBk);
                    strBk=null;
                }
                try {
                   str.append(buffer, startPosn, length);
                } catch (Exception e) {
                    System.out.println("startPosn:"+startPosn+" length:"+length+ " bufferLength:"+bufferLength);
                    str.append(buffer, startPosn, length-1);
                    str.append(A,0,A.length);
                }
                int pos = startPosn+length;
                if(pos<bufferLength){
                    strBk = new Text();
                   strBk.append(buffer, pos , bufferLength-pos);
                }
            }
        }
        return (int) Math.min(bytesConsumed, (long) Integer.MAX_VALUE);
    }
private boolean checkSpliter(int bufferPosn, byte[] sp) {
    for (int i = 0; i < sp.length; i++) {
        if (buffer[bufferPosn + i] != sp)
            return false;
    }
    return true;
}
对于这个代码的理解,是需要建立在对于LineReader代码的理解基础上,我尝试着梳理一下实现这个代码的一些思想(需要读者在结合LineReader的源码上来理解),首先,我们要知道这类XXXReader是对输入的文件流来做处理,因此有一个对象来代表输入的文件流,这个就是private InputStream in;那么,每触发调用一次readLine函数,其实是从inputstream中找出想要的那一部分来进行截断再输出,那么想要的那一部分可以是line的分割符\n\r,也可以是doc的分割符!因为stream流可能会很长,因此我们处理的对象其实是buffer流,buffer流是用于保存一小段inputstream的缓存,每次buffer流遍历完后,即会清空,加载进另外一小段inputstream,知道stream的末端为止。(buffer的大小可以通过配置io.file.buffer.size来进行控制的,默认为2m)
因此这段代码的核心思想是,首先从当前的buffer流中找到</doc>分隔符(checkSpliter(bufferPosn, sp)函数),并记录下他的位置 (bufferPosn += sp.length;),再往下即是拼凑一个完整的doc,因为如图5所示,buffer的开始处<buffer-start>不是<doc>时,只用一个buffer是无法还原一个doc的,因此需要一个变量strBk记录上一次被buffer截断时的信息(strBk.append(buffer, pos , bufferLength-pos);) 最后,由strBk与当前的buffer[0,pos]拼凑起来的即是一个完整的doc了。
5.png
图5 buffer流工作示意图
到这里,我们似乎解决了如何获取一个doc的问题,但是背后隐藏着另外一个问题,就是如果分割的时候buffer的开始位置刚好是处于</doc>中的某一个字节的话,那么会怎样呢?buffer开始位置,即意味着上一个buffer的结束位置,因此也即是说,上一个buffer中是没有找到</doc>的,那么上一个buffer就会被miss掉,这样会引起数据的丢失,如果在一个数据不可缺失的场景这样的情况发生可不妙了,因此这个方法需要有改进,前面其实有暗示到,buffer流截断了</doc>,那么如果每次判断一下上一次的buffer末尾与这一次的buffer开始,那这个问题不就可以解决了吗?另外还有一个解决的trick是放大buffer的大小,使得buffer一定是大于doc的,那么会大大的降低这种情况出现的可能行,注意,这只是降低,不是完全避免。


先介绍一下思路,上次的缺陷在于分割时如果刚好分开了分隔符,那么按照原来的算法,必然会造成数据缺少,本次升级的思路是将上一次的buffer进行全部保存,如果当数据开始新的buffer时,将原来的buffer赋予,请看代码,核心思想在红色字体的部分

private byte[] sp_e = { '<', '/', 'd', 'o', 'c', '>','\01'};
    Text strBk ;
    public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
            throws IOException {
        str.clear();
        boolean hitEndOfFile = false;
        int startPosn = bufferPosn;
        long bytesConsumed = 0;
        // get a split content
        outerLoop: while (true) {
            startPosn = bufferPosn;
            if (bufferPosn >= bufferLength) {
                if (!backfill()) {
                    hitEndOfFile = true;
                    break;
                }
            }
            if (strBk != null && bufferPosn==0) { //当开始读取一轮新的buffer数据时,将历史的buffer数据append给当前的buffer作为最新的buffer
                strBk.append(buffer, 0, buffer.length);
                buffer = strBk.getBytes();
                bufferLength = buffer.length;
                strBk = null;
            }
            startPosn = bufferPosn;
            strBk = new Text();
            for (; bufferPosn < bufferLength; ++bufferPosn) {
                if (bufferPosn + sp_e.length <= bufferLength && checkSpliter(bufferPosn, sp_e)) {
                        bufferPosn += sp_e.length;
                        break outerLoop;
                    }
               strBk.append(buffer, bufferPosn, 1); //循环时,将当前的所有buffer进行保存,以用于数据完整
            }
        }

      //只有找到分隔符</doc>才会到这里
        if (!hitEndOfFile) {
            bytesConsumed += bufferPosn - startPosn;
            int length = bufferPosn - startPosn;
            length = (int) Math.min(length, maxLineLength - str.getLength());
            if (length > 0) {
                try {
                    str.append(buffer, startPosn, length);
                    String s = str.toString();
                    LOG.info("find one:"+s.substring(0, 100)+s.substring(s.length()-10, s.length()));
                } catch (Exception e) {
                    System.err.println("startPosn:"+startPosn+" length:"+length+ " bufferLength:"+bufferLength);
                }
                int pos = bufferPosn;
                if(pos<bufferLength){ //将当前buffer剩余的buffer保存起来
                    strBk = new Text();
                    strBk.append(buffer, pos , bufferLength-pos);
                }
            }
        }
        return (int) Math.min(bytesConsumed, (long) Integer.MAX_VALUE);
    }

本文版权属作者@eletva所有,转载请注明出处




已有(2)人评论

跳转到指定楼层
ml32 发表于 2016-5-9 09:16:20
hadoop的mapreduce如何自定义分割文件【改变默认按行分割】
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条