分享

使用spark实现max/min/mean/topN等经典mapreduce问题

Oner 2016-12-8 13:15:06 发表于 总结型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 47458
本帖最后由 Oner 于 2016-12-8 13:43 编辑
问题导读:
1.  使用spark如何实现最大值最小值问题?
2.  使用spark如何实现平均值问题?
3.  使用spark如何实现topN问题?



摘要

Spark是一个Apache项目,它被标榜为“快如闪电的集群计算”。它拥有一个繁荣的开源社区,并且是目前最活跃的Apache项目。Spark提供了一个更快、更通用的数据处理平台。和Hadoop相比,Spark可以让你的程序在内存中运行时速度提升100倍,或者在磁盘上运行时速度提升10倍。同时spark也让传统的map reduce job开发变得更加简单快捷。本文将简单介绍几个经典hadoop的mr按理用spark实现,来让大家熟悉spark的开发。


最大值最小值

求最大值最小值一直是Hadoop的经典案例,我们用Spark来实现一下,借此感受一下spark中mr的思想和实现方式。话不多说直接上code:
[mw_shl_code=scala,true]@Test  def testMaxMin: Unit = {
    val sconf = new SparkConf().setAppName("test")
    val sc = new SparkContext(sconf)
    //初始化测试数据    val data = sc.parallelize(Array(10,7,3,4,5,6,7,8,1001,6,2))
    //方法一    val res = data.map(x => ("key", x)).groupByKey().map(x => {
      var min = Integer.MAX_VALUE      var max = Integer.MIN_VALUE      for(num <- x._2){
        if(num>max){
          max = num
        }
        if(num<min){
          min = num
        }
      }
      (max,min)
    }).collect.foreach(x => {
      println("max\t"+x._1)
      println("min\t"+x._2)
    })

    //方法二,下面用一个比较鸡贼的方式求最大最小值    val max = data.reduce((a,b) => Math.max(a,b))
    val min = data.reduce((a,b) => Math.min(a,b))
    println("max : " + max)
    println("min : " + min)
    sc.stop
  }[/mw_shl_code]
预期结果:
max: 1001
min: 2
思路和hadoop中的mr类似,设定一个key,value为需要求最大与最小值的集合,然后再groupBykey聚合在一起处理。第二个方法就更简单,性能也更好。


平均值问题

求每个key对应的平均值是常见的案例,在spark中处理类似问题常常会用到combineByKey这个函数,详细介绍请google一下用法,下面看代码:

[mw_shl_code=scala,true]@Test
  def testAvg(): Unit ={
    val sconf = new SparkConf().setAppName("test")
    val sc = new SparkContext(sconf)
    //初始化测试数据    val foo = sc.parallelize(List(Tuple2("a", 1), Tuple2("a", 3), Tuple2("b", 2), Tuple2("b", 8)));
    //这里需要用到combineByKey这个函数,需要了解的请google    val results=foo.combineByKey(
      (v)=>(v,1),
      (acc:(Int,Int),v) =>(acc._1+v,acc._2+1),
      (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
    ).map{case(key,value)=>(key,value._1/value._2.toDouble)}
    results.collect().foreach(println)
  }[/mw_shl_code]
我们让每个partiton先求出单个partition内各个key对应的所有整数的和 sum以及个数count,然后返回一个pair(sum, count)在shuffle后累加各个key对应的所有sum和count,再相除得到均值.


TopN问题

Top n问题同样也是hadoop种体现mr思想的经典案例,那么在spark中如何方便快捷的解决呢:
[mw_shl_code=scala,true]@Test  def testTopN(): Unit ={
    val sconf = new SparkConf().setAppName("test")
    val sc = new SparkContext(sconf)
    //初始话测试数据    val foo = sc.parallelize(Array(
      ("a", 1),
      ("a", 2),
      ("a", 3),
      ("b", 3),
      ("b", 1),
      ("a", 4),
      ("b", 4),
      ("b", 2)
    ))
    //这里测试,取top 2。    val groupsSort=foo.groupByKey().map(tu=>{
      val key=tu._1
      val values=tu._2
      val sortValues=values.toList.sortWith(_>_).take(2)
      (key,sortValues)
    })
    //转换格式进行print    val flattenedTopNPerGroup =
      groupsSort.flatMap({case (key, numbers) => numbers.map(key -> _)})
    flattenedTopNPerGroup.foreach((value: Any) => {
      println(value)
    })
    sc.stop

  }[/mw_shl_code]

思路很简单,把数据groupBykey以后按key形成分组然后取每个分组最大的2个。预期结果:
(a,4)
(a,3)
(b,4)
(b,3)

以上简单介绍了一下hadoop中常见的3个案例在spark中的实现。如果读者们已经接触过或者写过一些hadoop的mapreduce job,那么会不会觉得在spark中写起来方便快捷很多呢。
更多spark经典案例介绍期待下回分解。。。


来源:微信MaxLeap_yidongyanfa
作者:谭杨

已有(1)人评论

跳转到指定楼层
w517424787 发表于 2016-12-10 16:05:17
楼主写的还是很不错的!
1、求最大值最小值其实直接可以用 rdd.max() , rdd.min()函数来读取
2、求每个key的平均值用combineByKey(),感觉看的好迷糊,到不如用reduceByKey()来实现:
   
    val rdd = sc.parallelize(Array(("a", 3), ("b", 4), ("a", 1), ("a", 2), ("b", 2), ("b", 3)))
    //先将rdd(key,value)转换成rdd(key,(1,value)),每个key默认计数为1
    rdd.map(line => (line._1, (1, line._2)))
      .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)) // 在对key进行统计,统计数量和总数
      .map(line => (line._1, line._2._2 / line._2._1)) // 对每个key求平均值
      .collect().foreach(println)
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条