分享

about云日志分析项目准备10-4-1:spark streaming总结

pig2 发表于 2017-3-23 16:06:40 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 2436
本帖最后由 pig2 于 2017-3-23 16:09 编辑
问题导读

1.DStreams的含义是什么?
2.DStreams提供哪两种类型的操作?
3.Transformations操作分为哪两种类型?
4.本文说了哪些输入源?
5.什么是batch?







RDD与job之间的关系
Spark Streaming是构建在Spark上的实时流计算框架,扩展了Spark流式大数据处理能
力。Spark Streaming将数据流以时间片为单位分割形成RDD,使用RDD操作处理每一块数
据,每块数据(也就是RDD)都会生成一个Spark Job进行处理,最终以批处理的方式处理
每个时间片的数据
说明:Spark中的Job和MR中Job不一样不一样。MR中Job主要是Map或者Reduce Job。而Spark的Job其实很好区别,RDD一个action算子就算一个Job.

什么是batch
Spark Streaming生成新的batch并对它进行一些处理,每个batch中的数据都代表一个RDD

理解batch
间隔时间开始会创建,间隔时间内会积累

设置时间间隔的理解
我们知道spark streaming有个时间间隔。假如间隔为1秒,它是停下1秒,然后在接受1秒的数据,也就是说是间隔1秒,然后在接受数据,还是说接受1秒的数据。这里表面上没有太大的区别,其实在于理解的到不到位。说白了batch封装的是1秒的数据。


batch创建
batch在时间间隔开始被创建,在间隔时间内任何到达的数据都被添加到批数据中,间隔时间结束,batch创建结束。


什么是batch间隔参数
间隔时间大小的参数被称之为batch间隔

batch间隔范围一般为
500 毫秒到几分钟,由开发者定义。



spark streaming应用
spark streaming应用程序可以实时跟踪页面统计,训练机器学习模型或则自动检测异常,更多推荐参考
让你真正明白spark streaming
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21141

DStreams详解

DStreams是discretized streams的缩写,是离散流的意思。
DStreams是随着时间【推移】到达的一系列数据

每个dstream被表示为一个序列的RDDS(因此名称“离散”)。
DStreams可以不同的数据源创建,比如flume,kafka,或则hdfs.一旦构建,
DStreams提供两种类型的操作:

transformations,产生一个新的DStream
output operations,写数据到外部系统。
DStreams提供许多与RDD相同的操作,外加一些关于时间的操作比如slidingwindows【滑动窗口】。

DStreams来源
1.外部数据源
2.通过transformations转换而来

Transformations操作
分为有状态和无状态
Stateful transformations需要checkpointing,在StreamingContext中启用容错。
设置checkpointing
ssc.checkpoint("hdfs://...")

Windowed transformations

window操作需要两个参数,窗口持续时间和滑动持续时间。这两个必须是多个StreamingContext的batch时间区间。DStream数据源时间间隔是10秒。想创建滑动窗口上一个30秒(或则上3batches)),我们应该设置windowDuration30秒。sliding时间间隔,默认是batch时间间隔,控制DStream刷新计算结果。如果我们的DStream batch时间区间为10秒,我们想计算我们的window,只能在每个第二batch。我们设置我们的sliding间隔为20秒。

输出操作【output operations
保存DStream 为文本文件【Scala】
  1. ipAddressRequestCount.saveAsTextFiles("outputDir", "txt")
复制代码

saveAsHadoopFiles()是hadoop输出格式,例如Spark Streaming没有SaveAsSequenceFile()函数,我们可以保存为SequenceFiles

Scala
  1. val writableIpAddressRequestCount = ipAddressRequestCount.map {
  2. (ip, count) => (new Text(ip), new LongWritable(count)) }
  3. writableIpAddressRequestCount.saveAsHadoopFiles[
  4. SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt")
复制代码


Java
  1. JavaPairDStream<Text, LongWritable> writableDStream = ipDStream.mapToPair(
  2. new PairFunction<Tuple2<String, Long>, Text, LongWritable>() {
  3. public Tuple2<Text, LongWritable> call(Tuple2<String, Long> e) {
  4. return new Tuple2(new Text(e._1()), new LongWritable(e._2()));
  5. }});
  6. class OutFormat extends SequenceFileOutputFormat<Text, LongWritable> {};
  7. writableDStream.saveAsHadoopFiles(
  8. "outputDir", "txt", Text.class, LongWritable.class, OutFormat.class);
复制代码


foreachRDD()
  1. ipAddressRequestCount.foreachRDD { rdd =>
  2. rdd.foreachPartition { partition =>
  3. // Open connection to storage system (e.g. a database connection)
  4. partition.foreach { item =>
  5. // Use connection to push item to system
  6. }
  7. // Close connection
  8. }
  9. }
复制代码



checkpointing机制
spark streaming主要机制checkpointing,它将数据存储在一个可靠的文件系统,比如hdfs.

checkpoint的作用,用于恢复数据。它会定期保存状态到可靠的文件系统比如hdfs,s3
比如你每5-10批数据设置checkpointing。当发生丢失数据的时候,Spark Streaming讲恢复最近的checkpoint.随着 streaming application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的5~10倍。


输入源

spark streaming支持多个数据源,一些核心的数据源,已被构建到Streaming Maven artifact,其它可以通过额外的artifact,比如spark-streaming-kafka.
核心数据源比如sockets,还有文件 和 Akka actors.

其它数据源

使用kafka必须引入artifact:spark-streaming-kafka_2.10到项目中。它提供KafkaUtils对象,通过StreamingContext 和 JavaStreamingContext创建kafka消息的DStream.
因为它订阅多个topic. DStream创建由topic 和 message组成的对。我们可以调用createStream()方法来创建Stream。字符串分割开ZooKeeper hosts, consumer group的名称(唯一的名字),receiver 线程用于topic.

Apache Kafka 订阅Panda的topic【Scala】

  1. import org.apache.spark.streaming.kafka._
  2. ...
  3. // Create a map of topics to number of receiver threads to use
  4. val topics = List(("pandas", 1), ("logs", 1)).toMap
  5. val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics)
  6. StreamingLogInput.processLines(topicLines.map(_._2))
复制代码



Apache Kafka 订阅 to Panda’s topic【Java】
  1. import org.apache.spark.streaming.kafka.*;
  2. ...
  3. // Create a map of topics to number of receiver threads to use
  4. Map<String, Integer> topics = new HashMap<String, Integer>();
  5. topics.put("pandas", 1);
  6. topics.put("logs", 1);
  7. JavaPairDStream<String, String> input =
  8. KafkaUtils.createStream(jssc, zkQuorum, group, topics);
  9. input.print();
复制代码


推荐参照文章让你真正明白spark streaming
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21141

spark streaming知识总结2
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21173


转载注明来自about云(www.aboutyun.com
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21307





没找到任何评论,期待你打破沉寂

关闭

推荐上一条 /2 下一条