分享

Spark 高级分析:第三章第3节

feilong 2017-12-1 12:35:30 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 6159
本帖最后由 feilong 于 2017-12-1 12:38 编辑
问题导读

1.Spark MLib ALS限制条件是什么?
2.flatmap适用于什么场景?
3.Option类是什么?





上一篇:Spark 高级分析:第三章第2节
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23387&extra=

数据准备

这三个文件都需要复制到HDFS。本章假设文件存储在/user/ds/ 。启动spark-shell需要注意这个计算会耗费大量内存。如果你是本地模式运行而不是集群模式,例如你可以设置 --driver-memory 6g使其有足够内存完成这些计算。

建立模型的第一步是理解数据,然后分析转化成Spark可识别的结构。

Spark MLib ALS的实现的一个限制条件是用户和项目需要由32位非负整数数字化的ID。这意味着大于Interger.MAX_VALUE或者214783647的ID不能使用。这个数据集是否这个要求呢?使用Spark SparkContext的textFile方法将文件作为一个String类型RDD进行读取:
[mw_shl_code=scala,true]val rawUserArtistData = sc.textFile("hdfs:///user/ds/user_artist_data.txt")[/mw_shl_code]
RDD默认会包含HDFS块的有一个分区。因为该文件占据了HDFS大约400M存储,它会被切分成3到6分区典型的HDFS块大小。这通常是很好的,但是像ALS这样的机器学习任务可能比简单的文本处理更具有计算密集性。将数据分解成更小的部分——更多的分区——用于处理可能更好。这可以让Spark立即让更多的处理器核心处理这个问题。你可以为该方法提供第二个参数,以指定不同的和更大的分区数目。这可能会设置为匹配集群的CPU核数。

文件每行都包含一个用户ID,一个艺术家ID,播放数,这些都是空格分割的。为按用户ID做统计,我们将文件每行按空格进行切分,并且索引为0的值转化为数字。Stats()方法返回一个包含最大最小值的对象。艺术家ID同理:
[mw_shl_code=scala,true]rawUserArtistData.map(_.split(' ')(0).toDouble).stats()
rawUserArtistData.map(_.split(' ')(1).toDouble).stats()[/mw_shl_code]
打印的计算统计数据显示,最大用户和艺术家ID分别为2443548和10794401。它们都小于2147483647。使用这些ID不需要额外的转换。

知道与不透明数字ID相对应的艺术家名字是很有用的。这个信息包含在artist_data.txt。这一次,它包含由制表符分割的艺术家ID和艺术家名字。但是,将文件直接解析为(int,string)元组将失败:
[mw_shl_code=scala,true]val rawArtistData = sc.textFile("hdfs:///user/ds/artist_data.txt")
val artistByID = rawArtistData.map { line =>
val (id, name) = line.span(_ != '\t')
(id.toInt, name.trim)
}[/mw_shl_code]
这里,span()按行通过第一个制表符切分字符获取非制表符数据。然后解析第一部分为数字乐行艺术家ID,其余部分作为艺术家名字(不包括空格和制表符)。一部分数据行似乎已损坏。它们不包含一个标签,或者,在不经意间包括了换行符。这些行导致NumberFormatException出现,最理想的情况是它们未被映射成任何数据。

然而,map()函数对每一个输入都必须返回一个值,所以它不能用。可以使用filter()删除不解析的那些行,但这将使分析逻辑重复。flatmap()函数适用于当每一个元素要映射为零个、一个或多个结果时,因为它只是“使扁平化”把每个零个或多个输入结果变成一个大的RDD。它与Scala集合一起工作,但也适用于Scala Option类。Option表示值可能存在也可能不存在。它就像是1、0值的简单集合,对应于它的是Some和None子类。所以,在flatmap下面的函数可以返回一个空列表,或列表中的一个元素,这是一个使用flatmap合理的地方,包括使用Some和None:
[mw_shl_code=scala,true]val artistByID = rawArtistData.flatMap { line =>
val (id, name) = line.span(_ != '\t')
if (name.isEmpty) {
None
} else {
try {
Some((id.toInt, name.trim))
} catch {
case e: NumberFormatException => None
}
}
}[/mw_shl_code]
文件artist_alias.txt映射成艺术家ID可能会有拼写错误或不规范的艺术家名字。每行包含两个ID,由一个制表符分隔。此文件相对较小,包含大约200000个条目。它将收集作为一个有用的Map,将“坏”艺术家的ID转化为“好”的,而不是仅仅使用它作为艺术家IDS对的RDD。同样,由于某些原因,有些线缺少第一个艺术家ID,会被被跳过。
[mw_shl_code=scala,true]val rawArtistAlias = sc.textFile("hdfs:///user/ds/artist_alias.txt")
val artistAlias = rawArtistAlias.flatMap { line =>
val tokens = line.split('\t')
if (tokens(0).isEmpty) {
None
} else {
Some((tokens(0).toInt, tokens(1).toInt))
}
}.collectAsMap()[/mw_shl_code]
例如第一个实体,map ID从6803336到1000010。这些可以从包含艺术家名字的RDD中查到。
[mw_shl_code=scala,true]artistByID.lookup(6803336).head
artistByID.lookup(1000010).head[/mw_shl_code]

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条