分享

Spark Streaming编程指南

本帖最后由 坎蒂丝_Swan 于 2014-12-29 16:07 编辑

问题导读

1.创建Dstream的时候有什么要注意的地方?
2.对于调优,可以从哪些方面考虑?







Overview
Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。
它可以接受来自Kafka, Flume, Twitter, ZeroMQ和TCP Socket的数据源,使用简单的api函数比如 map, reduce, join, window等操作,还可以直接使用内置的机器学习算法、图算法包来处理数据。

streaming-arch.png

它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给Spark Engine处理最后生成该批次的结果。

streaming-flow.png

它支持的数据流叫Dstream,直接支持Kafka、Flume的数据源。Dstream是一种连续的RDDs,下面是一个例子帮助大家理解Dstream。

A Quick Example
  1. // 创建StreamingContext,1秒一个批次
  2. val ssc = new StreamingContext(sparkConf, Seconds(1));
  3. // 获得一个DStream负责连接 监听端口:地址
  4. val lines = ssc.socketTextStream(serverIP, serverPort);
  5. // 对每一行数据执行Split操作
  6. val words = lines.flatMap(_.split(" "));
  7. // 统计word的数量
  8. val pairs = words.map(word => (word, 1));
  9. val wordCounts = pairs.reduceByKey(_ + _);
  10. // 输出结果
  11. wordCounts.print();
  12. ssc.start();             // 开始
  13. ssc.awaitTermination();  // 计算完毕退出
复制代码
具体的代码可以访问这个页面:
https://github.com/apache/incubator-spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
如果已经装好Spark的朋友,我们可以通过下面的例子试试。

首先,启动Netcat,这个工具在Unix-like的系统都存在,是个简易的数据服务器。

使用下面这句命令来启动Netcat:
  1. $ nc -lk 9999
复制代码

接着启动example
  1. $ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
复制代码

在Netcat这端输入hello world,看Spark这边的
  1. # TERMINAL 1:
  2. # Running Netcat
  3. $ nc -lk 9999
  4. hello world
  5. ...
  6. # TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
  7. $ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
  8. ...
  9. -------------------------------------------
  10. Time: 1357008430000 ms
  11. -------------------------------------------
  12. (hello,1)
  13. (world,1)
  14. ...
复制代码

Basics
下面这块是如何编写代码的啦,哇咔咔!
首先我们要在SBT或者Maven工程添加以下信息:

  1. groupId = org.apache.spark
  2. artifactId = spark-streaming_2.10
  3. version = 0.9.0-incubating
复制代码
  1. //需要使用一下数据源的,还要添加相应的依赖
  2. Source   Artifact
  3. Kafka   spark-streaming-kafka_2.10
  4. Flume      spark-streaming-flume_2.10
  5. Twitter      spark-streaming-twitter_2.10
  6. ZeroMQ   spark-streaming-zeromq_2.10
  7. MQTT      spark-streaming-mqtt_2.10
复制代码

接着就是实例化
  1. new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
复制代码

这是之前的例子对DStream的操作。
streaming-dstream-ops.png


Input Sources
除了sockets之外,我们还可以这样创建Dstream
  1. streamingContext.fileStream(dataDirectory)
复制代码

这里有3个要点:
(1)dataDirectory下的文件格式都是一样
(2)在这个目录下创建文件都是通过移动或者重命名的方式创建的
(3)一旦文件进去之后就不能再改变

假设我们要创建一个Kafka的Dstream。
  1. import org.apache.spark.streaming.kafka._
  2. KafkaUtils.createStream(streamingContext, kafkaParams, ...)
复制代码

如果我们需要自定义流的receiver,可以查看https://spark.incubator.apache.o ... stom-receivers.html

Operations
对于Dstream,我们可以进行两种操作,transformations 和 output

