分享

hadoop mapreduce如何实现多目录输出

Alkaloid0515 发表于 2015-8-11 19:56:17 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 20351


背景:
运维用scribe从apache服务器推送过来的日志有重复记录,所以这边的ETL处理要去重,还有个需求是要按业务类型多目录输出,方便挂分区,后面的使用。
这两个需求单独处理都没有问题,但要在一个mapreduce里完成,需要一点技巧。


1、map输入数据,经过一系列处理,输出时:

[mw_shl_code=java,true]if(ttype.equals("other")){  
        file = (result.toString().hashCode() & 0x7FFFFFFF)%400;  
       }else if(ttype.equals("client")){  
        file = (result.toString().hashCode() & 0x7FFFFFFF)%260;  
       }else{  
        file = (result.toString().hashCode()& 0x7FFFFFFF)%60;  
       }  
       tp = new TextPair(ttype+"_"+file, result.toString());  
         
       context.write(tp, valuet);  [/mw_shl_code]


valuet是空的,什么都没有。
我这里有三个类型,other,client,wap,分别代表日志来源平台,要按他们分目录输出。
result就是整条记录。file得到的是最终输出文件名,hash,位操作,取模是为了输出均衡。
map的输出结构<key,value> =(ttype+"_"+file,result.toString())
这样做的目的是:保证相同的记录得到相同的key,同时还要保存类型。partition要按textPair的left,也就是这个key,
保证了后面要写到同一个输出文件的所有记录都到同一个reduce里去,一个reduce可以写多个输出文件,但是一个输出文件不能来自多个reduce,原因很明了。
这样的话大概400+260+60=720个输出文件,每个文件数据量大概差不多,job的reduce数我这里设置的240,这个数连同取模400,260,60都是根据我的数据量来定的,来尽量避免reduce的数据倾斜。


2、reduce方法去重:


[mw_shl_code=java,true]public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException  
   {  
         
       rcfileCols = getRcfileCols(key.getSecond().toString().split("\001"));  
       context.write(key.getFirst(), rcfileCols);  
  
  
   }  [/mw_shl_code]


不用迭代,对相同的key组,只输出一次。注意这里job用到的比较器,一定不能是FirstComparator,而是整个textpair对的比较。(先比较left,再比较right)

  我的程序里输出文件格式是rcfile。

3、多目录输出:

[mw_shl_code=java,true] job.setOutputFormatClass(WapApacheMutiOutputFormat.class);  
     
public class WapApacheMutiOutputFormat extends RCFileMultipleOutputFormat<Text, BytesRefArrayWritable> {  
    Random r = new Random();  
    protected String generateFileNameForKeyValue(Text key, BytesRefArrayWritable value,  
            Configuration conf) {  
         
            String typedir = key.toString().split("_")[0];  
  
  
            return typedir+"/"+key.toString();  
  
  
    }  
}  [/mw_shl_code]


这里的RCFileMultipleOutputFormat是自己继承自FileOutputFormat 自己写的,主要实现了recordWriter。


最终输出去重的,分目录的数据文件。


理解的关键主要是partition key的设计,reduce的原理。



已有(2)人评论

跳转到指定楼层
NEOGX 发表于 2015-8-11 20:02:09
mapreduce如何实现多目录输出?不是多文件.
http://www.aboutyun.com/thread-1611-1-1.html
(出处: about云开发)


回复

使用道具 举报

NEOGX 发表于 2015-8-11 20:11:32
hadoop1.2.1中使用MultipleOutputs将结果输出到多个文件或文件夹
使用步骤主要有三步:
1、在reduce或map类中创建MultipleOutputs对象,将结果输出


[mw_shl_code=java,true]class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{

        //将结果输出到多个文件或多个文件夹
        private MultipleOutputs<Text,IntWritable> mos;
    //创建对象
    protected void setup(Context context) throws IOException,InterruptedException {
        mos = new MultipleOutputs<Text, IntWritable>(context);
     }
           
        //关闭对象
        protected void cleanup(Context context) throws IOException,InterruptedException {
        mos.close();
        }
}[/mw_shl_code]

2、在map或reduce方法中使用MultipleOutputs对象输出数据,代替congtext.write()

