分享

MapReduce单表连接——哪里出错了

数据:child        parentJone        Lucy
Tom        Lucy
Tom        Jack

Jone        Jack
Lucy        Mary
Lucy        Ben
Jack        Alice
Jack        Jesse
Terry        Alice
Terry        Jesse
Philip        Terry
要求输出:
grandchild  grandparent表
这个例子很多人都看过吧,我就按照单表连接的原理自己写了一下,但是运行结果怎么都不对,代码如下:
Mapper实现:
[mw_shl_code=java,true]public class STJoinMapper extends Mapper<LongWritable, Text, Text, Text>{
        private LongWritable begin=new LongWritable(0);
        @Override
        protected void map(LongWritable key, Text value,Context context)
                        throws IOException, InterruptedException {
                if(key!=begin){
                        String line=value.toString();
                        StringTokenizer strtoken=new StringTokenizer(line, "\t");
                        String childName=new String();
                        String        parentName=new String();
                        if(strtoken.hasMoreTokens()){
                                childName=strtoken.nextToken();
                                parentName=strtoken.nextToken();
                                context.write(new Text(childName), new Text("1"+" "+childName+" "+parentName));
                                context.write(new Text(parentName),new Text("2"+" "+childName+" "+parentName));
                        }
                }
        }
       
}[/mw_shl_code]
Ruducer代码如下:
[mw_shl_code=java,true]public class STJoinReducer extends Reducer<Text, Text, Text, Text>{
       
        private boolean        flag=false;
        @Override
        protected void reduce(Text key, Iterable<Text> valueList,Context context) throws IOException,
                        InterruptedException {
               
                if(flag==false){
                        context.write(new Text("grandchild"), new Text("grandparent"));
                        flag=true;
                }
                ArrayList<String> grandchildList=new ArrayList<String>();
                ArrayList<String> grandparentList=new ArrayList<String>();
                for(Text name:valueList){
                        String namestr=name.toString();
                        String[] value=namestr.split(" ");
                       
                        if(value[0].equals("1")){
                                grandchildList.add(value[1]);
                                continue;
                        }
               
                        if(value[0].equals("2")){
                                grandparentList.add(value[2]);
                                continue;
                        }
                }
                if((!grandchildList.isEmpty())&&(!grandparentList.isEmpty())){
                        for(String child:grandchildList){
                                for(String parent:grandparentList){
                                        context.write(new Text(child), new Text(parent));
                                }
                        }
                }
        }
}[/mw_shl_code]
按照这个程序,跑出来的结果尽然是这样:
grandchild        grandparent
Jack        Jack
Jack        Jack
Jack        Jack
Jack        Jack
Lucy        Lucy
Lucy        Lucy
Lucy        Lucy
Lucy        Lucy
Terry        Terry
Terry        Terry

这是什么鬼?实在没找出哪有错。。。。。大神来解释



QQ截图20151102223617.png

已有(3)人评论

跳转到指定楼层
爱动的蜗牛 发表于 2015-11-2 22:41:36
输入数据是这样的:
child        parent
Jone        Lucy
Tom        Lucy
Tom        Jack
Jone        Jack
Lucy        Mary
Lucy        Ben
Jack        Alice
Jack        Jesse
Terry        Alice
Terry        Jesse
Philip        Terry
要求输出:
回复

使用道具 举报

Alkaloid0515 发表于 2015-11-3 11:51:50
爱动的蜗牛 发表于 2015-11-2 22:41
输入数据是这样的:
child        parent
Jone        Lucy

楼主思维有点胡乱,没看出是join map,还是join reduce。
对于数据量小的可以先join map,也就是把数据作为两份输入,然后在map端join。
public static class MapJoinMapper extends Mapper<LongWritable, Text, NullWritable, Emp_Dep> {

                private Map<Integer, String> joinData = new HashMap<Integer, String>();//这个是join的关键

