分享

【Flume】flume中transactionCapacity和batchSize概念的具体分析和解惑

问题导读

1.transactionCapacity的作用啊?
2.batchSize又是干啥的啊?
3.什么情况下,事务会提交,事务提交做了什么呢?





不知道各位用过flume的读者对这两个概念是否熟悉了解
一开始本人的确有点迷惑,觉得这是不是重复了啊?
没感觉到transactionCapacity的作用啊?
batchSize又是干啥的啊?
……
……
带着这些问题,我们深入源码来看一下:
batchSize
batchSize这个概念首先它出现在哪里呢?
kafkaSink的process方法

1.png

HDFS Sink

2.png

Exec Source
3.png


通过上面这三张图,相信大家应该知道batchSize从哪来的了
batchSize是针对Source和Sink提出的一个概念,它用来限制source和sink对event批量处理的。
即一次性你可以处理batchSize个event,这个一次性就是指在一个事务中。
当你处理的event数量超出了batchSize,那么事务就会提交了。

注意,这里有一个隐晦的地方,就是batchSize一定不能大于transactionCapacity

下面再来说说transactionCapacity

4.png

首先,从这个图中我们就可以看出transactionCapacity这个概念的来源了,它来自于通道中,不同于batchSize(Source,Sink)
那么,在通道中是如何使用该事务容量的呢??
内存通道中有个内部类MemoryTransaction



[mw_shl_code=java,true] private class MemoryTransaction extends BasicTransactionSemantics {
    private LinkedBlockingDeque<Event> takeList;
    private LinkedBlockingDeque<Event> putList;
    private final ChannelCounter channelCounter;
    private int putByteCounter = 0;
    private int takeByteCounter = 0;

    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque<Event>(transCapacity);
      takeList = new LinkedBlockingDeque<Event>(transCapacity);

      channelCounter = counter;
    }[/mw_shl_code]

这里就用到了事务容量,它就是putList和takeList的容量大小


putList就是用来存放put操作带来的event          channel的put



[mw_shl_code=java,true] if (!putList.offer(event)) {
        throw new ChannelException(
          "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }[/mw_shl_code]

每一次put前,都会预判put是否成功,从异常的提示信息就可以看出来,put不成功即事务容量满了


takeList存放的event是用来被take操作消耗的,返回拿到的一个event            channel的take

[mw_shl_code=java,true] if(takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
      }[/mw_shl_code]

take前也会预判,如果takeList已经满了,说明take操作太慢了,出现了event堆积的现象,这时候你应该调整事务容量


什么情况下,事务会提交呢,事务提交做了什么呢??

commit即事务提交
两种情况:
1、put的event提交


[mw_shl_code=java,true]while(!putList.isEmpty()) {
            if(!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }[/mw_shl_code]


event全部放到queue中,queue才是真正的flume中event的队列,它的容量是capacity,看上一张图即可。

2、take的event提交

5.png

因为在take操作的时候就已经将event从queue中取出了,而queue中取出的event正是靠put的提交来的


最后,再看看事务是如何回滚的??

6.png

事务回滚针对take操作,你把event拿出去,结果处理失败了,那当然得丢回来,等待下一次处理了!!
因为进入了rollback操作,说明commit操作出现异常,也就是commit操作失败了,那putList和takeList两个队列当然也没有被清空


[mw_shl_code=java,true] while(!takeList.isEmpty()) {
          queue.addFirst(takeList.removeLast());
        }[/mw_shl_code]
循环将event重新添加到queue中。

已有(3)人评论

跳转到指定楼层
cdb521007 发表于 2015-9-21 09:56:03
学习了,感谢版主帮我们分析的这些内容。辛苦~~
回复

使用道具 举报

daozhu 发表于 2016-7-5 00:13:36
好文章,正好用到,多谢分享
回复

使用道具 举报

墨小黑 发表于 2017-5-6 09:28:23
有个问题哈,batchSize是在source和sink里面配置,source中的batchSize不能大于transactionCapacity能理解。但是sink的batchSize如果也不能大于transactionCapacity,那是不是会导致channel中的event堆积??这样会不会导致flume挂掉??
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条