分享

Spark教程(1)Spark-fold

xioaxu790 2014-10-21 17:41:29 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 14399
本帖最后由 xioaxu790 于 2014-10-21 17:46 编辑
问题导读
1、如何理解Spark的fold操作?
2、如何该action的功能?
3、什么是Spark aggregate?




Spark的fold操作用于对RDD中的元素进行迭代操作,并且利用了一个变量保存迭代过程中的中间结果。其函数原型为:
  1. fold(self, zeroValue, op)
复制代码



我们先用一段代码来体验下该action的功能。
  1. def test_fold(sc):
  2.     def add(c, v):
  3.         print 'c = ' + str(c) + '; v = ' + str(v)
  4.         return c+v
  5.     data = sc.parallelize(np.arange(10), 2)
  6.     datasum = data.fold(0, add)
  7.     print datasum
  8. if __name__ == '__main__':
  9.     sc = SparkContext(appName='Test')
  10.     test_fold(sc)
  11. # 第一个partition的输出
  12. c = 0; v = 0
  13. c = 1; v = 0
  14. c = 2; v = 1
  15. c = 3; v = 3
  16. c = 4; v = 6
  17. # 第二个partition的输出
  18. c = 5; v = 0
  19. c = 6; v = 5
  20. c = 7; v = 11
  21. c = 8; v = 18
  22. c = 9; v = 26
  23. # combine过程的输出
  24. c = 0; v = 10
  25. c = 10; v = 35
  26. # 最终结果输出
  27. 45
复制代码



代码的功能就是对0-9这10个数字做累加操作,最终结果得到45,正确。我们看到zeroValue用于初始化每个partition的v,v是一个变量,用于在每个element的计算过程中保持不变。并且fold动作的最后需要对每个partitions的输出再做一次add,并且此时c初始化为zeroValue,v的值是每个partition的输出。如果将zeroValue改成1,则能清楚看到这个变化,其最后结果为48。

fold动作需要保证每个partition能够独立进行运算。它同aggregate的区别在于后者对于不同partitions提交的最终结果定义了一个专门的comOp函数来进行处理,而前者则都是采用同一个函数进行处理




Spark aggregate

Spark的官方文档其实说得并不是明了,很多内容如果不去研究源码,或者不去实验查看过程,你压根就不知道它的真正功能是啥,比如今天要说的aggregate。

aggregate的官方描述是:
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U.

函数原型:aggregate(self, zeroValue, seqOp, combOp)

现在先用一段代码测试它的输出:
  1. def test_aggregate(sc):
  2.     def seq(c, v):
  3.         print 'In seq c ' + str(c)
  4.         print 'In seq v ' + str(v)
  5.         return c[0]+v[1], c[1]+0.1
  6.     def comb(c1, c2):
  7.         print 'In comb c1 ' + str(c1)
  8.         print 'In comb c2 ' + str(c2)
  9.         return c1[0]-c2[0], c1[1]+c2[1]
  10.     data = sc.parallelize([(1.0, np.arange(10)), (2.0, np.arange(10)), (2.0, np.arange(10)), (1.0, np.arange(10))],2)
  11.     gradientSum, lossSum = data.aggregate((np.ones(10),0), seq, comb)
  12.     print gradientSum, lossSum
  13. if __name__ == '__main__':
  14.     sc = SparkContext(appName='Test')
  15.     test_aggregate(sc)
  16. # 第一个partition的输出
  17. In seq c (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  18. In seq v (1.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
  19. In seq c (array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.]), 0.1)
  20. In seq v (2.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
  21. In comb c1 (array([  1.,   3.,   5.,   7.,   9.,  11.,  13.,  15.,  17.,  19.]), 0.2)
  22. In comb c2 (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  23. # 第二个partition的输出
  24. In seq c (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  25. In seq v (2.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
  26. In seq c (array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.]), 0.1)
  27. In seq v (1.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
  28. In comb c1 (array([  1.,   3.,   5.,   7.,   9.,  11.,  13.,  15.,  17.,  19.]), 0.2)
  29. In comb c2 (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  30. # combine的输出
  31. In comb c1 (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  32. In comb c2 (array([  0.,   2.,   4.,   6.,   8.,  10.,  12.,  14.,  16.,  18.]), 0.2)
  33. In comb c1 (array([  1.,  -1.,  -3.,  -5.,  -7.,  -9., -11., -13., -15., -17.]), 0.2)
  34. In comb c2 (array([  0.,   2.,   4.,   6.,   8.,  10.,  12.,  14.,  16.,  18.]), 0.2)
  35. # 最后结果输出
  36. [  1.  -3.  -7. -11. -15. -19. -23. -27. -31. -35.] 0.4
复制代码




从上面可以看出,aggregate的作用大概是对每个partition应用seqOp操作和combOp操作,然后对这个由各个partitions的结果构成的结果空间再做一次combOp。zeroValue是初值,seqOp输入U和T,U的初值就是zeroValue,T是RDD的element,seqOp对每个element操作的结果作为下一个element的U。combOp的输入是U1,U2,其中U1的初值是zeroValue,U2是各个partitions的输出。

让我不解的是,为什么在每个partition计算完seqOp之后,都要做一次combOp(U1, U2)操作,且U1是seqOp最终的计算结果,U2则是zeroValue?

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条