a87758133 发表于 2019-4-18 13:04:10

大数据实战之App管理平台日志分析(二)

本帖最后由 a87758133 于 2019-4-18 13:13 编辑

问题导读:


1、如何通过GeoLite2-City获取手机真实IP地址?
2、如何对地理信息缓存?
3、如何将log消息发送给Kafka?
4、如何自定义Flume拦截器?


上一篇:
大数据实战之App管理平台日志分析(一)
http://www.aboutyun.com/forum.php?mod=viewthread&tid=27032&_dsign=2a49c0cc



一、引入GeoLite2-City,获取手机真实ip地址
-----------------------------------------------------------
    1.下载GeoLite数据库文件
      GeoLite2-City.mmdb

    2.引入pom.xml
      <dependency>
            <groupId>com.maxmind.db</groupId>
            <artifactId>maxmind-db</artifactId>
            <version>1.0.0</version>
      </dependency>
    3.测试提取国家和省份信息
@Test
      public void test1() throws IOException {
         InputStream in = ClassLoader.getSystemResourceAsStream("GeoLite2-City.mmdb");
         Reader r = new Reader(in);
         JsonNode node = r.get(InetAddress.getByName("140.211.11.105"));
         //国家
         String country = node.get("country").get("names").get("zh-CN").textValue();
         System.out.println(country);
         //省份
         String area = node.get("subdivisions").get(0).get("names").get("zh-CN").textValue();
         //城市
         String city = node.get("city").get("names").get("zh-CN").textValue();

         System.out.println(country + "." + area + "." + city);
      }
   4.封装GeoUtil工具类在common模块
package com.test.app.util;

      import com.fasterxml.jackson.databind.JsonNode;
      import com.maxmind.db.Reader;

      import java.io.InputStream;
      import java.net.InetAddress;

      /**
       * 地理工具类,实现通过ip查找地址区域
       */
      public class GeoUtil {
         private static InputStream in ;
         private static Reader reader ;
         static{
            try {
               in = ClassLoader.getSystemResourceAsStream("GeoLite2-City.mmdb");
               reader = new Reader(in);
            } catch (Exception e) {
               e.printStackTrace();
            }
         }

         /**
          *获得国家数据
          */
         public static String getCountry(String ip){
            try{
               JsonNode node = reader.get(InetAddress.getByName(ip));
               return node.get("country").get("names").get("zh-CN").textValue();
            }
            catch (Exception e){
               e.printStackTrace();
            }
            return "" ;
         }
         /**
          *获得省份数据
          */
         public static String getProvince(String ip){
            try{
               JsonNode node = reader.get(InetAddress.getByName(ip));
               return node.get("subdivisions").get(0).get("names").get("zh-CN").textValue();
            }
            catch (Exception e){
               e.printStackTrace();
            }
            return "" ;
         }
         /**
          *获得地理位置数据
          */
         public static String getCity(String ip){
            try{
               JsonNode node = reader.get(InetAddress.getByName(ip));
               return node.get("city").get("names").get("zh-CN").textValue();
            }
            catch (Exception e){
               e.printStackTrace();
            }
            return "" ;
         }
      }

二、对地理信息缓存处理
---------------------------------------------------------
   1.创建GeoInfo类
      public class GeoInfo {

         private String country ;

         private String province ;

         //get/set

      }

   2.Controller中增加map,存放出现过的ip信息。
public class CollectLogController{

         private Map<String,GeoInfo> cache = new HashMap<String, GeoInfo>();
         /**
          * 处理ip client地址问题
          */
         private void processIp(AppLogEntity e, String clientIp) {
            GeoInfo info = cache.get(clientIp);
            if(info == null){
               info = GeoUtil.getGeoInfo(clientIp);
               cache.put(clientIp,info) ;
            }
            for(AppStartupLog log : e.getAppStartupLogs()){
               log.setCountry(info.getCountry());
               log.setProvince(info.getProvince());
               log.setIpAddress(clientIp);
            }
         }
      }
三、将log消息发送给kafka
------------------------------------------------
   1.web项目中引入kafka依赖
      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
      </dependency>

   2.common模块中创建常量类
      package com.test.app.common;

      /**
       * 常量类
       */
      public class Constants {
         //主题
            public static final String TOPIC_APP_STARTUP = "topic-app-startup" ;
            public static final String TOPIC_APP_ERRROR = "topic-app-error" ;
            public static final String TOPIC_APP_EVENT = "topic-app-event" ;
            public static final String TOPIC_APP_USAGE = "topic-app-usage" ;
            public static final String TOPIC_APP_PAGE = "topic-app-page" ;
      }

   3.在controller发送消息给主题
