分享

使用hadoop mapreduce实现ip地理位置统计并直接入库mysql

本帖最后由 yuwenge 于 2015-6-9 17:30 编辑

问题导读

1.在mapreduce中如何实现将结果写入mysql数据库中?
2.通过自定义哪个类实现将reduce结果写到数据库?
3.使用mapreduce实现ip地理位置统计ip归属地和运营商?






使用hadoop实现IP个数统计,并将结果写入数据库


hadoop源代码中的WordCount事例中实现了单词统计,但是输出到HDFS文件,线上程序想使用其计算结果还要再次写个程序,所以自己就研究一下关于MapReduce的输出问题,下面就通过一个简单的例子说明下如何将MapReduce的计算结果输出到数据库中。

需求描述:
    分析网络服务器上的Apache日志,统计每个IP访问资源的次数,并将结果写入到mysql数据库中。

数据格式:

    Apache日志数据如下图所示:

1.png

    一行数据就是一条http请求记录,该事例只做简单的IP个数统计。

需求分析:

    通过MapReduce对日志文件采用分布式计算,map主要对日志做简单的拆分计数,reduce对map的结果求和。
    map程序对一行日志数据做简单的拆分,获取客户端IP,输出结果为 key为客户端IP,value为IP出现次数。结果样例如下图所示:
2.png
    reduce程序对Key值下的values做求和计算,输出结果为 key为客户端IP,value为IP出现次数。结果样例如下图所示:
3.png

    上面的MapReduce程序和WordCount程序类似,只是对IP做了简单的求和计算,下面就需要写reduce的输出格式,使计算结果写入到mysql数据库中。
    MapReduce支持用户自定义的输出格式,定义的类只需要继承FileOutputFormat即可。实现如下图所示:

4.png

自定义输出需要实现getRecordWriter方法,这里通过内部类的方式,实现了自定义的RecordWriter,在MysqlRecordWriter类中实现相关的输出即可完成将reduce结果数据写入到数据库中,具体实现如下图所示:

5.png
    在MapReduce程序中,在关于job的设置,只需要将输出格式指定为该输出格式即可完成将reduce的结果写入到数据库中。
job.setOutputFormatClass(MysqlOutputFormat.class);


代码实现:
    日志一行记录分析类TextLine,该类实现了从日志记录中提取IP信息以及IP次数(一行数据就是1次),代码如下:
[mw_shl_code=java,true]/**
* 日志行分析
* @author lulei
*/  
package com;  
  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
  
public class TextLine {  
    private String ip;  
    private IntWritable one = new IntWritable(1);  
    //标识数据是否为可用  
    private boolean right = true;  
      
    public TextLine(String textLine){  
        //检验一行日志数据是否符合要求,如不符合,将其标识为不可用  
        if (textLine == null || "".equals(textLine)) {  
            this.right = false;  
            return;  
        }  
        String []strs = textLine.split(" ");  
        if (strs.length < 2) {  
            this.right = false;  
            return;  
        }  
        this.ip = strs[0];  
    }  
      
    public boolean isRight() {  
        return right;  
    }  
  
    /**
     * 返回map的输出key值
     * @return
     */  
    public Text getIPCountMapOutKey() {  
        return new Text(this.ip);  
    }  
      
    /**
     * 返回map的输出value值
     * @return
     */  
    public IntWritable getIPCountMapOutValue() {  
        return this.one;  
    }  
}  [/mw_shl_code]

IPCountMR类实现了MapReduce功能,实现了日志数据中的IP出现次数统计,代码如下:

[mw_shl_code=java,true]/**
* 各IP出现次数统计
*/  
package com;  
  
import java.io.IOException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
public class IPCountMR extends Configured implements Tool{  
    /**
     * ip个数统计map
     * @author lulei
     */  
    public static class IPCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {  
  
        @Override  
        public void map(LongWritable key, Text value, Context context)  
                throws IOException, InterruptedException {  
            TextLine textLine = new TextLine(value.toString());  
            if (textLine.isRight()) {  
                context.write(textLine.getIPCountMapOutKey(), textLine.getIPCountMapOutValue());  
            }  
        }  
         
    }  
      
