分享

Flume-NG(1.5版本)中SpillableMemoryChannel源码级分析

问题导读
1、什么是SpillableMemoryChannel?
2、channel类按照国际惯例必须实现哪4个方法?
3、takeCommit()方法如何进行选择?




SpillableMemoryChannel是1.5版本新增的一个channel。这个channel优先将evnet放在内存中,一旦内存达到设定的容量就使用file channel写入磁盘。然后读的时候会按照顺序读取:会通过一个DrainOrderQueue来保证不管是内存中的还是溢出(本文的“溢出”指的是内存channel已满,需要使用file channel存储数据)文件中的顺序。这个Channel是memory channel和file channel的一个折中,虽然在内存中的数据仍然可能因为进程的突然中断而丢失,但是相对于memory channel而言一旦sink处理速度跟不上不至于丢失数据(后者一旦满了爆发异常会丢失后续的数据),提高了数据的可靠性;相对于file channel而言自然是大大提高了速度,但是可靠性较file channel有所降低。

  我们来看一下SpillableMemoryChannel的继承结构:SpillableMemoryChannel extends FileChannel,原来SpillableMemoryChannel是file的子类,天热具有file channel的特性。但是它的BasicTransactionSemantics是自己实现的。接下来我们来分析分析这个channel,这个channel可以看成是两个channel。相关内容传送门:Flume-NG源码阅读之FileChannel 和 flume-ng源码阅读memory-channel(原创) 。

  一、首先来看configure(Context context)方法,这个方法是对这个channel进行配置。一些主要参数介绍:
  (1)Semaphore totalStored,这两个channel【内存channel(并不是flume内置的memory channel,这里是新实现的一个,本文中的“内存channel”若无说明就是新实现的这个)和溢出而使用的file channel】中event数量的总和的信号量,初始为0;
  (2)ArrayDeque<Event> memQueue,这就是这里的内存channel,使用可以改变大小的数组双端队列ArrayDeque,存储event数据;
  (3)int memoryCapacity(对应参数名"memoryCapacity"),内存channel中存储的event的最大数量;
  (4)Semaphore memQueRemaining,内存channel剩余的可存储event的数量的信号量,初始大小为memoryCapacity;
  (5)int overflowTimeout(对应参数名"overflowTimeout"),溢出超时时间,指的是内存channel满了之后,切换到file channel的等待时间,默认是3s;
  (6)double overflowDeactivationThreshold(对应参数名"overflowDeactivationThreshold"),指的是停止溢出的阈值------内存channel剩余内存(这里指可再存储的event数量),默认5%;
  (7)volatile int byteCapacityBufferPercentage(对应参数名"byteCapacityBufferPercentage"),用来限制内存channel使用物理内存量,默认20;
  (8)volatile double avgEventSize()(对应参数名"avgEventSize"),指定每个event的大小,用来计算内存channel可以使用的slot总数量,会把event量化为slot,而不是字节,默认500;
  (9)volatile int byteCapacity(对应参数名"byteCapacity"),slot数量,默认是JVM可使用的最大物理内存(可通过配置"byteCapacity"参数来控制物理内存使用)的80%* (1 - byteCapacityBufferPercentage * .01 )) / avgEventSize得来;
  (10)Semaphore bytesRemaining,内存channel中剩余可使用的slot数量信号量,初始大小是byteCapacity;
  (11)volatile int lastByteCapacity,动态加载配置文件时才会有用,记录上一次的ByteCapacity,用于修改bytesRemaining信号量的大小;
  (12)int overflowCapacity(对应参数名"overflowCapacity"),用于设置file channel的容量,默认是1亿;
  此外,boolean overflowDisabled用来是否禁用溢出,只要overflowCapacity不小于1就不会禁用;boolean overflowActivated表示是否可以使用溢出,默认是false;还会对对file channel的"keep-alive"设置为0;最后会通过super.configure(context)来对file channel进行配置。对于file channel的配置信息可以和SpillableMemoryChannel的配置信息在一起配置。

  二、start()方法,首先会super.start()启动file channel,获取file中溢出的数据量overFlowCount,重置totalStored和DrainOrderQueue对象drainOrder,内存channel的start是不会有数据的。

  三、需要讲一下DrainOrderQueue drainOrder = new DrainOrderQueue()。我们知道SpillableMemoryChannel其实是由两个channel组成,分别是内存channel和file channel,因此数据也会分布在内存和磁盘文件之中,那我们take时,是什么机制呢?换句话说就是什么时候读内存中的数据,什么时候读磁盘上文件的数据?take的顺序怎么样呢?我们希望take的顺序和put的顺序一样,先put的应该先take,所以我们应该给所有的put(包括内存和文件)进行“编号”使得可以有序的take,还要注意的就是需要标示这个take是应该从内存还是file中去读。为此设计了DrainOrderQueue类,来使得有序的put和take。
  这个类设计的狠精巧,是保证take和put正常合理操作的关键。在讲之前先大概说一下原理:这个类的关键属性是ArrayDeque<MutableInteger> queue,这也是一个ArrayDeque,ArrayDeque特性是数组可变且大小不受限制,可在头尾操作,此类很可能在用作堆栈时快于 Stack,在用作队列时快于 LinkedList,但是不是线程安全的不支持多线程并发操作;put操作总是对queue中的最后(尾)一个元素操作,take操作总是对queue中第一个(头)操作;put时,如果是内存channel,在queue增加的就是正数,如果是溢出操作增加的就是负数,内存和溢出分别对应queue中不同的元素(可以分类去读);take时,如果从内存中取数据,就会使得queue第一个元素的值不断缩小(正数)至0,然后删除这个元素,如果是从溢出文件中取数据则会使得queue中第一个元素不断增大(负数)至0,然后删除这个元素;这样就会形成流,使得put不断追加数据到流中,take不断从流中取数据,这个流就是有序的,且流中元素其实就是内存中的evnet个数和溢出文件中event的个数。
  好了,DrainOrderQueue详细代码如下:
  1.    public static class DrainOrderQueue {
  2.      public ArrayDeque<MutableInteger> queue = new ArrayDeque<MutableInteger>(1000);
  3.     public int totalPuts = 0;  // for debugging only
  4.      private long overflowCounter = 0; // # of items in overflow channel
  5.      public  String dump() {
  6.       StringBuilder sb = new StringBuilder();
  7.        sb.append("  [ ");
  8.       for (MutableInteger i : queue) {
  9.          sb.append(i.intValue());
  10.         sb.append(" ");
  11.       }
  12.       sb.append("]");
  13.       return  sb.toString();
  14.      }
  15.      public void putPrimary(Integer eventCount) {
  16.        totalPuts += eventCount;
  17.        if (  (queue.peekLast() == null) || queue.getLast().intValue() < 0) {    //获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null
  18.         queue.addLast(new MutableInteger(eventCount));
  19.       } else {
  20.         queue.getLast().add(eventCount);//获取,但不移除此双端队列的第一个元素。
  21.        }
  22.      public void putFirstPrimary(Integer eventCount) {
  23.        if ( (queue.peekFirst() == null) || queue.getFirst().intValue() < 0) {    //获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。
  24.         queue.addFirst(new MutableInteger(eventCount));
  25.        } else {
  26.          queue.getFirst().add(eventCount);//获取,但不移除此双端队列的第一个元素。
  27.        }
  28.     }
  29.      public void putOverflow(Integer eventCount) {
  30.       totalPuts += eventCount;
  31.        if ( (queue.peekLast() == null) ||  queue.getLast().intValue() > 0) {
  32.          queue.addLast(new MutableInteger(-eventCount));
  33.       } else {
  34.          queue.getLast().add(-eventCount);
  35.        }
  36.        overflowCounter += eventCount;
  37.      }
  38.      public void putFirstOverflow(Integer eventCount) {
  39.        if ( (queue.peekFirst() == null) ||  queue.getFirst().intValue() > 0) {
  40.         queue.addFirst(new MutableInteger(-eventCount));
  41.        }  else {
  42.          queue.getFirst().add(-eventCount);
  43.        }
  44.        overflowCounter += eventCount;
  45.      }
  46.      public int front() {
  47.       return queue.getFirst().intValue();
  48.     }
  49.     public boolean isEmpty() {
  50.       return queue.isEmpty();
  51.    }
  52.     public void takePrimary(int takeCount) {
  53.       MutableInteger headValue = queue.getFirst();
  54.        // this condition is optimization to avoid redundant conversions of
  55.        // int -> Integer -> string in hot path
  56.       if (headValue.intValue() < takeCount)  {
  57.          throw new IllegalStateException("Cannot take " + takeCount +
  58.                 " from " + headValue.intValue() + " in DrainOrder Queue");
  59.        }
  60.       headValue.add(-takeCount);
  61.        if (headValue.intValue() == 0) {
  62.       queue.removeFirst();
  63.        }
  64.     }
  65.     public void takeOverflow(int takeCount) {
  66.       MutableInteger headValue = queue.getFirst();
  67.       if(headValue.intValue() > -takeCount) {
  68.        throw new IllegalStateException("Cannot take " + takeCount + " from "
  69.                 + headValue.intValue() + " in DrainOrder Queue head " );
  70.        }
  71.      headValue.add(takeCount);
  72.       if (headValue.intValue() == 0) {
  73.          queue.removeFirst();    //获取并移除此双端队列第一个元素。
  74.       }
  75.        overflowCounter -= takeCount;
  76.     }
  77.   }
复制代码


  我们一个方法一个方法的来剖析这个类:
  (1)dump(),这个方法比较简单就是获得queue中所有元素的数据量;

  (2)putPrimary(Integer eventCount),这个方法用在put操作的commit时,在commitPutsToPrimary()方法中被调用,表示向内存提交数据。这个方法会尝试获取queue中最后一个元素,如果为空(说明没数据)或者元素数值小于0(说明这个元素是面向溢出文件的),就新建一个元素赋值这个事务的event数量加入queue;否则表示当前是的元素表征的是内存中的event数量,直接累加即可。

  (3)putFirstPrimary(Integer eventCount),在doRollback()回滚的时候被调用,表示将takeList中的数据放回内存memQueue的头。这个方法会尝试获取queue中第一个元素,如果为空(说明没数据)或者元素数值小于0(说明这个元素是面向溢出文件的),就新建一个元素赋值takeList的event数量加入queue;否则表示当前是的元素表征的是内存中的event数量,直接累加即可。

  (4)putOverflow(Integer eventCount),这个方法发生在put操作的commit时,在commitPutsToOverflow_core方法和start()方法中,后者是设置初始量,前者表示内存channel已满要溢出到file channel。这个方法会尝试获取queue中最后一个元素,如果为空(说明没数据)或者元素数值大于0(表示这个元素是面向内存的),就新建一个元素赋值这个事务的event数量加入queue,这里赋值为负数;否则表示当前是的元素表征的是溢出文件中的event数量,直接累加负数即可。

  (5)putFirstOverflow(Integer eventCount),在doRollback()回滚的时候被调用,表示将takeList中event的数量放回溢出文件。这个方法会尝试获取queue中第一个元素,如果为空(说明没数据)或者元素数值大于0(表示这个元素是面向内存的),就新建一个元素赋值这个事务的 event数量加入queue,这里赋值为负数;否则表示当前是的元素表征的是溢出到文件中的event数量,直接累加负数即可。

  (6)front(),返回queue中第一个元素的值

  (7)takePrimary(int takeCount),这个方法在doTake()中被调用,表示take发生之后,要将内存中的event数量减takeCount(这个值一般都是1,即每次取一个)。这个方法会获取第一个元素的值(表示内存channel中有多少event),如果这个值比takeCount小,说明内存中没有足够的数量,这种情况不应该发生,报错;否则将这个元素的值减去takeCount,表示已取出takeCount个。最后如果这个元素的值为0,则从queue中删除这个元素。注意这里虽然是可以取takeCount个,但是源码调用这个参数都是一次取1个而已。

  (8)takeOverflow(int takeCount),这个方法在doTake()中被调用,表示take发生之后,要将溢出文件中的event数量加上takeCount(这个值一般都是1,即每次取一个)。这个 方法会获取第一个元素的值(表示溢出文件中有多少event),如果这个值比takeCount的负值大,说明文件中没有足够的数量,这种情况不应该发生,报错;否则将这个元素的值加上takeCount,表示已取出takeCount个。最后如果这个元素的值为0,则从queue中删除这个元素。注意这里虽然是可以取 takeCount个,但是源码调用这个参数都是一次取1个而已。

  四、这个channel的BasicTransactionSemantics:SpillableMemoryTransaction,这是每个channel的必须实现的可靠性保证。这个类也有一些属性:
  (1)BasicTransactionSemantics overflowTakeTx = null,这个是file channel的事务FileBackedTransaction,表示take操作从溢出文件中获取event;
  (2)BasicTransactionSemantics overflowPutTx = null,这个是file channel的事务FileBackedTransaction,表示put操作溢出到磁盘文件;
  (3)boolean useOverflow = false,是否使用溢出;
  (4)boolean putCalled = false,put操作,初次put的时候会置为true;
  (5)boolean takeCalled = false,take操作,初次take的时候会置为true;
  (6)int largestTakeTxSize = 5000,不是常量,可以再分配;
  (7)int largestPutTxSize = 5000,不是常量,可以再分配;
  (8)Integer overflowPutCount = 0,这次事务溢出的event的数量;
  (9)int putListByteCount = 0,这次事务putList所有event占用字节总和;
  (10)int takeListByteCount = 0,这次事务takeList所有event占用字节总和;
  (11)int takeCount = 0,这次事务take操作的个数;
  (12)ArrayDeque<Event> takeList,从memQueue拿出来的event暂存之所;
  (13)ArrayDeque<Event> putList,放入memQueue之前event的暂存之所;
  按照国际惯例必须实现的4个方法:
  A、doPut(Event event),代码如下:
  1. protected void doPut(Event event) throws InterruptedException {
  2.        channelCounter.incrementEventPutAttemptCount();
  3.       putCalled = true;    //说明是在put操作
  4.        int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);//获取这个event可以占用几个slot
  5.        if (!putList.offer(event)) {    //加入putList
  6.         throw new ChannelFullException("Put queue in " + getName() +
  7.                  " channel's Transaction having capacity " + putList.size() +
  8.                 " full, consider reducing batch size of sources");
  9.        }
  10.       putListByteCount += eventByteSize;
  11.     }
