分享

Spark 高级分析:第九章第5节 数据预处理

本帖最后由 feilong 于 2018-11-9 09:55 编辑

问题导读

1.预处理的数据
数据格式是怎样的
2.哪些数据需要预处理?做怎样的处理?

3.预处理代码如何编写的



关注最新经典文章,欢迎关注公众号



Spark 高级分析:第九章第3,4节 模型介绍和获取数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26271



在这一点上,我们有不同格式的不同来源的数据。例如,GOOGL的雅虎格式化数据的前几行看起来像:
图片1.png
而Investing.com原油价格的历史看来:
图片2.png
从每一个来源,对于每一个工具和因素,我们想得到一个(日期,收盘价)元组列表。使用在前一章介绍的Java的SimpleDateFormat,我们可以在Investing.com格式中解析日期:
[mw_shl_code=scala,true]import java.text.SimpleDateFormat
val format = new SimpleDateFormat("MMM d, yyyy")
format.parse("Oct 24, 2014")
res0: java.util.Date = Fri Oct 24 00:00:00 PDT 201[/mw_shl_code]
3000个票据历史和4个因子历史足够小,可以在本地读取和处理。即使对于成千上万的票据和成千上万的因素进行更大规模的模拟,情况也是如此。当实际运行模拟时,需要像Spark这样的分布式系统,这可能需要在每个票据上进行大量的计算。

从本地磁盘读取完整的Investing.com历史:
[mw_shl_code=scala,true]import com.github.nscala_time.time.Imports._
import java.io.File
import scala.io.Source
def readInvestingDotComHistory(file: File):
Array[(DateTime, Double)] = {
val format = new SimpleDateFormat("MMM d, yyyy")
val lines = Source.fromFile(file).getLines().toSeq
lines.map(line => {
val cols = line.split('\t')
val date = new DateTime(format.parse(cols(0)))
val value = cols(1).toDouble
(date, value)
}).reverse.toArray
}[/mw_shl_code]
和前一章一样,我们使用JodaTime及其Scala包装器NScalaTime来表示日期,将SimpleDateFormat的日期输出包装在JodaTime Date Time中。
阅读完整的雅虎历史:
[mw_shl_code=scala,true]def readYahooHistory(file: File): Array[(DateTime, Double)] = {
val format = new SimpleDateFormat("yyyy-MM-dd")
val lines = Source.fromFile(file).getLines().toSeq
lines.tail.map(line => {
val cols = line.split(',')
val date = new DateTime(format.parse(cols(0)))
val value = cols(1).toDouble
(date, value)
}).reverse.toArray
}[/mw_shl_code]
注意line.tail对于删除标题行很有用。我们加载所有的数据并过滤掉少于5年历史的票据:
[mw_shl_code=scala,true]val start = new DateTime(2009, 10, 23, 0, 0)
val end = new DateTime(2014, 10, 23, 0, 0)
val files = new File("data/stocks/").listFiles()
val rawStocks: Seq[Array[(DateTime, Double)]] =
files.flatMap(file => {
try {
Some(readYahooHistory(file))
} catch {
case e: Exception => None
}
}).filter(_.size >= 260*5+10)
val factorsPrefix = "data/factors/"
val factors1: Seq[Array[(DateTime, Double)]] =
Array("crudeoil.tsv", "us30yeartreasurybonds.tsv").
map(x => new File(factorsPrefix + x)).
map(readInvestingDotComHistory)
val factors2: Seq[Array[(DateTime, Double)]] =
Array("SNP.csv", "NDX.csv").
map(x => new File(factorsPrefix + x)).
map(readYahooHistory)[/mw_shl_code]
不同类型的票据可能在不同的日期交易,或者由于其他原因数据可能丢失值,因此确保我们的不同历史保持一致非常重要。首先,我们需要把我们所有的时间序列及时调整到同一个区域。然后,我们需要填写丢失的值。为了处理在时间区域中的开始和结束日期缺失值的时间序列,我们简单地用时间区域中的附近值填充那些日期。
[mw_shl_code=scala,true]def trimToRegion(history: Array[(DateTime, Double)],
start: DateTime, end: DateTime): Array[(DateTime, Double)] = {
var trimmed = history.
dropWhile(_._1 < start).takeWhile(_._1 <= end)
if (trimmed.head._1 != start) {
trimmed = Array((start, trimmed.head._2)) ++ trimmed
}
if (trimmed.last._1 != end) {
trimmed = trimmed ++ Array((end, trimmed.last._2))
}
trimmed
}[/mw_shl_code]
为了处理时间序列中的缺失值,我们使用一个简单的归责策略,该策略将工具的价格作为当天之前的最近收盘价。不幸的是,没有漂亮的Scala集合方法可以为我们这样做,所以我们需要编写我们自己的:
[mw_shl_code=scala,true]import scala.collection.mutable.ArrayBuffer
def fillInHistory(history: Array[(DateTime, Double)],
start: DateTime, end: DateTime): Array[(DateTime, Double)] = {
var cur = history
val filled = new ArrayBuffer[(DateTime, Double)]()
var curDate = start
Preprocessing | 173
while (curDate < end) {
if (cur.tail.nonEmpty && cur.tail.head._1 == curDate) {
cur = cur.tail
}
filled += ((curDate, cur.head._2))
curDate += 1.days
// Skip weekends
if (curDate.dayOfWeek().get > 5) curDate += 2.days
}
filled.toArray
}[/mw_shl_code]
我们用trimToRegion 和fillInHistory处理数据 :
[mw_shl_code=scala,true]val stocks = rawStocks.map(trimToRegion(_, start, end)).
map(fillInHistory(_, start, end))
val factors = (factors1 ++ factors2).
map(trimToRegion(_, start, end)).
map(fillInHistory(_, start, end))[/mw_shl_code]

已有(1)人评论

跳转到指定楼层
jiangzi 发表于 2018-11-11 15:24:50
Spark 高级分析:第九章第5节 数据预处理~~~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条