分享

Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出

本帖最后由 坎蒂丝_Swan 于 2015-1-12 14:20 编辑
问题导读
1.文件sink使用时,如何配置?
2.文件sink又有哪些依赖包?










应用场景:需要实时收集多台服务器的nginx日志到一台机器。收集完成结果存放需要按天生成文件夹,按每5分钟生成文件,比如2012年12月29日12点26分的日志,需要放到/data/log/20121229/log-1225-对应的文件中。自己实现了类似flume-og和flume-ng的hdfs-sink的文件sink。

使用的时候配置如下:
  1. agent.sources = source
  2. agent.channels = channel
  3. agent.sinks = sink
  4. agent.sources.source.type = avro
  5. agent.sources.source.bind = 192.168.0.100
  6. agent.sources.source.port = 44444
  7. agent.sources.source.channels = channel
  8. agent.sinks.sink.type = org.apache.flume.sink.FileSink
  9. agent.sinks.sink.file.path = /data/log/%{dayStr}
  10. agent.sinks.sink.file.filePrefix = log-%{hourStr}%{minStr}-
  11. agent.sinks.sink.file.txnEventMax = 10000
  12. agent.sinks.sink.file.maxOpenFiles = 5
  13. agent.sinks.sink.channel = channel
  14. agent.channels.channel.type = memory
  15. agent.channels.channel.capacity = 100000
  16. agent.channels.channel.transactionCapacity = 100000
  17. agent.channels.channel.keep-alive = 60
复制代码

依赖的jar如下:
jakarta-oro-2.0.1.jar
flume-ng-core-1.3.0-SNAPSHOT.jar
flume-ng-sdk-1.3.0-SNAPSHOT.jar
flume-ng-configuration-1.3.0-SNAPSHOT.jar
slf4j-log4j12-1.6.1.jar
slf4j-api-1.6.1.jar
guava-10.0.1.jar

