分享

Mahout协同过滤框架Taste的源码分析(2)

本帖最后由 sunshine_junge 于 2014-12-30 15:16 编辑

问题导读:

1.如何使用MapReduce计算物品相似度?
2.如何使用MapReduce进行矩阵乘法?
3.如何使用Taste完成推荐?






推荐过程

主要分成了如下几步来完成推荐

1. 输入数据预处理
2. 获取评分矩阵
3. 计算物品相似度
4. 矩阵乘法
5. 数据过滤
6. 计算推荐

测试数据
user&item
1
2
3
4
5
1
3
3
3
2
0
2
4
4
4
0
0
3
5
5
5
0
3
4
4
4
4
1
4


基于物品的推荐
  1. public final class RecommenderJob extends AbstractJob {
  2. @Override
  3.     public int run(String[] args) throws Exception {
  4.         // 1. 加载参数
  5.         //
  6.         // input(path)  
  7.         // 包含用户偏好信息的数据文件目录, 其中文件格式为 userID, itemID[, preferencevalue]
  8.         //
  9.         // output(path)
  10.         // 推荐结果的输出目录
  11.         //
  12.         // similarityClassname (classname)
  13.         // 计算相似度的实现类
  14.         //
  15.         // usersFile (path)
  16.         // 只计算指定用户的推荐结果
  17.         //
  18.         // itemsFile (path)
  19.         // 只在指定的物品中给出推荐结果
  20.         //
  21.         // filterFile (path)
  22.         // 从推荐结果中为指定的用户过滤指定的物品
  23.         //
  24.         // numRecommendations (integer)
  25.         // 给每个用户推荐物品的数量
  26.         //
  27.         // booleanData (boolean)
  28.         // 是否只考虑用户是否评价过物品而不考虑分值
  29.         //
  30.         // maxPrefsPerUser (integer)
  31.         // 用户的最大偏好数量, 默认为10
  32.         //
  33.         // maxSimilaritiesPerItem (integer)
  34.         // 每个物品最多与多少个物品计算相似度, 默认为100
  35.         //
  36.         // minPrefsPerUser (integer)
  37.         // 用户的最小偏好数量, 默认为1
  38.         //
  39.         // maxPrefsPerUserInItemSimilarity (integer)
  40.         // 在每个Item在计算相似度阶段对User的最大采样个数, 默认为1000
  41.         //
  42.         // threshold (double)
  43.         // 计算物品相似度时的门槛
  44.         // 2. 获取评分矩阵
  45.         // PreparePreferenceMatrixJob
  46.         //
  47.         // 3. 计算物品相似度
  48.         // RowSimilarityJob
  49.         //
  50.         // 4. 为矩阵乘法做准备
  51.         // prePartialMultiply1, prePartialMultiply2, partialMultiply
  52.         //
  53.         // 5. 如果filterFile存在则为指定用户过滤指定物品
  54.         // Job itemFiltering
  55.         //
  56.         // 6. 计算推荐
  57.         // Job aggregateAndRecommend
  58.     }
  59. }
复制代码



评分矩阵索引转换
  1. // Job itemIDIndex
  2. //
  3. // 物品ID转换成索引的MapReduce任务
  4. // 将 userID, itemID, pref格式的数据转换为 index, itemID
  5. //
  6. // inputPath      用户偏好信息数据
  7. // outputPath     tempDir/preparePreferenceMatrix/itemIDIndex
  8. // inputFormat    TextInputFormat
  9. // mapper         ItemIDIndexMapper
  10. // reducer        ItemIDIndexReducer
  11. // outputFormat   SequenceFileOutputFormat
复制代码



示例数据
  1. # 输入
  2. 1,1,3
  3. 1,2,3
  4. 1,3,3
  5. 1,4,2
  6. 2,1,4
  7. 2,2,4
  8. 2,3,4
  9. 3,1,5
  10. 3,2,5
  11. 3,3,5
  12. 3,5,3
  13. 4,1,4
  14. 4,2,4
  15. 4,3,4
  16. 4,4,1
  17. 4,5,4
  18. # 输出
  19. 1   1
  20. 2   2
  21. 3   3
  22. 4   4
  23. 5   5
复制代码



mapper实现
  1. // 解析输出文件, 假定其为long型, 转换为int型
  2. public final class ItemIDIndexMapper extends Mapper<LongWritable, Text, VarIntWritable, VarLongWritable> {
  3.     @Override
  4.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  5.         String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
  6.         long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
  7.         int index = TasteHadoopUtils.idToIndex(itemID);
  8.         context.write(new VarIntWritable(index), new VarLongWritable(itemID));
  9.     }
  10. }
复制代码




reducer实现
  1. // 将索引与itemID的对应关系写入到输出中
  2. public final class ItemIDIndexReducer extends Reducer<VarIntWritable, VarLongWritable, VarIntWritable, VarLongWritable> {
  3.     @Override
  4.     protected void reduce(VarIntWritable index,
  5.             Iterable<VarLongWritable> possibleItemIDs, Context context)
  6.             throws IOException, InterruptedException {
  7.         long minimumItemID = Long.MAX_VALUE;
  8.         for (VarLongWritable varLongWritable : possibleItemIDs) {
  9.             long itemID = varLongWritable.get();
  10.             if (itemID < minimumItemID) {
  11.                 minimumItemID = itemID;
  12.             }
  13.         }
  14.         if (minimumItemID != Long.MAX_VALUE) {
  15.             context.write(index, new VarLongWritable(minimumItemID));
  16.         }
  17.     }
  18. }