Transformations
  1. Transformation                             Meaning
  2. map(func)          对每一个元素执行func方法
  3. flatMap(func)         类似map函数,但是可以map到0+个输出
  4. filter(func)          过滤
  5. repartition(numPartitions)        增加分区,提高并行度     
  6. union(otherStream)      合并两个流
  7. count()                     统计元素的个数
  8. reduce(func)         对RDDs里面的元素进行聚合操作,2个输入参数,1个输出参数
  9. countByValue()        针对类型统计,当一个Dstream的元素的类型是K的时候,调用它会返回一个新的Dstream,包含<K,Long>键              值对,Long是每个K出现的频率。
  10. reduceByKey(func, [numTasks])  对于一个(K, V)类型的Dstream,为每个key,执行func函数,默认是local是2个线程,cluster是8个线程,也               可以指定numTasks
  11. join(otherStream, [numTasks])   把(K, V)和(K, W)的Dstream连接成一个(K, (V, W))的新Dstream
  12. cogroup(otherStream, [numTasks])  把(K, V)和(K, W)的Dstream连接成一个(K, Seq[V], Seq[W])的新Dstream
  13. transform(func)          转换操作,把原来的RDD通过func转换成一个新的RDD
  14. updateStateByKey(func)             针对key使用func来更新状态和值,可以将state该为任何值
复制代码
UpdateStateByKey Operation
使用这个操作,我们是希望保存它状态的信息,然后持续的更新它,使用它有两个步骤:
(1)定义状态,这个状态可以是任意的数据类型
(2)定义状态更新函数,从前一个状态更改新的状态

下面展示一个例子:
  1. def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
  2.     val newCount = ...  // add the new values with the previous running count to get the new count
  3.     Some(newCount)
  4. }
复制代码

它可以用在包含(word, 1) 的Dstream当中,比如前面展示的example
  1. val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
复制代码

它会针对里面的每个word调用一下更新函数,newValues是最新的值,runningCount是之前的值。

Transform Operation
和transformWith一样,可以对一个Dstream进行RDD->RDD操作,比如我们要对Dstream流里的RDD和另外一个数据集进行join操作,但是Dstream的API没有直接暴露出来,我们就可以使用transform方法来进行这个操作,下面是例子:
  1. val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information
  2. val cleanedDStream = inputDStream.transform(rdd => {
  3.   rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  4.   ...
  5. })
复制代码
另外,我们也可以在里面使用机器学习算法和图算法。
Window Operations
streaming-dstream-window.png

先举个例子吧,比如前面的word count的例子,我们想要每隔10秒计算一下最近30秒的单词总数。
我们可以使用以下语句:
  1. // Reduce last 30 seconds of data, every 10 seconds
  2. val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
复制代码

这里面提到了windows的两个参数:
(1)window length:window的长度是30秒,最近30秒的数据
(2)slice interval:计算的时间间隔
通过这个例子,我们大概能够窗口的意思了,定期计算滑动的数据。

下面是window的一些操作函数,还是有点儿理解不了window的概念,Meaning就不翻译了,直接删掉
  1. Transformation                                                                              Meaning
  2. window(windowLength, slideInterval)     
  3. countByWindow(windowLength, slideInterval)     
  4. reduceByWindow(func, windowLength, slideInterval)     
  5. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])     
  6. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])   
  7. countByValueAndWindow(windowLength, slideInterval, [numTasks])
复制代码
Output Operations
  1. Output Operation                                      Meaning
  2. print()                                 打印到控制台
  3. foreachRDD(func)                        对Dstream里面的每个RDD执行func,保存到外部系统
  4. saveAsObjectFiles(prefix, [suffix])     保存流的内容为SequenceFile, 文件名 : "prefix-TIME_IN_MS[.suffix]".
  5. saveAsTextFiles(prefix, [suffix])       保存流的内容为文本文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".
  6. saveAsHadoopFiles(prefix, [suffix])     保存流的内容为hadoop文件, 文件名 : "prefix-TIME_IN_MS[.suffix]".
