分享

hadoop 多数据源连接之DataJoin

InSight 发表于 2015-5-4 16:38:02 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 18404
问题导读:
1.Hadoop可以采用哪几种数据连接方式?
2.Redurce处理过程是怎样的?
3.实例操作?








一个MapReduce任务很可能访问和处理两个甚至多个数据集,在关系型数据库中,这将是两个或者多个表的连接,但是Hadoop系统没有关系型数据库中那样强大的连接处理功能,因此处理复杂一些。一般来讲,hadoop可以采用这几种数据连接方式:

        1 采用DataJoin类库实现Reduce端连接的方法

        2 用全局文件复制实现Map端连接方法

        3 带Map端过滤的Reduce端连接方法

   Hadoop的Mapreduce框架提供了一种较为通用  的多数据源连接方法,该方法用DataJoin类库为程序员提供了完成数据连接所需的编程框架和接口,其处理方法如下:

         为了完成不用数据源的连接操作,我们必须给每个数据源制定一个标签(tag),用来区分数据,就像关系型数据库中表名一样,这里我们需要实现 Text generateInputTag(String inputFile)方法;

         另外,为了进行连接操作,我们必须知道连接的主键是什么,类似于关系型数据库中的key,因此我们需要指定groupKey,这里我们需要实现 Text generateGroupKey(TaggedMapOutput aRecord)

         然后在Map端我们需要把原始数据包装成为一个带标签的数据记录,方便shuffle和Reduce端执行笛卡尔积,所以我们需要实现 TaggedMapOutput generateTaggedMapOutput(Object value);



总结一下

Map处理过程:

   Datajoin类库首先提哦功能管理一个抽象基类DataJoinMapperBase,该基类实现了map()方法,帮助程序员对每个数据源下的记录生成一个代标签的数据记录对象。Map端处理过程中,需要指定标签tag和Groupkey,然后包装成为带标签的数据记录对象,在shuffle过程中,这些GroupKey相同的记录被分到同一个Reduce节点上。

Reduce处理过程:

      Reduce节点收到这些带标签的数据记录后,Reduce过程将这些带不同的数据源标签的记录执行笛卡尔积,自动生成所有不同的叉积组合,由程序员实现一个combine()方法,根据应用程序需求将这些具有相同的Groupkey的数据记录进行适当的合并处理,以此完成类似于关系型数据库中不同实体数据记录之间的连接。

     在Reduce阶段我们需要继承DataJoinReduceBase,该基类实现了reduce()方法,我们只是需要实现combine()方法即可,另外我们还是需要继承TaggedMapOutput类,它描述了一个标签化的数据记录,实现了getTag(),setTag()方法,作为Mapper的key_value输出value类型,由于需要I/O,我们需要继承并且实现Writable接口,并且实现getData()方法用以读取记录数据

   下面是数据源:

        user.txt文件:

1,张三,135xxxxxxxx
2,李四,136xxxxxxxx
3,王五,137xxxxxxxx
4,赵六,138xxxxxxxx

order.txt文件:

3,A,13,2013-02-12
1,B,23,2013-02-14
2,C,16,2013-02-17
3,D,25,2013-03-12
这其中需要注意很多小细节,因为没有要求程序员实现Map和reduce方法,所以我们会很容易忽略很多东西,需要注意的东西我在下面一一注释了:

我们必须使用Jobconf 来声明一个job,同时使用JobClient来run job,另外我们在继承TaggedMapOutput的时候默认的无参构造方法中需要初始化data

  1. package joinTest;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
  7. import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
  8. import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.io.Writable;
  12. import org.apache.hadoop.mapred.FileInputFormat;
  13. import org.apache.hadoop.mapred.FileOutputFormat;
  14. import org.apache.hadoop.mapred.JobClient;
  15. import org.apache.hadoop.mapred.JobConf;
  16. import org.apache.hadoop.mapred.TextInputFormat;
  17. import org.apache.hadoop.mapred.TextOutputFormat;
  18. public class DataJoin{
  19.         public static class MyTaggedMapOutput extends TaggedMapOutput{
  20.                 private Writable data;
  21.             
  22.                 public MyTaggedMapOutput(){
  23.                         //一定要new一下,不然反序列化时会报空指针异常
  24.                         this.data=new Text();
  25.                 }
  26.                
  27.                 public MyTaggedMapOutput(Writable data){
  28.                         this.tag=new Text("");
  29.                         this.data=data;
  30.                 }
  31.                 public void readFields(DataInput in) throws IOException {
  32.                         this.tag.readFields(in);
  33.                         this.data.readFields(in);
  34.                 }
  35.                 public void write(DataOutput output) throws IOException {
  36.                         this.tag.write(output);
  37.                         //this.tag.write(output);     //大问题,粗心写成了this.tag.write(output); 结果一直报错
  38.                         this.data.write(output);
  39.                 }
  40.                 @Override
  41.                 public Writable getData() {
  42.                         return data;
  43.                 }
  44.         }
  45.        
  46.        
  47.         public static class DataJoinMapper extends DataJoinMapperBase{
  48.                
  49.                 protected Text generateInputTag(String inputFile){
  50.                                 //String datasource=inputFile.substring(inputFile.lastIndexOf("/")+1).split("\\.")[0];
  51.                             String datasource = inputFile.split("-")[0];
  52.                                 System.out.println("datasource:"+datasource);
  53.                                 return new Text(datasource);
  54.                 }
  55.                
  56.                 protected TaggedMapOutput generateTaggedMapOutput(Object value){
  57.                         TaggedMapOutput tm=new MyTaggedMapOutput((Text)value);
  58.                         tm.setTag(this.inputTag);
  59.                         return tm;
  60.                 }
  61.                 @Override
  62.                 protected Text generateGroupKey(TaggedMapOutput aRecord) {
  63.                         String line=aRecord.getData().toString();
  64.                         //String groupkey=line.split("\\s")[0];
  65.                         String groupkey=line.split(",")[0];
  66.                         return new Text(groupkey);
  67.                 }
  68.         }
  69.        
  70.         public static class DataJoinReducer extends DataJoinReducerBase{
  71.                 @Override
  72.                 protected TaggedMapOutput combine(Object[] tags, Object[] values) {
  73.                         if(tags.length<2){
  74.                                 return null;
  75.                         }
  76.                         String output = "";
  77.                    /* for(int i=0;i<tags.length;i++){
  78.                             TaggedMapOutput tat=(MyTaggedMapOutput)values[i];System.out.println("tags:"+tags[i]+"    values:"+tat.getData().toString());
  79.                             if(i==0){
  80.                                  output=tat.getData().toString();//System.out.println("i==0  output:"+output);
  81.                             }else{
  82.                                     output+="\t";
  83.                                     String [] s=tat.getData().toString().split("\\s",2);
  84.                                     System.out.println("s.length:"+s.length);
  85.                                     output+=s[0];
  86.                             }*/
  87.                         for(int j=0;j<tags.length;j++){
  88.                                 //TaggedMapOutput taOutput=(TaggedMapOutput)tags[j];
  89.                                 TaggedMapOutput taggedMapOutput=(TaggedMapOutput)values[j];
  90.                                 System.out.println("tag:"+taggedMapOutput.getTag()+"  value:"+taggedMapOutput.getData().toString());
  91.                         }
  92.                              for(int i=0;i<values.length;i++){
  93.                                      TaggedMapOutput tat=(MyTaggedMapOutput)values[i];
  94.                                      String  recordLine=((Text)tat.getData()).toString();
  95.                                      String [] tokens=recordLine.split(",",2);System.out.println("data:"+recordLine);
  96.                                      if(i>0)
  97.                                              output+=",";
  98.                                     output+=tokens[1];
  99.                     }
  100.                     TaggedMapOutput tag=new MyTaggedMapOutput(new Text(output));
  101.                     tag.setTag((Text)tags[0]);
  102.                     return tag;
  103.                 }
  104.         }
  105.        
  106.         /*
  107.          * 这里一定要注意,FileInputFormat和FileOutputFormat一定要是org.apache.hadoop.mapred下面的包
  108.          */
  109.         public static int run(String args[]) throws IOException{
  110.                        Configuration conf=new Configuration();
  111.                                 //Configuration conf=getConf();
  112.                                 JobConf job=new JobConf(conf, DataJoin.class);
  113.                                 //Job job=new Job(conf,"DataJoin");
  114.                                 job.setJobName("DataJoin");
  115.                         //        job.setJarByClass(DataJoin.class);
  116.                                
  117.                                 job.setMapperClass(DataJoinMapper.class);
  118.                                 job.setReducerClass(DataJoinReducer.class);
  119.                                 job.setInputFormat(TextInputFormat.class);
  120.                                 job.setOutputFormat(TextOutputFormat.class);
  121.                                 job.setMapOutputKeyClass(Text.class);
  122.                                 job.setMapOutputValueClass(MyTaggedMapOutput.class);
  123.                                 job.setOutputKeyClass(Text.class);
  124.                                 job.setOutputValueClass(Text.class);
  125.                                
  126.                                 //job.set("mapred.textoutputformat.separator", "\t");
  127.                                 job.set("mapred.textoutputformat.separator", ",");
  128.                                 //FileInputFormat.addInputPath(job, new Path("/home/hadoop/test/mapReduce/DataJoinTest2"));
  129.                                 //FileInputFormat.addInputPath(job, new Path("hdfs://192.168.0.1:9000/user/hadoop/DataJoinTest2"));
  130.                                 //FileInputFormat.addInputPaths(job,  "/home/hadoop/test/DataJoinTest/province.txt");
  131.                                 //MultipleInputs.addInputPath(job, new Path("/home/hadoop/test/DataJoinTest/province.txt"), TextInputFormat.class);
  132.                                 FileInputFormat.addInputPaths(job, args[0]);
  133.                                 //FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.1:9000/user/hadoop/DataJoinTest2_Out"));
  134.                                 //FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/test/mapReduce/DataJoinTest2_result"));
  135.                                 FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  136.                                 JobClient.runJob(job);
  137.                                 return 0;
  138.         }
  139.         public static void main(String [] args) throws Exception{
  140.                 String[] arg = { "/home/hadoop/test/mapReduce/DataJoinTest2",
  141.                 "/home/hadoop/test/mapReduce/DataJoinTest2_result" };
  142.                 int res=run(arg);
  143.                 System.exit(res);
  144.                
  145.         }
  146.        
  147. }
复制代码


转。

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条