复制代码



用户向量转换
  1. // Job toUserVectors ~
  2. //
  3. // 获取每个用户对物品偏好的向量, 即由原始数据获取 userID, <itemID, pref> 格式的向量
  4. //
  5. // inputPath      用户偏好信息数据
  6. // outputPath     tempDir/preparePreferenceMatrix/userVectors
  7. // inputFormat    TextInputFormat
  8. // mapper         ToItemPrefsMapper
  9. // reducer        ToUserVectorsReducer
  10. // outputFormat   SequenceFileOutputFormat
复制代码



示例数据
  1. # 输入同上
  2. # 输出
  3. 1 : (1,3.0) (2,3.0) (3,3.0) (4,2.0)
  4. 2 : (1,4.0) (2,4.0) (3,4.0)
  5. 3 : (1,5.0) (2,5.0) (3,5.0) (5,3.0)
  6. 4 : (1,4.0) (2,4.0) (3,4.0) (4,1.0) (5,4.0)
复制代码



mapper实现
  1. public abstract class ToEntityPrefsMapper extends
  2.         Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {
  3.     @Override
  4.     public void map(LongWritable key, Text value, Context context)
  5.             throws IOException, InterruptedException {
  6.         String[] tokens = DELIMITER.split(value.toString());
  7.         long userID = Long.parseLong(tokens[0]);
  8.         long itemID = Long.parseLong(tokens[1]);
  9.         // 是否需要进行转置
  10.         if (itemKey ^ transpose) {
  11.             long temp = userID;
  12.             userID = itemID;
  13.             itemID = temp;
  14.         }
  15.         // 是否不考虑评分值
  16.         if (booleanData) {
  17.             context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
  18.         } else {
  19.             float prefValue = tokens.length > 2 ? Float.parseFloat(tokens[2]) + ratingShift : 1.0f;
  20.             context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
  21.         }
  22.     }
  23. }
复制代码




reducer实现
  1. public final class ToUserVectorsReducer
  2.         extends
  3.         Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {
  4.     @Override
  5.     protected void reduce(VarLongWritable userID,
  6.             Iterable<VarLongWritable> itemPrefs, Context context)
  7.             throws IOException, InterruptedException {
  8.         // 将相同用户对物品的偏好信息存储到向量
  9.         Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
  10.         for (VarLongWritable itemPref : itemPrefs) {
  11.             int index = TasteHadoopUtils.idToIndex(itemPref.get());
  12.             float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref)
  13.                     .getPrefValue() : 1.0f;
  14.             userVector.set(index, value);
  15.         }
  16.         // 只有当物品数量达到最小值时才以userID为key写入到输出
  17.         if (userVector.getNumNondefaultElements() >= minPreferences) {
  18.             VectorWritable vw = new VectorWritable(userVector);
  19.             vw.setWritesLaxPrecision(true);
  20.             // 统计用户数量
  21.             context.getCounter(Counters.USERS).increment(1);
  22.             context.write(userID, vw);
  23.         }
  24.     }
复制代码




统计用户数量
  1. // 将用户数量写入到临时目录 tempDir/preparePreferenceMatrix/numUsers.bin
  2. public static final String NUM_USERS = "numUsers.bin";
  3. int numberOfUsers = (int) toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue();
  4. HadoopUtil.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf());
复制代码




物品向量
  1. // Job toUserVectors
  2. //
  3. // 获取每个物品与其关联用户的向量, 由userID, <itemID, pref> 转换为 itemID, <userID, pref>
  4. //  
  5. // inputPath      tempDir/preparePreferenceMatrix/userVectors
  6. // outputPath     tempDir/preparePreferenceMatrix/ratingMatrix
  7. // inputFormat    TextInputFormat
  8. // mapper         ToItemVectorsMapper
  9. // reducer        ToItemVectorsReducer
  10. // outputFormat   SequenceFileOutputFormat
复制代码



示例数据
  1. # 输入即上一个任务的输出
  2. # 输出
  3. 1 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
  4. 2 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
  5. 3 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
  6. 4 : (1,2.0) (4,1.0)
  7. 5 : (3,3.0) (4,4.0)
复制代码



计算相似度

向量赋权
  1. // Job normsAndTranspose
  2. //
  3. // 使用指定的相似度算法对每行计算权重,
  4. // 即由 itemID, <userID, pref> 计算得到 userID, <itemID, pref>
  5. //
  6. // inputPath      tempDir/preparePreferenceMatrix/ratingMatrix
  7. // outputPath     tempDir/weights
  8. // inputFormat    SequenceFileInputFormat
  9. // mapper         VectorNormMapper
  10. // reducer        MergeVectorsReducer
  11. // outputFormat   SequenceFileOutputFormat
复制代码



示例数据
  1. # 输入 itemID, <userID, pref>
  2. 1 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
  3. 2 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
  4. 3 : (1,3.0) (2,4.0) (3,5.0) (4,4.0)
  5. 4 : (1,2.0) (4,1.0)
  6. 5 : (3,3.0) (4,4.0)
  7. # 输出 userID, <itemID, pref>
  8. 1 : (1,3.0) (2,3.0) (3,3.0) (4,2.0)
  9. 2 : (1,4.0) (2,4.0) (3,4.0)
  10. 3 : (1,5.0) (2,5.0) (3,5.0) (5,3.0)
  11. 4 : (1,4.0) (2,4.0) (3,4.0) (4,1.0) (5,4.0)
  12. # norms.bin
  13. (1,66.0) (2,66.0) (3,66.0) (4,5.0) (5,25.0)
