立即注册 登录
About云-梭伦科技 返回首页

desehawk的个人空间 https://www.aboutyun.com/?29 [收藏] [复制] [分享] [RSS]

日志

spark Task/Object not serializable 任务不能序列化记录

已有 1900 次阅读2017-6-15 15:42 | Object, 记录


序列化问题

If you see this error: 
如果你在Spark任务提交之后碰到了这样的情况:

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...

The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:

上面的错误可以理解为当你在driver上初始化了一个变量,但是又尝试在executor上使用这个变量时触发的一个异常. 这种情况下 Spark Streaming 会尝试将这个变量对象进行序列化,从而将其传输给executor. 如果对象不可序列化则报错退出. 考虑如下的代码段:

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile");
rdd.map(s -> notSerializable.doSomething(s)).collect();

This will trigger that error. Here are some ideas to fix this error: 
这样就会触发序列化错误. 有几个小技巧可以避免这个问题:
  • Serializable the class 
    补充你声明变量所属的类的序列化方法
  • Declare the instance only within the lambda function passed in map 
    仅在传输给map阶段的匿名函数中声明该实例
  • Make the NotSerializable object as a static and create it once per machine. 
    将这个不能序列化的对象变成静态变量, 并且在每台worker上创建一个(有点像广播变量) 
    (或者可以考虑将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化, 未验证)
  • Call rdd.forEachPartition and create the NotSerializable object in there like this: 
    调用rdd的forEachPartition方法(而不是map), 并且用如下方法创建该对象:


rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable();

  // ...Now process iter
});

Tips

1) 实际中遇到的问题


val path = new Path("GOALPATH")
val pathStr = path.toString
def readData(sqlContext : SQLContext,path : Path) : DataFrame = {
    sqlContext.read.format("...").load(path.toString)
    ...
}
def readData(sqlContext : SQLContextpathStr : String) : DataFrame = {
    sqlContext.read.format("...").load(path.toString)
    ...
}
val bString = sc.broadcast("broadcast")
/* 报错 */
val data = readData(path)
data.map{
    value => {
    val str = bString.value
    ...
    }
}
/* 正常 *是因为已经使用tostring/
val dataStr = readData(pathStr)
dataStr.map{
    value => {
    val str = bString.value
    ...
    }
}

这里会报错,提示Path类不能进行序列化. 
判断是基于SQLContext的read方法中,是将目标文件进行分布式读取,所以需要将Path进行序列化分发给多个executor. 而默认的org.apache.Hadoop.fs.Path类仅实现了Comparable接口,没有实现序列化,所以这里报错.

转载
http://blog.h5min.cn/edin_blackpoint/article/details/72868621

路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条