复制代码
Persistence
Dstream中的RDD也可以调用persist()方法保存在内存当中,但是基于window和state的操作,reduceByWindow,reduceByKeyAndWindow,updateStateByKey它们就是隐式的保存了,系统已经帮它自动保存了。
从网络接收的数据(such as, Kafka, Flume, sockets, etc.),默认是保存在两个节点来实现容错性,以序列化的方式保存在内存当中。
RDD Checkpointing
状态的操作是基于多个批次的数据的。它包括基于window的操作和updateStateByKey。因为状态的操作要依赖于上一个批次的数据,所以它要根据时间,不断累积元数据。为了清空数据,它支持周期性的检查点,通过把中间结果保存到hdfs上。因为检查操作会导致保存到hdfs上的开销,所以设置这个时间间隔,要很慎重。对于小批次的数据,比如一秒的,检查操作会大大降低吞吐量。但是检查的间隔太长,会导致任务变大。通常来说,5-10秒的检查间隔时间是比较合适的。

  1. ssc.checkpoint(hdfsPath)  //设置检查点的保存位置
  2. dstream.checkpoint(checkpointInterval)  //设置检查点间隔
复制代码
对于必须设置检查点的Dstream,比如通过updateStateByKey和reduceByKeyAndWindow创建的Dstream,默认设置是至少10秒。

Performance Tuning
对于调优,可以从两个方面考虑:
(1)利用集群资源,减少处理每个批次的数据的时间
(2)给每个批次的数据量的设定一个合适的大小

Level of Parallelism
像一些分布式的操作,比如reduceByKey和reduceByKeyAndWindow,默认的8个并发线程,可以通过对应的函数提高它的值,或者通过修改参数spark.default.parallelism来提高这个默认值。

Task Launching Overheads
通过进行的任务太多也不好,比如每秒50个,发送任务的负载就会变得很重要,很难实现压秒级的时延了,当然可以通过压缩来降低批次的大小。

Setting the Right Batch Size
要使流程序能在集群上稳定的运行,要使处理数据的速度跟上数据流入的速度。最好的方式计算这个批量的大小,我们首先设置batch size为5-10秒和一个很低的数据输入速度。确实系统能跟上数据的速度的时候,我们可以根据经验设置它的大小,通过查看日志看看Total delay的多长时间。如果delay的小于batch的,那么系统可以稳定,如果delay一直增加,说明系统的处理速度跟不上数据的输入速度。

24/7 Operation
Spark默认不会忘记元数据,比如生成的RDD,处理的stages,但是Spark Streaming是一个24/7的程序,它需要周期性的清理元数据,通过spark.cleaner.ttl来设置。比如我设置它为600,当超过10分钟的时候,Spark就会清楚所有元数据,然后持久化RDDs。但是这个属性要在SparkContext 创建之前设置。
但是这个值是和任何的window操作绑定。Spark会要求输入数据在过期之后必须持久化到内存当中,所以必须设置delay的值至少和最大的window操作一致,如果设置小了,就会报错。

Monitoring
除了Spark内置的监控能力,还可以StreamingListener这个接口来获取批处理的时间, 查询时延, 全部的端到端的试验。

Memory Tuning
Spark Stream默认的序列化方式是StorageLevel.MEMORY_ONLY_SER,而不是RDD的StorageLevel.MEMORY_ONLY
默认的,所有持久化的RDD都会通过被Spark的LRU算法剔除出内存,如果设置了spark.cleaner.ttl,就会周期性的清理,但是这个参数设置要很谨慎。一个更好的方法是设置spark.streaming.unpersist为true,这就让Spark来计算哪些RDD需要持久化,这样有利于提高GC的表现。
推荐使用concurrent mark-and-sweep GC,虽然这样会降低系统的吞吐量,但是这样有助于更稳定的进行批处理。

Fault-tolerance PropertiesFailure of a Worker Node

