分享

求助关于spark mapToPair和reduceByKey遇到的问题,求助

duliming 发表于 2016-3-7 10:25:24 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 9 36654
本帖最后由 duliming 于 2016-3-7 11:56 编辑

我想通过spark的mapToPair和reduceByKey对一年中每月数据的某个指标求最大值,最小值。然而计算的结果并正确。
具体如下:

Time
41821
41821.00694
41821.01389
41821.02083
41821.02778
41821.03472
41821.04167


对这个Time列求最大值,首先通过mapToPair对数据按照月份进行分类。已经确保这些数据是在相同的月份的。然后通过reduceByKey进行计算后结果出来最大值是41821.02778。而不是41821.04167。


尝试过自定义分区,保证这些数据在同一个分区里,但是计算结果仍然不正确。


怀疑是不是多个task的原因,多个task计算的结果不能合并。实际上得出的结果只是一个task的计算结果。


希望大神和懂的以及遇到过这个问题的前辈能指导一下。或者说有更好的方法教教我。

另外,因为我是想提高效率,所以把求最大值、求最小值、平均值等多个操作想合并到一起去做。但没想到最大值这里就遇到了困难。是不是不可以这么做。

后来我又使用combineByKey计算了一下,计算的结果是正确的。就是不明白为什么reduceByKey会错呢?求大神指点。

已有(9)人评论

跳转到指定楼层
atsky123 发表于 2016-3-7 12:42:45
map和reduce都是局部排序。整体排序应该就没有问题了
回复

使用道具 举报

duliming 发表于 2016-3-7 13:21:19
atsky123 发表于 2016-3-7 12:42
map和reduce都是局部排序。整体排序应该就没有问题了

能说的在详细点吗?怎么整体排序。我这里没用到排序呀。
回复

使用道具 举报

邓立辉 发表于 2016-3-7 17:28:26
atsky123 发表于 2016-3-7 12:42
map和reduce都是局部排序。整体排序应该就没有问题了

不是吧,先局部后整体。
回复

使用道具 举报

duliming 发表于 2016-3-8 09:11:18
邓立辉 发表于 2016-3-7 17:28
不是吧,先局部后整体。

我糊涂了。
回复

使用道具 举报

chimes298 发表于 2016-3-8 18:32:40
我做过类似的例子,用ReduceByKey传一个求max的函数可以得到正确结果。
你把代码粘上来看看。
回复

使用道具 举报

duliming 发表于 2016-3-9 09:28:49
chimes298 发表于 2016-3-8 18:32
我做过类似的例子,用ReduceByKey传一个求max的函数可以得到正确结果。
你把代码粘上来看看。

[mw_shl_code=java,true]SparkConf conf = new SparkConf();
                conf.setAppName("dataAnalysis");
                conf.setMaster("local[2]");
                JavaSparkContext sc = new JavaSparkContext(conf);
                SQLContext sqlContext = new SQLContext(sc);
               
                String testFileSrc = "/dataAnalysis/testData/testTopTen.csv";

JavaRDD<FanDataVO> Data = sc.textFile(testFileSrc).map(new Function<String, FanDataVO>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        public FanDataVO call(String row) throws Exception {
                                String[] rowArr = row.split(",");
                                if(rowArr[0].equals("Time")) return new FanDataVO();
                                FanDataVO vo = new FanDataVO();
                                vo.setTime(rowArr[0]);
                                vo.setWt_WsAvg(rowArr[7]);
                                vo.setWt_PowerAvg(rowArr[20]);
                                vo.setRs(rowArr[8]);
                                vo.setMast_T(rowArr[19]);
                                vo.setDate_Begin(rowArr[0]);
                                vo.setDate_End(rowArr[0]);
                                return vo;
                        }
                });

JavaPairRDD<String, FanDataVO> monthPair = Data.mapToPair(new PairFunction<FanDataVO, String, FanDataVO>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        public Tuple2<String, FanDataVO> call(FanDataVO vo)
                                        throws Exception {
                                String month = DateUtil.getMonthByDays(vo.getTime());
                                return new Tuple2<String, FanDataVO>(month, vo);
                        }
                });
monthPair = monthPair.reduceByKey(new Function2<FanDataVO, FanDataVO, FanDataVO>() {
                        
                        @Override
                        public FanDataVO call(FanDataVO oldVO, FanDataVO newVO) throws Exception {
                                if (StringUtils.isNotBlank(newVO.getTime())) {
                                        if (StringUtils.isBlank(oldVO.getDate_Begin())) {
                                                oldVO.setDate_Begin(oldVO.getTime());
                                        } else {
                                                // 每月的最小时间
                                                if (Double.parseDouble(oldVO.getDate_Begin()) > Double
                                                                .parseDouble(newVO.getTime())) {
                                                        oldVO.setDate_Begin(newVO.getTime());
                                                }
                                        }
                                        if (StringUtils.isBlank(oldVO.getDate_End())) {
                                                oldVO.setDate_End(oldVO.getTime());
                                        } else {
                                                // 每月的最大时间
                                                if (Double.parseDouble(oldVO.getDate_End()) < Double
                                                                .parseDouble(newVO.getTime())) {
                                                        oldVO.setDate_End(newVO.getTime());
                                                }
                                        }
                                }
                                return oldVO;
                        }
                });[/mw_shl_code]
回复

使用道具 举报

chimes298 发表于 2016-3-9 16:44:36
duliming 发表于 2016-3-9 09:28
[mw_shl_code=java,true]SparkConf conf = new SparkConf();
                conf.setAppName("dataAna ...


if (StringUtils.isBlank(oldVO.getDate_Begin())) {
    oldVO.setDate_Begin(oldVO.getTime());
}

Time和Date_Begin都是set了rowArr[0],当oldVO.getDate_Begin()为空时,oldVO.getTime()也是空吧


回复

使用道具 举报

chimes298 发表于 2016-3-9 16:53:19
另外,用reduceByKey,平均值很难求吧
回复

使用道具 举报

duliming 发表于 2016-3-9 17:11:18
chimes298 发表于 2016-3-9 16:44
if (StringUtils.isBlank(oldVO.getDate_Begin())) {
    oldVO.setDate_Begin(oldVO.getTime());
}  ...

我的数据在确保都不为空的情况下。统计最大值也不正确。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条