分享

调试mapreduce程序碰到很奇葩的问题。。。求帮忙看看代码。。

dsy198816 发表于 2016-12-16 20:13:41 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 6093
import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;
import  java.io.*;

/**
* Return a single record (filename, "") where the filename is taken from
* the file split.
*/



public class BinRecordReader extends RecordReader<LongWritable, data> {
        private FSDataInputStream inputStream = null;
        private long start=0,end=0,pos=0;
        private Configuration conf = null;
        private FileSplit fileSplit = null;
        private LongWritable key = new LongWritable();
        private data value = new data();
        private boolean processed = false;
        
        public BinRecordReader() throws IOException {
        }

        
        public void close() {
                try {
                        if(inputStream != null)
                                inputStream.close();
                } catch (IOException e) {
                        e.printStackTrace();
                }
        }


        public float getProgress() {
                return ((processed == true)? 1.0f : 0.0f);
        }

        
        public LongWritable getCurrentKey() throws IOException,
        InterruptedException {
                return key;
        }

        
        public data getCurrentValue() throws IOException,InterruptedException {
                return value;
        }

        
        public void initialize(InputSplit inputSplit, TaskAttemptContext context)
                        throws IOException, InterruptedException {
                fileSplit = (FileSplit)inputSplit;
                conf = context.getConfiguration();
                this.start = fileSplit.getStart();
                this.end = this.start + fileSplit.getLength();

                try{
                        Path path = fileSplit.getPath();
                        FileSystem fs = path.getFileSystem(conf);
                        this.inputStream = fs.open(path);
                        inputStream.seek(this.start);
                        this.pos = this.start;
                }        catch(IOException e)        {
                        e.printStackTrace();
                }
        }

        public boolean nextKeyValue() throws IOException, InterruptedException {
                if(this.pos < this.end) {
                     int leng = 0;
                        int t, i;
                        short z;

                    key.set(this.pos);
                      value.total_len = inputStream.readInt();
                      inputStream.skip(value.total_len-4);         

                       FileWriter fw = null;
                        try{

                              fw = new FileWriter("/home/hadoop/aaa", true);
                                  fw.write("this.pos"+pos+"\n");
                              fw.write("data:"+value.total_len+"\n");
                                  fw.close();
         
                             } catch (Exception e) {
                              e.printStackTrace();
                             }
                     
                        
                     this.pos = inputStream.getPos();
                        return true;
                } else {
                        processed = true;
                        return false;
                }
        }
}


上面的函数解析我自定义的日志格式,日志第一个元素就是整条日志4字节的长度,整个程序功能没问题。。但是上面打印到/home/hadoop/aaa里的日志极为诡异,让我困惑不解。下面是aaa文件的输出:

this.pos134217728
data:779186478
this.pos402653184
data:892482614
this.pos0
data:151
this.pos151
data:192
this.pos343
data:383
this.pos726
data:145
this.pos871
data:312
this.pos1183
data:275
this.pos1458
data:126
this.pos1584
data:172
this.pos1756
data:136
this.pos1892
data:227
this.pos2119
data:284
this.pos2403
data:172
this.pos2575
data:153
this.pos2728
data:469
this.pos3197
data:795
this.pos3992


已有(7)人评论

跳转到指定楼层
dsy198816 发表于 2016-12-16 20:15:42
为什么打印出的输出第五行才开始是正确的值? 后面pos是获取的文件偏移, data是提取的日志前4字节日志的长度。     前四行的输出肯定不是文件里都出来的,日志文件格式我写程序验证过。
回复

使用道具 举报

dsy198816 发表于 2016-12-16 20:16:17
我想请问下,是不是nextvalue这个函数  是多个map并行执行的? 是不是要加锁什么的?
回复

使用道具 举报

langke93 发表于 2016-12-16 21:10:10
本帖最后由 langke93 于 2016-12-16 21:12 编辑
dsy198816 发表于 2016-12-16 20:16
我想请问下,是不是nextvalue这个函数  是多个map并行执行的? 是不是要加锁什么的?

应该是有四个mapreduce,初始化打印的值。并行是没有问题的,关键是并行的都写入同一个文件。
这样就出问题了。不同的mapreduce写入不同的文件应该就可以了。
文件名可以随机Math.random(),这样就产生了不同的文件,内容是同一个mapreduce产生的
回复

使用道具 举报

dsy198816 发表于 2016-12-16 21:31:09
求教楼上的大侠。。。是在哪里看到4个mapreduce的
回复

使用道具 举报

langke93 发表于 2016-12-17 08:35:50
dsy198816 发表于 2016-12-16 21:31
求教楼上的大侠。。。是在哪里看到4个mapreduce的

this.pos134217728
data:779186478
this.pos402653184
data:892482614
如果上面两个是一对,那就是两个。
因为这个是split,每个split对应的一个map任务。确切的应该是两个map。
不过这个是猜测,楼主可以实地观测下。

推荐参考
分片split和资源容器container之间的关系
http://www.aboutyun.com/forum.php?mod=viewthread&tid=17181


回复

使用道具 举报

dsy198816 发表于 2016-12-17 11:55:16
但是我的定义的Inputformat里的  isSplitable 返回的是false啊。。。应该是没有分片了吧。。。
回复

使用道具 举报

qcbb001 发表于 2016-12-17 16:59:10
dsy198816 发表于 2016-12-17 11:55
但是我的定义的Inputformat里的  isSplitable 返回的是false啊。。。应该是没有分片了吧。。。

Recordreader
以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类;
如果解释的话,需要数据,并且需要明白执行顺序。
建议还是调试下,看看到底是怎么走的
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条