分享

spark RDD分区是否可以指定分区



spark可以指定分区如下面:通过partitionBy实现
val fileRDD = sc.textFile("/opt/tarballs/spark_kafka/beifengspark/src/main/scala/2015082818")
      .filter(line=>line.length>0)
      .map{
        line =>
          val arr = line.split("\t")
          val date = arr(17).substring(0,10)
          val guid = arr(5)
          val url = arr(1)
          val uid = arr(18)
          (uid,(guid,url)) //key-value:tuple2
      }.partitionBy(new HashPartitioner(10)) //采用了hashcode分片方式,分成了10份,十个分区,每个分区10分



本帖被以下淘专辑推荐:

已有(2)人评论

跳转到指定楼层
desehawk 发表于 2018-6-5 21:20:20
更多详细内容


数据分区:

在分布式集群里,网络通信的代价很大,减少网络传输可以极大提升性能。

mapreduce框架的性能开支主要在io和网络传输,io因为要大量读写文件,它是不可避免的,但是网络传输是可以避免的,把大文件压缩变小文件,从而减少网络传输,但是增加了cpu的计算负载。

spark里面io也是不可避免的,但是网络传输spark里面进行了优化:

spark把rdd进行分区(分片),放在集群上并行计算。

同一个rdd分片100个,10个节点,平均一个节点10个分区

当进行sum型的计算的时候,先进行每个分区的sum,然后把sum值shuffle传输到主程序进行全局sum,所以进行sum型计算对网络传输非常小。

但对于进行join型的计算的时候,需要把数据本身进行shuffle,网络开销很大。

spark是如何优化这个问题的呢?

spark把key-value rdd通过key的hashcode进行分区,而且保证相同的key存储在同一个节点上,这样对改rdd进行key聚合时,就不需要shuffle过程

我们进行mapreduce计算的时候为什么要尽兴shuffle?,就是说mapreduce里面网络传输主要在shuffle阶段,shuffle的根本原因是相同的key存在不同的节点上,按key进行聚合的时候不得不进行shuffle。shuffle是非常影响网络的,它要把所有的数据混在一起走网络,然后它才能把相同的key走到一起。要尽兴shuffle是存储决定的。

spark从这个教训中得到启发,spark会把key进行分区,也就是key的hashcode进行分区,相同的key,hashcode肯定是一样的,所以它进行分区的时候100t的数据分成10分,每部分10个t,它能确保相同的key肯定在一个分区里面,而且它能保证存储的时候相同的key能够存在同一个节点上。

比如一个rdd分成了100份,集群有10个节点,所以每个节点存10份,每一分称为每个分区,spark能保证相同的key存在同一个节点上,实际上相同的key存在同一个分区。

key的分布不均决定了有的分区大有的分区小。没法分区保证完全相等,但它会保证在一个接近的范围。

所以mapreduce里面做的某些工作里边,spark就不需要shuffle了,spark解决网络传输这块的根本原理就是这个。

进行join的时候是两个表,不可能把两个表都分区好,通常情况下是把用的频繁的大表事先进行分区,小表进行关联它的时候小表进行shuffle过程。

大表不需要shuffle。

模版是:

val userData = sc.sequenceFile[UserID,UserInfo]("hdfs://...")

.partitionBy(new HashPartition(100))//构造100个分区

.persist()

从分区中获益的操作:cogroup(), groupwith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),cobimeByKey(),lookup()

所有基于key的操作都会获益

对于诸如cogroup()和join()这样的二元操作,预先进行数据分区会让其中至少一个rdd(使用已知分区器的那个rdd)不发生数据shuffle,如果两个rdd使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个rdd是通过mapvalues()从另一个rdd中创建出来的,这两个rdd就会拥有相同的key和分区方式),或者其中rdd还没有被计算出来,那么跨界点的shuffle(数据混洗)不会发生了。

mapreduce一般要求本地网卡达到20兆!即便进行了压缩!

代码:

[mw_shl_code=scala,true]import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition
import org.apache.hadoop.mapred.lib
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.HashPartitioner
/**
  * Created by zengxiaosen on 16/9/23.
  */
object PartitionVisitCount {

  /*
  大表小表关联
   */
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("useUDF").setMaster("local")
    val ss = SparkSession.builder().config(sparkConf).getOrCreate()
    val sc = ss.sparkContext

    val fileRDD = sc.textFile("/opt/tarballs/spark_kafka/beifengspark/src/main/scala/2015082818")
      .filter(line=>line.length>0)
      .map{
        line =>
          val arr = line.split("\t")
          val date = arr(17).substring(0,10)
          val guid = arr(5)
          val url = arr(1)
          val uid = arr(18)
          (uid,(guid,url)) //key-value:tuple2
      }.partitionBy(new HashPartitioner(10)) //采用了hashcode分片方式,分成了10份,十个分区,每个分区10分
      /*
      相同的key在同一个分区,在进行任务调用时候,大表不需要任何shuffle
      只需要shuffle小表
       */
      .persist(StorageLevel.DISK_ONLY)


    /*
    parallelize有两个参数,第一个是他的list,第二个是分区数
    分区数可以不给,不给的情况下默认就是它的核数
     */
    //比如里面存的是我的用户id
    val rdd = sc.parallelize(List(1,2,3,4,5,6,7),10)
      .map(i => (i+"", i+"str"))

    fileRDD.join(rdd).foreach(println)
    /*
    如果fileRDD后面还会关联好多个其他的rdd1,rdd2。。。rddn
    就要先把大的fileRDD进行分区
    这样优化了网络传输

     */


  }

}[/mw_shl_code]








回复

使用道具 举报

hahaxixi 发表于 2018-6-6 10:16:09
改版了,有点不习惯了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条