分享

spark中数组共享的问题

星语心愿 发表于 2017-12-19 15:19:21 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 8823
在spark中,设置多分区,多分区里的数据并行处理,并且需要并行地访问共享数组和修改数组里的数据。在spark中这个共享数组该怎么设置呢?

已有(2)人评论

跳转到指定楼层
desehawk 发表于 2017-12-19 16:48:33
其实是广播变量吧。你使用的是哪个版本?
首先引入包,然后直接定义变量,定义变量后其实跟普通变量就一样了,赋值和取值。
比如下面代码
[mw_shl_code=scala,true]package com.Streaming

import java.util

import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast

/**
  * Created by lxh on 2016/6/30.
  */
object BroadcastAccumulatorStreaming {

  /**
    * 声明一个广播和累加器!
    */
  private var broadcastList:Broadcast[List[String]]  = _
  private var accumulator:Accumulator[Int] = _

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
    val sc = new SparkContext(sparkConf)

    /**
      * duration是ms
      */
    val ssc = new StreamingContext(sc,Duration(2000))
   // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
    broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
    accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")

    /**
      * 获取数据!
      */
    val lines = ssc.socketTextStream("localhost",9999)

    /**
      * 拿到数据后 怎么处理!
      *
      * 1.flatmap把行分割成词。
      * 2.map把词变成tuple(word,1)
      * 3.reducebykey累加value
      * (4.sortBykey排名)
      * 4.进行过滤。 value是否在累加器中。
      * 5.打印显示。
      */
    val words = lines.flatMap(line => line.split(" "))

    val wordpair = words.map(word => (word,1))

    wordpair.filter(record => {broadcastList.value.contains(record._1)})


    val pair = wordpair.reduceByKey(_+_)

    /**
      *这步为什么要先foreachRDD?
      *
      * 因为这个pair 是PairDStream<String, Integer>
      *
      *   进行foreachRDD是为了?
      *
      */
/*    pair.foreachRDD(rdd => {
      rdd.filter(record => {

        if (broadcastList.value.contains(record._1)) {
          accumulator.add(1)
          return true
        } else {
          return false
        }

      })

    })*/

    val filtedpair = pair.filter(record => {
        if (broadcastList.value.contains(record._1)) {
          accumulator.add(record._2)
          true
        } else {
          false
        }

     }).print

    println("累加器的值"+accumulator.value)

   // pair.filter(record => {broadcastList.value.contains(record._1)})

   /* val keypair = pair.map(pair => (pair._2,pair._1))*/

    /**
      * 如果DStream自己没有某个算子操作。就通过转化transform!
      */
   /* keypair.transform(rdd => {
      rdd.sortByKey(false)//TODO
    })*/
    pair.print()
    ssc.start()
    ssc.awaitTermination()

  }

}[/mw_shl_code]

其中下面是引入需要的包
import org.apache.spark.{Accumulable, Accumulator.....}
import org.apache.spark.broadcast.Broadcast
接着定义变量:
[mw_shl_code=scala,true]private var broadcastList:Broadcast[List[String]]  = _
  private var accumulator:Accumulator[Int] = _[/mw_shl_code]
在接着就是赋值:
[mw_shl_code=scala,true] broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
    accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")[/mw_shl_code]
接着就是取值
[mw_shl_code=scala,true]wordpair.filter(record => {broadcastList.value.contains(record._1)})[/mw_shl_code]
其中broadcastList.value为值。
这就大概就是他的使用过程

回复

使用道具 举报

desehawk 发表于 2017-12-19 17:12:14
推荐参考:
spark分布式编程之全局变量专题【共享变量】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=19652
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条