复制代码



mapper实现
  1. public class RowSimilarityJob extends AbstractJob {
  2.     public static class VectorNormMapper extends
  3.             Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
  4.         @Override
  5.         protected void map(IntWritable row, VectorWritable vectorWritable,
  6.                 Context ctx) throws IOException, InterruptedException {
  7.             // 输入文件为itemID--Vector<userID, pref>格式的item向量
  8.             // 使用指定的相似度算法进行处理
  9.             Vector rowVector = similarity.normalize(vectorWritable.get());
  10.             int numNonZeroEntries = 0;
  11.             double maxValue = Double.MIN_VALUE;
  12.             // 遍历对某个物品存在非零偏好信息的所有用户
  13.             Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();
  14.             while (nonZeroElements.hasNext()) {
  15.                 Vector.Element element = nonZeroElements.next();
  16.                 RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);
  17.                 // 将 itemID - <userID, pref> 变换为 userID - <itemID, pref>
  18.                 partialColumnVector.setQuick(row.get(), element.get());
  19.                 ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));
  20.                 // 记录用户数量和最高分值
  21.                 numNonZeroEntries++;
  22.                 if (maxValue < element.get()) {
  23.                     maxValue = element.get();
  24.                 }
  25.             }
  26.             // 如果需要过滤最小分值则记录对当前物品评分的用户总数及最大分值
  27.             if (threshold != NO_THRESHOLD) {
  28.                 nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
  29.                 maxValues.setQuick(row.get(), maxValue);
  30.             }
  31.             // 记录每个物品的norm值, 不同的相似度算法有不同的实现
  32.             // 欧几里得距离计算的是平方和
  33.             norms.setQuick(row.get(), similarity.norm(rowVector));
  34.             ctx.getCounter(Counters.ROWS).increment(1);
  35.         }
  36.         @Override
  37.         protected void cleanup(Context ctx) throws IOException,
  38.                 InterruptedException {
  39.             super.cleanup(ctx);
  40.             // 向reducer传递特殊key值, 以便于特殊处理
  41.             ctx.write(new IntWritable(NORM_VECTOR_MARKER), new VectorWritable(norms));
  42.             ctx.write(new IntWritable(NUM_NON_ZERO_ENTRIES_VECTOR_MARKER), new VectorWritable(nonZeroEntries));
  43.             ctx.write(new IntWritable(MAXVALUE_VECTOR_MARKER), new VectorWritable(maxValues));
  44.         }
  45.     }
  46. }
复制代码


reducer实现
  1. public class RowSimilarityJob extends AbstractJob {
  2.     public static class MergeVectorsReducer extends
  3.             Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
  4.         @Override
  5.         protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
  6.                 throws IOException, InterruptedException {
  7.             // 按照物品进行合并
  8.             Vector partialVector = Vectors.merge(partialVectors);
  9.             if (row.get() == NORM_VECTOR_MARKER) {
  10.                 // 将每个物品的规范分值写入到 tempDir/norms.bin
  11.                 Vectors.write(partialVector, normsPath, ctx.getConfiguration());
  12.             } else if (row.get() == MAXVALUE_VECTOR_MARKER) {
  13.                 // 将最大分值写入到  tempDir/maxValues.bin
  14.                 Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
  15.             } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
  16.                 // 将非零偏好的用户数量写入到 tempDir/numNonZeroEntries.bin
  17.                 Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
  18.             } else {
  19.                 // 其它的写入到默认的输出目录 tempDir/weights
  20.                 ctx.write(row, new VectorWritable(partialVector));
  21.             }
  22.         }
  23.     }
  24. }
复制代码


相似度计算
  1. // 计算物品两两间相似度矩阵
  2. // 由 userID, <itemID, pref> 计算获得 itemA, <itemO, similarity>
  3. //  
  4. // inputPath      tempDir/weights
  5. // outputPath     tempDir/pairwiseSimilarity
  6. // inputFormat    SequenceFileInputFormat
  7. // mapper         CooccurrencesMapper
  8. // reducer        SimilarityReducer
  9. // outputFormat   SequenceFileOutputFormat
复制代码


示例数据
  1. # 输入
  2. 1 : (1,3.0) (2,3.0) (3,3.0) (4,2.0)
  3. 2 : (1,4.0) (2,4.0) (3,4.0)
  4. 3 : (1,5.0) (2,5.0) (3,5.0) (5,3.0)
  5. 4 : (1,4.0) (2,4.0) (3,4.0) (4,1.0) (5,4.0)
  6. # 输出
  7. 1 : (2,1.0) (3,1.0) (4,0.122828568570857) (5,0.1566130288262323)
  8. 2 : (3,1.0) (4,0.122828568570857) (5,0.1566130288262323)
  9. 3 : (4,0.122828568570857) (5,0.1566130288262323)
  10. 4 : (5,0.1757340838011157)
复制代码


