本帖最后由 xioaxu790 于 2014-10-21 17:46 编辑
问题导读
1、如何理解Spark的fold操作?
2、如何该action的功能?
3、什么是Spark aggregate?
Spark的fold操作用于对RDD中的元素进行迭代操作,并且利用了一个变量保存迭代过程中的中间结果。其函数原型为:
fold(self, zeroValue, op) 复制代码
我们先用一段代码来体验下该action的功能。
def test_fold(sc):
def add(c, v):
print 'c = ' + str(c) + '; v = ' + str(v)
return c+v
data = sc.parallelize(np.arange(10), 2)
datasum = data.fold(0, add)
print datasum
if __name__ == '__main__':
sc = SparkContext(appName='Test')
test_fold(sc)
# 第一个partition的输出
c = 0; v = 0
c = 1; v = 0
c = 2; v = 1
c = 3; v = 3
c = 4; v = 6
# 第二个partition的输出
c = 5; v = 0
c = 6; v = 5
c = 7; v = 11
c = 8; v = 18
c = 9; v = 26
# combine过程的输出
c = 0; v = 10
c = 10; v = 35
# 最终结果输出
45 复制代码
代码的功能就是对0-9这10个数字做累加操作,最终结果得到45,正确。我们看到zeroValue用于初始化每个partition的v,v是一个变量,用于在每个element的计算过程中保持不变。并且fold动作的最后需要对每个partitions的输出再做一次add,并且此时c初始化为zeroValue,v的值是每个partition的输出。如果将zeroValue改成1,则能清楚看到这个变化,其最后结果为48。
fold动作需要保证每个partition能够独立进行运算。它同aggregate的区别在于后者对于不同partitions提交的最终结果定义了一个专门的comOp函数来进行处理,而前者则都是采用同一个函数进行处理
Spark aggregate
Spark的官方文档其实说得并不是明了,很多内容如果不去研究源码,或者不去实验查看过程,你压根就不知道它的真正功能是啥,比如今天要说的aggregate。
aggregate的官方描述是:
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U.
函数原型:aggregate(self, zeroValue, seqOp, combOp)
现在先用一段代码测试它的输出:
def test_aggregate(sc):
def seq(c, v):
print 'In seq c ' + str(c)
print 'In seq v ' + str(v)
return c[0]+v[1], c[1]+0.1
def comb(c1, c2):
print 'In comb c1 ' + str(c1)
print 'In comb c2 ' + str(c2)
return c1[0]-c2[0], c1[1]+c2[1]
data = sc.parallelize([(1.0, np.arange(10)), (2.0, np.arange(10)), (2.0, np.arange(10)), (1.0, np.arange(10))],2)
gradientSum, lossSum = data.aggregate((np.ones(10),0), seq, comb)
print gradientSum, lossSum
if __name__ == '__main__':
sc = SparkContext(appName='Test')
test_aggregate(sc)
# 第一个partition的输出
In seq c (array([ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), 0)
In seq v (1.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
In seq c (array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]), 0.1)
In seq v (2.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
In comb c1 (array([ 1., 3., 5., 7., 9., 11., 13., 15., 17., 19.]), 0.2)
In comb c2 (array([ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), 0)
# 第二个partition的输出
In seq c (array([ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), 0)
In seq v (2.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
In seq c (array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]), 0.1)
In seq v (1.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
In comb c1 (array([ 1., 3., 5., 7., 9., 11., 13., 15., 17., 19.]), 0.2)
In comb c2 (array([ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), 0)
# combine的输出
In comb c1 (array([ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]), 0)
In comb c2 (array([ 0., 2., 4., 6., 8., 10., 12., 14., 16., 18.]), 0.2)
In comb c1 (array([ 1., -1., -3., -5., -7., -9., -11., -13., -15., -17.]), 0.2)
In comb c2 (array([ 0., 2., 4., 6., 8., 10., 12., 14., 16., 18.]), 0.2)
# 最后结果输出
[ 1. -3. -7. -11. -15. -19. -23. -27. -31. -35.] 0.4 复制代码
从上面可以看出,aggregate的作用大概是对每个partition应用seqOp操作和combOp操作,然后对这个由各个partitions的结果构成的结果空间再做一次combOp。zeroValue是初值,seqOp输入U和T,U的初值就是zeroValue,T是RDD的element,seqOp对每个element操作的结果作为下一个element的U。combOp的输入是U1,U2,其中U1的初值是zeroValue,U2是各个partitions的输出。
让我不解的是,为什么在每个partition计算完seqOp之后,都要做一次combOp(U1, U2)操作,且U1是seqOp最终的计算结果,U2则是zeroValue?