分享

数据分析之共同好友统计

问题导读:




1.如何统计好友?
2.如何用代码实现?
3.用到的算法是什么?









今天主要分享一个统计共同好友的案例,非常简单也非常实用的一个小案例,对于数据分析初级开发人员是很好的入门资料。
下面先来看一下原始数据:

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

上面这段数据的意思是:A的好友有B,C,D,F,E,O,B的好友有A、C、E、K,其他的按理类推。其中:用户和好友之间在这里以冒号分隔开,好友之间用逗号隔开。

当我们拿到这段数据的时候,该如何下手呢?

首先可以先两两合并为一组,然后找有相同的元素。例如可以分为AB、AC、AD、BC、BD.....等

找共同好友就是可以把AB求交集,然而这种方法在这里是不太可行的,那么我们可以逆向思维,反过来找,

例如找到第一行,B是A的朋友,C是A的朋友,然后就可以有K、V。

在A:B,C,D,F,E,O中B是A的朋友,在F:A,B,C,D,E,O,M中,B是F的朋友,则我们就可以认为A和F的共同好友是B,以这个B为K,依次类推。

则A是BDFGHIKO的共同好友   

B是AEFJ,的共同好友

C是ABEFGHK的共同好友

D是ACEFGHKL的共同好友

即当一个mapper执行结束后悔得到上面这些结果,然后再来一个mapreduce。

得到两两之间应该共同好友:

A:BD-A BF-A BG-A BH-A BI-A BK-A BO-A
B: AE-B AF-B AJ-B
C: AB-C AE-C AF-C ... BF-C BG-C  BH-C  ...

依次类推,这样就得到了共同好友了!

接下来用代码来实现:
[mw_shl_code=java,true]    String line=value.toString();   //一行行的读取数据  
                String[] split=line.split(":");  //把读到的数据切分开,前面说过的用冒号分开好友  
                String person=split[0];     //自己就是分开的数组的第0位  
                  
                String[] friends=split[1].split(",");   //好友是数组的第1位,而且以逗号隔开的  [/mw_shl_code]
下面循环这些值,并写入到Text中:

[mw_shl_code=java,true]for(String f:friends){  
    context.write(new Text(f), new Text(person));  
}  [/mw_shl_code]
然后提交给reduce来执行:再来一个循环和写入。

[mw_shl_code=java,true]StringBuffer sb=new  StringBuffer();  
              
            for(Text person:persons){  
                sb.append(person+",");  
            }  
            context.write(friend, new Text(sb.toString()));  
        }  [/mw_shl_code]
最后在main方法中调用map和reduce。

[mw_shl_code=java,true]//指定本job使用的mapper类  
                wcjob.setMapperClass(CommonFriendsOneMapper.class);  
                //指定本job使用的reducer类  
                wcjob.setReducerClass(CommonFriendsOneReducer.class);  [/mw_shl_code]
完整代码如下

[mw_shl_code=java,true]import java.io.IOException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  
public class CommonFriendsOne {  
      
      
    public static class CommonFriendsOneMapper extends Mapper<LongWritable, Text, Text, Text> {  
  
        @Override  
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
              
            String line=value.toString();  
            String[] split=line.split(":");  
            String person=split[0];  
              
            String[] friends=split[1].split(",");  
              
            for(String f:friends){  
                context.write(new Text(f), new Text(person));  
              
            }  
        }  
    }  
    public static class CommonFriendsOneReducer extends Reducer<Text, Text, Text, Text> {  
  
        // 输入<B->A><B->E><B->F>....  
        // 输出 B A,E,F,J  
        @Override  
        protected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {  
            StringBuffer sb=new  StringBuffer();  
              
            for(Text person:persons){  
                sb.append(person+",");  
            }  
            context.write(friend, new Text(sb.toString()));  
        }  
    }  
      
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
        //读取classpath下的所有xxx-site.xml配置文件,并进行解析  
                Configuration conf = new Configuration();  
                  
                Job wcjob = Job.getInstance(conf);  
                  
                //通过主类的类加载器机制获取到本job的所有代码所在的jar包  
                wcjob.setJarByClass(CommonFriendsOne.class);  
                  
                //指定本job使用的mapper类  
                wcjob.setMapperClass(CommonFriendsOneMapper.class);  
                //指定本job使用的reducer类  
                wcjob.setReducerClass(CommonFriendsOneReducer.class);  
                  
                  
                //指定reducer输出的kv数据类型  
                wcjob.setOutputKeyClass(Text.class);  
                wcjob.setOutputValueClass(Text.class);  
                  
                //指定本job要处理的文件所在的路径  
                FileInputFormat.setInputPaths(wcjob, new Path(args[0]));  
                //指定本job输出的结果文件放在哪个路径  
                FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));  
                  
                //将本job向hadoop集群提交执行  
                boolean res = wcjob.waitForCompletion(true);  
                  
                System.exit(res?0:1);  
  
    }  
  
} [/mw_shl_code]
可以按照我之前博文的方法:将这个文件打成一个jar包,然后提交到hadoop集群中运行。

在Hadoop中新建数据文件和目录。

bin/hadoop fs -mkdir -p /friends/data

然后上传源数据

bin/hadoop fs -put ../lx/data.txt  /friends/data

执行jar

bin/hadoop jar ../lx/friends.jar  cn.tf.friends.CommonFriendsOne  /friends/data  /friends/output

查看执行情况:

bin/hadoop fs -cat /friends/output/part-r-00000

运行效果如下:

1.png

到这里整个流程就分析完毕了。


文章出处:http://blog.csdn.net/sdksdk0/article/details/51646916
作者:朱培


已有(1)人评论

跳转到指定楼层
xuliang123789 发表于 2016-6-14 09:52:03
谢谢楼主,正需要,学习一下,赞~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条