分享

spark小知识总结


问题导读

1.如何创建rdd?
2.什么是pair rdd?
3.如何获取数组指定列?
4.foreachRDD的作用是什么?





本文来自对about云问答帖的总结,分享给大家

1.如何创建rdd
RDD是一个容错的、可以被并行操作的元素集合。创建一个RDD有两个方法:在你的驱动程序中并行化一个已经存在的集合;从外部存储系统中引用一个数据集。Transformation用于对RDD的创建,RDD只能使用Transformation创建,同时还提供大量操作方法,包括map,filter,groupBy,join等,RDD利用这些操作生成新的RDD.还有parallelize也可创建rdd.
下面通过代码来实现

/*
     * 1、从scala集合中创建RDD
     * 计算:1+2+3+...+100
     */
[mw_shl_code=scala,true]    val nums = 1 to 100
    val rdd = sc.parallelize(nums)
    val sum = rdd.reduce(_+_)
    println("sum:"+sum)[/mw_shl_code]

    /*
     * 2、从本地文件系统创建RDD
     * 计算 people.json 文件中字符总长度
     */
[mw_shl_code=scala,true]    val rows = sc.textFile("file:///home/hadoop/../resources/people.json")
    val length = rows.map(row=>row.length()).reduce(_+_)
    println("total chars length:"+length)[/mw_shl_code]

     /*
     * 3、从HDFS创建RDD(lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at textFile at)
     *  计算 hive_test 文件中字符长度
     */
[mw_shl_code=scala,true]    val lines = sc.textFile("hdfs://192.168.1.10:9000/user/hive/warehouse/hive_test")
    println( lines.map(row=>row.length()).reduce(_+_))
[/mw_shl_code]


2.如何转换为pair rdd
可以通过map的匿名函数
map(x=>(x,1))
将x,转换为key ,value。一般用于将rdd转换pair rdd。转换之后一般用于统计信息,这时候用到reduceBykey。

3.如何获取数组中元素的值

读取数据后形成rdd,然后获取其中一个元素。如下面
20170206|17825793|2887524685|0|www.aboutyun.com

该如何获取域名,可以通过take获取其中的元素。如下面,分割字符后,形成数组获取对应元素

object WordCount {
   def main(args: Array[String]) {

     val conf = new SparkConf()
     val sc = new SparkContext(conf)
     val inFile = sc.textFile("mylog")
val pairs=inFile.map(_.split("|").take(4)).map(x=>(x,1))     
val wordCounts = pairs.reduceByKey(_ + _)
     sc.stop()
   }
}
4.SparkStreaming之foreachRDD

DStream中的foreachRDD是一个非常强大函数,它允许你把数据发送给外部系统。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。所以要掌握它,对它要有深入了解。

其它需要注意的地方:
(1)输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD

actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作

dstream.foreachRDD(),但是没有任何RDD action操作在dstream.foreachRDD()里面,那么什么也不会执行。系统

仅仅会接收输入,然后丢弃它们。

(2)默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。



实验1:把SparkStreaming的内部数据存入Mysql

(1)在mysql中创建一个表用于存放数据
[mw_shl_code=bash,true]mysql> create database sparkStreaming;  
Query OK, 1 row affected (0.01 sec)  
   
mysql> use sparkStreaming;  
Database changed  
mysql> show tables;  
Empty set (0.01 sec)  
   
mysql> create table searchKeyWord(insert_time date,keyword varchar(30),search_count integer);  
Query OK, 0 rows affected (0.05 sec)  [/mw_shl_code]

(2)用scala编写连接Mysql的连接池
[mw_shl_code=scala,true]import java.sql.Connection  
import java.sql.PreparedStatement  
import java.sql.ResultSet  
  
import org.apache.commons.dbcp.BasicDataSource  
import org.apache.log4j.Logger  
  
object scalaConnectPool {  
  val  log = Logger.getLogger(scalaConnectPool.this.getClass)  
  var ds:BasicDataSource = null  
  def getDataSource={  
    if(ds == null){  
      ds = new BasicDataSource()  
      ds.setUsername("root")  
      ds.setPassword("iamhaoren")  
      ds.setUrl("jdbc:mysql://localhost:3306/sparkStreaming")  
      ds.setDriverClassName("com.mysql.jdbc.Driver")  
      ds.setInitialSize(20)  
      ds.setMaxActive(100)  
      ds.setMinIdle(50)  
      ds.setMaxIdle(100)  
      ds.setMaxWait(1000)  
      ds.setMinEvictableIdleTimeMillis(5*60*1000)  
      ds.setTimeBetweenEvictionRunsMillis(10*60*1000)  
      ds.setTestOnBorrow(true)  
    }  
    ds  
  }  
  
