分享

Spark给分组传递函数进行计算

zhuangmz 发表于 2016-11-18 22:28:01 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 9159
本帖最后由 zhuangmz 于 2016-11-18 22:34 编辑

大家好,请问Spark 2里,是否类似pandas的groupby函数,达到split-apply-combine的效果。简单的说,是 DataFrame.groupby(columns...).foreach(rows => ...)或者 DataFrame.groupby(columns...).foreach(rows => ...).collect(...)
举例:有一个大的DataFrame
| type | feature1 | feature2 | ... |
| A     | 1           | 1         | ..... |
| B     | 2           | 2         | ..... |
...

我想第一步根据 type 进行分组,得到两个DataFrame。

| type | feature1 | feature2 | ... |
| A     | 1           | 1         | ..... |
...

| type | feature1 | feature2 | ... |
| B     | 2           | 2         | ..... |
...

分别用自定义的函数进行计算,比如说Kmeans分类函数。

dfA:
| type | centroid_1 | centroid_2 | ... |
| A     | 1           | 1         | ..... |

dfB:
| type | centroid_1 | centroid_2 | ... |
| B     | 2           | 2         | ..... |


最后一步是合并结果。
如果是foreach,在函数里面存到数据库,那就不需要合并了。
如果是collect,则可以返回 map<key, DataFrame>,或者就合并成一个DataFrame。

谢谢!





已有(5)人评论

跳转到指定楼层
qcbb001 发表于 2016-11-19 08:59:51
没见过这样的。
个人建议:
可以从dataframe里面根据类型都提取出来,形成多个按照类型分开的dataframe.
然后在每个处理
回复

使用道具 举报

zhuangmz 发表于 2016-11-19 09:19:35
本帖最后由 zhuangmz 于 2016-11-19 09:23 编辑
qcbb001 发表于 2016-11-19 08:59
没见过这样的。
个人建议:
可以从dataframe里面根据类型都提取出来,形成多个按照类型分开的dataframe.
...
形成多个按照类型分开的dataframe

如何形成多个dataframe呢?for循环类型?每次从dataframe.filter(r => r.getType() == type)得到小dataframe?我的dataframe是从Hive表(parquet文件)读取出来的有10几亿条记录。每个类型里有大概200万条记录。
这样大的表,如果把大Dataframe.cache()。内存肯定是不够存的
如果for循环,会不会造成重复扫描Hive表?
谢谢




回复

使用道具 举报

nextuser 发表于 2016-11-19 22:00:27
本帖最后由 nextuser 于 2016-11-19 22:08 编辑

个人建议:
可先分割数据,然后在创建dataframe.而不是都放到dataframe中在分割

回复

使用道具 举报

zhuangmz 发表于 2016-11-20 11:03:05
nextuser 发表于 2016-11-19 22:00
个人建议:
可先分割数据,然后在创建dataframe.而不是都放到dataframe中在分割

谢谢。我的数据是放在Hive表里。
你的意思是读的时候,用 select * from xxx where type='A' 读第一个dataframe,对他进行处理。然后foreach处理。

我一共有n个type。
这样子的话,Hive表是会被全表扫描n遍吗?
如果我先用Spark `select * from xxx`,指针赋给bigDF。
然后bigDF.cache()
然后做foreach,每次是bigDF.filter(type='A'),这样子的话,Hive表也会被全表扫描n遍,但是有了cache,会快很多吧?
比较不懂的是bigDF特别大的时候,内存存不下,cache会怎么处理的?
谢谢
回复

使用道具 举报

nextuser 发表于 2016-11-20 13:47:04
zhuangmz 发表于 2016-11-20 11:03
谢谢。我的数据是放在Hive表里。
你的意思是读的时候,用 select * from xxx where type='A' 读第一个da ...

cache()同步数据的内存
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条