复制代码


  这个方法比较简单,就是put开始;设置putCalled为true表示put操作;计算占用slot个数;将event放入putList等待commit操作;putListByteCount加上这个evnet占用的slot数。

  B、doTake(),代码如下:
  1. protected Event doTake() throws InterruptedException {
  2.        channelCounter.incrementEventTakeAttemptCount();
  3.        if (!totalStored.tryAcquire(overflowTimeout, TimeUnit.SECONDS)) {
  4.          LOGGER.debug("Take is backing off as channel is empty.");
  5.          return null;
  6.        }
  7.       boolean takeSuceeded = false;
  8.        try {
  9.          Event event;
  10.          synchronized(queueLock) {
  11.            int drainOrderTop = drainOrder.front();
  12.            if (!takeCalled) {
  13.             takeCalled = true;
  14.              if (drainOrderTop < 0) {
  15.                useOverflow = true;
  16.                overflowTakeTx = getOverflowTx();        //获取file channle的事务
  17.              overflowTakeTx.begin();
  18.             }
  19.            }
  20.           if (useOverflow) {
  21.              if (drainOrderTop > 0) {
  22.                LOGGER.debug("Take is switching to primary");
  23.               return null;       // takes should now occur from primary channel
  24.              }
  25.              event = overflowTakeTx.take();
  26.             ++takeCount;
  27.              drainOrder.takeOverflow(1);
  28.            } else {
  29.              if (drainOrderTop < 0) {
  30.                LOGGER.debug("Take is switching to overflow");
  31.               return null;      // takes should now occur from overflow channel
  32.              }
  33.              event = memQueue.poll();    //获取并移除此双端队列所表示的队列的头(换句话说,此双端队列的第一个元素);如果此双端队列为空,则返回 null。
  34.             ++takeCount;
  35.             drainOrder.takePrimary(1);
  36.             Preconditions.checkNotNull(event, "Queue.poll returned NULL despite"
  37.                      + " semaphore signalling existence of entry");
  38.           }
  39.        }
  40.         int eventByteSize = (int)Math.ceil(estimateEventSize(event)/ avgEventSize);
  41.          if (!useOverflow) {
  42.           // takeList is thd pvt, so no need to do this in synchronized block
  43.           takeList.offer(event);
  44.          }
  45.          takeListByteCount += eventByteSize;
  46.          takeSuceeded = true;
  47.          return event;
  48.        } finally {
  49.         if(!takeSuceeded) {
  50.           totalStored.release();
  51.        }
  52.       }
  53.      }
