分享

Spark DataFrame show

zstu 发表于 2018-1-16 16:16:02 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 7 6203
val rdd = sc.textFile("..").map(x => x.split(",")).map(x => (x(0), x(1).toLong))
val df = rdd.toDF
df.show
df.cache
df.show

我的数据是
a, 12
b,13
c,12.5

在运行第一个df.show的时候可以打印df,但将df.cache缓存后,再df.show就会报错(12.5不能变成Long),我想问的是为什么第一个df.show的时候没有报错,在缓存后show就报错了

已有(7)人评论

跳转到指定楼层
qcbb001 发表于 2018-1-16 19:02:52
没有遇到过这种情况。具体错误贴出来看下。
回复

使用道具 举报

einhep 发表于 2018-1-16 19:06:06
既然这样的话,指定下DataFrame的字段的数据类型。
回复

使用道具 举报

fly2015 发表于 2018-1-16 19:58:36
第一次show的时候  spark并没有发生action操作  只是简单的打印出前20条数据;经过cache后  情况就不一样了,spark会做类型检查等等。
回复

使用道具 举报

zstu 发表于 2018-1-17 08:53:45
qcbb001 发表于 2018-1-16 19:02
没有遇到过这种情况。具体错误贴出来看下。

错误:java.lang.NumberFormatException: For inputstring: "12.5"
atjava.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Long.parseLong(Long.java:)
at java.lang.Long.parseLong(Long.java:)

回复

使用道具 举报

zstu 发表于 2018-1-17 08:55:05
fly2015 发表于 2018-1-16 19:58
第一次show的时候  spark并没有发生action操作  只是简单的打印出前20条数据;经过cache后  情况就不一样了 ...

第一show没有发生action,怎么计算出来的20条数据呢
回复

使用道具 举报

desehawk 发表于 2018-1-19 21:17:36
zstu 发表于 2018-1-17 08:55
第一show没有发生action,怎么计算出来的20条数据呢

我这里测试了下。
1.jpg

没有cache其实也是报错的。最好不要转换为long。
scala> val rdd = sc.textFile("/test").map(x => x.split(",")).map(x => (x(0), x(1).toLong))
rdd: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[10] at map at <console>:24

scala> val df = rdd.toDF
df: org.apache.spark.sql.DataFrame = [_1: string, _2: bigint]

scala> df.show
18/01/19 21:14:04 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 5, 192.168.1.10, executor 2): java.lang.NumberFormatException: For input string: "12.5"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
        at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
        at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:24)
        at $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:24)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

18/01/19 21:14:04 ERROR scheduler.TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, 192.168.1.10, executor 2): java.lang.NumberFormatException: For input string: "12.5"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
        at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
        at $anonfun$2.apply(<console>:24)
        at $anonfun$2.apply(<console>:24)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2153)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2366)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:245)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:644)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:603)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:612)
  ... 48 elided
Caused by: java.lang.NumberFormatException: For input string: "12.5"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Long.parseLong(Long.java:589)
  at java.lang.Long.parseLong(Long.java:631)
  at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
  at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
  at $anonfun$2.apply(<console>:24)
  at $anonfun$2.apply(<console>:24)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

如下代码则是能正确显示。
scala> val rdd = sc.textFile("/test").map(x => x.split(",")).map(x => (x(0), x(1)))
rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[17] at map at <console>:24

scala> val df = rdd.toDF
df: org.apache.spark.sql.DataFrame = [_1: string, _2: string]

scala> df.show
+---+----+                                                                     
| _1|  _2|
+---+----+
|  a|  12|
|  b|  13|
|  c|12.5|
+---+----+


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条