mapper实现
  1. public class RowSimilarityJob extends AbstractJob {
  2.     public static class CooccurrencesMapper extends
  3.             Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
  4.         @Override
  5.         protected void map(IntWritable column, VectorWritable occurrenceVector,
  6.                 Context ctx) throws IOException, InterruptedException {
  7.             // 获取某个用户关注的所有物品, 按照索引排序
  8.             Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);
  9.             Arrays.sort(occurrences, BY_INDEX);
  10.             int cooccurrences = 0;
  11.             int prunedCooccurrences = 0;
  12.             // 计算物品两两间由相似度实现类的 aggregate 方法求得的值
  13.             // 按照 itemA - <itemO, aggregate> 格式写入到输出
  14.             // 使用测试数据将得到
  15.             // user1:
  16.             //     1 : <2,9.0> <3,9.0> <4,6.0>
  17.             //     2 : <1,9.0> <3,9.0> <4,6.0>
  18.             //     3 : <1,9.0> <2,9.0> <4,6.0>
  19.             //     4 : <1,6.0> <2,6.0> <3,6.0>
  20.             // user2:
  21.             //     1 : <2,16.0> <3,16.0>
  22.             //     2 : <1,16.0> <3,16.0>
  23.             //     3 : <1,16.0> <2,16.0>
  24.             for (int n = 0; n < occurrences.length; n++) {
  25.                 Vector.Element occurrenceA = occurrences[n];
  26.                 Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
  27.                 for (int m = n; m < occurrences.length; m++) {
  28.                     Vector.Element occurrenceB = occurrences[m];
  29.                     if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
  30.                         dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
  31.                         cooccurrences++;
  32.                     } else {
  33.                         prunedCooccurrences++;
  34.                     }
  35.                 }
  36.                 ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
  37.             }
  38.             ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
  39.             ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
  40.         }
  41.     }
  42. }
复制代码


reducer实现
  1. public class RowSimilarityJob extends AbstractJob {
  2.     public static class SimilarityReducer extends
  3.             Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
  4.         @Override
  5.         protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
  6.                 throws IOException, InterruptedException {
  7.             // 上一步mapper过程得到 itemA - <itemO, aggregate> 格式的向量
  8.             // 累加相同物品由不同用户计算出的分值
  9.             // 1 : <2,(9+16+25+16)> <3,(9+16+25+16)> <4,(6+4)> <5,(15+16)>
  10.             // 2 : <1,(9+16+25+16)> <3,(9+16+25+16)> <4,(6+4)> <5,(15+16)>
  11.             Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
  12.             Vector dots = partialDotsIterator.next().get();
  13.             while (partialDotsIterator.hasNext()) {
  14.                 Vector toAdd = partialDotsIterator.next().get();
  15.                 Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();
  16.                 while (nonZeroElements.hasNext()) {
  17.                     Vector.Element nonZeroElement = nonZeroElements.next();
  18.                     dots.setQuick(nonZeroElement.index(),
  19.                             dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
  20.                 }
  21.             }
  22.             // 创建一个相同大小的向量存储两两物品间的相似度
  23.             Vector similarities = dots.like();
  24.             // norms 由上一步的MapReduce任务计算得出
  25.             // 这里直接从HDFS存储路径上加载文件构建成向量
  26.             // (1,66.0) (2,66.0) (3,66.0) (4,5.0) (5,25.0)
  27.             double normA = norms.getQuick(row.get());
  28.             // 使用相似度实现类的similarity方法计算两个物品的相似度
  29.             // 参数为:由相似度实现类的 aggregate 方法得到的值, 两个物品的norm值
  30.             // 欧几里得距离的实现为
  31.             //
  32.             // double euclideanDistance = Math.sqrt(normA - 2 * dots + normB);
  33.             // return 1.0 / (1.0 + euclideanDistance);
  34.             //
  35.             // 1与2的相似度为 1.0 / [1.0 + (66-2*66+66)] = 1.0
  36.             // 1与4的相似度为 1.0 / [1.0 + (66-2*10+5)] = 0.122828568570857
  37.             Iterator<Vector.Element> dotsWith = dots.iterateNonZero();
  38.             while (dotsWith.hasNext()) {
  39.                 Vector.Element b = dotsWith.next();
  40.                 double similarityValue = similarity.similarity(b.get(), normA,
  41.                         norms.getQuick(b.index()), numberOfColumns);
  42.                 if (similarityValue >= treshold) {
  43.                     similarities.set(b.index(), similarityValue);
  44.                 }
  45.             }
  46.             if (excludeSelfSimilarity) {
  47.                 similarities.setQuick(row.get(), 0);
  48.             }
  49.             ctx.write(row, new VectorWritable(similarities));
  50.         }
  51.     }
  52. }
复制代码

矩阵变换
  1. // Job asMatrix
  2. // 矩阵变换, 为每个物品查找TopN相似的其它物品
  3. // 最后仍然是 itemA, <itemO, similarity> 格式的向量
  4. //  
  5. // inputPath      tempDir/pairwiseSimilarity
  6. // outputPath     tempDir/similarityMatrix
  7. // inputFormat    SequenceFileInputFormat
  8. // mapper         UnsymmetrifyMapper
  9. // reducer        MergeToTopKSimilaritiesReducer
  10. // outputFormat   SequenceFileOutputFormat
复制代码

示例数据
  1. // Job asMatrix
  2. // 矩阵变换, 为每个物品查找TopN相似的其它物品
  3. // 最后仍然是 itemA, <itemO, similarity> 格式的向量
  4. //  
  5. // inputPath      tempDir/pairwiseSimilarity
  6. // outputPath     tempDir/similarityMatrix
  7. // inputFormat    SequenceFileInputFormat
  8. // mapper         UnsymmetrifyMapper
  9. // reducer        MergeToTopKSimilaritiesReducer
  10. // outputFormat   SequenceFileOutputFormat
复制代码