    /**
     * ip个数统计reduce
     * @author lulei
     */  
    public static class IPCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {  
  
        @Override  
        public void reduce(Text key, Iterable<IntWritable> values, Context context)  
                throws IOException, InterruptedException {  
            int sum = 0;  
            for (IntWritable value : values) {  
                sum += value.get();  
            }  
            context.write(key, new IntWritable(sum));  
        }  
         
    }  
      
    @SuppressWarnings("deprecation")  
    @Override  
    public int run(String[] arg0) throws Exception {  
        Configuration conf = new Configuration();  
        Job job = new Job(conf);  
        job.setJobName("ipcount");  
        job.setInputFormatClass(TextInputFormat.class);  
         
        //将输出设置为MysqlOutputFormat  
        job.setOutputFormatClass(MysqlOutputFormat.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
         
        job.setMapperClass(IPCountMap.class);  
        job.setCombinerClass(IPCountReduce.class);  
        job.setReducerClass(IPCountReduce.class);  
         
        FileInputFormat.addInputPath(job, new Path(arg0[0]));  
        //个人认为下面应该可以不设置的,但是不设置就会报错,不知道是什么地方出了问题  
        MysqlOutputFormat.setOutputPath(job,  new Path(arg0[1]));  
         
        job.waitForCompletion(true);  
         
        return job.isSuccessful() ? 0 : 1;  
    }  
      
    /**
     * @param args
     */  
    public static void main(String[] args) {  
        // TODO Auto-generated method stub  
        try {  
            int res = ToolRunner.run(new Configuration(), new IPCountMR(), args);  
            System.exit(res);  
        } catch (Exception e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
    }  
}  [/mw_shl_code]

MysqlOutputFormat类实现了reduce自定义输出,将reduce计算结果输出到数据库中,代码如下:
[mw_shl_code=java,true]package com;  
  
import java.io.IOException;  
  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.RecordWriter;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  
@SuppressWarnings("hiding")  
public class MysqlOutputFormat<Text, IntWritable> extends FileOutputFormat<Text, IntWritable> {  
    //Mysql RecordWriter私有类  
    private static class MysqlRecordWriter<Text, IntWritable> extends RecordWriter<Text, IntWritable> {  
        private LogDB logdb;  
         
        /**
         * 使用外部传进来的LogDB对象
         * @param logdb
         */  
        MysqlRecordWriter(LogDB logdb){  
            this.logdb = logdb;  
        }  
        @Override  
        public void close(TaskAttemptContext arg0) throws IOException,  
                InterruptedException {  
            // TODO Auto-generated method stub  
              
        }  
  
         
        /**
         * 将key-value写入到数据库中
         */  
        @Override  
        public void write(Text key, IntWritable value) throws IOException,  
                InterruptedException {  
            // TODO Auto-generated method stub  
            logdb.insert(key.toString(), value.toString());  
        }  
         
    }  
  
    @Override  
    public RecordWriter<Text, IntWritable> getRecordWriter(  
            TaskAttemptContext arg0) throws IOException, InterruptedException {  
        // TODO Auto-generated method stub  
        //返回MysqlRecordWriter对象  
        return new MysqlRecordWriter<Text, IntWritable>(new LogDB());  
    }  
  
}  [/mw_shl_code]

   上面的LogDB类是自己封装的数据库操作类,实现了数据的插入,具体实现代码如下:
[mw_shl_code=bash,true]package com;  
  
import java.sql.SQLException;  
  
import com.lulei.db.manager.DBServer;  
  
public class LogDB {  
    //新建连接池  
    private DBServer dBServer = new DBServer("proxool.log");  
      
    /**
     * 将数据插入至数据库
     * @param ip
     * @param num
     */  
    public void insert(String ip, String num) {  
        try {  
            dBServer.insert("insert into logmp(ip, num) values ('"  
                    + ip +"','" +  
                    num +"')");  
        } catch (SQLException e) {  
        } finally {  
            dBServer.close();  
        }  
    }  
    public static void main(String[] args) {  
        // TODO Auto-generated method stub  
        new LogDB().insert("127.0.0.2", "1");  
    }  
  
}  [/mw_shl_code]


  上面程序中的DBServer类是基于连接池proxool-0.9.1.jar封装的数据库操作类,这里就不做详细的介绍,这里也可以不通过连接池,直接将数据写入到数据库中,这不是本事例的重点,就不做详细的介绍。

上传运行:
      具体的操作命令参照博客 http://blog.csdn.net/xiaojimanman/article/details/40184581 中的上传运行。


执行结果:
    程序执行结束后,通过命令查看相应的数据表记录,计算结果已经正确写入到数据库中,如下图所示:
6.png
      上面用的日志文件的客户端都是从内网访问的,所以记录中都是内网地址。
注:资源 http://download.csdn.net/detail/xiaojimanman/6920219 中有相关的数据库连接池代码
百度网盘下载
链接:http://pan.baidu.com/s/1mg9qxcs 密码:euei

关于上文中的MysqlOutputFormat输出格式问题解决
    刚才在看书的时候,才发现hadoop已经提供了数据库的输出DBOutputFormat, 这里继续写自定义输出,上文中的MysqlOutputFormat是FileOutputFormat的子类,在FileOutputFormat中,对输出做了检测,所以不设置就会出错。源代码如下:

7.png



所以这里就需要重写该方法或者直接继承OutputFormat,这里就给出第二种解决方法的源代码:

[mw_shl_code=bash,true]package com;  
  
import java.io.IOException;  
  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.OutputCommitter;  
import org.apache.hadoop.mapreduce.OutputFormat;  
import org.apache.hadoop.mapreduce.RecordWriter;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  
@SuppressWarnings("hiding")  
public class MysqlOutputFormat<Text, IntWritable> extends OutputFormat<Text, IntWritable> {  
    //Mysql RecordWriter私有类  
    private static class MysqlRecordWriter<Text, IntWritable> extends RecordWriter<Text, IntWritable> {  
        private LogDB logdb;  
         
        /**
         * 使用外部传进来的LogDB对象
         * @param logdb
         */  
        MysqlRecordWriter(LogDB logdb){  
            this.logdb = logdb;  
        }  
        @Override  
        public void close(TaskAttemptContext arg0) throws IOException,  
                InterruptedException {  
            // TODO Auto-generated method stub  
              
        }  
  
         
        /**
         * 将key-value写入到数据库中
         */  
        @Override  
        public void write(Text key, IntWritable value) throws IOException,  
                InterruptedException {  
            // TODO Auto-generated method stub  
            logdb.insert(key.toString(), value.toString());  
        }  
         
    }  
  
    @Override  
    public RecordWriter<Text, IntWritable> getRecordWriter(  
            TaskAttemptContext arg0) throws IOException, InterruptedException {  
        // TODO Auto-generated method stub  
        //返回MysqlRecordWriter对象  
        return new MysqlRecordWriter<Text, IntWritable>(new LogDB());  
    }  
  
    @Override  
    public void checkOutputSpecs(JobContext arg0) throws IOException,  
            InterruptedException {  
         
    }  
  
    @Override  
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)  
            throws IOException, InterruptedException {  
        // TODO Auto-generated method stub  
        return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),  
                context);  
    }  
      
}  [/mw_shl_code]