  def getConnection : Connection= {  
    var connect:Connection = null  
    try {  
      if(ds != null){  
        connect = ds.getConnection  
      }else{  
        connect = getDataSource.getConnection  
      }  
    }  
    connect  
  }  
  
  def shutDownDataSource: Unit=if (ds !=null){ds.close()}  
  
  def closeConnection(rs:ResultSet,ps:PreparedStatement,connect:Connection): Unit ={  
    if(rs != null){rs.close}  
    if(ps != null){ps.close}  
    if(connect != null){connect.close}  
  }  
  
}  [/mw_shl_code]

(3)编写SparkStreaming程序

[mw_shl_code=scala,true]import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
  
object dataToMySQL {  
  def main(args: Array[String]) {  
    val conf = new SparkConf().setAppName("use the foreachRDD write data to mysql").setMaster("local[2]")  
    val ssc = new StreamingContext(conf,Seconds(10))  
  
    val streamData = ssc.socketTextStream("master",9999)  
    val wordCount = streamData.map(line =>(line.split(",")(0),1)).reduceByKeyAndWindow(_+_,Seconds(60))  
    val hottestWord = wordCount.transform(itemRDD => {  
      val top3 = itemRDD.map(pair => (pair._2, pair._1))  
        .sortByKey(false).map(pair => (pair._2, pair._1)).take(3)  
      ssc.sparkContext.makeRDD(top3)  
    })  
  
  
    hottestWord.foreachRDD( rdd =>{  
      rdd.foreachPartition(partitionOfRecords =>{  
        val connect = scalaConnectPool.getConnection  
        connect.setAutoCommit(false)  
        val stmt = connect.createStatement()  
        partitionOfRecords.foreach(record =>{  
          stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'"+record._1+"','"+record._2+"')")  
        })  
        stmt.executeBatch()  
        connect.commit()  
      }  
      )  
    }  
    )  
  
  
  
    ssc.start()  
    ssc.awaitTermination()  
    ssc.stop()  
  }  
}  [/mw_shl_code]

(4)编写一个socket端的数据模拟器

[mw_shl_code=scala,true]
import java.io.{PrintWriter}  
import java.net.ServerSocket  
import scala.io.Source  
  
  
object streamingSimulation {  
  def index(n: Int) = scala.util.Random.nextInt(n)  
  
  def main(args: Array[String]) {  
    // 调用该模拟器需要三个参数,分为为文件路径、端口号和间隔时间(单位:毫秒)  
    if (args.length != 3) {  
      System.err.println("Usage: <filename> <port> <millisecond>")  
      System.exit(1)  
    }  
  
    // 获取指定文件总的行数  
    val filename = args(0)  
    val lines = Source.fromFile(filename).getLines.toList  
    val filerow = lines.length  
  
    // 指定监听某端口,当外部程序请求时建立连接  
    val listener = new ServerSocket(args(1).toInt)  
  
    while (true) {  
      val socket = listener.accept()  
      new Thread() {  
        override def run = {  
          println("Got client connected from: " + socket.getInetAddress)  
          val out = new PrintWriter(socket.getOutputStream(), true)  
          while (true) {  
            Thread.sleep(args(2).toLong)  
            // 当该端口接受请求时,随机获取某行数据发送给对方  
            val content = lines(index(filerow))  
            println("-------------------------------------------")  
            println(s"Time: ${System.currentTimeMillis()}")  
            println("-------------------------------------------")  
            println(content)  
            out.write(content + '\n')  
            out.flush()  
          }  
          socket.close()  
        }  
      }.start()  
    }  
  }  
  
}  [/mw_shl_code]

实验数据为:
spark
Streaming
better
than
storm
you
need
it
yes
do
it

(5)实验启动

在客户端启动数据流模拟

对socket端的数据模拟器程序进行 jar文件的打包,并放入到集群目录中

启动程序如下:

[mw_shl_code=scala,true]java -cp DataSimulation.jar streamingSimulation /root/application/upload/Information 9999 1000  
[/mw_shl_code]

启动SparkStreaming程序

结果如下:
spark streaming.png








已有(10)人评论

跳转到指定楼层
SuperDove 发表于 2017-2-6 17:05:32
版主精神,过年来了就开始忙活起来了,顶一个
回复

使用道具 举报

美丽天空 发表于 2017-2-7 09:41:07
好文,感谢楼主分享!!!
回复

使用道具 举报

fly2015 发表于 2017-2-7 11:34:40
good!  代码写的很清楚!
回复

使用道具 举报

xingoo 发表于 2017-2-9 08:50:34
前面很容易理解,spark streaming还不太熟,等学习下再回来看!感谢分享
回复

使用道具 举报

spftoto 发表于 2018-7-5 18:04:55
看着,看着,就不懂了,哎。
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条