分享

如何让传统程序转换成mapreduce

pig2 发表于 2014-7-3 14:40:30 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 28450
问题导读:
1.传统程序有什么特点?
2.mapreduce如何实现的分布式?
3.是否所有的传统程序都可以转换为mapreduce?





传统程序我们求平均值、排序或许对于我们程序员来讲,这并不是难事。传统程序该如何转换mapreduce,这里以求平均数为例。

一、传统程序:
无论是C语言、Java、.net还是其它语言,比如求平均值。
我们都会是输入一组数据:
  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
复制代码



然后该如何求平均值:
  1. (1+2+3+4+5)/5=3
复制代码
这样的在任何语言中,这都是小事一桩。


二、如何转换mapreduce程序

为什么mapreduce被称之为分布式编程,是因为它把输入数据进行了分割,然后每一个客户端处理一部分数据,最后在合并起来。求平均值,mapreduce首先分割输入数据
1
2
3
4
5
分割之后,发给map处理,map处理完毕送到reduce,这样就完成了mapredcue。而这个中间的分割的过程,则是传统程序所没有的。下面便是通过来mapreduce实现来运行平均值



首先我们进行map函数:
map函数就是对数据一个分割,但是在进行之前已经对数据进行了分割。


我们从下面结果来分析mapredue:

输出内容.png


上面结果map传递value中,可以得出,map函数被调用了5次,然后分别输出了strScore.
Reduce调用了一次。

附上下面程序:如果附加到个人项目中,首先需要
(1)创建包aboutyun.com
(2)然后有avg.txt文件
(3)修改成自己的hdfs路径

从上面我们看出,任何传统的程序都可以转换为mapreduce.
  1. package aboutyun.com;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Counter;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  15. import java.util.Iterator;
  16. import java.util.StringTokenizer;
  17. public class pingjunzhi {
  18.         static final String INPUT_PATH = "hdfs://master:8020/avg.txt";
  19.         static final String OUT_PATH = "hdfs://master:8020/outPut/test";
  20.         public static void main(String[] args) throws Exception {
  21.                 // 主类
  22.                 Configuration conf = new Configuration();
  23.                 final Job job = Job.getInstance(conf, mapreduce.class.getSimpleName());
  24.                 // final Job job = new Job(conf, mapreduce.class.getSimpleName());
  25.                 job.setNumReduceTasks(1);
  26.                 job.setJarByClass(mapreduce.class);
  27.                 // 寻找输入
  28.                 FileInputFormat.setInputPaths(job, INPUT_PATH);
  29.                 // 1.2对输入数据进行格式化处理的类
  30.                 job.setInputFormatClass(TextInputFormat.class);
  31.                 job.setMapperClass(MyMapper.class);
  32.                 // 1.2指定map输出类型<key,value>类型
  33.                 job.setMapOutputKeyClass(Text.class);
  34.                 job.setMapOutputValueClass(LongWritable.class);
  35.                 job.setReducerClass(MyReduce.class);
  36.                 job.setOutputKeyClass(Text.class);
  37.                 job.setOutputValueClass(LongWritable.class);
  38.                 // 指定输出路径
  39.                 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  40.                 // 指定输出的格式或则类
  41.                 job.setOutputFormatClass(TextOutputFormat.class);
  42.                 // 把作业提交
  43.                 job.waitForCompletion(true);
  44.         }
  45.         // map类
  46.         static class MyMapper extends
  47.                         Mapper<LongWritable, Text, Text, LongWritable> {
  48.                 protected void map(LongWritable key, Text value, Context context)
  49.                                 throws IOException, InterruptedException {
  50.                
  51.                         String line = value.toString();
  52.                         Counter countPrint = context.getCounter("Map输出传递Value", line);
  53.                         countPrint.increment(1l);
  54.                         // 将输入的数据首先按行进行分割
  55.                         StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
  56.                         // 分别对每一行进行处理
  57.                         while (tokenizerArticle.hasMoreElements()) {
  58.                                 // 每行按空格划分
  59.                                 StringTokenizer tokenizerLine = new StringTokenizer(
  60.                                                 tokenizerArticle.nextToken());
  61.                           
  62.                                 String strScore = tokenizerLine.nextToken();// 个数部分
  63.                                 Counter countPrint1 = context.getCounter("Map中循环strScore", strScore);
  64.                                 countPrint1.increment(1l);
  65.                                 // Text name = new Text(strName);
  66.                                 int scoreInt = Integer.parseInt(strScore);
  67.                                 // 输出
  68.                                 context.write(new Text("avg"), new LongWritable(scoreInt));
  69.                         }
  70.                 }
  71.         }
  72.         // reduce类
  73.         static class MyReduce extends
  74.                         Reducer<Text, LongWritable, Text, LongWritable> {
  75.                 @Override
  76.                 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
  77.                                 Context ctx) throws java.io.IOException, InterruptedException {
  78.                
  79.                         long sum = 0;
  80.                         long count = 0;
  81.                         Iterator<LongWritable> iterator = v2s.iterator();
  82.                         while (iterator.hasNext()) {
  83.                                 sum += iterator.next().get();// 计算总值
  84.                                 count++;// 统计个数
  85.                         }
  86.                         long average = (long) sum / count;// 计算平均值
  87.                         ctx.write(k2, new LongWritable(average));
  88.                         Counter countPrint1 = ctx.getCounter("Redue调用次数","空");
  89.                         countPrint1.increment(1l);
  90.                 }
  91.         }
  92. }
