分享

mapreduce如何实现多目录输出?不是多文件.

shabulajiHEHE 发表于 2013-10-16 13:39:27 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 23750
mapreduce的结果可以输出到不同的目录吗?如何实现呢 谢谢了先!
              
               
                    MapReduce
                    输出
                    多目录
               

已有(6)人评论

跳转到指定楼层
jixianqiuxue 发表于 2013-10-16 13:40:15

            貌似job只能设定一个output目录
回复

使用道具 举报

Alkaloid0515 发表于 2015-8-11 19:46:59
jixianqiuxue 发表于 2013-10-16 13:40
貌似job只能设定一个output目录

hadoop多路径总结
回复

使用道具 举报

Alkaloid0515 发表于 2015-8-11 19:49:30



总结下多文件:

mapreduce如何修改文件名(MultipleOutputs使用)
http://www.aboutyun.com/thread-9757-1-1.html
(出处: about云开发)

Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究
http://www.aboutyun.com/thread-6550-1-1.html
(出处: about云开发)

MultipleOutputs 多文件输出问题
http://www.aboutyun.com/thread-11528-1-1.html
(出处: about云开发)


回复

使用道具 举报

Alkaloid0515 发表于 2015-8-11 19:54:46



背景:
运维用scribe从apache服务器推送过来的日志有重复记录,所以这边的ETL处理要去重,还有个需求是要按业务类型多目录输出,方便挂分区,后面的使用。
这两个需求单独处理都没有问题,但要在一个mapreduce里完成,需要一点技巧。


1、map输入数据,经过一系列处理,输出时:

[mw_shl_code=java,true]if(ttype.equals("other")){  
        file = (result.toString().hashCode() & 0x7FFFFFFF)%400;  
       }else if(ttype.equals("client")){  
        file = (result.toString().hashCode() & 0x7FFFFFFF)%260;  
       }else{  
        file = (result.toString().hashCode()& 0x7FFFFFFF)%60;  
       }  
       tp = new TextPair(ttype+"_"+file, result.toString());  
         
       context.write(tp, valuet);  [/mw_shl_code]


valuet是空的,什么都没有。
我这里有三个类型,other,client,wap,分别代表日志来源平台,要按他们分目录输出。
result就是整条记录。file得到的是最终输出文件名,hash,位操作,取模是为了输出均衡。
map的输出结构<key,value> =(ttype+"_"+file,result.toString())
这样做的目的是:保证相同的记录得到相同的key,同时还要保存类型。partition要按textPair的left,也就是这个key,
保证了后面要写到同一个输出文件的所有记录都到同一个reduce里去,一个reduce可以写多个输出文件,但是一个输出文件不能来自多个reduce,原因很明了。
这样的话大概400+260+60=720个输出文件,每个文件数据量大概差不多,job的reduce数我这里设置的240,这个数连同取模400,260,60都是根据我的数据量来定的,来尽量避免reduce的数据倾斜。


2、reduce方法去重:


[mw_shl_code=java,true]public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException  
   {  
         
       rcfileCols = getRcfileCols(key.getSecond().toString().split("\001"));  
       context.write(key.getFirst(), rcfileCols);  
  
  
   }  [/mw_shl_code]


不用迭代,对相同的key组,只输出一次。注意这里job用到的比较器,一定不能是FirstComparator,而是整个textpair对的比较。(先比较left,再比较right)

  我的程序里输出文件格式是rcfile。

3、多目录输出:

[mw_shl_code=java,true] job.setOutputFormatClass(WapApacheMutiOutputFormat.class);  
     
public class WapApacheMutiOutputFormat extends RCFileMultipleOutputFormat<Text, BytesRefArrayWritable> {  
    Random r = new Random();  
    protected String generateFileNameForKeyValue(Text key, BytesRefArrayWritable value,  
            Configuration conf) {  
         
            String typedir = key.toString().split("_")[0];  
  
  
            return typedir+"/"+key.toString();  
  
  
    }  
}  [/mw_shl_code]


这里的RCFileMultipleOutputFormat是自己继承自FileOutputFormat 自己写的,主要实现了recordWriter。


最终输出去重的,分目录的数据文件。


理解的关键主要是partition key的设计,reduce的原理。













回复

使用道具 举报

Alkaloid0515 发表于 2015-8-11 19:57:29
通过reduce输出来实现将数据输出到多个目录是不可以的。可以将reduce的输出设置为不输出,可以在你的Reducer类的configure()方法中创建一个输出数据到hdfs的流,然后在reduce()方法中将要输出的数据输出到hdfs流中,最后在close()方法中关闭输出流


回复

使用道具 举报

NEOGX 发表于 2015-8-11 19:58:41
Alkaloid0515 发表于 2015-8-11 19:57
通过reduce输出来实现将数据输出到多个目录是不可以的。可以将reduce的输出设置为不输出,可以在你的Reduce ...

可以自定義一個 FileOutputFormat<Object, Object> 來處理對於每個reducer key-value list的輸出。 然後再job conf里定義 JobConf.setOutputFormat() 就可以了。 也可以用@Alkaloid0515的方法。 本質上是一樣
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条