#####################################################

使用hadoop实现ip地理位置统计~ip归属地和运营商

对于上文 中的计算结果 key-value (ip,出现次数),统计下各个地区运营商下的IP个数,通过这个计算结果,可以分析出用户的地理位置分布情况,为决策提供数据支持。

需求描述:
    根据IP归属地,对IP进行分组求和,将结果输出到文件中。

数据格式:
    此次的数据格式相对比较简单,就是上文的结果数据,一行数据格式为:
ip地址空格分隔符出现次数 例: 192.168.1.1 25

需求分析:
     在实现mapreduce程序之前,需要考虑的一个问题就是IP地址和归属地之间的转换问题。我这里采用的是百度的阿拉丁接口,接口获取方法,在百度首页输入"IP",就会出现阿拉丁界面。如下图所示:

1.png

     通过对该部分的网络请求分析,获取地址 http://opendata.baidu.com/api.ph ... idu&_=1414563341538 可以获取IP的归属地,该接口返回的数据格式如下图所示:

2.png

    可以通过HttpClient模拟浏览器访问该地址,分析返回结果,获取该IP地址对应的归属地。如果自己有IP库,这一步就会简单很多。

    IP个归属地中间的对应关系解决了,就需要设计mapreduce的实现问题。
    map的输入就是一行原始记录,首先需要对记录进行拆分,取得IP地址,在通过上面提到的接口,查询该IP的归属地;map的输出结果是key为IP归属地,value为出现次数,一行记录就是1 。输出结果如下图所示:

