分享

spark循环中的变量问题,,foreach里面向map填值,在循环外面值没有了

头大了。。。。。下面这个map在循环里面有值,到循环外的map里就没有值了。。。

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf();
    conf.setMaster("local[1]");
    conf.setAppName("WcAppTask");
    val sc = new SparkContext(conf);
    val fileRdd = sc.parallelize(Array(("imsi1","2018-07-29 11:22:23","zd-A"),("imsi2","2018-07-29 11:22:24","zd-A"),("imsi3","2018-07-29 11:22:25","zd-A")))
    val result = mutable.Map.empty[String,String]
    fileRdd.foreach(input=>{
      val str = (input._1+"/t"+input._2+"/t"+input._3).toString;
      result += (input._1.toString -> str)
      println(result.size) //返回3
    })
    println(result.size) //返回0

  }
}
输出结果:12318/07/29 15:08:19 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1012 bytes result sent to driver18/07/29 15:08:19 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 199 ms on localhost (executor driver) (1/1)18/07/29 15:08:19 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/07/29 15:08:19 INFO DAGScheduler: ResultStage 0 (foreach at Test.scala:18) finished in 0.237 s018/07/29 15:08:19 INFO DAGScheduler: Job 0 finished: foreach at Test.scala:18, took 0.689218 s



已有(5)人评论

跳转到指定楼层
未央不见 发表于 2018-7-31 15:11:35
w517424787 发表于 2018-7-30 13:55
一般想在spark算子中使用外部变量,并改变外部变量的值,都是使用累加器来实现,因为在spark算子中引用的外 ...

谢谢大神,你总结真是太好了,我最近在研究spark,卡在这一块了,现在豁然开朗。。舒坦
还发现了另外一种方法:(使用广播broadcast)

val fileRdd = sc.parallelize(Array(("imsi1","2018-07-29 11:22:23","zd-A"),("imsi2","2018-07-29 11:22:24","zd-A"),("imsi3","2018-07-29 11:22:25","zd-A")))
val result = mutable.Map.empty[String,String]
val resultBroadCast = sc.broadcast(result)

fileRdd.foreach(input=>{
  val str = (input._1+"\t"+input._2+"\t"+input._3).toString;
  resultBroadCast.value += (input._1.toString -> str)
})

resultBroadCast.value.foreach(println(_)) //返回3
也可以返回结果:18/07/31 15:08:18 INFO DAGScheduler: Job 0 finished: foreach at Test.scala:19, took 0.238780 s(imsi2,imsi2        2018-07-29 11:22:24        zd-A)(imsi1,imsi1        2018-07-29 11:22:23        zd-A)(imsi3,imsi3        2018-07-29 11:22:25        zd-A)
回复

使用道具 举报

w517424787 发表于 2018-7-30 13:55:20
一般想在spark算子中使用外部变量,并改变外部变量的值,都是使用累加器来实现,因为在spark算子中引用的外部变量,其实是变量的副本,在算子中对其值进行修改,只是改变副本的值,外部的变量还是没有变。
累加器参考代码:
val accum = sc.collectionAccumulator[mutable.Map[String, String]]("My Accumulator")
    fileRdd.foreach(input => {
      val str = input._1 + "/t" + input._2 + "/t" + input._3
      accum.add(mutable.Map(input._1 -> str))
    })

    println(accum.value)
   
    累加器返回的结果:
   [Map(imsi1 -> imsi1/t2018-07-29 11:22:23/tzd-A), Map(imsi2 -> imsi2/t2018-07-29 11:22:24/tzd-A), Map(imsi3 -> imsi3/t2018-07-29 11:22:25/tzd-A)]

还有一种方法就是使用RDD的collect算子,将数据收集到Driver进行拼接,这样也可以实现!
回复

使用道具 举报

w123aw 发表于 2018-7-29 15:42:11
其他还没看,但是看到这里就不对了,local[1]里面的值至少为2才可以
conf.setMaster("local[1]");
应该改为
conf.setMaster("local[2]");



回复

使用道具 举报

rsgg03 发表于 2018-7-29 15:52:02
在foreach中,一般就是拿到一条数据处理一条,如果后面处理可能丢失。

详解如下:
################################

foreach中,传入一个function,这个函数的传入参数就是每个partition中,每次的foreach得到的一个rdd的kv实例,也就是具体的内容,

这种处理你并不知道这个iterator的foreach什么时候结果,只能是foreach的过程中,你得到一条数据,就处理一条数据.

由下面的红色部分可以看出,foreach操作是直接调用了partition中数据的foreach操作:



def foreach(f: T => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}



示例说明:

val list = new ArrayBuffer()

Rdd.foreach(record => {

  list += record

  If (list.size >= 10000) {

    list.flush

  }

})

上面这段示例代码中,如果会存在一个问题,迭代的最后,list的结果可能还没有达到10000条,这个时候,

你在内部的处理的flush部分就不会执行,也就是迭代的最后如果没有达到10000的数据就会丢失.

所以在foreach中,一般就是拿到一条数据进行下处理Rdd.foreach(record => {record._1 == a return})

回复

使用道具 举报

634320089 发表于 2018-7-31 11:50:16
上面已经解释过了,foreach里的变量带不出来的,除非用map,将结果作为rdd返回,你直接说要实现什么样的功能吧,如果是为了存到map,根据key拿到value,用join来做很简单
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条