import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;
import java.io.*;
/**
* Return a single record (filename, "") where the filename is taken from
* the file split.
*/
public class BinRecordReader extends RecordReader<LongWritable, data> {
private FSDataInputStream inputStream = null;
private long start=0,end=0,pos=0;
private Configuration conf = null;
private FileSplit fileSplit = null;
private LongWritable key = new LongWritable();
private data value = new data();
private boolean processed = false;
public BinRecordReader() throws IOException {
}
public void close() {
try {
if(inputStream != null)
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public float getProgress() {
return ((processed == true)? 1.0f : 0.0f);
}
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}
public data getCurrentValue() throws IOException,InterruptedException {
return value;
}
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
fileSplit = (FileSplit)inputSplit;
conf = context.getConfiguration();
this.start = fileSplit.getStart();
this.end = this.start + fileSplit.getLength();
try{
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
this.inputStream = fs.open(path);
inputStream.seek(this.start);
this.pos = this.start;
} catch(IOException e) {
e.printStackTrace();
}
}
public boolean nextKeyValue() throws IOException, InterruptedException {
if(this.pos < this.end) {
int leng = 0;
int t, i;
short z;
key.set(this.pos);
value.total_len = inputStream.readInt();
inputStream.skip(value.total_len-4);
FileWriter fw = null;
try{
fw = new FileWriter("/home/hadoop/aaa", true);
fw.write("this.pos"+pos+"\n");
fw.write("data:"+value.total_len+"\n");
fw.close();
} catch (Exception e) {
e.printStackTrace();
}
this.pos = inputStream.getPos();
return true;
} else {
processed = true;
return false;
}
}
}
上面的函数解析我自定义的日志格式,日志第一个元素就是整条日志4字节的长度,整个程序功能没问题。。但是上面打印到/home/hadoop/aaa里的日志极为诡异,让我困惑不解。下面是aaa文件的输出:
this.pos134217728
data:779186478
this.pos402653184
data:892482614
this.pos0
data:151
this.pos151
data:192
this.pos343
data:383
this.pos726
data:145
this.pos871
data:312
this.pos1183
data:275
this.pos1458
data:126
this.pos1584
data:172
this.pos1756
data:136
this.pos1892
data:227
this.pos2119
data:284
this.pos2403
data:172
this.pos2575
data:153
this.pos2728
data:469
this.pos3197
data:795
this.pos3992
|