分享

Spark aggregate介绍

问题导读
1、如何了解Spark的功能?
2、什么是Spark aggregate?




Spark的官方文档其实说得并不是明了,很多内容如果不去研究源码,或者不去实验查看过程,你压根就不知道它的真正功能是啥,比如今天要说的aggregate。

aggregate的官方描述是:
Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral "zero value." The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U.
本文译:聚合每个分区的元素,然后所有的分区结果,使用给定的组合功能和一个中立的“零值。“功能op(t1,t2)允许修改t1和它作为结果返回值以避免对象分配;然而,它不应该修改t2。第一个函数可以返回不同的结果类型(seqOp),U,比这个抽样的类型。因此,我们需要一个操作合并T成一个和一个操作合并两个)

函数原型:aggregate(self, zeroValue, seqOp, combOp)

现在先用一段代码测试它的输出:
  1. def test_aggregate(sc):
  2.     def seq(c, v):
  3.         print 'In seq c ' + str(c)
  4.         print 'In seq v ' + str(v)
  5.         return c[0]+v[1], c[1]+0.1
  6.     def comb(c1, c2):
  7.         print 'In comb c1 ' + str(c1)
  8.         print 'In comb c2 ' + str(c2)
  9.         return c1[0]-c2[0], c1[1]+c2[1]
  10.     data = sc.parallelize([(1.0, np.arange(10)), (2.0, np.arange(10)), (2.0, np.arange(10)), (1.0, np.arange(10))],2)
  11.     gradientSum, lossSum = data.aggregate((np.ones(10),0), seq, comb)
  12.     print gradientSum, lossSum
  13. if __name__ == '__main__':
  14.     sc = SparkContext(appName='Test')
  15.     test_aggregate(sc)
  16. # 第一个partition的输出
  17. In seq c (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  18. In seq v (1.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
  19. In seq c (array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.]), 0.1)
  20. In seq v (2.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
  21. In comb c1 (array([  1.,   3.,   5.,   7.,   9.,  11.,  13.,  15.,  17.,  19.]), 0.2)
  22. In comb c2 (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  23. # 第二个partition的输出
  24. In seq c (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  25. In seq v (2.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
  26. In seq c (array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.]), 0.1)
  27. In seq v (1.0, array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
  28. In comb c1 (array([  1.,   3.,   5.,   7.,   9.,  11.,  13.,  15.,  17.,  19.]), 0.2)
  29. In comb c2 (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  30. # combine的输出
  31. In comb c1 (array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]), 0)
  32. In comb c2 (array([  0.,   2.,   4.,   6.,   8.,  10.,  12.,  14.,  16.,  18.]), 0.2)
  33. In comb c1 (array([  1.,  -1.,  -3.,  -5.,  -7.,  -9., -11., -13., -15., -17.]), 0.2)
  34. In comb c2 (array([  0.,   2.,   4.,   6.,   8.,  10.,  12.,  14.,  16.,  18.]), 0.2)
  35. # 最后结果输出
  36. [  1.  -3.  -7. -11. -15. -19. -23. -27. -31. -35.] 0.4
复制代码


从上面可以看出,aggregate的作用大概是对每个partition应用seqOp操作和combOp操作,然后对这个由各个partitions的结果构成的结果空间再做一次combOp。zeroValue是初值,seqOp输入U和T,U的初值就是zeroValue,T是RDD的element,seqOp对每个element操作的结果作为下一个element的U。combOp的输入是U1,U2,其中U1的初值是zeroValue,U2是各个partitions的输出。

让我不解的是,为什么在每个partition计算完seqOp之后,都要做一次combOp(U1, U2)操作,且U1是seqOp最终的计算结果,U2则是zeroValue?

已有(2)人评论

跳转到指定楼层
江火似流星 发表于 2018-12-5 10:58:16
谢谢楼主的分享
回复

使用道具 举报

海尔兄弟o 发表于 2018-12-5 16:32:08
很优秀的楼主
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条