分享

Spark实现倒排索引

ayou 发表于 2015-5-4 11:02:45 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 36590
本帖最后由 ayou 于 2015-5-6 08:37 编辑

输入:文档ID+“\t”+文档内容
ID1    Hello world
ID2    Hello spark
输出:关键词+"\t"+文档ID
Hello    ID1 ID2

world     ID1
spark     ID2

代码如下:
请根据自己机器情况修改路径
  1. package Spark1
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.SparkContext._
  4. import scala.collection.mutable
  5. /**
  6. *
  7. * Created by youxingzhi on 15-5-3.
  8. */
  9. object InvertedIndex {
  10.   def main (args: Array[String]) {
  11.     val conf = new SparkConf().setAppName("InvertedIndex").setMaster("spark://192.168.1.170:7077")
  12.     val spark =  new SparkContext(conf)
  13.     spark.addJar("/home/youxingzhi/IdeaProjects/WordCount/out/artifacts/Spark1_jar/Spark1.jar")
  14.     //textFile可以通过设置第二个参数来指定slice个数(slice与Hadoop里的split/block概念对应,一个task处理一个slice)。Spark默认将Hadoop上一个block对应为一个slice,但可以调大slice的个数,但不能比block的个数小,这就需要知道HDFS上一个文件的block数目,可以通过50070的dfs的jsp来查看。
  15.     val words = spark.textFile("hdfs://master:8020/InvertedIndex",1).map(file=>file.split("\t")).
  16.       map(item =>{
  17.       (item(0),item(1))
  18.     }).flatMap(file => {
  19.       var map = mutable.Map[String,String]()
  20.       val words = file._2.split(" ").iterator
  21.       val doc = file._1
  22.       while(words.hasNext){
  23.         map+=(words.next() -> doc)
  24.       }
  25.       map
  26.     })
  27.     //save to file
  28.     words.reduceByKey(_+" "+_).map(x=>{
  29.       x._1+"\t"+x._2
  30.     }).saveAsTextFile("hdfs://master:8020/test3")
  31.   }
  32. }
复制代码






没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条