3.png


    reduce就需要对同一个key下的记录求和即可,输出结果是key为IP归属地,value为出现次数,如下图所示:

4.png


    这一篇主要的目的就是在mapreduce程序中使用第三方的接口。需求分析就到此为止,下面就看具体的代码实现。

代码实现:
    ip归属地查询代码

[mw_shl_code=java,true]/**
* @Description: ip归属地查询
*/  
package com.lulei.crawl.ip;  
  
import java.io.IOException;  
import java.util.Date;  
import java.util.HashMap;  
  
import org.apache.commons.httpclient.HttpException;  
  
import com.lulei.crawl.CrawlBase;  
import com.lulei.util.DoRegex;  
  
/**
* @author lulei
* 这里继承了自己的封装类,在类CrawlBase中实现了网络数据的获取,并将网页源代码存储在pageSourceCode中
*/  
public class IPInfo extends CrawlBase{  
    private String ip;  
    private String location;  
    //第三方接口地址  
    private static String ipUrl = "http://opendata.baidu.com/api.php?query=%ip%&co=&resource_id=6006&t=%t1%&ie=utf8&oe=gbk&format=json&tn=baidu&_=%t2%";  
    private static long timeDifference = 1000L;  
    private static HashMap<String, String> params;  
    private static String locationRegex = "\"location\":\"(.*?)\"";  
      
    //伪装浏览器  
    static {  
        params = new HashMap<String, String>();  
        params.put("Referer", "http://www.baidu.com");  
        params.put("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36");  
    }  
      
    public IPInfo(String ip) throws HttpException, IOException {  
        long t1 = new Date().getTime();  
        long t2 = t1 + timeDifference;  
        this.ip = ip;  
        //组装请求地址  
        String url = ipUrl.replaceAll("%ip%", ip)  
                                     .replaceAll("%t1%", t1 + "")  
                                     .replaceAll("%t2%", t2 + "");  
        //获取网页源代码,具体的实现,这里就不详细的介绍,自己可以写简单的HttpClient实现此功能   
        readPageByGet(url, "utf-8", params);  
        //解析源代码,获取归属地  
        setLocation();  
    }  
      
    /**
     * @author lulei
     * 解析源代码,获取归属地
     */  
    private void setLocation() {  
        this.location = DoRegex.getFirstString(getPageSourceCode(), locationRegex, 1);  
    }  
      
    public String getIp() {  
        return ip;  
    }  
    public String getLocation() {  
        return location;  
    }  
  
    /**
     * @param args
     * @throws IOException  
     * @throws HttpException  
     */  
    public static void main(String[] args) throws HttpException, IOException {  
        // TODO Auto-generated method stub  
        String ip = "122.49.34.58";  
        IPInfo ipinfo = new IPInfo(ip);  
        System.out.println("ip:" +ip );  
        System.out.println("归属地:" + ipinfo.getLocation());  
    }  
}  [/mw_shl_code]

对一行记录的分析类
[mw_shl_code=java,true] /**   
*@Description: 一行记录分析
*/   
package com.mapreduce.log;   
  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
  
import com.lulei.crawl.ip.IPInfo;  
   
   
public class LogLine {  
    private String ip;  
    private String location;  
    private boolean right = true;  
    private IntWritable one = new IntWritable(1);  
      
