分享

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

本帖最后由 pig2 于 2015-1-6 14:18 编辑

问题导读
1.机器学习算法基本遵循怎样的思路?
2.如何求得损失函数的最优解?












概要
本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读。

线性回归模型
机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路
  • 假设函数
  • 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准
  • 根据损失函数推出目标函数
  • 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

具体到线性回归来说,上述就转换为
151926500778543.png
151927022807138.png



梯度下降法
那么如何求得损失函数的最优解,针对最小二乘法来说可以使用梯度下降法。

151928289838560.png
算法实现
151930293589035.png
151930397645815.png


随机梯度下降
151931562024499.png
151932069201309.png

正则化

151933471557596.png


如何解决这些问题呢?可以采用收缩方法(shrinkage method),收缩方法又称为正则化(regularization)。 主要是岭回归(ridge regression)和lasso回归。通过对最小二乘估计加 入罚约束,使某些系数的估计为0。

151935089055171.png
线性回归的代码实现
上面讲述了一些数学基础,在将这些数学理论用代码来实现的时候,最主要的是把握住相应的假设函数和最优化算法是什么,有没有相应的正则化规则。

对于线性回归,这些都已经明确,分别为
  • Y = A*X + B 假设函数
  • 随机梯度下降法
  • 岭回归或Lasso法,或什么都没有

那么Spark mllib针对线性回归的代码实现也是依据该步骤来组织的代码,其类图如下所示

151942037646581.png
函数调用路径

151943281082615.png
train->run,run函数的处理逻辑
  • 利用最优化算法来求得最优解,optimizer.optimize
  • 根据最优解创建相应的回归模型, createModel

runMiniBatchSGD是真正计算Gradient和Loss的地方
  1. def
  2. runMiniBatchSGD(
  3.       data: RDD[(Double, Vector)],
  4.       gradient: Gradient,
  5.       updater: Updater,
  6.       stepSize: Double,
  7.       numIterations: Int,
  8.       regParam: Double,
  9.       miniBatchFraction: Double,
  10.       initialWeights: Vector): (Vector, Array[Double]) = {
  11.    
  12. val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
  13.     val numExamples = data.count()
  14.     val miniBatchSize = numExamples * miniBatchFraction
  15.     // if no data, return initial weights to avoid NaNs
  16.     if (numExamples == 0) {
  17.       logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
  18.       return (initialWeights, stochasticLossHistory.toArray)
  19.     }
  20.     // Initialize weights as a column vector
  21.     var weights = Vectors.dense(initialWeights.toArray)
  22.     val n = weights.size
  23.     /**
  24.      * For the first iteration, the regVal will be initialized as sum of weight squares
  25.      * if it's L2 updater; for L1 updater, the same logic is followed.
  26.      */
  27.     var regVal = updater.compute(
  28.       weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
  29.     for (i  (c, v) match { case ((grad, loss), (label, features)) =>
  30.             val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
  31.             (grad, loss + l)
  32.           },
  33.           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
  34.             (grad1 += grad2, loss1 + loss2)
  35.           })
  36.       /**
  37.        * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
  38.        * and regVal is the regularization value computed in the previous iteration as well.
  39.        */
  40.       stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
  41.       val update = updater.compute(
  42.         weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
  43.       weights = update._1
  44.       regVal = update._2
  45.     }
  46.     logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
  47.       stochasticLossHistory.takeRight(10).mkString(", ")))
  48.     (weights, stochasticLossHistory.toArray)
  49.   }
复制代码

上述代码中最需要引起重视的部分是aggregate函数的使用,先看下aggregate函数的定义
  1. def
  2. aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
  3.    
  4. // Clone the zero value since we will also be serializing it as part of tasks
  5.     var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
  6.     val cleanSeqOp = sc.clean(seqOp)
  7.     val cleanCombOp = sc.clean(combOp)
  8.     val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
  9.     val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
  10.     sc.runJob(this, aggregatePartition, mergeResult)
  11.     jobResult
  12.   }
复制代码
aggregate函数有三个入参,一是初始值ZeroValue,二是seqOp,三为combOp.
  • seqOp seqOp会被并行执行,具体由各个executor上的task来完成计算
  • combOp combOp则是串行执行, 其中combOp操作在JobWaiter的taskSucceeded函数中被调用

为了进一步加深对aggregate函数的理解,现举一个小小例子。启动spark-shell后,运行如下代码
  1. val
  2. z = sc. parallelize (List (
  3. 1 ,2 ,3 ,4 ,5 ,6),2)
  4. z.aggregate (0)(math.max(_, _), _ + _)
  5. // 运 行 结 果 为 9
  6. res0: Int = 9
复制代码
仔细观察一下运行时的日志输出, aggregate提交的job由一个stage(stage0)组成,由于整个数据集被分成两个partition,所以为stage0创建了两个task并行处理。