复制代码







已有(7)人评论

跳转到指定楼层
kartik 发表于 2014-7-4 10:36:11
请问传统的下载文件程序如何转为mapreduce,有相关的例子吗?谢谢版主。
回复

使用道具 举报

pig2 发表于 2014-7-4 11:05:31
本帖最后由 pig2 于 2014-7-4 11:23 编辑
kartik 发表于 2014-7-4 10:36
请问传统的下载文件程序如何转为mapreduce,有相关的例子吗?谢谢版主。

你这是批量下载吗?

如果是的话,可以把批量的数据进行map分割,然后放到不同的分区,最后reduce合并。

这里只是举例:例如你需要下载下面url



我这里没有按照最优的方法,你可以在调整:
  1. // map类
  2. static class MyMapper extends
  3. Mapper<LongWritable, Text, Text, LongWritable> {
  4. protected void map(LongWritable key, Text value, Context context)
  5. throws IOException, InterruptedException {
  6. String line = value.toString();
  7. Counter countPrint = context.getCounter("Map输出传递url", line);
  8. countPrint.increment(1l);
  9. // 将输入的数据首先按行进行分割
  10. StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
  11.     // 分别对每一行进行处理
  12.    while (tokenizerArticle.hasMoreElements()) {
  13.                                // 每行按空格划分
  14.                                 String url = tokenizerLine.nextToken();// 个数部分
  15.                                 Counter countPrint1 = context.getCounter("Map中循环url", url);
  16.                                 countPrint1.increment(1l);
  17.                      
  18.                                 // 输出
  19.                                 context.write(new Text("avg"), new LongWritable(scoreInt));
  20.                         }
  21.    }
  22. }
复制代码







这样上面完成url的分割,达到我们的分布式,传统程序是没有分割这一步的

下面是reduce的例子,不过是伪代码,你可以按照这个思路来
  1. // reduce类
  2.         static class MyReduce extends
  3.                         Reducer<Text, LongWritable, Text, LongWritable> {
  4.                 @Override
  5.                 protected void reduce(Text k2, java.lang.Iterable<LongWritable> urls,
  6.                                 Context ctx) throws java.io.IOException, InterruptedException {
  7.                         
  8.                         long times = 0L;
  9.                         for (LongWritable url : urls) {
  10.                         
  11.                                 //这里写上下载的业务逻辑
  12.                                if(“成功”)
  13.                                  {
  14.                                 ctx.write(url+“成功”, new LongWritable(1L));
  15.                                  }
  16.                               else
  17.                                 {
  18.                                      ctx.write(url+“失败”, new LongWritable(1L));
  19.                                  }
  20.                         }
  21.                
  22.                         
  23.                 }
  24.                
  25.         }
复制代码













回复

使用道具 举报

pig2 发表于 2014-7-4 11:18:41
驱动函数都差不多,可以不用调整,这里面的关键是map进行了分割,没有对业务逻辑做实质性的事情,可以把业务逻辑放到reduce中。
回复

使用道具 举报

pig2 发表于 2014-7-4 11:18:42
驱动函数都差不多,可以不用调整,这里面的关键是map进行了分割,没有对业务逻辑做实质性的事情,可以把业务逻辑放到reduce中。
回复

使用道具 举报

kartik 发表于 2014-7-4 11:38:22
pig2 发表于 2014-7-4 11:18
驱动函数都差不多,可以不用调整,这里面的关键是map进行了分割,没有对业务逻辑做实质性的事情,可以把业 ...

好的,我先了解看看。谢谢你哈。
回复

使用道具 举报

kartik 发表于 2014-7-4 11:43:14
pig2 发表于 2014-7-4 11:18
驱动函数都差不多,可以不用调整,这里面的关键是map进行了分割,没有对业务逻辑做实质性的事情,可以把业 ...

我的是下载单个URL里面的数据,涉及多线程,断点续传,可否让一个map对应一个数据块
回复

使用道具 举报

pig2 发表于 2014-7-4 11:59:53
kartik 发表于 2014-7-4 11:43
我的是下载单个URL里面的数据,涉及多线程,断点续传,可否让一个map对应一个数据块
你只要了解,map是用来划分、分割的,如同我们干活,以前我们都是用一台机器干活,这就传统程序。现在我们可以使用多台机器了,那么就可以使用多台了。
因此你的下载划分是关键

这里面你可以划分数据块,也可以划分线程。这里只是提供思路,可行性你还需要进一步研究
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条