立即注册 登录
About云-梭伦科技 返回首页

xw2016的个人空间 https://www.aboutyun.com/?40798 [收藏] [复制] [分享] [RSS]

日志

mapreduce复习

热度 3已有 1952 次阅读2016-7-5 23:23 |个人分类:hadoop| mapreduce

今天复习了一下mapreduce工作原理,感觉还不是很清晰,百度了好几篇相关文章,小结一下。

MapReduce 角色
client:编写mapreduce程序,提交作业。
JobTracker: 初始化作业,分配作业,与TaskTracker通信,协调整个作业。
TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个;
Hdfs: 保存作业的数据、配置信息和运行结果。

执行流程
1. 提交作业
客户端把编写好的mapreduce程序,提交到JobTracker;

2. 初始化任务
JobTracker构建job,给job分配任务ID,并加入到队列中,然后进行调度,默认的调度方法为FIFO。
作业调度器会获取输入分片信息(input split),每个分片创建一个map任务。

3. 分配任务
tasktracker会运行一个简单的循环机制定期发送心跳给jobtracker,心跳间隔是5秒,这个时间可配置,心跳就是jobtracker和tasktracker沟通的桥梁,通过心跳,jobtracker可以监控tasktracker是否存活,也可以获取tasktracker处理的状态,同时tasktracker也可以通过心跳里的返回值获取jobtracker给它的任务。

4. 执行任务
TaskTracker向JobTracker申请任务(这个任务可以使Map也可能是Reduce任务),申请到任务后,TaskTracker会拷贝mapreduce代码到本地,然后运行任务。在任务运行过程中,tasktracker会本地监控自己的状态和进度,并通过心跳机制汇报给jobtracker,这样jobtracker就可以宏观监控整个job的状态和进度,当jobtracker获得了最后一个完成指定任务的tasktracker操作成功的通知时候,jobtracker会把整个job状态置为成功。

以下几张图表达得很好:





来个实例练习一下:

package com.k3.mapreduce;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**

 * 统计各单词数

 * @author Administrator

 *

 */

public class WordCountCB {

public static void main(String[] args) throws Exception {

if (args == null || args.length != 2) {

throw new Exception("输入参数不正确!");

}

Configuration conf = new Configuration();

Job job = new Job(conf, "WordCountCB");

// 运行主类

job.setJarByClass(WordCountCB.class);

// 指定Mapper

job.setMapperClass(WordCBMapper.class);

// 指定Reducer

job.setReducerClass(WordCBReduce.class);

//指定Combiner 注册

job.setCombinerClass(WordCountCombiner.class);

//或者:直接注册  当reduce和combiner一样的时候,可以直接注册为reduce

//job.setCombinerClass(WordReduce.class);

// 指定Mapper输出类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);

// 指定Reducer输出类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

// 指定输入路径

// 指定输出路径

String input = args[0];

String output = args[1];// 必须不存在

FileInputFormat.addInputPath(job, new Path(input));

FileOutputFormat.setOutputPath(job, new Path(output));

// 提交

job.waitForCompletion(true);

}


static class WordCBMapper extends Mapper<LongWritable, Text, Text, LongWritable> {


@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String input = value.toString();

String[] data = input.split(" ");

for (String s : data) {

context.write(new Text(s), new LongWritable(1));

}

}

}


static class WordCBReduce extends Reducer<Text, LongWritable, Text, LongWritable> {


@Override

protected void reduce(Text key, Iterable<LongWritable> val,

Context context) throws IOException, InterruptedException {

// TODO Auto-generated method stub

int sum = 0;

for (LongWritable w : val) {

sum += w.get();

}

context.write(key, new LongWritable(sum));

}

}

/**

* 入参与reduce的入参一致

* @author Administrator

*

*/

static class WordCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {


@Override

protected void reduce(Text key, Iterable<LongWritable> val,

Context context)

throws IOException, InterruptedException {

Long sum = new Long(0);

for (LongWritable v : val) {

sum += v.get();

}

context.write(key, new LongWritable(sum));

}

}

}

打包运行:
hadoop jar wordcountcb.jar hdfs://ns1/data/word.txt hdfs://ns1/out97

程序中使用了Combiner,Combiner是一个本地化的reduce操作,它执行在map运算的后,对计算结果中重复key值做一个合并, 以减少到reduce的数据传输,但要注意的是,不是所有计算都可以使用Combiner,比如计算平均数时,使用Combiner就可能会得到错误结果。


路过

雷人
3

握手

鲜花

鸡蛋

刚表态过的朋友 (3 人)

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条