//一份日志文件log1.txt,内容:访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
目标:session查询次数排行榜
val rdd1 = sc.textFile("hdfs://yun01-nn-01:9000/data/log1.txt")
val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)
val rdd3=rdd2.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
rdd3.toDebugString
rdd3.saveAsTextFile("hdfs://yun01-nn-01:9000/spark2")
scala> rdd3.toDebugString
res3: String =
(2) MapPartitionsRDD[10] at map at <console>:25 []
| ShuffledRDD[9] at sortByKey at <console>:25 []
+-(2) MapPartitionsRDD[6] at map at <console>:25 []
| ShuffledRDD[5] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[4] at map at <console>:25 []
| MapPartitionsRDD[3] at filter at <console>:23 []
| MapPartitionsRDD[2] at map at <console>:23 []
| MapPartitionsRDD[1] at textFile at <console>:21 []
| hdfs://yun01-nn-01:9000/data/log1.txt HadoopRDD[0] at textFile at <console>:21 []
分析:
由上面的RDD3依赖关系看出,保存之前经过了两次shuffle,分别在reduceByKey 和sortByKey 时,因此DAGScheduler将之分成3个stage,可见stage是以shuffle为边界划分的,查看web界面的运行情况也验证了这一点,最后一个stage为保存时生成,在rdd3之后了。