代码如下:
FileSink.java
  1. package org.apache.flume.sink;
  2. import java.io.IOException;
  3. import java.util.Calendar;
  4. import java.util.List;
  5. import java.util.concurrent.Executors;
  6. import java.util.concurrent.ScheduledExecutorService;
  7. import org.apache.flume.Channel;
  8. import org.apache.flume.Context;
  9. import org.apache.flume.Event;
  10. import org.apache.flume.EventDeliveryException;
  11. import org.apache.flume.Transaction;
  12. import org.apache.flume.conf.Configurable;
  13. import org.apache.flume.formatter.output.BucketPath;
  14. import org.apache.flume.instrumentation.SinkCounter;
  15. import org.apache.flume.serialization.EventSerializer;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import com.google.common.base.Preconditions;
  19. import com.google.common.collect.Lists;
  20. import com.google.common.util.concurrent.ThreadFactoryBuilder;
  21. public class FileSink extends AbstractSink implements Configurable {
  22.         private static final Logger logger = LoggerFactory
  23.                      . getLogger(FileSink .class );
  24.         private String path ;
  25.         private static final String defaultFileName = "FlumeData";
  26.         private static final int defaultMaxOpenFiles = 50;
  27.         /**
  28.         * Default length of time we wait for blocking BucketWriter calls before
  29.         * timing out the operation. Intended to prevent server hangs.
  30.         */
  31.         private long txnEventMax ;
  32.         private FileWriterLinkedHashMap sfWriters ;
  33.         private String serializerType ;
  34.         private Context serializerContext ;
  35.         private boolean needRounding = false;
  36.         private int roundUnit = Calendar.SECOND;
  37.         private int roundValue = 1;
  38.         private SinkCounter sinkCounter ;
  39.         private int maxOpenFiles ;
  40.         private ScheduledExecutorService timedRollerPool ;
  41.         private long rollInterval ;
  42.         @Override
  43.         public void configure(Context context) {
  44.               String directory = Preconditions. checkNotNull(
  45.                            context.getString( "file.path"), "file.path is required");
  46.               String fileName = context.getString( "file.filePrefix", defaultFileName);
  47.                this.path = directory + "/" + fileName;
  48.                maxOpenFiles = context.getInteger("file.maxOpenFiles" ,
  49.                             defaultMaxOpenFiles);
  50.                serializerType = context.getString("sink.serializer" , "TEXT" );
  51.                serializerContext = new Context(
  52.                            context.getSubProperties(EventSerializer. CTX_PREFIX));
  53.                txnEventMax = context.getLong("file.txnEventMax" , 1l);
  54.                if (sinkCounter == null) {
  55.                       sinkCounter = new SinkCounter(getName());
  56.               }
  57.                rollInterval = context.getLong("file.rollInterval" , 30l);
  58.               String rollerName = "hdfs-" + getName() + "-roll-timer-%d" ;
  59.                timedRollerPool = Executors.newScheduledThreadPool( maxOpenFiles,
  60.                             new ThreadFactoryBuilder().setNameFormat(rollerName).build());
  61.        }
  62.         @Override
  63.         public Status process() throws EventDeliveryException {
  64.               Channel channel = getChannel();
  65.               Transaction transaction = channel.getTransaction();
  66.               List<BucketFileWriter> writers = Lists. newArrayList();
  67.               transaction.begin();
  68.                try {
  69.                      Event event = null;
  70.                       int txnEventCount = 0;
  71.                       for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
  72.                            event = channel.take();
  73.                             if (event == null) {
  74.                                    break;
  75.                            }
  76.                             // reconstruct the path name by substituting place holders
  77.                            String realPath = BucketPath
  78.                                          . escapeString(path, event.getHeaders(), needRounding,
  79.                                                         roundUnit, roundValue );
  80.                            BucketFileWriter bucketFileWriter = sfWriters.get(realPath);
  81.                             // we haven't seen this file yet, so open it and cache the
  82.                             // handle
  83.                             if (bucketFileWriter == null) {
  84.                                   bucketFileWriter = new BucketFileWriter();
  85.                                   bucketFileWriter.open(realPath, serializerType,
  86.                                                  serializerContext, rollInterval , timedRollerPool,
  87.                                                  sfWriters);
  88.                                    sfWriters.put(realPath, bucketFileWriter);
  89.                            }
  90.                             // track the buckets getting written in this transaction
  91.                             if (!writers.contains(bucketFileWriter)) {
  92.                                   writers.add(bucketFileWriter);
  93.                            }
  94.                             // Write the data to File
  95.                            bucketFileWriter.append(event);
  96.                      }
  97.                       if (txnEventCount == 0) {
  98.                             sinkCounter.incrementBatchEmptyCount();
  99.                      } else if (txnEventCount == txnEventMax) {
  100.                             sinkCounter.incrementBatchCompleteCount();
  101.                      } else {
  102.                             sinkCounter.incrementBatchUnderflowCount();
  103.                      }
  104.                       // flush all pending buckets before committing the transaction
  105.                       for (BucketFileWriter bucketFileWriter : writers) {
  106.                             if (!bucketFileWriter.isBatchComplete()) {
  107.                                   flush(bucketFileWriter);
  108.                            }
  109.                      }
  110.                      transaction.commit();
  111.                       if (txnEventCount > 0) {
  112.                              sinkCounter.addToEventDrainSuccessCount(txnEventCount);
  113.                      }
  114.                       if (event == null) {
  115.                             return Status.BACKOFF ;
  116.                      }
  117.                       return Status.READY ;
  118.               } catch (IOException eIO) {
  119.                      transaction.rollback();
  120.                       logger.warn("File IO error" , eIO);
  121.                       return Status.BACKOFF ;
  122.               } catch (Throwable th) {
  123.                      transaction.rollback();
  124.                       logger.error("process failed" , th);
  125.                       if (th instanceof Error) {
  126.                             throw (Error) th;
  127.                      } else {
  128.                             throw new EventDeliveryException(th);
  129.                      }
  130.               } finally {
  131.                      transaction.close();
  132.               }
  133.        }
  134.         private void flush(BucketFileWriter bucketFileWriter) throws IOException {
  135.               bucketFileWriter.flush();
  136.        }
  137.         @Override
  138.         public synchronized void start() {
  139.                super.start();
  140.                this.sfWriters = new FileWriterLinkedHashMap(maxOpenFiles);
  141.                sinkCounter.start();
  142.        }
  143. }
复制代码