/**
         */
      @Controller()
      @RequestMapping("/coll")
      public class CollectLogController {

         /**
            * 地理信息缓存
            */
         private Map<String,GeoInfo> cache = new HashMap<String, GeoInfo>();

         /**
            * 启动日志收集
            */
         @RequestMapping(value = "/index", method = RequestMethod.POST)
         @ResponseBody
         public AppLogEntity collect(@RequestBody AppLogEntity e, HttpServletRequest req) {

            System.out.println("=============================");
            //server时间
            long myTime = System.currentTimeMillis() ;
            //客户端时间
            long clientTime = Long.parseLong(req.getHeader("clientTime"));
            //时间校对
            long diff = myTime - clientTime ;

            //1.修正日志时间
            verifyTime(e,diff);
            //2.对e进行处理,将具体日志分类的属性值填充完毕
            copyBaseProperties(e);
            //3.修正日志的ip位置等信息
            String clientIp = req.getRemoteAddr();
            processIp(e , clientIp);
            //4.发送日志到kafka集群
            sendMessageToKafka(e);
            return e;
         }

         /**
            * 发送消息到kafka集群
            * @param e
            */
         private void sendMessageToKafka(AppLogEntity e) {
            //创建配置对象
            Properties props = new Properties();
            props.put("metadata.broker.list", "s100:9092");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "1");

            //创建生产者
            Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));
            sendSingleLog(producer,Constants.TOPIC_APP_STARTUP,e.getAppStartupLogs());
            sendSingleLog(producer,Constants.TOPIC_APP_ERRROR,e.getAppErrorLogs());
            sendSingleLog(producer,Constants.TOPIC_APP_EVENT,e.getAppEventLogs());
            sendSingleLog(producer,Constants.TOPIC_APP_PAGE,e.getAppPageLogs());
            sendSingleLog(producer,Constants.TOPIC_APP_USAGE,e.getAppUsageLogs());
            producer.close();
         }

         /**
            * 发送单个的log消息给kafka
            */
         private void sendSingleLog(Producer<Integer, String> producer,String topic , AppBaseLog[] logs){
            for (AppBaseLog log : logs) {
               String logMsg = JSONObject.toJSONString(log);
               //创建消息
               KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, logMsg);
               producer.send(data);
            }
         }

         /**
            * 处理ip client地址问题
            */
         private void processIp(AppLogEntity e, String clientIp) {
            GeoInfo info = cache.get(clientIp);
            if(info == null){
               info = new GeoInfo();
               info.setCountry(GeoUtil.getCountry(clientIp));
               info.setProvince(GeoUtil.getProvince(clientIp));
               cache.put(clientIp,info) ;
            }
            for(AppStartupLog log : e.getAppStartupLogs()){
               log.setCountry(info.getCountry());
               log.setProvince(info.getProvince());
               log.setIpAddress(clientIp);
            }
         }


         /**
            * 校对各个具体日志的创建时间(使用服务器时间差diff)
            */
         private void verifyTime(AppLogEntity e, long diff)
         {
            //启动修正
            //startuplog
            for(AppBaseLog log : e.getAppStartupLogs()){
               log.setCreatedAtMs(log.getCreatedAtMs() + diff );
            }
            for(AppBaseLog log : e.getAppUsageLogs()){
               log.setCreatedAtMs(log.getCreatedAtMs() + diff );
            }
            for(AppBaseLog log : e.getAppPageLogs()){
               log.setCreatedAtMs(log.getCreatedAtMs() + diff );
            }
            for(AppBaseLog log : e.getAppEventLogs()){
               log.setCreatedAtMs(log.getCreatedAtMs() + diff );
            }
            for(AppBaseLog log : e.getAppErrorLogs()){
               log.setCreatedAtMs(log.getCreatedAtMs() + diff );
            }
         }

         /**
            * 将Log的属性分类复制到各个具体的log中
            */
         private void copyBaseProperties(AppLogEntity e){
            PropertiesUtil.copyProperties(e,e.getAppStartupLogs());
            PropertiesUtil.copyProperties(e,e.getAppErrorLogs());
            PropertiesUtil.copyProperties(e,e.getAppEventLogs());
            PropertiesUtil.copyProperties(e,e.getAppPageLogs());
            PropertiesUtil.copyProperties(e,e.getAppUsageLogs());
         }
      }
    4.启动zk集群和kafka集群

    5.创建5个主题。
      $>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-startup$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-error
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-event
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-usage
$>kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 3 --topic topic-app-page

    6.查看并验证主题是否创建和发送成功
      a.查看主题