mapper实现
  1. public class RowSimilarityJob extends AbstractJob {
  2.     public static class UnsymmetrifyMapper extends
  3.             Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
  4.         @Override
  5.         protected void map(IntWritable row,
  6.                 VectorWritable similaritiesWritable, Context ctx)
  7.                 throws IOException, InterruptedException {
  8.             // 这里的输入格式为 itemA, <itemO, similarity>
  9.             Vector similarities = similaritiesWritable.get();
  10.             // 存储转置后的向量
  11.             Vector transposedPartial = similarities.like();
  12.             // 根据参数设置存储最多 maxSimilaritiesPerRow 个与当前物品相似的物品
  13.             TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);
  14.             Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
  15.             while (nonZeroElements.hasNext()) {
  16.                 Vector.Element nonZeroElement = nonZeroElements.next();
  17.                 topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
  18.                 // 转置向量里按照 <itemA, similarity> 格式存储
  19.                 // 再以 itemO, <itemA, similarity> 格式写入到输出
  20.                 transposedPartial.setQuick(row.get(), nonZeroElement.get());
  21.                 ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
  22.                 transposedPartial.setQuick(row.get(), 0.0);
  23.             }
  24.             // 将与当前物品TopN相似的物品以 itemA, <itemO, similarity> 格式写入到输出
  25.             Vector topKSimilarities = similarities.like();
  26.             for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
  27.                 topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
  28.             }
  29.             ctx.write(row, new VectorWritable(topKSimilarities));
  30.         }
  31.     }
  32. }
复制代码


reducer实现
  1. public class RowSimilarityJob extends AbstractJob {
  2.     public static class MergeToTopKSimilaritiesReducer extends
  3.             Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> {
  4.         @Override
  5.         protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
  6.                 throws IOException, InterruptedException {
  7.             // 将mapper过程中得到的两种向量 itemO, <itemA, similarity> 与 itemA, <itemO, similarity> 按相同物品进行合并
  8.             Vector allSimilarities = Vectors.merge(partials);
  9.             // 再次求TopN
  10.             Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
  11.             // 最后输出 itemA, <itemO, similarity> 格式的向量
  12.             ctx.write(row, new VectorWritable(topKSimilarities));
  13.         }
  14.     }
  15. }
复制代码


矩阵乘法准备

准备过程1
  1. // Job prePartialMultiply1
  2. // 为每个物品添加对自身的相似度, 并将向量转换为 VectorOrPrefWritable 类型
  3. // 最后得到 itemA, <itemO, similarity>, 即每个物品与其它物品的相似度
  4. //
  5. // inputPath      tempDir/similarityMatrix
  6. // outputPath     tempDir/prePartialMultiply1
  7. // inputFormat    SequenceFileInputFormat
  8. // mapper         SimilarityMatrixRowWrapperMapper
  9. // reducer        Reducer
  10. // outputFormat   SequenceFileOutputFormat
复制代码

示例数据
  1. # 输入
  2. 1 : (2,1.0) (3,1.0) (4,0.122828568570857) (5,0.1566130288262323)
  3. 2 : (1,1.0) (3,1.0) (4,0.122828568570857) (5,0.1566130288262323)
  4. 3 : (1,1.0) (2,1.0) (4,0.122828568570857) (5,0.1566130288262323)
  5. 4 : (1,0.122828568570857) (2,0.122828568570857) (3,0.122828568570857) (5,0.1757340838011157)
  6. 5 : (1,0.1566130288262323) (2,0.1566130288262323) (3,0.1566130288262323) (4,0.1757340838011157)
  7. # 输出
  8. 1 : (1,NaN) (2,1.0) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
  9. 2 : (1,1.0) (2,NaN) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
  10. 3 : (1,1.0) (2,1.0) (3,NaN) (4,0.12282856553792953) (5,0.15661302208900452)
  11. 4 : (1,0.12282856553792953) (2,0.12282856553792953) (3,0.12282856553792953) (4,NaN) (5,0.17573408782482147)
  12. 5 : (1,0.15661302208900452) (2,0.15661302208900452) (3,0.15661302208900452) (4,0.17573408782482147) (5,NaN)
复制代码


mapper实现
  1. public final class SimilarityMatrixRowWrapperMapper
  2.         extends
  3.         Mapper<IntWritable, VectorWritable, VarIntWritable, VectorOrPrefWritable> {
  4.     @Override
  5.     protected void map(IntWritable key, VectorWritable value, Context context)
  6.             throws IOException, InterruptedException {
  7.         // 这里的输入格式为 itemA, <itemO, similarity>
  8.         Vector similarityMatrixRow = value.get();
  9.         // 将当前物品与自己的相似度设置 Double.NaN
  10.         similarityMatrixRow.set(key.get(), Double.NaN);
  11.         // 将向量转换为 VectorOrPrefWritable 类型
  12.         context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));
  13.     }
  14. }
复制代码


reducer实现
  1. // org.apache.hadoop.mapreduce.Reducer
  2. // 使用hadoop默认的reducer实现, 即仅仅合并相同的key
  3. // 但在mapper过程中的输入即已是合并完成的向量
  4. // 所以这里的reducer过程并无实际意义
复制代码


准备过程2
  1. // Job prePartialMultiply2
  2. //
  3. // 把用户偏好信息变换为以物品为key的向量
  4. // 即由 userID, <itemID, pref> 变换为 itemID, <userID, pref>
  5. // 最后得到每个用户对同一个物品的偏好
  6. //  
  7. // inputPath      tempDir/preparePreferenceMatrix/userVectors
  8. // outputPath     tempDir/prePartialMultiply2
  9. // inputFormat    SequenceFileInputFormat
  10. // mapper         UserVectorSplitterMapper
  11. // reducer        Reducer
  12. // outputFormat   SequenceFileOutputFormat
