分享

Spark 高级分析:第八章第7,8节 准备纽约市出租车数据并处理不良记录

feilong 2018-10-5 07:51:58 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 6545
问题导读

1.Spark Shell如何新增依赖

2.文中
准备数据做了哪些处理
3.如何处理不良记录



关注最新经典文章,欢迎关注公众号



上一篇:第八章第6节 GeoJSON简介
http://www.aboutyun.com/forum.php?mod=viewthread&tid=25303


GeoJSON和JodaTime库准备好后,是时候开始使用Spark交互式地分析纽约出租车驾驶数据。让我们在HDFS创建一个textdata目录,并将我们一直在查看的旅行数据复制到集群中:
[mw_shl_code=bash,true]hadoop fs -mkdir taxidata
hadoop fs -put trip_data_1.csv taxidata/
[/mw_shl_code]
启动Spark Shell,通过--jars参数将我们需要的库引入到REPL:
[mw_shl_code=bash,true]spark-shell --jars joda-time-2.4.jar,geojson.jar[/mw_shl_code]
Spark Shell 加载完成后,我们可以从出租车数据中创建一个RDD并检查前几行,就像我们在其他章节中所做的那样:
[mw_shl_code=scala,true]val taxiRaw = sc.textFile("taxidata")
val taxiHead = taxiRaw.take(10)
taxiHead.foreach(println)
[/mw_shl_code]
让我们首先定义一个案例类,它包含了我们想在分析中使用的每个出租车出行的信息。我们将定义一个名为Trip的case类,它使用JodaTime API中的DateTime类来表示上车和下车时间,使用Esri Geometry API中的Point类来表示上车和下车位置的经度和纬度:
[mw_shl_code=scala,true]import com.esri.core.geometry.Point
import com.github.nscala_time.time.Imports._
case class Trip(
pickupTime: DateTime,
dropoffTime: DateTime,
pickupLoc: Point,
dropoffLoc: Point)
[/mw_shl_code]
为了将taxiRaw RDD中的数据解析为case类的实例,我们需要创建一些帮助对象和函数。首先,我们将使用具有格式化字符串的SimpleDateFormat实例来处理上车和下车时间:
[mw_shl_code=scala,true]val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")[/mw_shl_code]
接下来,我们将使用Point类以及Scala为字符串提供的隐式toDouble方法解析上车位置和下车位置的经度和纬度:
[mw_shl_code=scala,true]def point(longitude: String, latitude: String): Point = {
new Point(longitude.toDouble, latitude.toDouble)
}
[/mw_shl_code]
有了这些方法,我们可以定义一个解析函数,从taxiraw RDD的每一行中提取包含驱动程序的许可证和Trip类的实例的元组:
[mw_shl_code=scala,true]def parse(line: String): (String, Trip) = {
val fields = line.split(',')
val license = fields(1)
val pickupTime = new DateTime(formatter.parse(fields(5)))
val dropoffTime = new DateTime(formatter.parse(fields(6)))
val pickupLoc = point(fields(10), fields(11))
val dropoffLoc = point(fields(12), fields(13))
val trip = Trip(pickupTime, dropoffTime, pickupLoc, dropoffLoc)
(license, trip)
}
[/mw_shl_code]
我们可以对来自taxiHead数组的几个记录测试解析函数,以验证它是否能够正确地处理数据样本。

任何处理过大规模、真实数据集的人都知道,它们总是包含一些不符合编写代码来处理它们的人员的期望的记录。许多MapReduce作业和Spark管道都由于解析逻辑引发异常的无效记录而失败。

通常,我们一次处理这些异常,方法是检查各个任务的日志,找出哪行代码抛出异常,然后找出如何调整代码以忽略或纠正无效记录。这是一个繁琐的过程,并且常常感觉像是在玩一个鼹鼠游戏:正如修复一个异常一样,我们在稍后在分区内出现的记录中发现另一个异常。

有经验的数据科学家在使用新数据集时采用的一种策略是将try-catch块添加到他们的解析代码中,从而可以将任何无效记录写入日志而不会导致整个作业失败。如果整个数据集中只有少数无效记录,那么我们可以忽略它们,继续进行分析。使用Spark,我们可以做得更好:我们可以调整我们的解析代码,以便我们能够交互地分析数据中的无效记录,就像我们执行任何其他类型的分析一样容易。

对于RDD中的任何单个记录,我们的解析代码都有两个可能的结果:要么成功解析记录,并返回有意义的输出,要么失败并抛出异常,在这种情况下,我们要捕获无效记录的值被扔了。每当操作有两个互斥结果时,我们可以使用Scala的Either[L,R]类型来表示操作的返回类型。对我们来说,“左”结果就是成功解析的记录,“右”结果就是我们命中的异常和导致异常的输入记录的元组。

