Spark的fold操作用于对RDD中的元素进行迭代操作,并且利用了一个变量保存迭代过程中的中间结果。其函数原型为:
fold(self, zeroValue, op)
我们先用一段代码来体验下该action的功能。
def test_fold(sc):
def add(c, v):
print 'c = ' + str(c) + '; v = ' + str(v)
return c+v
data = sc.parallelize(np.arange(10), 2)
datasum = data.fold(0, add)
print datasum
if __name__ == '__main__':
sc = SparkContext(appName='Test')
test_fold(sc)
# 第一个partition的输出
c = 0; v = 0
c = 1; v = 0
c = 2; v = 1
c = 3; v = 3
c = 4; v = 6
# 第二个partition的输出
c = 5; v = 0
c = 6; v = 5
c = 7; v = 11
c = 8; v = 18
c = 9; v = 26
# combine过程的输出
c = 0; v = 10
c = 10; v = 35
# 最终结果输出
45
代码的功能就是对0-9这10个数字做累加操作,最终结果得到45,正确。我们看到zeroValue用于初始化每个partition的v,v是一个变量,用于在每个element的计算过程中保持不变。并且fold动作的最后需要对每个partitions的输出再做一次add,并且此时c初始化为zeroValue,v的值是每个partition的输出。如果将zeroValue改成1,则能清楚看到这个变化,其最后结果为48。
fold动作需要保证每个partition能够独立进行运算。它同aggregate的区别在于后者对于不同partitions提交的最终结果定义了一个专门的comOp函数来进行处理,而前者则都是采用同一个函数进行处理。