复制代码

示例数据
  1. # 输入
  2. 1 : (1,3.0) (2,3.0) (3,3.0) (4,2.0)
  3. 2 : (1,4.0) (2,4.0) (3,4.0)
  4. 3 : (1,5.0) (2,5.0) (3,5.0) (5,3.0)
  5. 4 : (1,4.0) (2,4.0) (3,4.0) (4,1.0) (5,4.0)
  6. # 输出
  7. 1 : (1,3.0)
  8. 1 : (2,4.0)
  9. 1 : (3,5.0)
  10. 1 : (4,4.0)
  11. 2 : (1,3.0)
  12. 2 : (2,4.0)
  13. 2 : (3,5.0)
  14. 2 : (4,4.0)
  15. 3 : (1,3.0)
  16. 3 : (2,4.0)
  17. 3 : (3,5.0)
  18. 3 : (4,4.0)
  19. 4 : (1,2.0)
  20. 4 : (4,1.0)
  21. 5 : (3,3.0)
  22. 5 : (4,4.0)
复制代码

mapper实现
  1. public final class UserVectorSplitterMapper
  2.         extends
  3.         Mapper<VarLongWritable, VectorWritable, VarIntWritable, VectorOrPrefWritable> {
  4.     @Override
  5.     protected void map(VarLongWritable key, VectorWritable value,
  6.             Context context) throws IOException, InterruptedException {
  7.         long userID = key.get();
  8.         // 是否过滤用户
  9.         if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
  10.             return;
  11.         }
  12.         Vector userVector = maybePruneUserVector(value.get());
  13.         // 获取每个用户对物品的偏好信息
  14.         // 将 userID, <itemID, pref> 变换为 itemID, <userID, pref>
  15.         Iterator<Vector.Element> it = userVector.iterateNonZero();
  16.         VarIntWritable itemIndexWritable = new VarIntWritable();
  17.         VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
  18.         while (it.hasNext()) {
  19.             Vector.Element e = it.next();
  20.             itemIndexWritable.set(e.index());
  21.             vectorOrPref.set(userID, (float) e.get());
  22.             context.write(itemIndexWritable, vectorOrPref);
  23.         }
  24.     }
  25. }
复制代码

reducer实现
  1. // org.apache.hadoop.mapreduce.Reducer
  2. // 使用hadoop默认的reducer实现, 即仅仅合并相同的key
复制代码

数据拼接
  1. // Job partialMultiply
  2. // 根据用户偏好以及物品相似度获得
  3. // itemA, <<itemO, similarity>, userIDs, prefValues> 格式的输出
  4. // inputPath      tempDir/prePartialMultiply1, tempDir/prePartialMultiply2
  5. // outputPath     tempDir/partialMultiply
  6. // inputFormat    SequenceFileInputFormat
  7. // mapper         Mapper
  8. // reducer        ToVectorAndPrefReducer
  9. // outputFormat   SequenceFileOutputFormat</span>
复制代码

示例数据
  1. <span style="font-weight: normal;"># 输入
  2. # prePartialMultiply1
  3. 1 : (1,NaN) (2,1.0) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
  4. 2 : (1,1.0) (2,NaN) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
  5. 3 : (1,1.0) (2,1.0) (3,NaN) (4,0.12282856553792953) (5,0.15661302208900452)
  6. 4 : (1,0.12282856553792953) (2,0.12282856553792953) (3,0.12282856553792953) (4,NaN) (5,0.17573408782482147)
  7. 5 : (1,0.15661302208900452) (2,0.15661302208900452) (3,0.15661302208900452) (4,0.17573408782482147) (5,NaN)
  8. # prePartialMultiply2
  9. 1 : (1,3.0)
  10. 1 : (2,4.0)
  11. 1 : (3,5.0)
  12. 1 : (4,4.0)
  13. 2 : (1,3.0)
  14. 2 : (2,4.0)
  15. 2 : (3,5.0)
  16. 2 : (4,4.0)
  17. 3 : (1,3.0)
  18. 3 : (2,4.0)
  19. 3 : (3,5.0)
  20. 3 : (4,4.0)
  21. 4 : (1,2.0)
  22. 4 : (4,1.0)
  23. 5 : (3,3.0)
  24. 5 : (4,4.0)
  25. # 输出
  26. 1 :
  27. <1,3.0> <2,4.0> <4,4.0> <3,5.0>
  28. (1,NaN) (2,1.0) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
  29. 2 :
  30. <1,3.0> <2,4.0> <4,4.0> <3,5.0>
  31. (1,1.0) (2,NaN) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
  32. 3 :
  33. <1,3.0> <2,4.0> <4,4.0> <3,5.0>
  34. (1,1.0) (2,1.0) (3,NaN) (4,0.12282856553792953) (5,0.15661302208900452)
  35. 4 :
  36. <1,2.0> <4,1.0>
  37. (1,0.12282856553792953) (2,0.12282856553792953) (3,0.12282856553792953) (4,NaN) (5,0.17573408782482147)
  38. 5 :
  39. <3,3.0> <4,4.0>
  40. (1,0.15661302208900452) (2,0.15661302208900452) (3,0.15661302208900452) (4,0.17573408782482147) (5,NaN)
