分享

让你真正明白spark streaming

本帖最后由 pig2 于 2017-3-21 07:33 编辑
问题导读

1.为什么使用spark streaming?
2.什么是StreamingContext?
3.什么是DStream?






spark streaming介绍
Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。我们可以从kafka、flume、witter、 ZeroMQ、Kinesis等源获取数据,也可以通过由 高阶函数map、reduce、join、window等组成的复杂算法计算出数据。最后,处理后的数据可以推送到文件系统、数据库、实时仪表盘中

spark streaming.jpg

为什么使用spark streaming
很多大数据应用程序需要实时处理数据流。思考:
我们知道spark和storm都能处理实时数据,可是spark是如何处理实时数据的,spark包含比较多组件:包括
  • spark core
  • Spark SQL
  • Spark Streaming
  • GraphX
  • MLlib


spark core中包含RDD、DataFrame和DataSet等,因此spark sql是为了兼容hive而产生的sql语句,GraphX提供的分布式图计算框架,MLlib提供的机器学习框架。因此spark所谓的实时处理数据则是通过spark streaming来实现的。

那么spark有哪些应用spark streaming应用程序可以实时跟踪页面统计,训练机器学习模型或则自动检测异常等
如网站监控
网站监控.png

欺诈检测
欺诈检测.png
实时准确数据转移

数据转移.jpg


反作弊 + 计费
反作弊计费.jpg

什么是StreamingContext
为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。一个StreamingContext 对象可以用SparkConf对象创建。StreamingContext这里可能不理解,其实跟SparkContext也差不多的。(可参考让你真正理解什么是SparkContext, SQLContext 和HiveContext)。同理也有hadoop Context,它们都是全文对象,并且会获取配置文件信息。那么配置文件有哪些?比如hadoop的core-site.xml,hdfs-site.xml等,spark如spark-defaults.conf等。这时候我们可能对StreamingContext有了一定的认识。下面一个例子

为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。
一个StreamingContext 对象可以用SparkConf对象创建。

[mw_shl_code=scala,true]import org.apache.spark._
impoty org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc=new StreamingContext(conf,Seconds(1))[/mw_shl_code]

appName表示你的应用程序显示在集群UI上的名字,master 是一个Spark、Mesos、YARN集群URL 或者一个特殊字符串“local”,它表示程序用本地模式运行。当程序运行在集群中时,你并不希望在程序中硬编码 master ,而是希望用 sparksubmit启动应用程序,并从 spark-submit 中得到 master 的值。对于本地测试或者单元测试,你可以传递“local”字符串在同
一个进程内运行Spark Streaming。需要注意的是,它在内部创建了一个SparkContext对象,你可以通过 ssc.sparkContext访问这个SparkContext对象。
批时间片需要根据你的程序的潜在需求以及集群的可用资源来设定,你可以在性能调优那一节获取详细的信息.可以利用已经存在的 SparkContext 对象创建 StreamingContext 对象。
[mw_shl_code=scala,true]import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))[/mw_shl_code]
当一个上下文(context)定义之后,你必须按照以下几步进行操作

  • 定义输入源;
  • 准备好流计算指令;
  • 利用 streamingContext.start() 方法接收和处理数据;
  • 处理过程将一直持续,直到 streamingContext.stop() 方法被调用。



几点需要注意的地方:
  • 一旦一个context已经启动,就不能有新的流算子建立或者是添加到context中。
  • 一旦一个context已经停止,它就不能再重新启动
  • 在JVM中,同一时间只能有一个StreamingContext处于活跃状态
  • 在StreamingContext上调用 stop() 方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设
  • 置 stop() 的可选参数为false
  • 一个SparkContext对象可以重复利用去创建多个StreamingContext对象,前提条件是前面的StreamingContext在后面
  • StreamingContext创建之前关闭(不关闭SparkContext)。



什么是DStream
Spark Streaming支持一个高层的抽象,叫做离散流( discretized stream )或者 DStream ,它代表连续的数据流。DStream既可以利用从Kafka, Flume和Kinesis等源获取的输入数据流创建,也可以 在其他DStream的基础上通过高阶函数获得。在内部,DStream是由一系列RDDs组成。


