分享

Flume-ng生产环境实践(四)实现log格式化interceptor

本帖最后由 坎蒂丝_Swan 于 2015-1-12 14:23 编辑
问题导读
1.filesink中数据如何传输?
2.event中header的键值对是怎样起作用的?











续上篇,由于filesink中需要使用/data/log/%{dayStr}/log-%{hourStr}%{minStr}-这样文件格式的,为了使file-sink能使用%{dayStr}这样的标签,需要在数据传输过程中,给event的header中添加对应的键值对。在flume-ng中提供了很方便的方式:Interceptor以下为实现的interceptor,首先使用正则表达式匹配nginx日志,如何匹配成功,则获取匹配到的数据,并且对url中的参数进行处理,最后所有日志信息都被存储在Map中。根据配置文件中需要输出的键找到对应的值,按照顺序输出为csv格式的行。

原始日志格式:
  1. 112.245.239.72 - - [29/Dec/2012:15:00:00 +0800] "GET /p.gif?a=1&b=2 HTTP/1.1" 200 0 "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4
  2. .0; 4399Box.1357; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; AskTbPTV2/5.9.1.14019; 4399Box.1357)"
复制代码

最终结果:
  1. 1,2
复制代码

配置信息为:
  1. agent.sources = source
  2. agent.channels = channel
  3. agent.sinks = sink
  4. agent.sources.source.type = exec
  5. #agent.sources.source.command = tail -n +0 -F /data/tmp/accesspvpb_2012-11-18.log
  6. agent.sources.source.command = cat /opt/nginx/logs/vvaccess_log_pipe
  7. agent.sources.source.interceptors = logformat
  8. agent.sources.source.interceptors.logformat.type = org.apache.flume.interceptor.LogFormatInterceptor$Builder
  9. agent.sources.source.interceptors.logformat.confpath = /usr/programs/flume/conf/logformat_vv.properties
  10. agent.sources.source.interceptors.logformat.dynamicprop = true
  11. agent.sources.source.interceptors.logformat.hostname = vv111
  12. agent.sources.source.interceptors.logformat.prop.monitor.rollInterval = 100000
  13. # The channel can be defined as follows.
  14. agent.sources.source.channels = channel
  15. agent.sinks.sink.type = avro
  16. agent.sinks.sink.hostname = 192.168.0.100
  17. agent.sinks.sink.port = 44444
  18. agent.sinks.sink.channel = channel
  19. # Each channel's type is defined.
  20. agent.channels.channel.type = file
  21. agent.channels.channel.checkpointDir = /data/tmpc/checkpoint
  22. agent.channels.channel.dataDirs = /data/tmpc/data
  23. agent.channels.channel.transactionCapacity = 15000
复制代码

/usr/programs/flume/conf/logformat_vv.properties文件内容为:
  1. keys=a,b
  2. regexp=([0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3})\\s-\\s-\\s\\[([^]]+)\\]\\s"GET\\s/p.gif\\?(.+)\\s.*"\\s[0-9]+\\s[0-9]+\\s"(.+)"
复制代码

