分享

关于spark中map、reduce的一点疑问

本帖最后由 nike1972 于 2016-8-9 21:38 编辑

现在的需求是这样的,假设我在外部声明了一个字段,在map中对这个字段进行了赋值,然后在reduce中对这个字段进行取值操作。我以wordcount为例,
[mw_shl_code=scala,true]object WordCount {
var str:String=null
   def main(args: Array[String]) {
     if (args.length < 1) {
       System.err.println("Usage: <file>")
      System.exit(1)
     }

     val conf = new SparkConf()
    val sc = new SparkContext(conf)
     val line = sc.textFile(args(0))

val counts= line.flatMap(_.split(" ")).map(word=>{
str="welcome to spark"
(word, 1)
})
println(str)
val finalRdd= counts.reduceByKey((x,y)=>{
x+y
println(str)
}).collect().foreach(println)

     sc.stop()
  }
}[/mw_shl_code]
这是一个简单的wordcount代码,我只是声明了一个全局变量str,并且在map中对str进行了赋值操作,我想知道为什么map和reduce之间的那个println语句打印出来的str是null,是不是因为我没有进行action操作,map语句还没有执行的原因么?我希望将map中的str的赋值的内容传到reduce中去,在reduce中进行操作,请问有什么办法吗?

已有(8)人评论

跳转到指定楼层
langke93 发表于 2016-8-9 22:37:35
这是分布式编程跟传统编程不一样的。
全局变量也是不管用的。
因为map函数和reduce函数可能会在不同的机器上执行。
可以尝试使用context
回复

使用道具 举报

nike1972 发表于 2016-8-9 22:50:02
langke93 发表于 2016-8-9 22:37
这是分布式编程跟传统编程不一样的。
全局变量也是不管用的。
因为map函数和reduce函数可能会在不同的机 ...

能不能说的具体一点?刚接触spark,很多不是太懂,我需要有一个字符串的数据能够在map中进行赋值,然后在reduce中对赋值的结果进行操作。用context如何进行?
回复

使用道具 举报

einhep 发表于 2016-8-10 06:12:10
nike1972 发表于 2016-8-9 22:50
能不能说的具体一点?刚接触spark,很多不是太懂,我需要有一个字符串的数据能够在map中进行赋值,然后在 ...

context应该是hadoop的, Spark 支持 2 种类型的共享变量:广播变量(broadcast variables),用来在所有节点的内存中缓存一个值;累加器(accumulators),仅仅只能执行“添加(added)”操作,例如:记数器(counters)和求和(sums)。
楼主可以尝试上面的。
网上资料也不少
推荐参考
Spark中文手册1-编程指南
http://www.aboutyun.com/forum.php?mod=viewthread&tid=11413



回复

使用道具 举报

nike1972 发表于 2016-8-10 09:16:13
einhep 发表于 2016-8-10 06:12
context应该是hadoop的, Spark 支持 2 种类型的共享变量:广播变量(broadcast variables),用来在所有节 ...

我也想到了使用广播变量,但是我现在要对map中得到的值进行传递,在map中使用sc的时候总会造成task 没有序列化这个异常,这个问题如何解决呢?
回复

使用道具 举报

arsenduan 发表于 2016-8-10 13:23:38
nike1972 发表于 2016-8-10 09:16
我也想到了使用广播变量,但是我现在要对map中得到的值进行传递,在map中使用sc的时候总会造成task 没有 ...

具体是什么异常,贴出来看下
回复

使用道具 举报

nike1972 发表于 2016-8-10 13:46:22
公司没有外网,所以只能拍照片,异常提示sparkcontext没有序列化,原因是在map中调用了sc的广播变量。
111.jpg
回复

使用道具 举报

nextuser 发表于 2016-8-10 15:36:46
nike1972 发表于 2016-8-10 13:46
公司没有外网,所以只能拍照片,异常提示sparkcontext没有序列化,原因是在map中调用了sc的广播变量。

代码尝试加上 @transient


@transient
val conf = new SparkConf();
@transient
val sc = new SparkContext(conf);

回复

使用道具 举报

nike1972 发表于 2016-8-10 22:33:41
nextuser 发表于 2016-8-10 15:36
代码尝试加上 @transient

试过了,加上@transient还是会出现这个异常,原因就在于在map中调用了sparkcontext方法,我已经用groupbykey和map代替了reducebykey算子。。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条