本帖最后由 ningbingjian 于 2017-10-28 21:59 编辑
让用napreduce实现的目的应该是预设了数据量极大,不能装内存,写的程序要有分布式的思维,楼上有个同学发代码,按第二列作为key分发,而且只做了一次reducer,感觉好像不太对,而且还感觉顶不住大数据量。
我的实现原理:
1.第一列和第二列互换构造第二个表
0 1 家电
0 2 服装
0 3 食品
1 4 洗衣机
1 5 冰箱
2 6 男装
2 7 女装
3 8 零食
3 9 水果
4 10 美的
5 11 海尔
6 12 衬衫
7 13 蓬蓬裙
8 14 牛奶
14 15 特仑苏
如果用mapreduce的话 ,需要自定义partitioner来将两个表相同的key分发到相同的reducer即可
然后做关联
用spark好处理点 直接调用rdd.join
两个表连接:
原始表join构造表:
1,((1,洗衣机),(10,美的)))
(3,((3,零食),(14,牛奶)))
(8,((8,牛奶),(15,特仑苏)))
(2,((2,女装),(13,蓬蓬裙)))
(1,((1,冰箱),(11,海尔)))
(2,((2,男装),(12,衬衫)))
(0,((0,服装),(6,男装)))
(0,((0,服装),(7,女装)))
(0,((0,食品),(8,零食)))
(0,((0,食品),(9,水果)))
(0,((0,家电),(4,洗衣机)))
(0,((0,家电),(5,冰箱)))
原始表继续join 刚才的结果:
(3,((3,零食),((8,牛奶),(15,特仑苏))))
(0,((0,服装),((2,女装),(13,蓬蓬裙))))
(0,((0,服装),((2,男装),(12,衬衫))))
(0,((0,食品),((3,零食),(14,牛奶))))
(0,((0,家电),((1,洗衣机),(10,美的))))
(0,((0,家电),((1,冰箱),(11,海尔))))
原始表继续join 刚才的结果:
(0,(None,((0,服装),((2,女装),(13,蓬蓬裙)))))
(0,(None,((0,服装),((2,男装),(12,衬衫)))))
(0,(None,((0,食品),((3,零食),(14,牛奶)))))
(0,(None,((0,家电),((1,洗衣机),(10,美的)))))
(0,(None,((0,家电),((1,冰箱),(11,海尔)))))
(3,(Some((0,食品)),((3,零食),((8,牛奶),(15,特仑苏)))))
到这里就好处理了
(0,(None,((0,食品),((3,零食),(14,牛奶)))))
(3,(Some((0,食品)),((3,零食),((8,牛奶),(15,特仑苏)))))
这两个需要特殊处理,判断长度,留下长度最大的就可以。
(3,(Some((0,食品)),((3,零食),((8,牛奶),(15,特仑苏)))))
我的实现缺点是必须知道join的最大次数,也就是分类的最大深度
当然还可以优化下de
附上spark代码:
[mw_shl_code=scala,true]
package com.ning.test
import org.apache.spark.{SparkConf, SparkContext}
object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("test")
conf.setMaster("local[1]")
val sc = new SparkContext(conf)
val rdd = sc.textFile("data/test.txt")
val rdd1 = rdd.map{
case line =>
val fields = line.split("\t+")
(fields(0),(fields(1),fields(2)))
}
val rdd2 = rdd.map{
case line =>
val fields = line.split("\t")
(fields(1),(fields(0),fields(2)))
}
val rdd3 = rdd1.join(rdd2)
.map{
case item=>
(item._2._1._1,(item._2._1,item._2._2))
}
rdd3.collect().foreach(println)
val rdd4 = rdd1.join(rdd3)
.map{
case item=>
(item._2._1._1,(item._2._1,item._2._2))
}
rdd4.collect().foreach(println)
rdd1.rightOuterJoin(rdd4)
.collect().foreach(println)
}
}
[/mw_shl_code]
|