LeastSquareGradient
讲完了aggregate函数的执行过程, 回过头来继续讲组成seqOp的gradient.compute函数。
LeastSquareGradient用来计算梯度和误差,注意cmopute中cumGraident会返回改变后的结果。这里计算公式依据的就是cost-function中的▽Q(w)
  1. class LeastSquaresGradient extends Gradient {
  2.   
  3. override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
  4.     val brzData = data.toBreeze
  5.     val brzWeights = weights.toBreeze
  6.     val diff = brzWeights.dot(brzData) - label
  7.     val loss = diff * diff
  8.     val gradient = brzData * (2.0 * diff)
  9.     (Vectors.fromBreeze(gradient), loss)
  10.   }
  11.   override def compute(
  12.       data: Vector,
  13.       label: Double,
  14.       weights: Vector,
  15.       cumGradient: Vector): Double = {
  16.     val brzData = data.toBreeze
  17.     val brzWeights = weights.toBreeze
  18.     //dot表示点积,是接受在实数R上的两个向量并返回一个实数标量的二元运算,它的结果是欧几里得空间的标准内积。
  19.     //两个向量的点积写作a·b。点乘的结果叫做点积,也称作数量积
  20.     val diff = brzWeights.dot(brzData) - label
  21.     //下面这句话完成y += a*x
  22.     brzAxpy(2.0 * diff, brzData, cumGradient.toBreeze)
  23.     diff * diff
  24.   }
  25. }
复制代码
在上述代码中频繁出现breeze相关的函数,你一定会很好奇,这是个什么新鲜玩艺。
说 开 了 其 实 一 点 也 不 稀 奇, 由 于 计 算 中 有 大 量 的 矩 阵(Matrix)及 向量(Vector)计算,为了更好支持和封装这些计算引入了breeze库。
Breeze, Epic及Puck是scalanlp中三大支柱性项目, 具体可参数www.scalanlp.org

正则化过程
根据本次迭代出来的梯度和误差对权重系数进行更新,这个时候就需要用上正则化规则了。也就是下述语句会触发权重系数的更新
  1. val update = updater.compute(
  2.      weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
复制代码
以岭回归为例,看其更新过程的代码实现。
  1. class SquaredL2Updater extends Updater {
  2.   
  3. override def compute(
  4.       weightsOld: Vector,
  5.       gradient: Vector,
  6.       stepSize: Double,
  7.       iter: Int,
  8.       regParam: Double): (Vector, Double) = {
  9.     // add up both updates from the gradient of the loss (= step) as well as
  10.     // the gradient of the regularizer (= regParam * weightsOld)
  11.     // w' = w - thisIterStepSize * (gradient + regParam * w)
  12.     // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
  13.     val thisIterStepSize = stepSize / math.sqrt(iter)
  14.     val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
  15.     brzWeights :*= (1.0 - thisIterStepSize * regParam)
  16.     brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
  17.     val norm = brzNorm(brzWeights, 2.0)
  18.     (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
  19.   }
  20. }
复制代码

结果预测
计算出权重系数(weights)和截距intecept,就可以用来创建线性回归模型LinearRegressionModel,利用模型的predict函数来对观测值进行预测
  1. class LinearRegressionModel (
  2.     override val weights: Vector,
  3.     override val intercept: Double)
  4.   
  5. extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
  6.   override protected def predictPoint(
  7.       dataMatrix: Vector,
  8.       weightMatrix: Vector,
  9.       intercept: Double): Double = {
  10.     weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
  11.   }
  12. }
复制代码

注意LinearRegression的构造函数需要权重(weights)和截距(intercept)作为入参,对新的变量做出预测需要调用predictPoint

一个完整的示例程序
在spark-shell中执行如下语句来亲自体验一下吧。
  1. import
  2. org.apache.spark.mllib.regression.LinearRegressionWithSGD
  3. import org.apache.spark.mllib.regression.LabeledPoint
  4. import org.apache.spark.mllib.linalg.Vectors
  5. // Load and parse the data
  6. val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
  7. val parsedData = data.map { line =>
  8.   val parts = line.split(',')
  9.   LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
  10. }
  11. // Building the model
  12. val numIterations = 100
  13. val model = LinearRegressionWithSGD.train(parsedData, numIterations)
  14. // Evaluate model on training examples and compute training error
  15. val valuesAndPreds = parsedData.map { point =>
  16.   val prediction = model.predict(point.features)
  17.   (point.label, prediction)
  18. }
  19. val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
  20. println("training Mean Squared Error = " + MSE)
复制代码

小结
再次强调,找到对应的假设函数,用于评估的损失函数,最优化求解方法,正则化规则

相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行

Apache Spark源码走读之3-- Task运行期之函数调用关系分析

Apache Spark源码走读之4 -- DStream实时流数据处理

Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

Apache Spark源码走读之13 -- hiveql on spark实现详解

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解

Apache Spark源码走读之17 -- 如何进行代码跟读

Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现







欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(2)人评论

跳转到指定楼层
355815741 发表于 2015-1-5 09:46:28
学习了,谢谢lz分享~
回复

使用道具 举报

ohano_javaee 发表于 2015-4-18 22:09:34
大神~大神~大神~大神~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条