复制代码


mapper实现
  1. // org.apache.hadoop.mapreduce.Mapper
  2. // 使用hadoop默认的mapper实现, 即仅仅读取输入
复制代码


reducer实现
  1. public final class ToVectorAndPrefReducer
  2.         extends
  3.         Reducer<VarIntWritable, VectorOrPrefWritable, VarIntWritable, VectorAndPrefsWritable> {
  4.     @Override
  5.     protected void reduce(VarIntWritable key, Iterable<VectorOrPrefWritable> values, Context context)
  6.             throws IOException, InterruptedException {
  7.         List<Long> userIDs = Lists.newArrayList();
  8.         List<Float> prefValues = Lists.newArrayList();
  9.         Vector similarityMatrixColumn = null;
  10.         // 从输入中分别读取所有用户对当前物品的打分以及所有其它物品与当前物品的相似度
  11.         for (VectorOrPrefWritable value : values) {
  12.             if (value.getVector() == null) {
  13.                 userIDs.add(value.getUserID());
  14.                 prefValues.add(value.getValue());
  15.             } else {
  16.                 if (similarityMatrixColumn != null) {
  17.                     throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
  18.                 }
  19.                 similarityMatrixColumn = value.getVector();
  20.             }
  21.         }
  22.         if (similarityMatrixColumn == null) {
  23.             return;
  24.         }
  25.         // 以 itemA, <<itemO, similarity>, userIDs, prefValues> 格式写入到输出
  26.         VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);
  27.         context.write(key, vectorAndPrefs);
  28.     }
  29. }<b></span></b>
复制代码


进行推荐物品过滤
  1. // 如果启动RecommenderJob时设置了 filterFile 则执行此任务
  2. // 最后将得到一个黑名单, 用于在推荐过程进行过滤
  3. //
  4. // inputPath      存储被过滤物品的数据目录
  5. // outputPath     tempDir/explicitFilterPath
  6. // inputFormat    TextInputFormat
  7. // mapper         ItemFilterMapper
  8. // reducer        ItemFilterAsVectorAndPrefsReducer
  9. // outputFormat   SequenceFileOutputFormat
复制代码


mapper实现
  1. public class ItemFilterMapper extends
  2.         Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {
  3.     @Override
  4.     protected void map(LongWritable key, Text line, Context ctx)
  5.             throws IOException, InterruptedException {
  6.         // 仅仅读取了要为哪些用户过滤哪些物品
  7.         // 因为做矩阵乘法准备的过程中生成的数据都以物品为key
  8.         // 因此这里的输出为 itemID, userID
  9.         String[] tokens = SEPARATOR.split(line.toString());
  10.         long userID = Long.parseLong(tokens[0]);
  11.         long itemID = Long.parseLong(tokens[1]);
  12.         ctx.write(new VarLongWritable(itemID), new VarLongWritable(userID));
  13.     }
  14. }
复制代码


reducer实现
  1. public class ItemFilterAsVectorAndPrefsReducer extends
  2.         Reducer<VarLongWritable, VarLongWritable, VarIntWritable, VectorAndPrefsWritable> {
  3.     @Override
  4.     protected void reduce(VarLongWritable itemID,
  5.             Iterable<VarLongWritable> values, Context ctx) throws IOException,
  6.             InterruptedException {
  7.         // 索引转换
  8.         int itemIDIndex = TasteHadoopUtils.idToIndex(itemID.get());
  9.         // 将被过滤物品的相似度设置为Double.NaN, 以便于识别
  10.         Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
  11.         vector.set(itemIDIndex, Double.NaN);
  12.         List<Long> userIDs = Lists.newArrayList();
  13.         List<Float> prefValues = Lists.newArrayList();
  14.         // 将用户对被过滤物品的喜欢统一设置为1.0
  15.         for (VarLongWritable userID : values) {
  16.             userIDs.add(userID.get());
  17.             prefValues.add(1.0f);
  18.         }
  19.         ctx.write(new VarIntWritable(itemIDIndex), new VectorAndPrefsWritable(vector, userIDs, prefValues));
  20.     }
  21. }
复制代码


统计和推荐
  1. // Job aggregateAndRecommend
  2. //
  3. // 根据用户对每个物品的偏好信息进行计算
  4. // 最后返回为每个用户给出TopN个推荐
  5. //
  6. // inputPath      tempDir/partialMultiply, tempDir/explicitFilterPath
  7. // outputPath     推荐结果的输出目录
  8. // inputFormat    SequenceFileInputFormat
  9. // mapper         PartialMultiplyMapper
  10. // reducer        AggregateAndRecommendReducer
  11. // outputFormat   TextOutputFormat
复制代码


示例数据
  1. # 输入
  2. 1 :
  3. <1,3.0> <2,4.0> <4,4.0> <3,5.0>
  4. (1,NaN) (2,1.0) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
  5. 2 :
  6. <1,3.0> <2,4.0> <4,4.0> <3,5.0>
  7. (1,1.0) (2,NaN) (3,1.0) (4,0.12282856553792953) (5,0.15661302208900452)
  8. 3 :
  9. <1,3.0> <2,4.0> <4,4.0> <3,5.0>
  10. (1,1.0) (2,1.0) (3,NaN) (4,0.12282856553792953) (5,0.15661302208900452)
  11. 4 :
  12. <1,2.0> <4,1.0>
  13. (1,0.12282856553792953) (2,0.12282856553792953) (3,0.12282856553792953) (4,NaN) (5,0.17573408782482147)
  14. 5 :
  15. <3,3.0> <4,4.0>
  16. (1,0.15661302208900452) (2,0.15661302208900452) (3,0.15661302208900452) (4,0.17573408782482147) (5,NaN)
  17. # 输出
  18. 1   [5:2.7277858]
  19. 2   [5:4.0,4:4.0]
  20. 3   [4:4.35418]