复制代码


  由于ArrayDeque是非线程安全的(memQueue就是ArrayDeque),所以take操作从memQueue获取数据时,要独占memQueue。任何对memQueue都要进行同步,这里是同步queueLock。

  doTake方法会先检查totalStored中有无许可,即channel中有无数据;然后同步;再获取drainOrder的头元素,如果takeCalled为false(初始为false),则设置其为true,再判断获取到的drainOrder头元素的值是否为负数,负数说明数据在溢出文件中,设置useOverflow为true表示要从溢出文件中读取数据并且获取file channel的FileBackedTransaction赋值给overflowTakeTx,begin()可以获取数据。如果useOverflow为true则转到调用overflowTakeTx.take获取event,然后takeCount自增1,调用drainOrder.takeOverflow(1)修改队列中溢出event数量的值。如果useOverflow为false说明数据在内存中,直接调用memQueue.poll()获得event,然后takeCount自增1,调用drainOrder.takePrimary(1)修改队列中内存中evnet数量的值。然后计算这个event占用的slot数。如果是从内存channel中读取的event则将其放入takeList中;takeListByteCount加上这个evnet占用的slot数。最后返回event。

  C、doCommit()方法,如果putCalled为true就会调用putCommit()方法来处理put的操作,如果takeCalled为true就调用takeCommit()方法来处理take操作。

  1、putCommit()方法,会首先依据overflowActivated的真假来设置超时时间。内存channel的溢出情况由两个信号量控制memQueRemaining和bytesRemaining,前者控制着event的数量,后者控制着物理内存的使用情况,如果这两者中的任何一个不满足都会触发溢出,溢出会设置overflowActivated = true;useOverflow = true,如果useOverflow为true,就调用commitPutsToOverflow()方法来处理溢出,这个方法会创建一个file channel的FileBackedTransaction赋值给overflowPutTx,begin可以put数据,然后依次将putList中的event通过overflowPutTx.put(event)放入file channel中,调用commitPutsToOverflow_core方法来处理overflowPutTx提交事务,再调用drainOrder.putOverflow(putList.size())修改queue中溢出文件中event的数量,如果在overflowPutTx提交过程中失败,最多再尝试一次,中间等待overflowTimeout秒。返回到commitPutsToOverflow方法,将totalStored释放putList.size的许可,溢出数量overflowPutCount增加putList.size。到这溢出的情况完成。如果putCommit()中useOverflow为false则说明event在内存channel中,会调用commitPutsToPrimary()来处理,这个方法会将putList中的所有event放入memQueue中,然后调用drainOrder.putPrimary(putList.size())修改queue中内存中event的数量,修改maxMemQueueSize的值,将totalStored释放putList.size的许可。

  2、takeCommit()方法,如果overflowTakeTx不为null,说明是从溢出文件取得的event,就调用commit方法提交事务。然后获得内存channel剩余空间的百分比,包括两部分之和,一部分是内存channel还可以再存储evnet的数量,另一部分就是takeCount,他们俩之和与memoryCapacity(不能为0)之比就是百分比memoryPercentFree。如果overflowActivated为true且memoryPercentFree不小于overflowDeactivationThreshold,说明内存中剩余空间已经达到了停止溢出的阈值,就设置overflowActivated为false停止溢出,这样其实会导致内存满了之后等待溢出的时间加长。如果take操作是从内存channel中取数据,memQueRemaining会释放takeCount个许可,表示腾出takeCount个空间;bytesRemaining会释放takeListByteCount个许可,表示腾出takeListByteCount个slot。
  D、doRollback(),代码如下:
  1. protected void doRollback() {
  2.       LOGGER.debug("Rollback() of " +
  3.                (takeCalled ? " Take Tx" : (putCalled ? " Put Tx" : "Empty Tx")));
  4.       if (putCalled) {
  5.         if (overflowPutTx!=null) {
  6.            overflowPutTx.rollback();
  7.         }
  8.          if (!useOverflow) {
  9.           bytesRemaining.release(putListByteCount);
  10.           putList.clear();
  11.          }
  12.          putListByteCount = 0;
  13.        } else if (takeCalled) {
  14.          synchronized(queueLock) {
  15.            if (overflowTakeTx!=null) {
  16.              overflowTakeTx.rollback();
  17.            }
  18.            if (useOverflow) {
  19.              drainOrder.putFirstOverflow(takeCount);
  20.           } else {
  21.              int remainingCapacity = memoryCapacity - memQueue.size();
  22.              Preconditions.checkState(remainingCapacity >= takeCount,
  23.                      "Not enough space in memory queue to rollback takes. This" +
  24.                             " should never happen, please report");
  25.              while (!takeList.isEmpty()) {
  26.                memQueue.addFirst(takeList.removeLast());
  27.             }
  28.              drainOrder.putFirstPrimary(takeCount);
  29.           }
  30.          }
  31.         totalStored.release(takeCount);
  32.        } else {
  33.         overflowTakeTx.rollback();
  34.       }
  35.       channelCounter.setChannelSize(memQueue.size() + drainOrder.overflowCounter);
  36.      }
