分享

用Hadoop流实现mapreduce版推荐系统基于物品的协同过滤算法


问题导读

1.mapreduce矩阵乘法是如何实现的?
2.本文如何改进,形成一个mapreduce?







以个性化新闻推荐为例,整个过程分成两个mapreduce阶段,当然也可以整合为一个mapreduce,详细参考

让你真正明白什么是MapReduce组合式,迭代式,链式
http://www.aboutyun.com/thread-7435-1-1.html




1、首先需要将原始数据处理成如下形式的两个文件

文件一:Item_user_score.txt格式:物品—用户—分数
如下图中第一行,物品100655565被用户1634974浏览过,则将分数记为1

1.png

文件二:Item_Item_number.txt格式:物品—物品—相似度
如下图中第二行,物品100654360与物品100650498同时被两个用户浏览过

2.png


2、矩阵乘法

其实文件一和文件二分别保存着一个矩阵,第二步就是要做矩阵乘法。

step1mapper.py
[mw_shl_code=python,true]#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
        for line in file:
                yield line.strip().split(separator,1)

def main(separator='\t'):
        # input comes from STDIN (standard input)
        datas = read_mapper_output(sys.stdin, separator=separator)
        # groupby groups multiple word-count pairs by word,
        # and creates an iterator that returns consecutive keys and their group:
        #   current_word - string containing a word (the key)
        #   group - iterator yielding all ["<current_word>", "<count>"] items
        #print data
        for data in datas:
                print "%s%s%s" % (data[0], separator, data[1])
if __name__ == "__main__":
        main()[/mw_shl_code]

step1mapper.py的主要目的是将两个文件中的键值对取出,然后相同的key输入到同一个reducer。


step1reducer.py

[mw_shl_code=python,true]#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
        for line in file:
                yield line.split()

def main(separator='\t'):
        # input comes from STDIN (standard input)
        data = read_mapper_output(sys.stdin, separator=separator)
        # groupby groups multiple word-count pairs by word,
        # and creates an iterator that returns consecutive keys and their group:
        #   current_word - string containing a word (the key)
        #   group - iterator yielding all ["<current_word>", "<count>"] items
        #print data
        datanew={}
        for key in data:
                if key[0] not in datanew.keys():
                        datanew[key[0]]=[]
                datanew[key[0]].append([key[1],key[2]])
        for line in datanew.keys():
                user_score=[]
                item_number=[]
                for temp in datanew[line]:
                        if int(temp[0])/1000000 < 99:
                                user_score.append(temp)
                        else :
                                item_number.append(temp)
                for attr1 in user_score:
                        for attr2 in item_number:
                                print "%s%s%s" % (attr1[0], separator, attr2[0]+separator+str(float(attr1[1])*int(attr2[1])))
if __name__ == "__main__":
        main()
[/mw_shl_code]

step1reducer.py的输出形式为:1634974 10065436034.0

可以理解为用户1634974对新闻100654360的喜爱程度为34.

执行命令为:
  ~/hadoop-2.3.0/bin/hadoop jar hadoop-streaming-2.3.0.jar -mapper step1mapper.py -reducer step1reducer.py -input /input/test.txt -output /step1out.txt -file step1mapper.py -file step1reducer.py  



3、结果融合
将上一个过程reducer的输出作为该过程mapper阶段的输入。
step2mapper.py

[mw_shl_code=python,true]#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
        for line in file:
                yield line.strip().split(separator,1)

def main(separator='\t'):
        # input comes from STDIN (standard input)
        datas = read_mapper_output(sys.stdin, separator=separator)
        # groupby groups multiple word-count pairs by word,
        # and creates an iterator that returns consecutive keys and their group:
        #   current_word - string containing a word (the key)
        #   group - iterator yielding all ["<current_word>", "<count>"] items
        #print data
        for data in datas:
                print "%s%s%s" % (data[0], separator, data[1])
if __name__ == "__main__":
        main()[/mw_shl_code]

step2reducer.py推荐最可能的5个

[mw_shl_code=python,true]#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
import operator
from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
        for line in file:
                yield line.split()

def main(separator='\t'):
        # input comes from STDIN (standard input)
        data = read_mapper_output(sys.stdin, separator=separator)
        # groupby groups multiple word-count pairs by word,
        # and creates an iterator that returns consecutive keys and their group:
        #   current_word - string containing a word (the key)
        #   group - iterator yielding all ["<current_word>", "<count>"] items
        #print data
        datanew={}
        for key in data:
#                print "\n--------------------------"
                if key[0] not in datanew.keys():
                        datanew[key[0]]=[]
                datanew[key[0]].append([key[1],key[2]])
        for key in datanew.keys():
                result={}
                for data in datanew[key]:
                        if data[0] not in result.keys():
                                result[data[0]]=0.0
                        result[data[0]] += float(data[1])
                for rec_item in sorted(result.iteritems(),key=operator.itemgetter(1),reverse=True)[:5]:
                        print "%s\t%s\t%s" % (key, rec_item[0], rec_item[1])
if __name__ == "__main__":
        main()
[/mw_shl_code]


执行命令为:  ~/hadoop-2.3.0/bin/hadoop jar hadoop-streaming-2.3.0.jar -mapper step2mapper.py -reducer step2reducer.py -input/output/step1out.txt -output /step2out.txt -file step2mapper.py -file step2reducer.py  

输出形式为:

PS:其实两个mapreduce阶段的mapper都是冗余的,完全可以在执行命令的时候加入参数:
-D stream.map.input.field.separator='\t'   //输入文件中的数据每行都以tab作为分隔符
-D stream.num.map.input.key.fields=1       //每行第一个tab之前的数据作为key,其余的作为value

这样两个mapreduce过程就可以合并成一个mapreduce过程执行。




在原先作者:keepreder基础做了改进

本帖被以下淘专辑推荐:

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条