    public LogLine(String textLine) {  
        //检验一行日志数据是否符合要求,如不符合,将其标识为不可用  
        if (textLine == null || "".equals(textLine)) {  
            this.right = false;  
            return;  
        }  
        String []strs = textLine.split(" ");  
        if (strs.length < 2) {  
            this.right = false;  
            return;  
        }  
        //ip地址在第一个位置  
        this.ip = strs[0];  
        setLocation();  
    }  
      
    private void setLocation() {  
        try {  
            IPInfo ipInfo = new IPInfo(this.ip);  
            this.location = ipInfo.getLocation();  
        } catch (Exception e) {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
            //如果出现网络错误,将此IP的归属地设置成“未知”  
            this.location = "未知";  
        }   
    }  
  
    /**
     * @return
     * @Author:lulei   
     * @Description: map输出key
     */  
    public Text getMapKey() {  
        return new Text(this.location);  
    }  
      
    /**
     * @return
     * @Author:lulei   
     * @Description: map输出value
     */  
    public IntWritable getMapValue() {  
        return this.one;  
    }  
  
    public boolean isRight() {  
        return right;  
    }  
      
}  [/mw_shl_code]


  mapreduce程序实现类
[mw_shl_code=java,true] /**   
*@Description: IP归属地统计mapreduce实现   
*/   
package com.mapreduce.log;   
  
import java.io.IOException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
public class IPLocationMapReduce extends Configured implements Tool{  
  
    /**
     *@Description: IP归属地统计map  
     *@Author:lulei   
     *@Version:1.1.0
     */  
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {  
  
        @Override  
        protected void map(LongWritable key, Text value, Context context)  
                throws IOException, InterruptedException {  
            LogLine logLine = new LogLine(value.toString());  
            if (logLine.isRight()) {  
                context.write(logLine.getMapKey(), logLine.getMapValue());  
            }  
        }  
         
    }  
      
    /**
     *@Description: IP归属地统计reduce
     *@Author:lulei   
     *@Version:1.1.0
     */  
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {  
  
        @Override  
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)  
                throws IOException, InterruptedException {  
            int sum = 0;  
            //对values进行求和操作  
            for (IntWritable value : values) {  
                sum += value.get();  
            }  
            context.write(key, new IntWritable(sum));  
        }  
         
    }  
  
    @Override  
    public int run(String[] arg0) throws Exception {  
        Configuration conf = new Configuration();  
        @SuppressWarnings("deprecation")  
        Job job = new Job(conf);  
        job.setJobName("ipcount");  
        job.setInputFormatClass(TextInputFormat.class);  
         
        //将输出设置为TextOutputFormat  
        job.setOutputFormatClass(TextOutputFormat.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
         
        //Mapper Combiner Reducer  
        job.setMapperClass(Map.class);  
        job.setCombinerClass(Reduce.class);  
        job.setReducerClass(Reduce.class);  
         
        //输入 输出路径  
        FileInputFormat.addInputPath(job, new Path(arg0[0]));  
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));  
         
        job.waitForCompletion(true);  
         
        return job.isSuccessful() ? 0 : 1;  
    }  
  
    public static void main(String[] args) {  
        // TODO Auto-generated method stub   
        //这里没有对输入的参数做验证  
        try {  
            int res = ToolRunner.run(new Configuration(), new IPLocationMapReduce(), args);  
            System.exit(res);  
        } catch (Exception e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
    }  
}  [/mw_shl_code]

上传运行:

    打包、上传、运行这些步骤这里就不再详细介绍,具体可以参照博客 http://blog.csdn.net/xiaojimanman/article/details/40184581 最后一部分。
    对于自己写的数据的输出结果如下图所示:

5.png

到此一个完整的mapreduce程序就完成了,关于hadoop的学习,自己还将继续~














出处:http://blog.csdn.net/xiaojimanman/article/details/40372189

已有(1)人评论

跳转到指定楼层
arBen 发表于 2015-6-10 08:49:24
感谢大神分享。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条