分享

Hadoop : 一个目录下的数据只由一个map处理

yuwenge 2015-5-14 01:49:08 发表于 小知识点 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 31439
有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。

刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 “mapreduce job让一个文件只由一个map来处理“。

或者是把目录写在文件里面,作为输入:

[mw_shl_code=bash,true]/path/to/directory1
/path/to/directory2
/path/to/directory3[/mw_shl_code]

代码里面按行读取:
[mw_shl_code=bash,true]@Override
       protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
           FileSystem fs = FileSystem.get(context.getConfiguration());
           for (FileStatus status : fs.listStatus(new Path(value.toString()))) {
               // process file
           }
       }[/mw_shl_code]


都不能满足需求,还是自己实现一个 OneMapOneDirectoryInputFormat 吧,也很简单:

[mw_shl_code=bash,true]import java.io.IOException;
import java.util.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

/**
* 一个map处理一个目录的数据
*/
public abstract class OneMapOneDirectoryInputFormat<K, V> extends CombineFileInputFormat<K, V> {

    private static final Log LOG = LogFactory.getLog(OneMapOneDirectoryInputFormat.class);

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

    @Override
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        // get all the files in input path
        List<FileStatus> stats = listStatus(job);
        List<InputSplit> splits = new ArrayList<InputSplit>();
        if (stats.size() == 0) {
            return splits;
        }

        LOG.info("fileNums=" + stats.size());
        Map<String, List<FileStatus>> map = new HashMap<String, List<FileStatus>>();
        for (FileStatus stat : stats) {
            String directory = stat.getPath().getParent().toString();
            if (map.containsKey(directory)) {
                map.get(directory).add(stat);
            } else {
                List<FileStatus> fileList = new ArrayList<FileStatus>();
                fileList.add(stat);
                map.put(directory, fileList);
            }
        }

        // 设置inputSplit
        long currentLen = 0;
        List<Path> pathLst = new ArrayList<Path>();
        List<Long> offsetLst = new ArrayList<Long>();
        List<Long> lengthLst = new ArrayList<Long>();
        Iterator<String> itr = map.keySet().iterator();
        while (itr.hasNext()) {
            String dir = itr.next();
            List<FileStatus> fileList = map.get(dir);
            for (int i = 0; i < fileList.size(); i++) {
                FileStatus stat = fileList.get(i);
                pathLst.add(stat.getPath());
                offsetLst.add(0L);
                lengthLst.add(stat.getLen());
                currentLen += stat.getLen();
            }

            Path[] pathArray = new Path[pathLst.size()];
            CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray),
                    getLongArray(offsetLst), getLongArray(lengthLst), new String[0]);
            LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
                    + ") length(" + currentLen + ")");
            for (int i = 0; i < pathArray.length; i++) {
                LOG.info("  -> path[" + i + "]=" + pathArray.toString());
            }
            splits.add(thissplit);

            pathLst.clear();
            offsetLst.clear();
            lengthLst.clear();
            currentLen = 0;
        }

        return splits;
    }

    private long[] getLongArray(List<Long> lst) {
        long[] rst = new long[lst.size()];
        for (int i = 0; i < lst.size(); i++) {
            rst = lst.get(i);
        }
        return rst;
    }
}[/mw_shl_code]

这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。





已有(3)人评论

跳转到指定楼层
深沉 发表于 2015-5-14 08:01:16
很好感谢分享
回复

使用道具 举报

arBen 发表于 2015-5-14 08:22:13
感谢楼主分享..........
回复

使用道具 举报

bosaidong 发表于 2015-5-14 10:22:13
帅气!  很好很强大
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条