$> kafka-topics.sh --list --zookeeper s100:2181
      b.启动控制台消费者,进行测试,查看日志输出
$> kafka-console-consumer.sh --bootstrap-server s200:9092 --topic topic-app-startup --from-beginning

四、通过flume收集kafka消息,然后上传到hdfs进行储存
----------------------------------------------------------------
    1.日志分成5个方面,hdfs中存放在不同目录下。
      /data/applogs/startup/201901/12/1213/xxx-xxxxxxx
      /data/applogs/error/201901/12/1213/xxx-xxxxxxx
      ...

    2.如果想实现上面的自动命名hdfs目录名
      a.将kafka消息转换成对象
      b.抽取createTimeMs属性值作为flume的Header
      c.按照固定的格式化串对Header进行格式化
      d.创建自定义拦截器,对每条kafka消息都进行类似format处理
      f.按照格式化之后的串创建HDFS目录

    3.创建新的模块app-logs-flume,添加maven依赖
      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>

            <groupId>com.test</groupId>
            <artifactId>app-logs-flume</artifactId>
            <version>1.0-SNAPSHOT</version>

            <dependencies>
                <dependency>
                  <groupId>org.apache.flume</groupId>
                  <artifactId>flume-ng-core</artifactId>
                  <version>1.7.0</version>
                </dependency>
                <dependency>
                  <groupId>junit</groupId>
                  <artifactId>junit</artifactId>
                  <version>4.11</version>
                </dependency>
                <dependency>
                  <groupId>com.alibaba</groupId>
                  <artifactId>fastjson</artifactId>
                  <version>1.2.24</version>
                </dependency>
                <dependency>
                  <groupId>com.test</groupId>
                  <artifactId>app-analyze-common</artifactId>
                  <version>1.0-SNAPSHOT</version>
                </dependency>
            </dependencies>
      </project>

   4.自定义flume拦截器
package com.test.app.flume.interceptor;

      import com.alibaba.fastjson.JSONObject;
      import com.test.app.common.AppBaseLog;
      import org.apache.flume.Context;
      import org.apache.flume.Event;
      import org.apache.flume.interceptor.Interceptor;

      import java.util.List;
      import java.util.Map;

      import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*;

      /**
         * 自定义flume的拦截器,提取body中的createTimeMS字段作为header
         */
      public class LogCollInterceptor implements Interceptor {

            private final boolean preserveExisting;

            private LogCollInterceptor(boolean preserveExisting) {
                this.preserveExisting = preserveExisting;
            }

            public void initialize() {
            }

            /**
             * Modifies events in-place.
             * 将flume的时间戳全部盖掉。换成startTimeMs
             */
            public Event intercept(Event event) {
                Map<String, String> headers = event.getHeaders();
                //得到kafka传递过来的消息,反转成AppBaseLog对象
                byte[] json = event.getBody();
                String jsonStr = new String(json);
                AppBaseLog log = JSONObject.parseObject(jsonStr , AppBaseLog.class);
                //获取日志创建时间
                long time = log.getCreatedAtMs();

                //处理log类型的头
                //1.盖掉flume的头信息的时间戳
                headers.put(TIMESTAMP, Long.toString(time));

                //2.处理头的logType[实现1个flume订阅5个kafka主题]
                String logType = "" ;
                if(jsonStr.contains("pageId")){
                  logType = "page" ;
                }
                //eventLog
                else if (jsonStr.contains("eventId")) {
                  logType = "event";
                }
                //usageLog
                else if (jsonStr.contains("singleUseDurationSecs")) {
                  logType = "usage";
                }
                //error
                else if (jsonStr.contains("errorBrief")) {
                  logType = "error";
                }
                //startup
                else if (jsonStr.contains("network")) {
                  logType = "startup";
                }
                headers.put("logType", logType);
                return event;
            }

            /**
             * Delegates to {@link #intercept(Event)} in a loop.
             *
             * @param events
             * @return
             */
            public List<Event> intercept(List<Event> events) {
                for (Event event : events) {
                  intercept(event);
                }
                return events;
            }

            public void close() {
            }

            /**
             */
            public static class Builder implements Interceptor.Builder {

                private boolean preserveExisting = PRESERVE_DFLT;

                public Interceptor build() {
                  return new LogCollInterceptor(preserveExisting);
                }

                public void configure(Context context) {
                  preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
                }

            }

            public static class Constants {
                public static String TIMESTAMP = "timestamp";
                public static String PRESERVE = "preserveExisting";
                public static boolean PRESERVE_DFLT = false;
            }

      }
   5.导出flumejar包[使用Build Artfacts打包,加入所有的依赖进行打包]
      复制到flume的/lib下,并分发到所有节点

   6.配置flume配置文件
      a1.sources=r1
      a1.channels=c1
      a1.sinks=k1

      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = com.test.app.flume.interceptor.LogCollInterceptor$Builder
      a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
      a1.sources.r1.batchSize = 5000
      a1.sources.r1.batchDurationMillis = 2000
      a1.sources.r1.kafka.bootstrap.servers = s200:9092
      a1.sources.r1.kafka.zookeeperConnect = s200:2181,s300:2181,s400:2181
      a1.sources.r1.kafka.topics.regex = ^topic-app-.*$
      #a1.sources.r1.kafka.consumer.group.id = g3

      a1.channels.c1.type=memory
      a1.channels.c1.capacity=100000
      a1.channels.c1.transactionCapacity=10000

      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.path = /data/applogs/%{logType}/%Y%m/%d/%H%M
      a1.sinks.k1.hdfs.filePrefix = events-
      a1.sinks.k1.hdfs.round = false
      a1.sinks.k1.hdfs.roundValue = 30
      a1.sinks.k1.hdfs.roundUnit = second

      a1.sources.r1.channels = c1
      a1.sinks.k1.channel= c1

    7.启动hdfs,并且创建好目录/data/applogs/

   8.启动flume
      $>flume-ng agent -f applog.conf -n a1

    9.启动web服务器和日志生成程序,查看hdfs上是否成功生成日志

