分享

关于MapReduce仿真

niuliwan 发表于 2013-10-16 13:40:05 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 12 9160
我手上有个用C++写的MapReduce的仿真程序,就是仿真MapReduce工作过程的,哪位大神知道那个程序该怎么运行。(我的hadoop集群环境是一台安装ubuntu系统的物理机并在上面装有两台ubuntu虚拟机,其中物理机作为namenode和jobtracker,两台虚拟机作为datanode和tasktracker)。
谢谢各位大神了,求交流!!
              
               
               

已有(12)人评论

跳转到指定楼层
u010134582 发表于 2013-10-16 13:40:39

            你写的程序你自己都不知道怎么样运行,别人怎么知道?
你又没开源
就像你问别人你口袋里有几个硬币,你让别人猜啊?
        
回复

使用道具 举报

niuliwan 发表于 2013-10-16 13:41:29

            #include "mapreduce.hpp"
#include
#include
#include
#include [i]
#include
#include
namespace mapreduce
{
  int
  StringToInt(std::string str)  //将字符串转化成整形
  {
    std::istringstream sin(str);
    int ret;
    sin>>ret;
    return ret;
  }
  std::string
  IntToString(int i)        //将整形转化成字符串
  {
    std::ostringstream sout;
    sout ret;
    std::string tmp;
    for (size_t i = 0; i  &files)   //扫描文件
  {
    DIR *dh = opendir(dirname.c_str());
    if (!dh)
      return;
    struct dirent *pde = NULL;
    while ((pde = readdir(dh)) != NULL)
      {
        std::string fname(pde->d_name);
        if (fname == "." || fname == "..")
          continue;
        files.push_back(fname);
      }
    closedir(dh);
  }
  void
  MapReduceInput::set_filebase(std::string dirname)  //将文件名输入,设置好文件基地址,相当于一个接口
  {
    if (dirname.empty())
      dirname = "/";
    if (dirname[dirname.size() - 1] != '/')
      dirname += "/";
    DBUG_PRINT("setting filebase to: "filebase = dirname;
  }
  std::string
  MapReduceInput::get_filebase()                //获取文件基地址值
  {
    if (this->filebase.empty())
      {
        return "/";
      }
    return this->filebase;
  }
  void
  MapReduceOutput::set_filebase(std::string dirname)
  {
    if (dirname.empty())
      dirname = "/";
    if (dirname[dirname.size() - 1] != '/')
      dirname += "/";
    this->filebase = dirname;
  }
  std::string
  MapReduceOutput::get_filebase()
  {
    if (this->filebase.empty())
      {
        return "/";
      }
    return this->filebase;
  }
  MapReduceInput *
  MapReduceSpecification::add_input()
  {
    MapReduceInput *pin = new MapReduceInput;
    this->ilist.push_back(pin);
    return pin;
  }
  MapReduceOutput *
  MapReduceSpecification::output()
  {
    return &(this->mr_out);
  }
  bool
  Mapper::Emit(std::string const &key, std::string const &value)
  {
    std::string data = key + "\t" + value + "\n";
    size_t ewt = fwrite(data.c_str(), data.size(), 1, this->pmi->_pif);
    if (ewt != 1)
      return false;
    return true;
  }
  void
  ReduceInput::read_intermediate()
  {
    std::string line;
    while (mapreduce::get_line(line, this->_pif))
      {
        std::vector data =
          explode('\t', line);
        line = "";
        this->_intermediate.push_back(make_pair(data[0], data[1]));
      }
  }
  void
  ReduceInput::sort_intermediate()
  {
    std::sort(this->_intermediate.begin(),
              this->_intermediate.end(),
              intermediate_sorter());
  }
  ReduceInput::ReduceInput(FILE *_pfile, FILE *_pintermediate)
    : _pf(_pfile), _pif(_pintermediate), all_done(false)
  {
    this->read_intermediate();
    this->sort_intermediate();
    this->l = this->f = this->_intermediate.begin();
    if (!this->_intermediate.empty())
      {
        this->_key = this->_intermediate[0].first;
      }
    else
      {
        this->all_done = true;
      }
    while (this->l != this->_intermediate.end() && this->l->first == this->_key)
      {
        ++this->l;
      }
  }
  bool
  ReduceInput::set_next_range()
  {
    if (this->l == this->_intermediate.end())
      {
        this->all_done = true;
        return false;
      }
    this->f = this->l;
    this->_key = this->f->first;
    while (this->l != this->_intermediate.end() && this->l->first == this->_key)
      {
        ++this->l;
      }
    return true;
  }
  bool
  Reducer::Emit(std::string const &str)
  {
    std::string data = this->pri->_key + "\t" + str + "\n";
    size_t ewt = fwrite(data.c_str(), data.size(), 1, this->pri->_pf);
    if (ewt != 1)
      return false;
    return true;
  }
  bool
  MapInput::get_line()
  {
    bool gl_ret = mapreduce::get_line(this->_line, this->_pf);
    if (gl_ret)
      ++this->_lno;
    return gl_ret;
  }
  void *
  worker_proc(void *vptr)
  {
    worker_data *pdata = (worker_data*)(vptr);
    /* Open the intermediate file in write mode. */
    std::string ifile_name = "/tmp/mapreduce_intermediate." +
      IntToString(pdata->thr_no) + ".dat";
    FILE *pif = fopen(ifile_name.c_str(), "w");
    if (!pif)
      {
        std::cerrpfiles->begin();
         i != pdata->pfiles->end(); ++i)
      {
        /* Open it and pass the opened file handle to the get_line()
         * function till there are lines to be read and also call the
         * Mapping function.
         */
        std::cerrc_str(), "r");
        if (!pf)
          {
            std::cerrpspec->mapper->pmi = &mi;
        while (mi.get_line())
          {
            DBUG_PRINT("Processing line: "pspec->mapper->Map(mi);
          }
        fclose(pf);
      }
    fclose(pif);
    return 0;
  }
  bool
  MapReduce(MapReduceSpecification &spec, MapReduceResult &res)
  {
    std::vector > files;
    files.resize(spec.num_thr);
    int thr_no = 0;
    /* For each directory containing the input file(s). */
    for (size_t i = 0; i  file_list;
        mapreduce::scandir(spec.ilist[i]->get_filebase(), file_list);
        /* For each file in this directory. */
        for (size_t j = 0; j get_filebase() + file_list[j];
            files[thr_no].push_back(fname);
            ++thr_no;
            thr_no %= spec.num_thr;
          }
      }
    std::vector wdata;
    wdata.resize(spec.num_thr);
    /* For each thread. */
    for (size_t i = 0; i Reduce(&ri);
        ri.set_next_range();
      }
    fclose(pf);
    fclose(pif);
    return true;
  }
}
        
回复

使用道具 举报

niuliwan 发表于 2013-10-16 13:41:59

            #include [i]
#include
#include
#include
#include
// #define DBUG_PRINT(MSG) std::cerr ilist;
    MapReduceOutput mr_out;
    Mapper *mapper;
    Reducer *reducer;
    int num_thr;
    std::vector threads;
  public:
    MapReduceSpecification()
      : mapper(NULL), reducer(NULL)
    {
      this->set_threads(1);
    }
    MapReduceInput *
    add_input();
    MapReduceOutput *
    output();
    void
    set_mapper(Mapper *pm)
    { this->mapper = pm; }
    void
    set_reducer(Reducer *pr)
    { this->reducer = pr; }
    void
    set_threads(int _nthr)
    {
      this->num_thr = _nthr;
      this->threads.resize(this->num_thr);
    }
  };
  class MapInput
  {
  public:
    std::string _line;
    std::string _file;
    int _lno;
    FILE *_pf;
    FILE *_pif;
    bool
    get_line();
    MapInput(std::string _filename, FILE *_pfile, FILE *_pintermediate)
      : _file(_filename), _lno(0), _pf(_pfile), _pif(_pintermediate)
    { }
  public:
    std::string const&
    value() const
    { return this->_line; }
    std::string const&
    file() const
    { return this->_file; }
  };
  class Mapper
  {
  public:
    MapInput *pmi;
    virtual void
    Map(const MapInput &input) = 0;
    bool
    Emit(std::string const &key, std::string const &value);
    virtual
    ~Mapper()
    { }
  };
  typedef std::pair string_pair_t;
  struct intermediate_sorter
  {
    bool
    operator()(string_pair_t const &lhs, string_pair_t const &rhs)
    {
      return lhs.first  _intermediate;
    std::vector::iterator f, l;
    bool all_done;
  public:
    ReduceInput(FILE *_pfile, FILE *_pintermediate);
    void
    read_intermediate();
    void
    sort_intermediate();
    bool
    set_next_range();
    std::string
    key() const
    { return this->_key; }
    std::string
    value()
    {
      return f->second;
    }
    void
    NextValue()
    {
      ++this->f;
    }
    bool
    done()
    {
      return this->f == this->l;
    }
  };
  class Reducer
  {
  public:
    ReduceInput *pri;
    virtual void
    Reduce(ReduceInput *input) = 0;
    bool
    Emit(std::string const &str);
    virtual
    ~Reducer()
    { }
  };
  class MapReduceResult
  {
  };
  struct worker_data
  {
    MapReduceSpecification *pspec;
    std::list *pfiles;
    MapReduceResult *pres;
    int thr_no;
    worker_data()
      : pspec(0), pfiles(0), pres(0), thr_no(0)
    { }
  };
  void *
  worker_proc(void *vptr);
  bool
  MapReduce(MapReduceSpecification &spec, MapReduceResult &res);
  int
  StringToInt(std::string str);
  std::string
  IntToString(int i);
}
        
回复

使用道具 举报

niuliwan 发表于 2013-10-16 13:42:56

            不好意思,我刚开始接触 hadoop,不太懂。希望得到您的帮助。
        
回复

使用道具 举报

niuliwan 发表于 2013-10-16 13:44:39

            是啊,是MapReduce的仿真程序,仿真MapReduce过程的。我也不太懂,刚刚接触。
        
回复

使用道具 举报

dickens88 发表于 2013-10-16 13:45:37

            目测楼主被坑了
        
回复

使用道具 举报

chenxs_03 发表于 2013-10-16 13:46:17

            这个程序应该是用的hadoop的streaming,直接用io,你的C++ 相当于下面的-mapper中的/bin/cat或、
-reducer 的/bin/wc
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc
参考http://hadoop.apache.org/docs/stable/streaming.html
        
回复

使用道具 举报

w454694219 发表于 2013-10-16 13:48:10

            引用 9 楼 tntzbzc 的回复:Quote: 引用 8 楼 chenxs_03 的回复:
这个程序应该是用的hadoop的streaming,直接用io,你的C++ 相当于下面的-mapper中的/bin/cat或、
-reducer 的/bin/wc
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc
参考http://hadoop.apache.org/docs/stable/streaming.html

LZ这个MR不是HADOOP标准版的,楼上你确定可以把LZ的C++ MR通过streaming跑HADOOP MR?
我比较赞同7楼的观点
有哪位网友能在HADOOP上跑通,麻烦分享一下,谢谢
引用 7 楼 dickens88 的回复:目测楼主被坑了

这个程序应该跟 hadoop的系统没有关系, 它应该是直接在Linux下跑的,只是仿真MapReduce处理数据的过程。
        
回复

使用道具 举报

tntzbzc 发表于 2013-10-16 13:49:04

            引用 10 楼 w454694219 的回复:Quote: 引用 9 楼 tntzbzc 的回复:
Quote: 引用 8 楼 chenxs_03 的回复:
这个程序应该是用的hadoop的streaming,直接用io,你的C++ 相当于下面的-mapper中的/bin/cat或、
-reducer 的/bin/wc
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /bin/wc
参考http://hadoop.apache.org/docs/stable/streaming.html

LZ这个MR不是HADOOP标准版的,楼上你确定可以把LZ的C++ MR通过streaming跑HADOOP MR?
我比较赞同7楼的观点
有哪位网友能在HADOOP上跑通,麻烦分享一下,谢谢
引用 7 楼 dickens88 的回复:目测楼主被坑了

这个程序应该跟 hadoop的系统没有关系, 它应该是直接在Linux下跑的,只是仿真MapReduce处理数据的过程。

是哦,而且只能在单机上跑,不能做分布式协同处理
        
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条