举例:
一个简单的基于Streaming的workCount代码如下:

[mw_shl_code=scala,true]
package com.debugo.example
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf

object WordCountStreaming {
  def main(args: Array[String]): Unit ={
    val sparkConf = new SparkConf().setAppName("HDFSWordCount").setMaster("spark://172.19.1.232:7077")

    //create the streaming context
    val  ssc = new StreamingContext(sparkConf, Seconds(30))

    //process file when new file be found.
    val lines = ssc.textFileStream("file:///home/spark/data")
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)//这里不是rdd,而是dstream
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}[/mw_shl_code]
这段代码实现了当指定的路径有新文件生成时,就会对这些文件执行wordcount,并把结果print。具体流程如下:



代码诠释:
使用Spark Streaming就需要创建StreamingContext对象(类似SparkContext)。创建StreamingContext对象所需的参数与SparkContext基本一致,包括设定Master节点(setMaster),设定应用名称(setAppName)。第二个参数Seconds(30),指定了Spark Streaming处理数据的时间间隔为30秒。需要根据具体应用需要和集群处理能力进行设置。
val lines = ssc.textFileStream("file:///home/spark/data")为创建lines Dstream

val words = lines.flatMap(_.split(" "))为通过flatMap转换为words Dstream

我们在引一例,比如创建Twitter
val tweets=ssc.twitterStream()


tw.jpg

其中为tweets为DStream

后续更新

转载注明来自:about云(www.aboutyun.com

已有(13)人评论

跳转到指定楼层
中风拓 发表于 2017-3-4 18:50:54
期待楼主的继续更新,最好有相关的可以实现的工程代码……
回复

使用道具 举报

Juvenile 发表于 2017-3-21 17:54:40
学习了,不错
回复

使用道具 举报

DataNerd 发表于 2017-5-30 19:03:04
mark mark mark
回复

使用道具 举报

fengfengda 发表于 2017-9-7 10:41:41
WordCount代码报如下的错误
17/09/07 10:37:05 INFO JobScheduler: Started JobScheduler
17/09/07 10:37:05 INFO StreamingContext: StreamingContext started
17/09/07 10:37:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(null) (172.28.41.196:52138) with ID 0
17/09/07 10:37:11 INFO BlockManagerMasterEndpoint: Registering block manager 172.28.41.196:35789 with 413.9 MB RAM, BlockManagerId(0, 172.28.41.196, 35789)
17/09/07 10:37:30 WARN FileInputDStream: Error finding new files
java.lang.NullPointerException
        at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)
        at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192)
        at scala.collection.SeqLike$class.size(SeqLike.scala:106)
        at scala.collection.mutable.ArrayOps$ofRef.size(ArrayOps.scala:186)
        at scala.collection.mutable.Builder$class.sizeHint(Builder.scala:69)
        at scala.collection.mutable.ArrayBuilder.sizeHint(ArrayBuilder.scala:22)
        at scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:230)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:205)
        at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
        at org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:36)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:36)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
        at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
17/09/07 10:37:30 INFO FileInputDStream: New files at time 1504751850000 ms:

17/09/07 10:37:30 INFO JobScheduler: Added jobs for time 1504751850000 ms
17/09/07 10:37:30 INFO JobScheduler: Starting job streaming job 1504751850000 ms.0 from job set of time 1504751850000 ms
17/09/07 10:37:30 INFO SparkContext: Starting job: print at WordCountStreaming.scala:19
17/09/07 10:37:30 INFO DAGScheduler: Registering RDD 3 (map at WordCountStreaming.scala:18)
17/09/07 10:37:30 INFO DAGScheduler: Got job 0 (print at WordCountStreaming.scala:19) with 1 output partitions
17/09/07 10:37:30 INFO DAGScheduler: Final stage: ResultStage 1 (print at WordCountStreaming.scala:19)
17/09/07 10:37:30 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
17/09/07 10:37:30 INFO DAGScheduler: Missing parents: List()
17/09/07 10:37:30 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCountStreaming.scala:18), which has no missing parents
17/09/07 10:37:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.8 KB, free 907.2 MB)
17/09/07 10:37:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1723.0 B, free 907.2 MB)
17/09/07 10:37:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.28.26.83:53299 (size: 1723.0 B, free: 907.2 MB)
17/09/07 10:37:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1012
17/09/07 10:37:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCountStreaming.scala:18)
17/09/07 10:37:30 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
17/09/07 10:37:30 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 0, 172.28.41.196, partition 0, PROCESS_LOCAL, 5798 bytes)
17/09/07 10:37:30 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 0 on executor id: 0 hostname: 172.28.41.196.
17/09/07 10:37:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.28.41.196:35789 (size: 1723.0 B, free: 413.9 MB)
17/09/07 10:37:31 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 0, 172.28.41.196): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.ShuffledRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

