分享

大数据mapreduce疑惑

Parameter接口:
[mw_shl_code=java,true]package com.xiaohong.TongJi;

public interface Parameter {
        /**
         * LenFields is the length of fields, or columns; it need to be modified for varied
         */
        public final int LenFields = 4;
        /**
         * iFields[0] for min()
         * iFields[1] for max()
         * iFields[2] for sum()
         * iFields[3] for sum2()
         */
        public final int LenStat = 4;

        public final int iMin = 0x80000000;
        public final int iMax = 0x7FFFFFFF;
        public final float fMin = (float) -3.4E38;
        public final float fMax = (float) 3.4E38;
}
[/mw_shl_code]

map端:
[mw_shl_code=java,true]package com.xiaohong.TongJi;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TongJiMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> implements Parameter {

        /**
         * The iFields[][] is the Array for Keys to present the min(), max(), sum()
         * of each field in the input. iFields[0] for min() iFields[1] for max()
         * iFields[2] for sum() iFields[3] for sum2()
         */

        private static IntWritable iFields[][] = new IntWritable[LenStat][LenFields];
        private static float min[] = new float[LenFields];
        private static float max[] = new float[LenFields];
        private static IntWritable iwCnt = new IntWritable(LenStat * LenFields);
        private final static FloatWritable one = new FloatWritable(1);

        public TongJiMapper() {
                for (int i = 0; i < LenStat; i++) {
                        for (int j = 0; j < LenFields; j++) {
                                iFields[j] = new IntWritable(i * LenFields + j);
                        }
                }

                for (int j = 0; j < LenFields; j++) {
                        min[j] = fMax; // the maximum integer
                        max[j] = fMin; // the minimum integer
                }

        }

        @Override
        protected void map(LongWritable key, Text values,
                        Mapper<LongWritable, Text, IntWritable, FloatWritable>.Context context)
                        throws IOException, InterruptedException {

                /*
                 * 原始数据:
                 *         9.0000000e+000        1.0000000e+000        2.0000000e+000        4.0000000e+000
                        3.0000000e+000        8.0000000e+000        4.0000000e+000        8.0000000e+000
                        6.0000000e+000        5.0000000e+000        9.0000000e+000        1.0000000e+000
                        5.0000000e+000        6.0000000e+000        9.0000000e+000        2.0000000e+000
                        9.0000000e+000        8.0000000e+000        4.0000000e+000        2.0000000e+000
                        7.0000000e+000        9.0000000e+000        9.0000000e+000        2.0000000e+000
                        5.0000000e+000        7.0000000e+000        1.0000000e+000        6.0000000e+000
                 */
                StringTokenizer st = new StringTokenizer(values.toString().toLowerCase(), " \t,;");
                float iTmp;
                for (int j = 0; j < LenFields; j++) {
                        /** handle each field. */
                        iTmp = Float.parseFloat(st.nextToken());
                        /**
                         * for min(), this judgement just output
                         * about 37 <key,value> pairs in 100,000
                         * records.
                         */
                        
                        if (min[j] > iTmp) {
                                min[j] = iTmp;
                                context.write(iFields[0][j], new FloatWritable(min[j]));
                        }
                        if (max[j] < iTmp) { /** for max() */
                                max[j] = iTmp;
                                context.write(iFields[1][j], new FloatWritable(max[j]));
                        }
                        context.write(iFields[2][j], new FloatWritable(iTmp));
                        /** for sum() */
                        context.write(iFields[3][j], new FloatWritable(iTmp * iTmp));/** for sum2() */
                }
                context.write(iwCnt, one); /** for cnt() */

        }

}
[/mw_shl_code]

reducer端:
[mw_shl_code=java,true]package com.xiaohong.TongJi;