下面有两种失效的方式:
1.使用hdfs上的文件,因为hdfs是可靠的文件系统,所以不会有任何的数据失效。
2.如果数据来源是网络,比如Kafka和Flume,为了防止失效,默认是数据会保存到2个节点上,但是有一种可能性是接受数据的节点挂了,那么数据可能会丢失,因为它还没来得及把数据复制到另外一个节点。

Failure of the Driver Node
为了支持24/7不间断的处理,Spark支持驱动节点失效后,重新恢复计算。Spark Streaming会周期性的写数据到hdfs系统,就是前面的检查点的那个目录。驱动节点失效之后,StreamingContext可以被恢复的。
为了让一个Spark Streaming程序能够被回复,它需要做以下操作:
(1)第一次启动的时候,创建 StreamingContext,创建所有的streams,然后调用start()方法。
(2)恢复后重启的,必须通过检查点的数据重新创建StreamingContext。

下面是一个实际的例子:
通过StreamingContext.getOrCreate来构造StreamingContext,可以实现上面所说的。
  1. // Function to create and setup a new StreamingContext
  2. def functionToCreateContext(): StreamingContext = {
  3.     val ssc = new StreamingContext(...)   // new context
  4.     val lines = ssc.socketTextStream(...) // create DStreams
  5.     ...
  6.     ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  7.     ssc
  8. }
  9. // Get StreaminContext from checkpoint data or create a new one
  10. val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
  11. // Do additional setup on context that needs to be done,
  12. // irrespective of whether it is being started or restarted
  13. context. ...
  14. // Start the context
  15. context.start()
  16. context.awaitTermination()
复制代码

在stand-alone的部署模式下面,驱动节点失效了,也可以自动恢复,让别的驱动节点替代它。这个可以在本地进行测试,在提交的时候采用supervise模式,当提交了程序之后,使用jps查看进程,看到类似DriverWrapper就杀死它,如果是使用YARN模式的话就得使用其它方式来重新启动了。

这里顺便提一下向客户端提交程序吧,之前总结的时候把这块给落下了。
  1. ./bin/spark-class org.apache.spark.deploy.Client launch
  2.    [client-options] \
  3.    <cluster-url> <application-jar-url> <main-class> \
  4.    [application-options]
  5. cluster-url: master的地址.
  6. application-jar-url: jar包的地址,最好是hdfs上的,带上hdfs://...否则要所有的节点的目录下都有这个jar的
  7. main-class: 要发布的程序的main函数所在类.
  8. Client Options:
  9. --memory <count> (驱动程序的内存,单位是MB)
  10. --cores <count> (为你的驱动程序分配多少个核心)
  11. --supervise (节点失效的时候,是否重新启动应用)
  12. --verbose (打印增量的日志输出)
复制代码

在未来的版本,会支持所有的数据源的可恢复性。

为了更好的理解基于HDFS的驱动节点失效恢复,下面用一个简单的例子来说明:
  1. Time     Number of lines in input file     Output without driver failure     Output with driver failure
  2.      10                     10                    10
  3.      20                     20                    20
  4.      30                     30                    30
  5.      40                     40                    [DRIVER FAILS] no output
  6.      50                     50                    no output
  7.      60                     60                    no output
  8.      70                     70                    [DRIVER RECOVERS] 40, 50, 60, 70
  9.      80                     80                    80
  10.      90                     90                    90
  11.     100                     100                   100
复制代码
在4的时候出现了错误,40,50,60都没有输出,到70的时候恢复了,恢复之后把之前没输出的一下子全部输出。






本文转自岑玉海 http://www.cnblogs.com/cenyuhai/p/3537106.html


欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(2)人评论

跳转到指定楼层
落魂草 发表于 2014-12-29 19:35:39
回复

使用道具 举报

lixiaoliang7 发表于 2014-12-30 06:41:37
感谢分享,干货很多。
就是有些地方还是不懂。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条