[mw_shl_code=java,true]protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                        throws IOException, InterruptedException {
                IntWritable V = new IntWritable();
                int sum = 0;
                for(IntWritable value : values){
                        sum = sum + value.get();
                }
                System.out.println("word:" + key.toString() + "     sum = " + sum);
                V.set(sum);

                //使用MultipleOutputs对象输出数据
                if(key.toString().equals("hello")){
                        mos.write("hello", key, V);
                }else if(key.toString().equals("world")){
                        mos.write("world", key, V);
                }else if(key.toString().equals("hadoop")){
                        //输出到hadoop/hadoopfile-r-00000文件
                        mos.write("hadoopfile", key, V, "hadoop/");
                }
               
        }[/mw_shl_code]


3、在创建job时,定义附加的输出文件,这里的文件名称与第二步设置的文件名相同

[mw_shl_code=java,true]//定义附加的输出文件
                        MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);
                        MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);
                        MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);[/mw_shl_code]


完整代码:

[mw_shl_code=java,true]package com.ru.hadoop.wordcount;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.lib.MultipleOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Progressable;

public class WordCount2 extends Configured{

        public static void main(String[] args) {
                String in = "/home/nange/work/test/word/";
                String out = "hdfs://localhost:9000/hdfs/test/wordcount/out/";
               
                Job job;
                try {
                        //删除hdfs目录
                        WordCount2 wc2 = new WordCount2();
                        wc2.removeDir(out);
                       
                        job = new Job(new Configuration(), "wordcount Job");
                        job.setOutputKeyClass(Text.class);
                        job.setOutputValueClass(IntWritable.class);
                        job.setMapperClass(mapperString.class);
//                        job.setCombinerClass(reduceStatistics.class);
                        job.setReducerClass(reduceStatistics.class);
                       
                        //定义附加的输出文件
                        MultipleOutputs.addNamedOutput(job,"hello",TextOutputFormat.class,Text.class,IntWritable.class);
                        MultipleOutputs.addNamedOutput(job,"world",TextOutputFormat.class,Text.class,IntWritable.class);
                        MultipleOutputs.addNamedOutput(job,"hadoopfile",TextOutputFormat.class,Text.class,IntWritable.class);
                       
                        FileInputFormat.addInputPath(job, new Path(in));
                        FileOutputFormat.setOutputPath(job, new Path(out));
                        job.waitForCompletion(true);
                } catch (IOException e) {
                        e.printStackTrace();
                } catch (URISyntaxException e) {
                        e.printStackTrace();
                } catch (ClassNotFoundException e) {
                        e.printStackTrace();
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
        }
       
        public void removeDir(String filePath) throws IOException, URISyntaxException{
                String url = "hdfs://localhost:9000";
                FileSystem fs  = FileSystem.get(new URI(url), new Configuration());
                fs.delete(new Path(filePath));
        }
}


/**
* 重写maptask使用的map方法
* @author nange
*
*/
class mapperString extends Mapper<LongWritable, Text, Text, IntWritable>{
        //设置正则表达式的编译表达形式
        public static Pattern PATTERN = Pattern.compile(" ");
        Text K = new Text();
        IntWritable V = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {
               
                String[] words = PATTERN.split(value.toString());
                System.out.println("********" + value.toString());
                for(String word : words){
                        K.set(word);
                        context.write(K, V);
                }
        }
}

/**
* 对单词做统计
* @author nange
*
*/
class reduceStatistics extends Reducer<Text, IntWritable, Text, IntWritable>{

        //将结果输出到多个文件或多个文件夹
        private MultipleOutputs<Text,IntWritable> mos;
        //创建MultipleOutputs对象
    protected void setup(Context context) throws IOException,InterruptedException {
        mos = new MultipleOutputs<Text, IntWritable>(context);
     }
   
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                        throws IOException, InterruptedException {
                IntWritable V = new IntWritable();
                int sum = 0;
                for(IntWritable value : values){
                        sum = sum + value.get();
                }
                System.out.println("word:" + key.toString() + "     sum = " + sum);
                V.set(sum);

                //使用MultipleOutputs对象输出数据
                if(key.toString().equals("hello")){
                        mos.write("hello", key, V);
                }else if(key.toString().equals("world")){
                        mos.write("world", key, V);
                }else if(key.toString().equals("hadoop")){
                        //输出到hadoop/hadoopfile-r-00000文件
                        mos.write("hadoopfile", key, V, "hadoop/");
                }
               
        }
       
        //关闭MultipleOutputs对象
        protected void cleanup(Context context) throws IOException,InterruptedException {
        mos.close();
        }
}[/mw_shl_code]
















回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条