17/09/07 10:37:31 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 1, 172.28.41.196, partition 0, PROCESS_LOCAL, 5798 bytes)
17/09/07 10:37:31 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 1 on executor id: 0 hostname: 172.28.41.196.
17/09/07 10:37:32 INFO TaskSetManager: Lost task 0.1 in stage 1.0 (TID 1) on executor 172.28.41.196: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.ShuffledRDD) [duplicate 1]
17/09/07 10:37:32 INFO TaskSetManager: Starting task 0.2 in stage 1.0 (TID 2, 172.28.41.196, partition 0, PROCESS_LOCAL, 5798 bytes)
17/09/07 10:37:32 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 2 on executor id: 0 hostname: 172.28.41.196.
17/09/07 10:37:32 INFO TaskSetManager: Lost task 0.2 in stage 1.0 (TID 2) on executor 172.28.41.196: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.ShuffledRDD) [duplicate 2]
17/09/07 10:37:32 INFO TaskSetManager: Starting task 0.3 in stage 1.0 (TID 3, 172.28.41.196, partition 0, PROCESS_LOCAL, 5798 bytes)
17/09/07 10:37:32 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 3 on executor id: 0 hostname: 172.28.41.196.
17/09/07 10:37:32 INFO TaskSetManager: Lost task 0.3 in stage 1.0 (TID 3) on executor 172.28.41.196: java.lang.ClassCastException (cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.ShuffledRDD) [duplicate 3]
17/09/07 10:37:32 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job
17/09/07 10:37:32 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
17/09/07 10:37:32 INFO TaskSchedulerImpl: Cancelling stage 1
17/09/07 10:37:32 INFO DAGScheduler: ResultStage 1 (print at WordCountStreaming.scala:19) failed in 1.762 s
17/09/07 10:37:32 INFO DAGScheduler: Job 0 failed: print at WordCountStreaming.scala:19, took 1.994405 s
17/09/07 10:37:32 INFO JobScheduler: Finished job streaming job 1504751850000 ms.0 from job set of time 1504751850000 ms
17/09/07 10:37:32 INFO JobScheduler: Total delay: 2.188 s for time 1504751850000 ms (execution: 2.029 s)
17/09/07 10:37:32 ERROR JobScheduler: Error running job streaming job 1504751850000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 3, 172.28.41.196): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.ShuffledRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1324)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1298)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.ShuffledRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        ... 3 more
Exception in thread "main" 17/09/07 10:37:32 INFO FileInputDStream: Cleared 0 old files that were older than 1504751790000 ms:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 3, 172.28.41.196): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.ShuffledRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
回复

使用道具 举报

nextuser 发表于 2017-9-7 13:20:10
fengfengda 发表于 2017-9-7 10:41
WordCount代码报如下的错误
17/09/07 10:37:05 INFO JobScheduler: Started JobScheduler
17/09/07 10:37 ...

可能环境变量的问题,确保一些jar包,特别自定义的。在每个节点都有。
回复

使用道具 举报

fengfengda 发表于 2017-9-7 16:55:57
nextuser 发表于 2017-9-7 13:20
可能环境变量的问题,确保一些jar包,特别自定义的。在每个节点都有。

环境变量怎么会有问题呢。没有自定义jar包啊,直接在Idea中运行的在浏览器可以看到
的错误信息
QQ截图20170907165507.png
回复

使用道具 举报

a530491093 发表于 2017-9-8 09:52:58
MARK下,,不错感谢~
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条