分享

spark streaming 把spark.streaming.concurrentJobs设置成2就报错

邓立辉 发表于 2016-3-7 17:37:46 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 22289
本帖最后由 邓立辉 于 2016-3-7 17:43 编辑

报如下错误
16/03/07 14:59:27 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 2.2.2.30:53524 (size: 2.0 KB, free: 1060.0 MB)
16/03/07 14:59:27 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1457333965500 ms, 1457333966000 ms)
16/03/07 14:59:27 INFO InputInfoTracker: remove old batch metadata: 1457333965500 ms 1457333966000 ms
16/03/07 14:59:27 INFO BlockManagerInfo: Removed input-0-1457333966000 on 2.2.2.32:48519 in memory (size: 1024.0 B, free: 1060.0 MB)
16/03/07 14:59:27 INFO BlockManagerInfo: Removed input-0-1457333966000 on 2.2.2.30:53524 in memory (size: 1024.0 B, free: 1060.0 MB)
16/03/07 14:59:27 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 74, 2.2.2.30): java.lang.Exception: Could not compute split, block input-0-1457333966000 not found
        at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
看日志是先删掉 Removed input-0-1457333966000。 然后有去读这个了, block input-0-1457333966000 not found。
这是为什么呢?我把集群的服务器时间都同步了一下,精确到2秒内了。


我看spark源码是InputInfoTracker的cleanup给删掉的。是不是要设置下这个参数"spark.streaming.internal.batchTime",我没查到这个参数是干什么的。

已有(6)人评论

跳转到指定楼层
atsky123 发表于 2016-3-7 19:28:37
可能内存不够了
尝试使用MEMORY_AND_DISK_SER级别来存日志,当内存不够时直接写磁盘或者增大executor-memory的大小
回复

使用道具 举报

邓立辉 发表于 2016-3-7 19:38:01
应该不是,因为我一个耗时的处理完事,结果都输出了,一个简单的处理一开始就报错了。不过我显示试试。
回复

使用道具 举报

邓立辉 发表于 2016-3-7 19:55:12
16/03/07 19:53:39 INFO DAGScheduler: ResultStage 32 (foreachRDD at CLS_MainApp.java:107) finished in 0.045 s
16/03/07 19:53:39 INFO TaskSchedulerImpl: Removed TaskSet 32.0, whose tasks have all completed, from pool
16/03/07 19:53:39 INFO DAGScheduler: Job 21 finished: foreachRDD at CLS_MainApp.java:107, took 0.821981 s
16/03/07 19:53:39 INFO SparkContext: Starting job: foreachRDD at CLS_MainApp.java:107
16/03/07 19:53:39 INFO DAGScheduler: Got job 22 (foreachRDD at CLS_MainApp.java:107) with 1 output partitions
16/03/07 19:53:39 INFO DAGScheduler: Final stage: ResultStage 33(foreachRDD at CLS_MainApp.java:107)
16/03/07 19:53:39 INFO DAGScheduler: Parents of final stage: List()
16/03/07 19:53:39 INFO DAGScheduler: Missing parents: List()
16/03/07 19:53:39 INFO DAGScheduler: Submitting ResultStage 33 (MapPartitionsRDD[249] at foreachRDD at CLS_MainApp.java:107), which has no missing parents
16/03/07 19:53:39 INFO TaskSchedulerImpl: Cancelling stage 33
16/03/07 19:53:39 INFO DAGScheduler: ResultStage 33 (foreachRDD at CLS_MainApp.java:107) failed in Unknown s
16/03/07 19:53:39 INFO DAGScheduler: Job 22 failed: foreachRDD at CLS_MainApp.java:107, took 0.007775 s
16/03/07 19:53:39 ERROR JobScheduler: Error running job streaming job 1457351619000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[237] at createStream at CLS_MainApp.java:106 after its blocks have been removed!
org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:250)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1394)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1368)
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:829)
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:827)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.AbstractTraversable.map(Traversable.scala:105)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:827)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:836)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1298)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1272)
        at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:494)
        at org.apache.spark.api.java.AbstractJavaRDDLike.take(JavaRDDLike.scala:47)
        at org.apache.spark.examples.streaming.queryInCityFirstByBayonet.analyse(queryInCityFirstByBayonet.java:64)
        at org.apache.spark.examples.streaming.CLS_MainApp$1.call(CLS_MainApp.java:137)
        at org.apache.spark.examples.streaming.CLS_MainApp$1.call(CLS_MainApp.java:1)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[237] at createStream at CLS_MainApp.java:106 after its blocks have been removed!
        at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
        at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:250)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1394)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
        at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1368)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:829)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:827)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:827)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[237] at createStream at CLS_MainApp.java:106 after its blocks have been removed!
org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:250)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1394)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
scala.collection.immutable.List.foreach(List.scala:318)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1368)
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:829)
org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:827)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.AbstractTraversable.map(Traversable.scala:105)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:827)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:836)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1298)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1272)
        at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:494)
        at org.apache.spark.api.java.AbstractJavaRDDLike.take(JavaRDDLike.scala:47)
        at org.apache.spark.examples.streaming.queryInCityFirstByBayonet.analyse(queryInCityFirstByBayonet.java:64)
        at org.apache.spark.examples.streaming.CLS_MainApp$1.call(CLS_MainApp.java:137)
        at org.apache.spark.examples.streaming.CLS_MainApp$1.call(CLS_MainApp.java:1)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Attempted to use BlockRDD[237] at createStream at CLS_MainApp.java:106 after its blocks have been removed!
        at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83)
        at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
        at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:251)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:250)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1394)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1405)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1404)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1404)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1402)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1402)
        at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1368)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:829)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:827)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:827)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:772)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:757)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1463)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/03/07 19:53:39 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
回复

使用道具 举报

jaxvck 发表于 2017-9-15 19:14:37
@邓立辉
你好,遇到同样的问题,你那边怎么解决的
回复

使用道具 举报

zstu 发表于 2017-11-29 15:20:18
jaxvck 发表于 2017-9-15 19:14
@邓立辉
你好,遇到同样的问题,你那边怎么解决的

同问,你解决了这个问题吗?
回复

使用道具 举报

test2767111 发表于 2019-7-10 14:38:55
可以通过设置RDD存活时间来避免这个问题,譬如:sparkStreamingContext.remeber(600) ,存活10分钟。可以参考这个:https://siukwan.cn/spark-streami ... e%e6%8a%a5%e9%94%99
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条