分享

Spark Streaming小结

本帖最后由 坎蒂丝_Swan 于 2015-1-18 17:34 编辑

问题导读


1.Spark Streaming有哪些优势?
2.Discretized Stream在Spark中起到哪些作用?










概述
Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强这两个特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。
其内部工作方式如下:


Word Count示例
  1. private static final Pattern SPACE = Pattern.compile(" ");
  2. public static void main(String[] args) {
  3.     StreamingExamples.setStreamingLogLevels();
  4.     JavaStreamingContext jssc = new JavaStreamingContext("local[2]",
  5.           "JavaNetworkWordCount", new Duration(10000));
  6.     jssc.checkpoint(".");//使用updateStateByKey()函数需要设置checkpoint
  7.     //打开本地的端口9999
  8.     JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
  9.     //按行输入,以空格分隔
  10.     JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(SPACE.split(line)));
  11.     //每个单词形成pair,如(word,1)
  12.     JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
  13.     //统计并更新每个单词的历史出现次数
  14.     JavaPairDStream<String, Integer> counts = pairs.updateStateByKey((values, state) -> {
  15.         Integer newSum = state.or(0);
  16.         for(Integer i :values) {
  17.             newSum += i;
  18.         }
  19.         return Optional.of(newSum);
  20.     });
  21.     counts.print();
  22.     jssc.start();
  23.     jssc.awaitTermination();
  24. }
复制代码
启动Netcat
  1. $ nc -lk 9999
复制代码
启动Spark Streaming Application
若在本地调试,可在IDE中启动,否则,用如下命令启动:
  1. $ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
复制代码
测试
输入:
  1. hello world
  2. hello spark
  3. hello yurnom
复制代码
结果:
  1. -------------------------------------------
  2. Time: 1407741020000 ms
  3. -------------------------------------------
  4. (yurnom,1)
  5. (hello,3)
  6. (world,1)
  7. (spark,1)
复制代码


DStream
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream由连续的序列化RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
对数据的操作也是按照RDD为单位来进行的,如下图所示:
上图下方的RDD都是通过Spark高级原语的转换而来,计算过程由Spark engine来完成。


Operations
DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

UpdateStateByKey Operation
UpdateStateByKey原语用于记录历史记录,上文中Word Count示例中就用到了该特性。若不用UpdateStateByKey来更新状态,那么每次数据进来后分析完成后,结果输出后将不在保存。如,若将上文代码示例中的第15行若替换为:
  1. JavaPairDStream<String, Integer> counts = pairs.reduceByKey((i1, i2) -> (i1 + i2));
复制代码
那么输入:hellow world,结果则为:(hello,1)(world,1),然后输入hello spark,结果则为(hello,1)(spark,1)。也就是不会保留上一次数据处理的结果。
使用UpdateStateByKey原语需要用于记录的State,可以为任意类型,如上例中即为Optional<Intege>类型;此外还需要更新State的函数,可参考Word Count示例中的15-20行。

Transform Operations
Transform()原语允许DStream上执行任意的RDD-to-RDD函数。通过该函数可以方便的扩展Spark API。此外,本篇开头所提到的MLlib(机器学习)以及Graphx也是通过本函数来进行结合的。官方示例:
  1. import org.apache.spark.streaming.api.java.*;
  2. // RDD containing spam information
  3. final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
  4. JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
  5.   new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
  6.     @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
  7.       rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
  8.       ...
  9.     }
  10.   });
复制代码

Window Operations
Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。如下图所示:
如以下代码表示,每10秒钟处理最近30秒钟中的数据。
  1. JavaPairDStream<String, Integer> windowedWordCounts =
  2.       pairs.reduceByKeyAndWindow((a, b) -> (a + b),
  3.             new Duration(30000), new Duration(10000));
复制代码

Window相关API有:
  • window(windowLength, slideInterval)
  • countByWindow(windowLength, slideInterval)
  • reduceByWindow(func, windowLength, slideInterval)
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
  • countByValueAndWindow(windowLength, slideInterval, [numTasks])

Output Operations
当某个Output Operations原语被调用时,stream才会开始真正的计算过程。现阶段支持的Output方式有以下几种:
  • print()
  • foreachRDD(func)
  • saveAsObjectFiles(prefix, [suffix])
  • saveAsTextFiles(prefix, [suffix])
  • saveAsHadoopFiles(prefix, [suffix])


其它特性

输入源
除了前文中Word Count示例中用到的TCP套接字连接连接作为输入源以外,Spark Streaming还可以使用很多其它的输入源。例如对于文件,可以这样处理:
  1. jssc.fileStream(dataDirectory);
复制代码
Spark Streaming将会监控该文件夹,要使用该特性,需要注意以下几点:
  • 该文件夹下的所有文件必须有相同的数据格式
  • 在该文件夹下创建文件的方式必须是原子性的移动或重命名的方式,不可以先创建文件后在进行写入
  • 所有文件夹下的文件不可进行改动
其它数据源的使用可以参考Spark安装包中的examples文件夹中的streaming部分。同样对于特殊的数据输入源,可以进行定制

监控
一般来说,使用Spark自带的Web UI就能满足大部分的监控需求。对于Spark Streaming来说,以下两个度量指标尤为重要(在Batch Processing Statistics标签下):
  • Processing Time:处理每个batch的时间
  • Scheduling Delay:每个batch在队列中等待前一个batch完成处理所等待的时间
若Processing Time的值一直大于Scheduling Delay,或者Scheduling Delay的值持续增长,代表系统已经无法处理这样大的数据输入量了,这时就需要考虑各种优化方法来增强系统的负载。

持久化
与RDD一样,DStream同样也能通过persist()方法将数据流存放在内存中,这样做的好处是遇到需要多次迭代计算的程序时,速度优势十分的明显。而对于上文中提到的各种window原语,其默认的持久化策略就是保存在内存中。
当数据源来自于网络时(例如通过Kafka、Flume、sockets等等),默认的持久化策略是将数据保存在两台机器上,这也是为了容错性而设计的。


参考资料









欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(8)人评论

跳转到指定楼层
stark_summer 发表于 2015-1-19 10:24:42
回复

使用道具 举报

louislau318 发表于 2015-1-19 10:45:59
先存了,希望以后会用到!
回复

使用道具 举报

louislau318 发表于 2015-1-19 10:50:03
先存了,希望以后会用到!
回复

使用道具 举报

chenhenry 发表于 2015-2-10 14:16:38
多谢分享,LZ辛苦了
回复

使用道具 举报

zhujun182104906 发表于 2015-3-19 21:55:26
回复

使用道具 举报

hlyz2008 发表于 2015-4-27 00:09:30
多谢楼主分享~
回复

使用道具 举报

hlyz2008 发表于 2015-4-27 00:10:05
多谢楼主分享~
回复

使用道具 举报

hlyz2008 发表于 2015-4-27 00:10:36
多谢楼主分享~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条