interceptor的代码:
  1. package org.apache.flume.interceptor;
  2. import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.CONF_PATH;
  3. import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP;
  4. import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.DYNAMICPROP_DFLT;
  5. import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME;
  6. import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.HOSTNAME_DFLT;
  7. import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL;
  8. import static org.apache.flume.interceptor.LogFormatInterceptor.Constants.PROPMONITORINTERVAL_DFLT;
  9. import java.io.File;
  10. import java.io.FileInputStream;
  11. import java.io.FileNotFoundException;
  12. import java.io.IOException;
  13. import java.text.ParseException;
  14. import java.text.SimpleDateFormat;
  15. import java.util.Date;
  16. import java.util.HashMap;
  17. import java.util.LinkedList;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.Properties;
  21. import org.apache.flume.Context;
  22. import org.apache.flume.Event;
  23. import org.apache.flume.event.EventBuilder;
  24. import org.apache.oro.text.regex.MalformedPatternException;
  25. import org.apache.oro.text.regex.MatchResult;
  26. import org.apache.oro.text.regex.Pattern;
  27. import org.apache.oro.text.regex.PatternCompiler;
  28. import org.apache.oro.text.regex.PatternMatcher;
  29. import org.apache.oro.text.regex.Perl5Compiler;
  30. import org.apache.oro.text.regex.Perl5Matcher;
  31. import org.slf4j.Logger;
  32. import org.slf4j.LoggerFactory;
  33. public class LogFormatInterceptor implements Interceptor {
  34.         private static final Logger logger = LoggerFactory
  35.                      . getLogger(LogFormatInterceptor.class);
  36.         private String conf_path = null;
  37.         private boolean dynamicProp = false;
  38.         private String hostname = null;
  39.         private long propLastModify = 0;
  40.         private long propMonitorInterval ;
  41.         private String regexp = null;
  42.         private List<String> keys = null;
  43.         private Pattern pattern = null;
  44.         private PatternCompiler compiler = null;
  45.         private PatternMatcher matcher = null;
  46.         private SimpleDateFormat sdf = null;
  47.         private SimpleDateFormat sd = null;
  48.         private SimpleDateFormat sh = null;
  49.         private SimpleDateFormat sm = null;
  50.         private SimpleDateFormat sdfAll = null;
  51.         private long eventCount = 0l;
  52.         public LogFormatInterceptor(String conf_path, boolean dynamicProp,
  53.                      String hostname, long propMonitorInterval) {
  54.                this.conf_path = conf_path;
  55.                this.dynamicProp = dynamicProp;
  56.                this.hostname = hostname;
  57.                this.propMonitorInterval = propMonitorInterval;
  58.        }
  59.         @Override
  60.         public void close () {
  61.        }
  62.         @Override
  63.         public void initialize () {
  64.                try {
  65.                       // 读取配置文件,初始化正在表达式和输出的key列表
  66.                      File file = new File(conf_path );
  67.                       propLastModify = file.lastModified();
  68.                      Properties props = new Properties();
  69.                      FileInputStream fis;
  70.                      fis = new FileInputStream(file);
  71.                      props.load(fis);
  72.                       regexp = props.getProperty( "regexp");
  73.                      String strKey = props.getProperty( "keys");
  74.                       if (strKey != null) {
  75.                            String[] strkeys = strKey.split( ",");
  76.                             keys = new LinkedList<String>();
  77.                             for (String key : strkeys) {
  78.                                    keys.add(key);
  79.                            }
  80.                      }
  81.                       if (keys == null) {
  82.                             logger.error("====================keys is null====================");
  83.                      } else {
  84.                             logger.info("keys=" + keys );
  85.                      }
  86.                       if (regexp == null) {
  87.                             logger.error("====================regexp is null====================");
  88.                      } else {
  89.                             logger.info("regexp=" + regexp );
  90.                      }
  91.                       // 初始化正在表达式以及时间格式化类
  92.                       compiler = new Perl5Compiler();
  93.                       pattern = compiler.compile( regexp);
  94.                       matcher = new Perl5Matcher();
  95.                       sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z" ,
  96.                                   java.util.Locale. US);
  97.                       sd = new SimpleDateFormat("yyyyMMdd" );
  98.                       sh = new SimpleDateFormat("HH" );
  99.                       sm = new SimpleDateFormat("mm" );
  100.                       sdfAll = new SimpleDateFormat("yyyyMMddHHmmss" );
  101.               } catch (MalformedPatternException e) {
  102.                       logger.error("Could not complile pattern!" , e);
  103.               } catch (FileNotFoundException e) {
  104.                       logger.error("conf file is not found!" , e);
  105.               } catch (IOException e) {
  106.                       logger.error("conf file can not be read!" , e);
  107.               }
  108.        }
  109.         @Override
  110.         public Event intercept(Event event) {
  111.               ++ eventCount;
  112.                try {
  113.                       if (dynamicProp && eventCount > propMonitorInterval) {
  114.                            File file = new File(conf_path );
  115.                             if (file.lastModified() > propLastModify ) {
  116.                                    propLastModify = file.lastModified();
  117.                                   Properties props = new Properties();
  118.                                   FileInputStream fis;
  119.                                   fis = new FileInputStream(file);
  120.                                   props.load(fis);
  121.                                   String strKey = props.getProperty( "keys");
  122.                                    if (strKey != null) {
  123.                                          String[] strkeys = strKey.split("," );
  124.                                          List<String> keystmp = new LinkedList<String>();
  125.                                           for (String key : strkeys) {
  126.                                                 keystmp.add(key);
  127.                                          }
  128.                                           if (keystmp.size() > keys .size()) {
  129.                                                  keys = keystmp;
  130.                                                  logger.info("dynamicProp status updated = " + keys);
  131.                                          } else {
  132.                                                  logger.error("dynamicProp status new keys size less than old,so status update fail = "
  133.                                                               + keys);
  134.                                          }
  135.                                   } else {
  136.                                           logger.error("dynamicProp status get keys fail ,so status update fail = "
  137.                                                        + keys);
  138.                                   }
  139.                            }
  140.                      }
  141.                      Map<String, String> headers = event.getHeaders();
  142.                      headers.put( "host", hostname );
  143.                      String body = new String(event.getBody());
  144.                       if (pattern != null) {
  145.                            StringBuffer stringBuffer = new StringBuffer();
  146.                            Date date = null;
  147.                            Map<String, String> index = new HashMap<String, String>();
  148.                             if (matcher .contains(body, pattern)) {
  149.                                   index.put( "host", hostname );
  150.                                   MatchResult result = matcher.getMatch();
  151.                                   index.put( "ip", result.group(1));
  152.                                    try {
  153.                                          date = sdf.parse(result.group(2));
  154.                                          index.put( "loc_time", sdfAll.format(date));
  155.                                   } catch (ParseException e1) {
  156.                                   }
  157.                                   String url = result.group(3).replaceAll( ",", "|");
  158.                                   String[] params = url.split( "&");
  159.                                    for (String param : params) {
  160.                                          String[] p = param.split("=" );
  161.                                           if (p.length == 2) {
  162.                                                 index.put(p[0], p[1]);
  163.                                          }
  164.                                   }
  165.                                   index.put( "browser", result.group(4).replaceAll("," , "|" ));
  166.                                    for (String key : keys ) {
  167.                                           if (index.containsKey(key)) {
  168.                                                 stringBuffer.append(index.get(key) + ",");
  169.                                          } else {
  170.                                                 stringBuffer.append( "~,");
  171.                                          }
  172.                                   }
  173.                                    if (stringBuffer.length() > 0) {
  174.                                          stringBuffer.deleteCharAt(stringBuffer.length() - 1);
  175.                                   } else {
  176.                                          stringBuffer.append( "error=" + body);
  177.                                   }
  178.                                    if (date != null) {
  179.                                          headers.put( "dayStr", sd .format(date));
  180.                                          headers.put( "hourStr", sh .format(date));
  181.                                          Integer m = Integer.parseInt(sm.format(date));
  182.                                          String min = "";
  183.                                           if (m >= 0 && m < 10) {
  184.                                                 min = "0" + (m / 5) * 5;
  185.                                          } else {
  186.                                                 min = (m / 5) * 5 + "" ;
  187.                                          }
  188.                                          headers.put( "minStr", min);
  189.                                   } else {
  190.                                          headers.put( "dayStr", "errorLog" );
  191.                                   }
  192.                                   Event e = EventBuilder.withBody(stringBuffer.toString()
  193.                                                 .getBytes(), headers);
  194.                                    return e;
  195.                            }
  196.                      }
  197.               } catch (Exception e) {
  198.                       logger.error("LogFormat error!" , e);
  199.               }
  200.                return null ;
  201.        }
  202.         @Override
  203.         public List<Event> intercept(List<Event> events) {
  204.               List<Event> list = new LinkedList<Event>();
  205.                for (Event event : events) {
  206.                      Event e = intercept(event);
  207.                       if (e != null ) {
  208.                            list.add(e);
  209.                      }
  210.               }
  211.                return list;
  212.        }
  213.         /**
  214.         * Builder which builds new instances of the HostInterceptor.
  215.         */
  216.         public static class Builder implements Interceptor.Builder {
  217.                private String confPath ;
  218.                private boolean dynamicProp ;
  219.                private String hostname ;
  220.                private long propMonitorInterval ;
  221.                @Override
  222.                public Interceptor build() {
  223.                       return new LogFormatInterceptor(confPath, dynamicProp, hostname,
  224.                                    propMonitorInterval);
  225.               }
  226.                @Override
  227.                public void configure(Context context) {
  228.                       confPath = context.getString( CONF_PATH);
  229.                       dynamicProp = context.getBoolean(DYNAMICPROP, DYNAMICPROP_DFLT);
  230.                       hostname = context.getString( HOSTNAME, HOSTNAME_DFLT );
  231.                       propMonitorInterval = context.getLong(PROPMONITORINTERVAL,
  232.                                    PROPMONITORINTERVAL_DFLT);
  233.               }
  234.        }
  235.         public static class Constants {
  236.                public static String CONF_PATH = "confpath";
  237.                public static String DYNAMICPROP = "dynamicprop";
  238.                public static boolean DYNAMICPROP_DFLT = false;
  239.                public static String HOSTNAME = "hostname";
  240.                public static String HOSTNAME_DFLT = "hostname";
  241.                public static String PROPMONITORINTERVAL = "prop.monitor.rollInterval" ;
  242.                public static long PROPMONITORINTERVAL_DFLT = 500000l;
  243.        }
  244. }
复制代码




Flume-ng生产环境实践(一)Flume-ng生产环境编译
Flume-ng生产环境实践(二)flume-ng 测试过程中event丢失部分body数据
Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出
Flume-ng生产环境实践(四)实现log格式化interceptor

欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(3)人评论

跳转到指定楼层
stark_summer 发表于 2015-1-12 13:56:28
回复

使用道具 举报

355815741 发表于 2015-1-12 22:26:29
学习了,谢谢分享~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条