五、配置hive数据仓库 -- 周期性的加载hdfs上的数据到hive仓库中,用于后期查询
-----------------------------------------------------------------------------
    1.说明
      因为使用json格式存放数据,需要第三方serde库。
      下载json-serde-1.3.8-jar-with-dependencies.jar

   2.复制以上的jar包hive的lib下,分发

   3.配置hive-site.xml文件,添加jar包的声明,永久注册。
      
      <property>
            <name>hive.aux.jars.path</name>
            <value>file:///soft/hive/lib/json-serde-1.3.8-jar-with-dependencies.jar</value>
      </property>
   4.设置不压缩存储
      
      <property>
         <name>hive.exec.compress.output</name>
         <value>false</value>
      </property>
   5.创建数据库
    $hive> create database applogs_db ;
   6.创建测试表
       hive> use applogs_db;
hive> create table test(id int , name string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS TEXTFILE;
   7.执行插入
      $hive> insert into test(id,name) values(1,'tom') ;

   8.修改配置文件需要重新进入hive命令行

   9.创建applogs表语句
      CREATE external TABLE ext_startup_logs(
      createdAtMs bigint ,
      name string)
PARTITIONED BY (
      ym string,
      day string,
      hm string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
      ...

六、BUG解决
-------------------------------------------------------------
    1.公共模块加载Geo数据库的Null Stream问题
      类加载问题。
      ClassLoader.getSystemSystemAsStream("Geo.mmdb") ;
      web
      tomcat

      使用线程获得当前的类加载器.[因为是一个web程序,所以不能使用传统的classloader方式]
      public class GeoUtil {
            ...
            static{
                try {
                  ClassLoader loader = Thread.currentThread().getContextClassLoader();
                  in = loader.getResource("GeoLite2-City.mmdb").openStream();
                  reader = new Reader(in);
                } catch (Exception e) {
                  e.printStackTrace();
                }
            }
      }

    2.idea下模块之间存在依赖关系的时候,添加依赖和打包问题
       a.只需要在项目结构中添加依赖的模块即可,不需要在pom.xml中添加依赖工件。

       b.在web模块工件部分,将依赖的第三方模块put into web-info/classes下。



最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg
来源:CSDN作者:葛红富原文:《大数据项目实战之 --- 某App管理平台的手机app日志分析系统(二)》https://blog.csdn.net/xcvbxv01/article/details/84256844


美丽天空 发表于 2019-4-20 00:23:19

感谢分享
页: [1]
查看完整版本: 大数据实战之App管理平台日志分析(二)