分享

Hadoop自定义数据类型代码实现

把后面的URLString 封装成 URL类型。代码如下

[mw_shl_code=java,true]import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

import org.apache.hadoop.io.Writable;
/**
* @author liuyazhuang
*/
public class URLWritable implements Writable {

        protected URL url;
       
        public URLWritable() {
               
        }
       
        public URLWritable(URL url) {
                this.url = url;
        }

        @Override
        public void write(DataOutput out) throws IOException {
                out.writeUTF(url.toString());
        }

        @Override
        public void readFields(DataInput in) throws IOException {
                this.url = new URL(in.readUTF());
        }
       
        public void set(String string) {
                try {
                        this.url = new URL(string);
                } catch (MalformedURLException e) {
                        throw new RuntimeException("Should not have happened " + e.toString());
                }
        }
}
[/mw_shl_code]



[mw_shl_code=java,true]import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
/**
* @author liuyazhuang
*/
public class TimeUrlLineRecordReader extends RecordReader<Text, URLWritable> {
        public static final String Time_URL_SEPERATOR =
            "mapreduce.input.keyvaluelinerecordreader.key.value.separator";
       
        private final LineRecordReader lineRecordReader;
       
        private byte separator = (byte) '\t';
       
        private Text innerValue;
          
        private Text key;
          
        private URLWritable value;
       
        public static int findSeparator(byte[] utf, int start, int length, byte sep) {
                for (int i = start; i < (start + length); i++) {
                        if (utf == sep) {
                                return i;
                        }
                }
                return -1;
        }
       
        public static void setKeyValue(Text key, URLWritable value, byte[] line,
                        int lineLen, int pos) {
                if (pos == -1) {
                        key.set(line, 0, lineLen);
                        value.set(StringUtils.EMPTY);
                } else {
                        key.set(line, 0, pos);
                        String url = null;
                        System.arraycopy(line, pos + 1,url , 0, lineLen - pos - 1);
                        value.set(url);
                }
        }

        public TimeUrlLineRecordReader(Configuration conf) throws IOException {
                lineRecordReader = new LineRecordReader();
                String sepStr = conf.get(Time_URL_SEPERATOR, "\t");
            this.separator = (byte) sepStr.charAt(0);
        }
       
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                        throws IOException, InterruptedException {
                 lineRecordReader.initialize(split, context);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
                byte[] line = null;
                int lineLen = -1;
                if (lineRecordReader.nextKeyValue()) {
                        innerValue = lineRecordReader.getCurrentValue();
                        line = innerValue.getBytes();
                        lineLen = innerValue.getLength();
                } else {
                        return false;
                }
                if (line == null) {
                        return false;
                }
                if (key == null) {
                        key = new Text();
                }
                if (value == null) {
                        value = new URLWritable();
                }
                int pos = findSeparator(line, 0, lineLen, this.separator);
                setKeyValue(key, value, line, lineLen, pos);
            return true;
        }

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
                return key;
        }

        @Override
        public URLWritable getCurrentValue() throws IOException,
                        InterruptedException {
                return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
                return lineRecordReader.getProgress();
        }

        @Override
        public void close() throws IOException {
                lineRecordReader.close();
        }
}
[/mw_shl_code]


[mw_shl_code=java,true]import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* @author liuyazhuang
*/
public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable>{

        @Override
        protected boolean isSplitable(JobContext context, Path file) {
                final CompressionCodec codec = new CompressionCodecFactory(
                                context.getConfiguration()).getCodec(file);
                return codec == null;
        }

        @Override
        public RecordReader<Text, URLWritable> createRecordReader(InputSplit split,
                        TaskAttemptContext context) throws IOException, InterruptedException {
                context.setStatus(split.toString());
                return new TimeUrlLineRecordReader(context.getConfiguration());
        }
}
[/mw_shl_code]

[mw_shl_code=java,true]import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author liuyazhuang
*/
public class CustomTimeUrl extends Configured implements Tool {

        public static class CustomTimeUrlMapper extends Mapper<Text, URLWritable, Text, URLWritable> {

                @Override
                protected void map(Text key, URLWritable value, Context context)
                                throws IOException, InterruptedException {
                        context.write(key, value);
                }
               
        }
       
        public static class CustomTimeUrlReducer extends Reducer<Text, URLWritable, Text, URLWritable> {

                @Override
                protected void reduce(Text key, Iterable<URLWritable> values,Context context)throws IOException, InterruptedException {
                        for (URLWritable value : values) {
                                context.write(key, value);
                        }
                }
               
        }
       
            @Override
             public int run(String[] args) throws Exception {
                Job job = new Job(getConf());
                job.setJarByClass(getClass());
                job.setJobName("CustomTimeUrl");
               
                job.setInputFormatClass(TimeUrlTextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
               
               
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(URLWritable.class);
               
                job.setMapperClass(CustomTimeUrlMapper.class);
                job.setReducerClass(CustomTimeUrlReducer.class);
               
                FileInputFormat.setInputPaths(job, new Path("/timeurl/input/"));
                FileOutputFormat.setOutputPath(job, new Path("/timeurl/output"));
               
                boolean success = job.waitForCompletion(true);
                return success ? 0 : 1;
        }
       
        public static void main(String[] args) throws Exception {
                int result = ToolRunner.run(new TimeUrl(), args);
                System.exit(result);
        }
}
[/mw_shl_code]




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条