复制代码

mapper实现
  1. public final class PartialMultiplyMapper extends
  2.         Mapper<VarIntWritable, VectorAndPrefsWritable, VarLongWritable, PrefAndSimilarityColumnWritable> {
  3.     @Override
  4.     protected void map(VarIntWritable key,
  5.             VectorAndPrefsWritable vectorAndPrefsWritable, Context context)
  6.             throws IOException, InterruptedException {
  7.         // 从输入中分别读取当前物品的相似度向量, 即 itemA, <itemO, similarity>
  8.         // 以及所有用户对当前物品的偏好信息, 即 <userID, pref> , 这里将其分别存储在两个列表中
  9.         Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();
  10.         List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
  11.         List<Float> prefValues = vectorAndPrefsWritable.getValues();
  12.         VarLongWritable userIDWritable = new VarLongWritable();
  13.         PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();
  14.         // 遍历用户, 输出 userID, <pref, <itemO, similarity>>
  15.         // 比如
  16.         // 5 : <3,3.0> <4,4.0>
  17.         // (1,0.15661302208900452)
  18.         // (2,0.15661302208900452)
  19.         // (3,0.15661302208900452)
  20.         // (4,0.17573408782482147)
  21.         // (5,NaN)
  22.         // 变换为
  23.         // 3 : <3.0,
  24.         // (1,0.15661302208900452)
  25.         // (2,0.15661302208900452)
  26.         // (3,0.15661302208900452)
  27.         // (4,0.17573408782482147)>
  28.         // 4 : <4.0,
  29.         // (1,0.15661302208900452)
  30.         // (2,0.15661302208900452)
  31.         // (3,0.15661302208900452)
  32.         // (4,0.17573408782482147)>
  33.         for (int i = 0; i < userIDs.size(); i++) {
  34.             long userID = userIDs.get(i);
  35.             float prefValue = prefValues.get(i);
  36.             if (!Float.isNaN(prefValue)) {
  37.                 prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);
  38.                 userIDWritable.set(userID);
  39.                 context.write(userIDWritable, prefAndSimilarityColumn);
  40.             }
  41.         }
  42.     }
  43. }
复制代码

reducer实现
  1. public final class AggregateAndRecommendReducer
  2.         extends
  3.         Reducer<VarLongWritable, PrefAndSimilarityColumnWritable, VarLongWritable, RecommendedItemsWritable> {
  4.     @Override
  5.     protected void reduce(VarLongWritable userID, Iterable<PrefAndSimilarityColumnWritable> values, Context context)
  6.             throws IOException, InterruptedException {
  7.         // 针对任务运行参数分别处理
  8.         if (booleanData) {
  9.             reduceBooleanData(userID, values, context);
  10.         } else {
  11.             reduceNonBooleanData(userID, values, context);
  12.         }
  13.     }
  14.     private void reduceNonBooleanData(VarLongWritable userID,
  15.             Iterable<PrefAndSimilarityColumnWritable> values, Context context)
  16.             throws IOException, InterruptedException {
  17.         Vector numerators = null;
  18.         Vector denominators = null;
  19.         Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
  20.         // 遍历用户有过偏好信息的所有物品
  21.         // 2 <4.0, (2,1.0) (3,1.0)
  22.         //   (4,0.12282856553792953) (5,0.15661302208900452)>
  23.         // 2 <4.0, (1,1.0) (3,1.0)
  24.         //   (4,0.12282856553792953) (5,0.15661302208900452)>
  25.         // 2 <4.0, (1,1.0) (2,1.0)
  26.         //   (4,0.12282856553792953) (5,0.15661302208900452)>
  27.         for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
  28.             Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
  29.             float prefValue = prefAndSimilarityColumn.getPrefValue();
  30.             // 为每一个需要给出预测分值的物品记录相似物品的打分次数
  31.             Iterator<Vector.Element> usedItemsIterator = simColumn.iterateNonZero();
  32.             while (usedItemsIterator.hasNext()) {
  33.                 int itemIDIndex = usedItemsIterator.next().index();
  34.                 numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
  35.             }
  36.             // 计算加权值所需的分子
  37.             // 构成由 similarity * pref 组成的新向量
  38.             // <4, 0.12282856553792953 * 4.0 +
  39.             //     0.12282856553792953 * 4.0 +
  40.             //     0.12282856553792953 * 4.0>
  41.             // <5, 0.15661302208900452 * 4.0 +
  42.             //     0.15661302208900452 * 4.0 +
  43.             //     0.15661302208900452 * 4.0>
  44.             numerators = numerators == null ?
  45.                 prefValue == BOOLEAN_PREF_VALUE ? simColumn.clone() : simColumn.times(prefValue)
  46.                 : numerators.plus(prefValue == BOOLEAN_PREF_VALUE ? simColumn
复制代码







引用:http://matrix-lisp.github.io/blog/2013/12/26/mahout-taste-source-2/


欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(1)人评论

跳转到指定楼层
songyl525 发表于 2014-12-30 15:23:11
写的很详细,我没有细看,思路就是这样的
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条