复制代码


  如果putCalled为true,则表明正在进行的是put操作。如果overflowPutTx不为null,说明是在溢出,执行overflowPutTx的roolback方法进行回滚。如果没有溢出,则bytesRemaining释放putListByteCount许可,表示腾出putListByteCount个slot;清空putList;最后将putListByteCount置为0。如果takeCalled为true,说明正在进行的操作是take,如果overflowTakeTx不为null,说明是在溢出,执行overflowTakeTx的roolback方法进行回滚;如果在溢出,则调用drainOrder.putFirstOverflow(takeCount)修改queue中溢出文件中的event的数量;如果在使用内存channel,则计算出内存channel中还可以最多存储event的数量,如果这个数量小于takeCount,则报错,否则将takeList中的所有event加入memQueue的头部,执行drainOrder.putFirstPrimary(takeCount)来修改queue中内存channel存放的event的数量;然后totalStored释放takeCount个许可,表示内存channel中增加了takeCount个event。

  五、stop方法,会调用父类file channel中的stop方法。

  六、createTransaction()方法,直接返回一个SpillableMemoryTransaction对象。这说明take和put可以并发执行,但是当涉及到memQueue时,还是需要同步。

  至此,这个新的channel介绍完了。总体来说SpillableMemoryChannel是精心设计的一个channel,兼顾Flume内置的file channel和memory channel的优点,又增加了一个选择,大伙可根据需要选择合适的channel。





本文转载自:http://www.cnblogs.com/lxf20061900/p/3813492.html

已有(2)人评论

跳转到指定楼层
aishangzailu 发表于 2014-8-18 20:31:16
好资料,学习了
回复

使用道具 举报

ohano_javaee 发表于 2014-10-10 17:33:00

不过官方文档说不建议用在生产环境,不知道后续版本会不会成为稳定版

本帖最后由 ohano_javaee 于 2014-10-10 17:34 编辑

挺好正在想用这个channel
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条