下面的safe函数接受类型为S T的名为f的参数,并返回一个新的S Either[T,(S,Exception)],该S Either[T,(S,Exception)]将返回调用f的结果,或者,如果抛出异常,则返回包含无效输入值和异常本身的元组。
[mw_shl_code=scala,true]def safe[S, T](f: S => T): S => Either[T, (S, Exception)] = {
new Function[S, Either[T, (S, Exception)]] with Serializable {
def apply(s: S): Either[T, (S, Exception)] = {
try {
Left(f(s))
} catch {
case e: Exception => Right((s, e))
}
}
}
}
[/mw_shl_code]
现在我们可以通过将解析函数(类型为String Trip)传递给safe函数,然后将safeParse应用于taxiRaw RDD,来创建名为safeParse的安全包装器函数:
val safeParse = safe(parse)
val taxiParsed = taxiRaw.map(safeParse)
taxiParsed.cache()
如果我们想要确定成功解析了多少条输入行,我们可以结合countByValue操作对任一Either[L,R]使用isLeft方法:
[mw_shl_code=scala,true]taxiParsed.map(_.isLeft).
countByValue().
foreach(println)
...
(false,87)
(true,14776529)
[/mw_shl_code]
这看起来是个好消息——只有一小部分输入记录抛出了异常。我们希望检查客户机中的这些记录,看看抛出了哪个异常,并确定是否可以改进我们的解析代码来正确处理它们。获取无效记录的一种方法是使用filter和map方法的组合:
[mw_shl_code=scala,true]val taxiBad = taxiParsed.
filter(_.isRight).
map(_.right.get)
[/mw_shl_code]
或者,我们可以在单个调用中使用RDD类上的collect方法进行过滤和映射,该方法以部分函数作为参数。部分函数是具有isDefinedAt方法的函数,该方法确定它是否为特定输入定义。我们可以通过扩展PartialFunction[S,T]特性或者通过以下特殊语法在Scala中创建部分函数:
[mw_shl_code=scala,true]val taxiBad = taxiParsed.collect({
case t if t.isRight => t.right.get
})
[/mw_shl_code]
if块确定定义了部分函数的值,并在给出部分函数返回值后给出表达式。请注意区分对RDD应用部分函数的收集转换和不接受参数并将RDD的内容返回给客户端的collect()操作:
[mw_shl_code=scala,true]taxiBad.collect().foreach(println)[/mw_shl_code]
注意,大多数坏记录抛出ArrayIndexoutOfBoundsExceptions,因为它们缺少我们在上面编写的解析函数中试图提取的字段。由于这些不良记录相对较少(只有87条左右),我们将不再考虑它们,继续我们的分析,集中于正确解析的数据中的记录:
[mw_shl_code=scala,true]val taxiGood = taxiParsed.collect({
case t if t.isLeft => t.left.get
})
taxiGood.cache()
[/mw_shl_code]
即使taxiGood RDD中的记录被正确解析,它们仍然可能存在我们希望发现和处理的数据质量问题。为了找到剩余的数据质量问题,我们可以开始考虑一些条件,这些条件对于任何正确记录的行程都是正确的。

考虑到行程数据的时间特性,我们可以预期一个合理的不变量是任何行程的下车时间都将在上车时间之后的某个时间。我们也许会预期,行程不会超过几个小时完成,尽管长途旅行、在高峰时间进行的旅行或因事故而延迟的旅行肯定会持续几个小时。我们不能确切地确定在一段“合理”的时间段内旅行的截止时间。

让我们定义一个名为hours的辅助函数,该函数使用JodaTime Duration类来计算乘坐出租车所花费的时间。然后我们可以使用它来计算taxigood RDD旅行中从开始到结束的小时数的直方图:
[mw_shl_code=scala,true]import org.joda.time.Duration
def hours(trip: Trip): Long = {
val d = new Duration(
trip.pickupTime,
trip.dropoffTime)
d.getStandardHours
}
taxiGood.values.map(hours).
countByValue().
toList.
sorted.
foreach(println)
...
(-8,1)
(0,14752245)
(1,22933)
(2,842)
(3,197)
(4,86)
(5,55)
(6,42)
(7,33)
(8,17)
(9,9)
...
[/mw_shl_code]
这里的一切看起来都很好,除了一次旅行,八个小时的时间就要结束了!也许回到未来的DeloRon是兼职纽约出租车?让我们检查一下这条记录。
[mw_shl_code=scala,true]taxiGood.values.
filter(trip => hours(trip) == -8).
collect().
foreach(println)
[/mw_shl_code]
这揭示了一个奇怪的记录-一个旅行开始于1月25日的下午6点,在同一天上午10点之前完成。我并不清楚这次旅行的录音到底出了什么问题,但是由于它似乎只出现在一条记录上,所以现在把它排除在我们的分析之外应该没问题。

看看剩下的非负个小时的旅程,似乎绝大多数出租车乘坐时间不超过3小时。我们将把一个过滤器应用到taxiGood RDD,这样我们就可以专注于这些“典型”的分布,而忽略现在的离群点:
[mw_shl_code=scala,true]val taxiClean = taxiGood.filter {
case (lic, trip) => {
val hrs = hours(trip)
0 <= hrs && hrs < 3
}
}[/mw_shl_code]

已有(1)人评论

跳转到指定楼层
草莓的橘子树 发表于 2018-12-11 10:35:19
请问大佬有用过python操作spark任务吗?就是pyspark…我最近想用pyspark把DataFrame写到本地文件里,但是一直没成功,所以想请教大佬
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条