分享

利用MapReduce计算框架实现谷歌(PR值)PageRank算法并行实现

pig2 发表于 2014-5-5 19:36:21 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 20443
问题导读:
1.什么是pr值?
2.如何通过mapreduce实现pr值?
3.map过程input、output为什么?
4.reduce过程:input、output为什么?







1. PageRank算法分步式原理

pagerank-sample.png



PageRank的分步式算法原理,简单来讲,就是通过矩阵计算实现并行化。

1). 把邻接矩阵的列,按数据行存储

邻接矩阵

  1.           [,1]   [,2]   [,3]   [,4]
  2. [1,] 0.0375000 0.0375 0.0375 0.0375
  3. [2,] 0.3208333 0.0375 0.0375 0.8875
  4. [3,] 0.3208333 0.4625 0.0375 0.0375
  5. [4,] 0.3208333 0.4625 0.8875 0.0375
复制代码
按行存储HDFS
  1. 1       0.037499994,0.32083333,0.32083333,0.32083333
  2. 2       0.037499994,0.037499994,0.4625,0.4625
  3. 3       0.037499994,0.037499994,0.037499994,0.88750005
  4. 4       0.037499994,0.88750005,0.037499994,0.037499994
复制代码

2). 迭代:求矩阵特征值

pagerank-mr.png


map过程:
input: 邻接矩阵, pr值
output: key为pr的行号,value为邻接矩阵和pr值的乘法求和公式

reduce过程:
input: key为pr的行号,value为邻接矩阵和pr值的乘法求和公式
output: key为pr的行号, value为计算的结果,即pr值

第1次迭代

  1. 0.0375000 0.0375 0.0375 0.0375     1     0.150000
  2. 0.3208333 0.0375 0.0375 0.8875  *  1  =  1.283333
  3. 0.3208333 0.4625 0.0375 0.0375     1     0.858333
  4. 0.3208333 0.4625 0.8875 0.0375     1     1.708333
复制代码
第2次迭代
  1. 0.0375000 0.0375 0.0375 0.0375     0.150000      0.150000
  2. 0.3208333 0.0375 0.0375 0.8875  *  1.283333  =   1.6445833
  3. 0.3208333 0.4625 0.0375 0.0375     0.858333      0.7379167
  4. 0.3208333 0.4625 0.8875 0.0375     1.708333      1.4675000
复制代码

… 10次迭代

特征值
  1. 0.1500000
  2. 1.4955721
  3. 0.8255034
  4. 1.5289245
复制代码

3). 标准化PR值
  1. 0.150000                                              0.0375000
  2. 1.4955721  / (0.15+1.4955721+0.8255034+1.5289245) =   0.3738930
  3. 0.8255034                                             0.2063759
  4. 1.5289245                                             0.3822311
复制代码

2. MapReduce分步式编程

MapReduce流程分解
PageRankJob.png




HDFS目录

input(/user/hdfs/pagerank): HDFS的根目录
input_pr(/user/hdfs/pagerank/pr): 临时目录,存储中间结果PR值
tmp1(/user/hdfs/pagerank/tmp1):临时目录,存储邻接矩阵
tmp2(/user/hdfs/pagerank/tmp2):临时目录,迭代计算PR值,然后保存到input_pr
result(/user/hdfs/pagerank/result): PR值输出结果

开发步骤:

网页链接关系数据: page.csv
出始的PR数据:pr.csv
邻接矩阵: AdjacencyMatrix.java
PageRank计算: PageRank.java
PR标准化: Normal.java
启动程序: PageRankJob.java

1). 网页链接关系数据: page.csv

新建文件:page.csv

1,2
1,3
1,4
2,3
2,4
3,4
4,2

2). 出始的PR数据:pr.csv

设置网页的初始值都是1

新建文件:pr.csv

1,1
2,1
3,1
4,1

3). 邻接矩阵: AdjacencyMatrix.java
adjacencyMatrix.png



矩阵解释:

阻尼系数为0.85
页面数为4
reduce以行输出矩阵的列,输出列主要用于分步式存储,下一步需要转成行
新建程序:AdjacencyMatrix.java


  1. package org.conan.myhadoop.pagerank;
  2. import java.io.IOException;
  3. import java.util.Arrays;
  4. import java.util.Map;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapred.JobConf;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. import org.conan.myhadoop.hdfs.HdfsDAO;
  17. public class AdjacencyMatrix {
  18.     private static int nums = 4;// 页面数
  19.     private static float d = 0.85f;// 阻尼系数
  20.     public static class AdjacencyMatrixMapper extends Mapper<LongWritable, Text, Text, Text> {
  21.         @Override
  22.         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
  23.             System.out.println(values.toString());
  24.             String[] tokens = PageRankJob.DELIMITER.split(values.toString());
  25.             Text k = new Text(tokens[0]);
  26.             Text v = new Text(tokens[1]);
  27.             context.write(k, v);
  28.         }
  29.     }
  30.     public static class AdjacencyMatrixReducer extends Reducer<Text, Text, Text, Text> {
  31.         @Override
  32.         public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
  33.             float[] G = new float[nums];// 概率矩阵列
  34.             Arrays.fill(G, (float) (1 - d) / G.length);
  35.             float[] A = new float[nums];// 近邻矩阵列
  36.             int sum = 0;// 链出数量
  37.             for (Text val : values) {
  38.                 int idx = Integer.parseInt(val.toString());
  39.                 A[idx - 1] = 1;
  40.                 sum++;
  41.             }
  42.             if (sum == 0) {// 分母不能为0
  43.                 sum = 1;
  44.             }
  45.             StringBuilder sb = new StringBuilder();
  46.             for (int i = 0; i < A.length; i++) {
  47.                 sb.append("," + (float) (G[i] + d * A[i] / sum));
  48.             }
  49.             Text v = new Text(sb.toString().substring(1));
  50.             System.out.println(key + ":" + v.toString());
  51.             context.write(key, v);
  52.         }
  53.     }
  54.     public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
  55.         JobConf conf = PageRankJob.config();
  56.         String input = path.get("input");
  57.         String input_pr = path.get("input_pr");
  58.         String output = path.get("tmp1");
  59.         String page = path.get("page");
  60.         String pr = path.get("pr");
  61.         HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
  62.         hdfs.rmr(input);
  63.         hdfs.mkdirs(input);
  64.         hdfs.mkdirs(input_pr);
  65.         hdfs.copyFile(page, input);
  66.         hdfs.copyFile(pr, input_pr);
  67.         Job job = new Job(conf);
  68.         job.setJarByClass(AdjacencyMatrix.class);
  69.         job.setOutputKeyClass(Text.class);
  70.         job.setOutputValueClass(Text.class);
  71.         job.setMapperClass(AdjacencyMatrixMapper.class);
  72.         job.setReducerClass(AdjacencyMatrixReducer.class);
  73.         job.setInputFormatClass(TextInputFormat.class);
  74.         job.setOutputFormatClass(TextOutputFormat.class);
  75.         FileInputFormat.setInputPaths(job, new Path(page));
  76.         FileOutputFormat.setOutputPath(job, new Path(output));
  77.         job.waitForCompletion(true);
  78.     }
  79. }
复制代码

4). PageRank计算: PageRank.java
pagerank-step1.png

矩阵解释:

实现邻接与PR矩阵的乘法
map以邻接矩阵的行号为key,由于上一步是输出的是列,所以这里需要转成行
reduce计算得到未标准化的特征值
新建文件: PageRank.java

  1. package org.conan.myhadoop.pagerank;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import java.util.Iterator;
  5. import java.util.Map;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapred.JobConf;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  15. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  18. import org.conan.myhadoop.hdfs.HdfsDAO;
  19. public class PageRank {
  20.     public static class PageRankMapper extends Mapper<LongWritable, Text, Text, Text> {
  21.         private String flag;// tmp1 or result
  22.         private static int nums = 4;// 页面数
  23.         @Override
  24.         protected void setup(Context context) throws IOException, InterruptedException {
  25.             FileSplit split = (FileSplit) context.getInputSplit();
  26.             flag = split.getPath().getParent().getName();// 判断读的数据集
  27.         }
  28.         @Override
  29.         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
  30.             System.out.println(values.toString());
  31.             String[] tokens = PageRankJob.DELIMITER.split(values.toString());
  32.             if (flag.equals("tmp1")) {
  33.                 String row = values.toString().substring(0,1);
  34.                 String[] vals = PageRankJob.DELIMITER.split(values.toString().substring(2));// 矩阵转置
  35.                 for (int i = 0; i < vals.length; i++) {
  36.                     Text k = new Text(String.valueOf(i + 1));
  37.                     Text v = new Text(String.valueOf("A:" + (row) + "," + vals[i]));
  38.                     context.write(k, v);
  39.                 }
  40.             } else if (flag.equals("pr")) {
  41.                 for (int i = 1; i <= nums; i++) {
  42.                     Text k = new Text(String.valueOf(i));
  43.                     Text v = new Text("B:" + tokens[0] + "," + tokens[1]);
  44.                     context.write(k, v);
  45.                 }
  46.             }
  47.         }
  48.     }
  49.     public static class PageRankReducer extends Reducer<Text, Text, Text, Text> {
  50.         @Override
  51.         public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
  52.             Map<Integer, Float> mapA = new HashMap<Integer, Float>();
  53.             Map<Integer, Float> mapB = new HashMap<Integer, Float>();
  54.             float pr = 0f;
  55.             for (Text line : values) {
  56.                 System.out.println(line);
  57.                 String vals = line.toString();
  58.                 if (vals.startsWith("A:")) {
  59.                     String[] tokenA = PageRankJob.DELIMITER.split(vals.substring(2));
  60.                     mapA.put(Integer.parseInt(tokenA[0]), Float.parseFloat(tokenA[1]));
  61.                 }
  62.                 if (vals.startsWith("B:")) {
  63.                     String[] tokenB = PageRankJob.DELIMITER.split(vals.substring(2));
  64.                     mapB.put(Integer.parseInt(tokenB[0]), Float.parseFloat(tokenB[1]));
  65.                 }
  66.             }
  67.             Iterator iterA = mapA.keySet().iterator();
  68.             while(iterA.hasNext()){
  69.                 int idx = iterA.next();
  70.                 float A = mapA.get(idx);
  71.                 float B = mapB.get(idx);
  72.                 pr += A * B;
  73.             }
  74.             context.write(key, new Text(PageRankJob.scaleFloat(pr)));
  75.             // System.out.println(key + ":" + PageRankJob.scaleFloat(pr));
  76.         }
  77.     }
  78.     public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
  79.         JobConf conf = PageRankJob.config();
  80.         String input = path.get("tmp1");
  81.         String output = path.get("tmp2");
  82.         String pr = path.get("input_pr");
  83.         HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
  84.         hdfs.rmr(output);
  85.         Job job = new Job(conf);
  86.         job.setJarByClass(PageRank.class);
  87.         job.setOutputKeyClass(Text.class);
  88.         job.setOutputValueClass(Text.class);
  89.         job.setMapperClass(PageRankMapper.class);
  90.          job.setReducerClass(PageRankReducer.class);
  91.         job.setInputFormatClass(TextInputFormat.class);
  92.         job.setOutputFormatClass(TextOutputFormat.class);
  93.         FileInputFormat.setInputPaths(job, new Path(input), new Path(pr));
  94.         FileOutputFormat.setOutputPath(job, new Path(output));
  95.         job.waitForCompletion(true);
  96.         hdfs.rmr(pr);
  97.         hdfs.rename(output, pr);
  98.     }
  99. }
复制代码



5). PR标准化: Normal.java

normal-step1.png

矩阵解释:

对PR的计算结果标准化,让所以PR值落在(0,1)区间
新建文件:Normal.java

  1. package org.conan.myhadoop.pagerank;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.Map;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapred.JobConf;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  17. import org.conan.myhadoop.hdfs.HdfsDAO;
  18. public class Normal {
  19.     public static class NormalMapper extends Mapper<LongWritable, Text, Text, Text> {
  20.         Text k = new Text("1");
  21.         @Override
  22.         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
  23.             System.out.println(values.toString());
  24.             context.write(k, values);
  25.         }
  26.     }
  27.     public static class NormalReducer extends Reducer<Text, Text, Text, Text> {
  28.         @Override
  29.         public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
  30.             List vList = new ArrayList();
  31.             float sum = 0f;
  32.             for (Text line : values) {
  33.                 vList.add(line.toString());
  34.                 String[] vals = PageRankJob.DELIMITER.split(line.toString());
  35.                 float f = Float.parseFloat(vals[1]);
  36.                 sum += f;
  37.             }
  38.             for (String line : vList) {
  39.                 String[] vals = PageRankJob.DELIMITER.split(line.toString());
  40.                 Text k = new Text(vals[0]);
  41.                 float f = Float.parseFloat(vals[1]);
  42.                 Text v = new Text(PageRankJob.scaleFloat((float) (f / sum)));
  43.                 context.write(k, v);
  44.                 System.out.println(k + ":" + v);
  45.             }
  46.         }
  47.     }
  48.     public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
  49.         JobConf conf = PageRankJob.config();
  50.         String input = path.get("input_pr");
  51.         String output = path.get("result");
  52.         HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
  53.         hdfs.rmr(output);
  54.         Job job = new Job(conf);
  55.         job.setJarByClass(Normal.class);
  56.         job.setOutputKeyClass(Text.class);
  57.         job.setOutputValueClass(Text.class);
  58.         job.setMapperClass(NormalMapper.class);
  59.         job.setReducerClass(NormalReducer.class);
  60.         job.setInputFormatClass(TextInputFormat.class);
  61.         job.setOutputFormatClass(TextOutputFormat.class);
  62.         FileInputFormat.setInputPaths(job, new Path(input));
  63.         FileOutputFormat.setOutputPath(job, new Path(output));
  64.         job.waitForCompletion(true);
  65.     }
  66. }
复制代码

6). 启动程序: PageRankJob.java

新建文件:PageRankJob.java
  1. package org.conan.myhadoop.pagerank;
  2. import java.text.DecimalFormat;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import java.util.regex.Pattern;
  6. import org.apache.hadoop.mapred.JobConf;
  7. public class PageRankJob {
  8.     public static final String HDFS = "hdfs://192.168.1.210:9000";
  9.     public static final Pattern DELIMITER = Pattern.compile("[\t,]");
  10.     public static void main(String[] args) {
  11.         Map<String, String> path = new HashMap<String, String>();
  12.         path.put("page", "logfile/pagerank/page.csv");// 本地的数据文件
  13.         path.put("pr", "logfile/pagerank/pr.csv");// 本地的数据文件
  14.         path.put("input", HDFS + "/user/hdfs/pagerank");// HDFS的目录
  15.         path.put("input_pr", HDFS + "/user/hdfs/pagerank/pr");// pr存储目
  16.         path.put("tmp1", HDFS + "/user/hdfs/pagerank/tmp1");// 临时目录,存放邻接矩阵
  17.         path.put("tmp2", HDFS + "/user/hdfs/pagerank/tmp2");// 临时目录,计算到得PR,覆盖input_pr
  18.         path.put("result", HDFS + "/user/hdfs/pagerank/result");// 计算结果的PR
  19.         try {
  20.             AdjacencyMatrix.run(path);
  21.             int iter = 3;
  22.             for (int i = 0; i < iter; i++) {// 迭代执行
  23.                 PageRank.run(path);
  24.             }
  25.             Normal.run(path);
  26.         } catch (Exception e) {
  27.             e.printStackTrace();
  28.         }
  29.         System.exit(0);
  30.     }
  31.     public static JobConf config() {// Hadoop集群的远程配置信息
  32.         JobConf conf = new JobConf(PageRankJob.class);
  33.         conf.setJobName("PageRank");
  34.         conf.addResource("classpath:/hadoop/core-site.xml");
  35.         conf.addResource("classpath:/hadoop/hdfs-site.xml");
  36.         conf.addResource("classpath:/hadoop/mapred-site.xml");
  37.         return conf;
  38.     }
  39.     public static String scaleFloat(float f) {// 保留6位小数
  40.         DecimalFormat df = new DecimalFormat("##0.000000");
  41.         return df.format(f);
  42.     }
  43. }
复制代码

程序代码已上传到github:

https://github.com/bsspirit/mave ... n/myhadoop/pagerank

这样就实现了,PageRank的并行吧!接下来,我们就可以用PageRank做一些有意思的应用了。






fens


已有(3)人评论

跳转到指定楼层
maizhu 发表于 2014-10-7 23:37:54
楼主厉害!
回复

使用道具 举报

zgqqlong 发表于 2015-4-25 23:31:26
强大         
回复

使用道具 举报

ansha886 发表于 2016-6-23 10:00:18
感谢分享,楼主辛苦
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条