分享

spark官网一个例子 不懂 请教一下各位大神

remarkzhao 发表于 2017-11-3 10:17:42 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 8 5516
object QueueStream {  def main(args: Array[String]) {    val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")    val ssc = new StreamingContext(sparkConf, Seconds(20))    val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()    val queueStream = ssc.queueStream(rddQueue)    val mappedStream = queueStream.map(r => (r % 10, 1))    val reducedStream = mappedStream.reduceByKey(_ + _)    reducedStream.print()    ssc.start()    for (i <- 1 to 10){        rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)        Thread.sleep(1000)    }    ssc.stop()  }}object QueueStream {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(20))
    val rddQueue =new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
    val queueStream = ssc.queueStream(rddQueue)
    val mappedStream = queueStream.map(r => (r % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()
    ssc.start()
    for (i <- 1 to 10){
        rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
        Thread.sleep(1000)
    }
    ssc.stop()
  }
}  请教各位大神一个问题,这个是spark官网一个例子,输出结果是 (4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)
我咋觉得应该是(4,100)
(0,100)
(6,100)
(8,100)
(2,100)
(1,100)
(3,100)
(7,100)
(9,100)
(5,100)

    for (i <- 1 to 10){
        rddQueue += ssc.sparkContext.makeRDD(1 to 100,2)
        Thread.sleep(1000)
    }  这个循环总共做了10次,每次都是放1到100啊。。一批流计算结果就是 (4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)
那10次 应该就是 (0,100)
(6,100)
(8,100)
(2,100)
(1,100)
(3,100)
(7,100)
(9,100)
(5,100)


已有(8)人评论

跳转到指定楼层
einhep 发表于 2017-11-3 13:41:35
通过通常的spark程序实现应该是这个结果。如果spark streaming,输出的可能是一段时间内的,而非整个的数据。楼主可以尝试验证下。
回复

使用道具 举报

nextuser 发表于 2017-11-3 16:15:51
einhep 发表于 2017-11-3 13:41
通过通常的spark程序实现应该是这个结果。如果spark streaming,输出的可能是一段时间内的,而非整个的数据 ...

代码好乱
回复

使用道具 举报

nextuser 发表于 2017-11-3 16:31:05
这是对每个流的统计。
每个输出应该是这样的
比如下面程序
[mw_shl_code=scala,true]object QueueStream {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("queueStream")
    //每1秒对数据进行处理
    val ssc = new StreamingContext(conf,Seconds(1))
    //创建一个能够push到QueueInputDStream的RDDs队列
    val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
    //基于一个RDD队列创建一个输入源
    val inputStream = ssc.queueStream(rddQueue)
    val mappedStream = inputStream.map(x => (x % 10,1))
    val reduceStream = mappedStream.reduceByKey(_ + _)
    reduceStream.print
    ssc.start()
    for(i <- 1 to 30){
      rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2)   //创建RDD,并分配两个核数
      Thread.sleep(1000)                                 
    }
    ssc.stop()
  }
}[/mw_shl_code]
[mw_shl_code=bash,true]-------------------------------------------
Time: 1459595433000 ms //第1个输出
-------------------------------------------
(4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)
............
............
-------------------------------------------
Time: 1459595463000 ms //第30个输出
-------------------------------------------
(4,10)
(0,10)
(6,10)
(8,10)
(2,10)
(1,10)
(3,10)
(7,10)
(9,10)
(5,10)[/mw_shl_code]

回复

使用道具 举报

nextuser 发表于 2017-11-3 16:34:46
本帖最后由 nextuser 于 2017-11-3 16:35 编辑

第10次和第1次没有任何的关系,都是从1到100统计
1到10里面,有10个数字,这时候统计从1到9都出现1次
11到20里面,同样10个数字,这时候统计从1到9都出现2次
同样到100, 每个都是10个数字,这时候统计从1到9都出现10次
也就是每个循环都是10次,也就是第一次循环是10次,第二个循环也是10次,第10个循环也是10次。怎么会变成100次。楼主是怎么计算的

回复

使用道具 举报

wangxiaojian 发表于 2017-11-3 17:39:14
本帖最后由 wangxiaojian 于 2017-11-3 17:57 编辑

总共循环了30次,每一次循环的时候 实际是 1 到 100,那么 reduceBykey 中  x % 10 的值就是0 到 9,在执行reduce的时候统计到 0--10次,1--10次,2--10 次 等等等.....
楼主大概是没理解scala 的queue队列,然后楼主再取理解一下sparkStreaming,对流动的数据,按设定的1秒切割一次。
回复

使用道具 举报

remarkzhao 发表于 2017-11-6 09:17:08
nextuser 发表于 2017-11-3 16:34
第10次和第1次没有任何的关系,都是从1到100统计
1到10里面,有10个数字,这时候统计从1到9都出现1次
11 ...

恩恩。明白了。多谢。。

回复

使用道具 举报

remarkzhao 发表于 2017-11-6 10:46:50
nextuser 发表于 2017-11-3 16:34
第10次和第1次没有任何的关系,都是从1到100统计
1到10里面,有10个数字,这时候统计从1到9都出现1次
11 ...

hi,我把执行时间换成20秒 为什么只能打印一次呢???
回复

使用道具 举报

nextuser 发表于 2017-11-7 15:24:21
remarkzhao 发表于 2017-11-6 10:46
hi,我把执行时间换成20秒 为什么只能打印一次呢???

那就是每20秒处理一次,应该不会的。可以多测试几个区间。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条