                @Override
                protected void setup(Mapper<LongWritable, Text, NullWritable, Emp_Dep>.Context context) throws IOException, InterruptedException {
                        // 预处理把要关联的文件加载到缓存中
                        Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                        // 我们这里只缓存了一个文件,所以取第一个即可,创建BufferReader去读取
                        BufferedReader reader = new BufferedReader(new FileReader(paths[0].toString()));

                        String str = null;
                        try {
                                // 一行一行读取
                                while ((str = reader.readLine()) != null) {
                                        // 对缓存中的表进行分割
                                        String[] splits = str.split("\t");
                                        // 把字符数组中有用的数据存在一个Map中
                                        joinData.put(Integer.parseInt(splits[0]), splits[1]);
                                }
                        } catch (Exception e) {
                                e.printStackTrace();
                        } finally{
                                reader.close();
                        }

                }

                @Override
                protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Emp_Dep>.Context context) throws IOException,
                                InterruptedException {
                        // 获取从HDFS中加载的表
                        String[] values = value.toString().split("\t");
                        // 创建Emp_Dep对象
                        Emp_Dep emp_Dep = new Emp_Dep();
                        // 设置属性
                        emp_Dep.setName(values[0]);
                        emp_Dep.setSex(values[1]);
                        emp_Dep.setAge(Integer.parseInt(values[2]));
                        // 获取关联字段depNo,这个字段是关键
                        int depNo = Integer.parseInt(values[3]);
                        // 根据depNo从内存中的关联表中获取要关联的属性depName
                        String depName = joinData.get(depNo);
                        // 设置depNo
                        emp_Dep.setDepNo(depNo);
                        // 设置depName
                        emp_Dep.setDepName(depName);

                        // 写出去
                        context.write(NullWritable.get(), emp_Dep);
                }
        }



详细参考:
MapReduce表连接操作之Map端join
回复

使用道具 举报

爱动的蜗牛 发表于 2015-11-3 20:56:57
本帖最后由 爱动的蜗牛 于 2015-11-3 21:22 编辑
Alkaloid0515 发表于 2015-11-3 11:51
楼主思维有点胡乱,没看出是join map,还是join reduce。
对于数据量小的可以先join map,也就是把数据 ...

大神,你好,你的那个参考,我很仔细的看了,收益匪浅,谢谢。我这个是join reduce。。。。。因为是单表,所以首先将表拆分为分为左表和右表,在map输出时,选择合并的键作为key。
左表加“2”表示,取父母名字做为key:
  1. context.write(new Text(childName), new Text("1"+" "+childName+" "+parentName));
复制代码
右表加“1”表示,取孩子名字做为key:
  1. context.write(new Text(parentName),new Text("2"+" "+childName+" "+parentName));
复制代码
在map输出后的shuffle阶段,会将相同key的value的值连接,而两表合并发生在reduce中,合并的原则是:对于同一个key,如果同时存在于左右表中,即可合并。例如:对于左表“2  Jone        Lucy”和右表“1  Lucy  Mary“,因为key=Lucy,同时存在于两个表中,即Lucy是Jone的父母,还是Mary的儿女,自然Jone是Mary是孙子,所以合并这两个表啊

  1. for(Text name:valueList){
  2.                         String namestr=name.toString();
  3.                         String[] value=namestr.split(" ");
  4.                         
  5.                         if(value[0].equals("1")){
  6.                                 grandchildList.add(value[1]);
  7.                                 continue;
  8.                         }
  9.                
  10.                         if(value[0].equals("2")){
  11.                                 grandparentList.add(value[2]);
  12.                                 continue;
  13.                         }
  14.                 }
复制代码

取出符合合并原则的孙子们的姓名与祖父母们的姓名,然后执行计算的卡迪儿积,
  1. for(String child:grandchildList){
  2.            for(String parent:grandparentList){
  3.                     context.write(new Text(child), new Text(parent));
  4.                      }
  5.                   }
复制代码

将结果输出,这就是这个逻辑,是有一点混乱哈,菜鸟刚接触,所以很多不会,但是今天看了你的回帖再仔细看看自己的程序,还是没找到在哪错的,期待您的回复

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条