分享

键值对rdd求value最大值

一颗银杏树 发表于 2017-5-15 21:09:13 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 12 25984
源数据:(“人名”,年龄) val rdd = sc.parallelize(Array[(String,Int)](("Michael",29),("Andy",30),("Justin",19), ("Tom",10), ("Tonny",11), ("Bob",12)))


目的:找出年龄最大的人名

我的方法:
scala> val result = rdd.map(x => x._2).max   //先找到最大的年龄
result: Int = 30
scala> rdd.filter(x => x._2 == 30).collect    // 再filter
res14: Array[(String, Int)] = Array((Andy,30))


虽然是得到结果了,但是这里一直想用reduce来替换filter来完成这个目的,但是一直没写出这个函数来,哪位大神帮帮我写一个,谢谢!!

已有(12)人评论

跳转到指定楼层
tntzbzc 发表于 2017-5-15 21:51:14
本帖最后由 tntzbzc 于 2017-5-15 21:55 编辑

reduce没有想出来。不过我觉得一个语句可以完成。不知道能不能嵌套
[mw_shl_code=scala,true]rdd.filter(x => x._2 ==  rdd.map(x => x._2).max).collect [/mw_shl_code]
回复

使用道具 举报

一颗银杏树 发表于 2017-5-15 21:56:04
tntzbzc 发表于 2017-5-15 21:51
reduce没有想出来。不过我觉得一个语句可以完成。不知道能不能嵌套
[mw_shl_code=scala,true]rdd.filter(x ...

这个语句执行不了,报错了
回复

使用道具 举报

xiaobaiyang 发表于 2017-5-16 14:57:03
tntzbzc 发表于 2017-5-15 21:51
reduce没有想出来。不过我觉得一个语句可以完成。不知道能不能嵌套
[mw_shl_code=scala,true]rdd.filter(x ...

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.

回复

使用道具 举报

xiaobaiyang 发表于 2017-5-16 15:02:20
reduce:reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止,所以我认为reduce不适合你说的这种情景
回复

使用道具 举报

一颗银杏树 发表于 2017-5-16 15:31:28
xiaobaiyang 发表于 2017-5-16 15:02
reduce:reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递 ...

val result = rdd.reduce((x,y) => ("result",(if(x._2 < y._2) y._2 else x._2)))  
这条语句可以找到最大值为30,但是这里的key我用了个字符串“result“补进去了,我想一次找出最大值30和30对应的人名。
回复

使用道具 举报

xiaobaiyang 发表于 2017-5-17 08:58:07
一颗银杏树 发表于 2017-5-16 15:31
val result = rdd.reduce((x,y) => ("result",(if(x._2 < y._2) y._2 else x._2)))  
这条语句可以找到 ...

受教了
回复

使用道具 举报

xiaobaiyang 发表于 2017-5-17 09:03:01
一颗银杏树 发表于 2017-5-16 15:31
val result = rdd.reduce((x,y) => ("result",(if(x._2 < y._2) y._2 else x._2)))  
这条语句可以找到 ...

scala> val result = rdd.reduce((x,y) => (if(x._2 < y._2) y else x))
result: (String, Int) = (Andy,30)

我感觉这样是不是更好些,既可以找到最大的年龄值,又可以找到对应的人名

回复

使用道具 举报

w517424787 发表于 2017-5-17 11:40:06
scala> rdd.map(x=>(x._2,x)).sortByKey(false).map(x=>x._2).take(1)
res6: Array[(String, Int)] = Array((Andy,30))
回复

使用道具 举报

ledasion 发表于 2017-5-17 13:33:28
sortBy 按照年龄排序
rdd.sortBy(x=> x._2)).take(1)
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条