BucketFileWriter.java
  1. package org.apache.flume.sink;
  2. import java.io.BufferedOutputStream;
  3. import java.io.File;
  4. import java.io.FileOutputStream;
  5. import java.io.IOException;
  6. import java.io.OutputStream;
  7. import java.util.concurrent.Callable;
  8. import java.util.concurrent.ScheduledExecutorService;
  9. import java.util.concurrent.TimeUnit;
  10. import java.util.concurrent.atomic.AtomicLong;
  11. import org.apache.flume.Context;
  12. import org.apache.flume.Event;
  13. import org.apache.flume.serialization.EventSerializer;
  14. import org.apache.flume.serialization.EventSerializerFactory;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. public class BucketFileWriter {
  18.      private static final Logger logger = LoggerFactory
  19.                .getLogger(BucketFileWriter.class);
  20.      private static final String IN_USE_EXT = ".tmp";
  21.      /**
  22.      * This lock ensures that only one thread can open a file at a time.
  23.      */
  24.      private final AtomicLong fileExtensionCounter;
  25.      private OutputStream outputStream;
  26.      private EventSerializer serializer;
  27.      private String filePath;
  28.      /**
  29.      * Close the file handle and rename the temp file to the permanent filename.
  30.      * Safe to call multiple times. Logs HDFSWriter.close() exceptions.
  31.      *
  32.      * @throws IOException
  33.      *             On failure to rename if temp file exists.
  34.      */
  35.      public BucketFileWriter() {
  36.           fileExtensionCounter = new AtomicLong(System.currentTimeMillis());
  37.      }
  38.      public void open(final String filePath, String serializerType,
  39.                Context serializerContext, final long rollInterval,
  40.                final ScheduledExecutorService timedRollerPool,
  41.                final FileWriterLinkedHashMap sfWriters) throws IOException {
  42.           this.filePath = filePath;
  43.           File file = new File(filePath + fileExtensionCounter + IN_USE_EXT);
  44.           file.getParentFile().mkdirs();
  45.           outputStream = new BufferedOutputStream(new FileOutputStream(file));
  46.           logger.info("filename = " + file.getAbsolutePath());
  47.           serializer = EventSerializerFactory.getInstance(serializerType,
  48.                     serializerContext, outputStream);
  49.           serializer.afterCreate();
  50.           if (rollInterval > 0) {
  51.                Callable<Void> action = new Callable<Void>() {
  52.                     @Override
  53.                     public Void call() throws Exception {
  54.                          logger.debug(
  55.                                    "Rolling file ({}): Roll scheduled after {} sec elapsed.",
  56.                                    filePath + fileExtensionCounter + IN_USE_EXT,
  57.                                    rollInterval);
  58.                          if (sfWriters.containsKey(filePath)) {
  59.                               sfWriters.remove(filePath);
  60.                          }
  61.                          close();
  62.                          return null;
  63.                     }
  64.                };
  65.                timedRollerPool.schedule(action, rollInterval, TimeUnit.SECONDS);
  66.           }
  67.      }
  68.      public void append(Event event) throws IOException {
  69.           serializer.write(event);
  70.      }
  71.      public boolean isBatchComplete() {
  72.           return true;
  73.      }
  74.      public void flush() throws IOException {
  75.           serializer.flush();
  76.           outputStream.flush();
  77.      }
  78.      /**
  79.      * Rename bucketPath file from .tmp to permanent location.
  80.      */
  81.      private void renameBucket() {
  82.           File srcPath = new File(filePath + fileExtensionCounter + IN_USE_EXT);
  83.           File dstPath = new File(filePath + fileExtensionCounter);
  84.           if (srcPath.exists()) {
  85.                srcPath.renameTo(dstPath);
  86.                logger.info("Renaming " + srcPath + " to " + dstPath);
  87.           }
  88.      }
  89.      public synchronized void close() throws IOException, InterruptedException {
  90.           if (outputStream != null) {
  91.                outputStream.flush();
  92.                outputStream.close();
  93.           }
  94.           renameBucket();
  95.      }
  96. }
复制代码

FileWriterLinkedHashMap.java
  1. package org.apache.flume.sink;
  2. import java.io.IOException;
  3. import java.util.LinkedHashMap;
  4. import java.util.Map.Entry;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. public class FileWriterLinkedHashMap extends
  8.           LinkedHashMap<String, BucketFileWriter> {
  9.      private static final Logger logger = LoggerFactory
  10.                .getLogger(FileWriterLinkedHashMap.class);
  11.      private static final long serialVersionUID = -7860596835613215998L;
  12.      private final int maxOpenFiles;
  13.      public FileWriterLinkedHashMap(int maxOpenFiles) {
  14.           super(16, 0.75f, true); // stock initial capacity/load, access
  15.           this.maxOpenFiles = maxOpenFiles;
  16.      }
  17.      @Override
  18.      protected boolean removeEldestEntry(Entry<String, BucketFileWriter> eldest) {
  19.           if (size() > maxOpenFiles) {
  20.                // If we have more that max open files, then close the last one
  21.                // and
  22.                // return true
  23.                try {
  24.                     eldest.getValue().close();
  25.                } catch (IOException e) {
  26.                     logger.warn(eldest.getKey().toString(), e);
  27.                } catch (InterruptedException e) {
  28.                     logger.warn(eldest.getKey().toString(), e);
  29.                     Thread.currentThread().interrupt();
  30.                }
  31.                return true;
  32.           } else {
  33.                return false;
  34.           }
  35.      }
  36. }
复制代码



欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(6)人评论

跳转到指定楼层
stark_summer 发表于 2015-1-12 13:54:09
回复

使用道具 举报

355815741 发表于 2015-1-12 22:24:39
学习了,谢谢分享~
回复

使用道具 举报

Joker 发表于 2015-5-6 18:01:07
你好,我想请问一下,你这个可以启动起来吗?
我启动配置文件时候,一直报错

org.apache.flume.FlumeException: Unable to load sink type: fileSink, class: fileSink
        at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:69)
        at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: fileSink
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:190)
        at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67)
        ... 11 more

不知道为啥,是不是那些依赖包都要都到flume集群的 lib下?

望回复
回复

使用道具 举报

Joker 发表于 2015-5-6 20:36:26
本帖最后由 Joker 于 2015-5-6 20:39 编辑

好吧,怪我。
找到原文看了下,他的代码是高亮显示,这个类是自己写的!!!
我当时是直接先运行,没看代码!

回复

使用道具 举报

gudeyang 发表于 2015-5-11 13:21:40
flume是如何收集nginx的日志的,如果用tail -F access_log那不是产生IO了, 如和让nginx的access_log以数据流的形式传递给本地的flume呢
回复

使用道具 举报

cdb521007 发表于 2015-7-26 14:21:15
很实用,学学了。。。。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条