import java.io.IOException;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class TongJiReducer extends Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>
                implements Parameter {

        @Override
        protected void reduce(IntWritable K, Iterable<FloatWritable> values,
                        Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>.Context context)
                        throws IOException, InterruptedException {

                float min = iMax;
                float max = iMin;
                float sum = 0;
                long iCnt = 0;
                int iCategory = K.get() / LenFields; // restore the category from Keys.
                switch (iCategory) {
                case 0:/** min() */
                        for (FloatWritable value : values) {
                                if (min > value.get()) {
                                        min = value.get();
                                }
                        }
                        context.write(K, new FloatWritable(min));
                        break;
                case 1:/** max() */
                        for (FloatWritable value : values) {
                                if (max < value.get()) {
                                        max = value.get();
                                }
                        }
                        context.write(K, new FloatWritable(max));
                        break;
                case 2:/** sum() */
                        for (FloatWritable value : values) {
                                sum += value.get();
                        }
                        context.write(K, new FloatWritable(sum));
                        break;
                case 3: /** sum2() */
                        for (FloatWritable value : values) {
                                sum += value.get();
                        }
                        context.write(K, new FloatWritable(sum));
                        break;
                case 4:
                        for (FloatWritable value : values) {
                                iCnt += value.get();
                        }
                        context.write(K, new FloatWritable(iCnt));
                        break;
                } // switch

        }

}
[/mw_shl_code]

yarn客户端:
[mw_shl_code=java,true]package com.xiaohong.TongJi;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class JobSubMitter {

        public static void main(String[] args) throws Exception {

                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf);

                job.setJarByClass(JobSubMitter.class);

                job.setMapperClass(TongJiMapper.class);
                job.setReducerClass(TongJiReducer.class);

                job.setMapOutputKeyClass(IntWritable.class);
                job.setMapOutputValueClass(FloatWritable.class);

                job.setOutputKeyClass(IntWritable.class);
                job.setOutputValueClass(FloatWritable.class);

                job.setInputFormatClass(TextInputFormat.class);
                FileInputFormat.setInputPaths(job, new Path("d:/mrData/TongJi/input"));

                job.setOutputFormatClass(TextOutputFormat.class);
                FileOutputFormat.setOutputPath(job, new Path("d:/mrData/TongJi/output"));

                boolean res = job.waitForCompletion(true);
                System.exit(res ? 0 : 1);

        }

}
[/mw_shl_code]

问题:我想问下当reducer端执行到这句代码的时候"int iCategory = K.get()",怎么将K转换成0~4了呢?,K不是0~15吗,难道调用.get()方法有什么玄机吗?

已有(4)人评论

跳转到指定楼层
nextuser 发表于 2018-6-2 07:48:03
为什么map的代码都注释掉。虽然只看懂了一部分,但是下面似乎看懂了。
  int iCategory = K.get() / LenFields; // restore the category from Keys.

假如K.get() 是16,那么LenFields是4,那么iCategory的结果就是 int iCategory=4
回复

使用道具 举报

若余相思28 发表于 2018-6-2 09:53:22
nextuser 发表于 2018-6-2 07:48
为什么map的代码都注释掉。虽然只看懂了一部分,但是下面似乎看懂了。
  int iCategory = K.get() / LenFi ...

我就是不懂你说这里,能解释下么,.get()方法不就是将value变为整形吗,怎么变成别的数字呢
回复

使用道具 举报

arsenduan 发表于 2018-6-2 12:39:54
若余相思28 发表于 2018-6-2 09:53
我就是不懂你说这里,能解释下么,.get()方法不就是将value变为整形吗,怎么变成别的数字呢

楼主理解了第一步,第二步其实就是我们高中的取余数,O(∩_∩)O哈哈~
int iCategory = K.get() / LenFields;
看上面,LenFields=4的,这个没有问题吧
看这里
[mw_shl_code=java,true]package com.xiaohong.TongJi;

public interface Parameter {
        /**
         * LenFields is the length of fields, or columns; it need to be modified for varied
         */
        public final int LenFields = 4;
        /**
         * iFields[0] for min()
         * iFields[1] for max()
         * iFields[2] for sum()
         * iFields[3] for sum2()
         */
        public final int LenStat = 4;

        public final int iMin = 0x80000000;
        public final int iMax = 0x7FFFFFFF;
        public final float fMin = (float) -3.4E38;
        public final float fMax = (float) 3.4E38;
}[/mw_shl_code]

那么任何数对4取余都超不过4的。

比如:
1/4取余是1
2/4取余是2
3/4取余是3
4/4取余是0
5/4取余是1
如此循环。都不会超过4





回复

使用道具 举报

若余相思28 发表于 2018-6-2 13:09:46
arsenduan 发表于 2018-6-2 12:39
楼主理解了第一步,第二步其实就是我们高中的取余数,O(∩_∩)O哈哈~
int iCategory = K.get() / LenFie ...

我懂了
int iCategory = K.get() / LenFields;   这个并不是取余数
而是取整数,我们懂了,我一直把它看成取余数,我懂了,谢谢您哈!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条