分享

hadoop(大数据)统计相同手机号的总数

需求说明:
找相同手机号出现的总数

案例数据
1.png
file:///C:\Users\ADMINI~1\AppData\Local\Temp\ksohtml\wpsAF48.tmp.jpg

程序
Mapper
package com.fish.had.cellphonenumbercount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CellPhoneNumberMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

        final LongWritable ONE = new LongWritable(1L);

        @Override
        protected void map(LongWritable k1, Text v1,
                        Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                        throws IOException, InterruptedException {
                String v = v1.toString();
                if(null != v && !"".equals(v)){
                        String[] ss = v.split("\t");//2
                        if(ss.length > 2){
                                if(ss[2].length() == 11 && ss[2].indexOf(".") < 0){
                                        context.write(new Text(ss[2]), ONE);
                                }
                        }
                }
        }
}
说明:
1、根据案例数据发现,第三列为需要的数据
if(null != v && !"".equals(v)){
                        String[] ss = v.split("\t");//2
                        if(ss.length > 2){
2、判断数据,取11位。通过 . 过滤11位的ip
if(ss[2].length() == 11 && ss[2].indexOf(".") < 0){

Combiner
package com.fish.had.cellphonenumbercount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class CellPhoneNumberCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{

        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s,
                        Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                        throws IOException, InterruptedException {
                long sum = 0L;
                for (LongWritable v2 : v2s){
                        sum += v2.get();
                }
                context.write(k2, new LongWritable(sum));
        }
}


Reducer
package com.fish.had.cellphonenumbercount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class CellPhoneNumberReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s,
                        Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                        throws IOException, InterruptedException {
                long sum = 0L;
                for (LongWritable v2 : v2s){
                        sum += v2.get();
                }
                context.write(k2, new LongWritable(sum));
        }
}

Main
package com.fish.had.cellphonenumbercount;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class CellPhoneNumberMain extends Configured implements Tool{

        public static void main(String[] args) throws Exception {
                if(args.length != 2){
                        System.err.print("param is two ,not other\n");
                        System.exit(1);
                }
                ToolRunner.run(new CellPhoneNumberMain(), args);
        }

        @Override
        public int run(String[] args) throws Exception {
                String INPUTPATH = args[0];
                String OUTPUTPATH = args[1];
                Path outPutPath = new Path(OUTPUTPATH);

                //判断输出目录是否exist,若存在,即删除
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(new URI(INPUTPATH), conf);
                if(fs.exists(outPutPath)) fs.delete(outPutPath, true);

                //生成job
                @SuppressWarnings("deprecation")
                Job job = new Job(conf, CellPhoneNumberMain.class.getSimpleName());
                //指定jarclass
                job.setJarByClass(CellPhoneNumberMain.class);
                //1.指定输入/输出目录
                FileInputFormat.setInputPaths(job, new Path(INPUTPATH));
                FileOutputFormat.setOutputPath(job, outPutPath);
                //2.指定对输入/输出数据进行格式化的类
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                //3.指定自定义的Mapper/Reducer
                job.setMapperClass(CellPhoneNumberMapper.class);
                job.setReducerClass(CellPhoneNumberReducer.class);
                //3.1指定map输出的<k,v>类型
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
                //3.2指定reducer输出<k,v>类型
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
                //4.分区
                job.setPartitionerClass(HashPartitioner.class);
                job.setNumReduceTasks(1);//0
                //5.归约
                job.setCombinerClass(CellPhoneNumberCombiner.class);
                //6.把作业交给jobTracker运行
                job.waitForCompletion(true);

                return 0;
        }

}

生成jar
2.png

运行
3.png

查看结果
4.png



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条