立即注册 登录
About云-梭伦科技 返回首页

s060403072的个人空间 https://www.aboutyun.com/?57 [收藏] [复制] [分享] [RSS]

日志

flume传递速率限制【二次开发】

已有 2142 次阅读2015-9-21 20:23 | 开发






API 地址


 按理说,应该在sink端限制数据的发送速度,但flume-ng提供了非常便利的interceptor模式,因此本文,就只是在source端简单的实现了对数据发送速度的限制。

package com.xxx.flume.core.interceptor;  
  
import java.util.List;  
import org.slf4j.Logger;  
import org.apache.flume.Event;  
import org.slf4j.LoggerFactory;  
import org.apache.flume.Context;  
import org.apache.flume.interceptor.Interceptor;  
  
  
public class LimitInterceptor implements Interceptor {  
    private static final Logger logger = LoggerFactory.getLogger(LimitInterceptor.class);  
  
    private static long KB = 1024L;  
  
    private long lastEventSentTick = System.nanoTime();  
  
    private long pastSentLength = 0L;  
    private long max;  
    private long timeCostPerCheck = 1000000000L;  
  
    private long headerSize = 0L;  
  
    private boolean flag = true;  
  
    private int num = 0;  
  
    public LimitInterceptor(long limitRate, long headerSize) {  
        this.max = (limitRate * KB);  
        this.headerSize = headerSize;  
    }  
  
    public void initialize() {  
    }  
  
    public Event intercept(Event event) {  
        this.num += 1;  
        if (this.pastSentLength > this.max) {  
            long nowTick = System.nanoTime();  
            long multiple = this.pastSentLength / this.max;  
            long missedTime = multiple * this.timeCostPerCheck - (nowTick - this.lastEventSentTick);  
            if (missedTime > 0L) {  
                try {  
                    System.out.printf("Limit source send rate, headerLength:%d,pastSentLength:%d,lastEventSentTick:%d,sleepTime:%d, num:%d\n",  
                        headerSize, pastSentLength, lastEventSentTick, missedTime / 1000000, num);  
                    Thread.sleep(missedTime / 1000000L, (int) (missedTime % 1000000L));  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            this.num = 0;  
            this.pastSentLength = 0L;  
            this.lastEventSentTick = (nowTick + (missedTime > 0L ? missedTime : 0L));  
        }  
        this.pastSentLength += this.headerSize + event.getBody().length;  
  
        return event;  
    }  
  
    public List<Event> intercept(List<Event> events) {  
        for (Event event : events) {  
            intercept(event);  
        }  
        return events;  
    }  
  
    public void close() {  
    }  
  
    public static class Builder implements Interceptor.Builder {  
        private long limitRate;  
        private long headerSize;  
  
        public Interceptor build() {  
            return new LimitInterceptor(this.limitRate, this.headerSize);  
        }  
  
        public void configure(Context context) {  
            this.limitRate = context.getLong(Constants.LIMIT_RATE, Long.valueOf(Constants.DEFAULT_RATE)).longValue();  
            this.headerSize = context.getLong(Constants.HEADER_SIZE, Long.valueOf(Constants.DEFAULT_SIZE)).longValue();  
        }  
  
        public static class Constants {  
            public static long DEFAULT_RATE = 500L;  
            public static long DEFAULT_SIZE = 16L;  
            public static String LIMIT_RATE = "limitRate";  
            public static String HEADER_SIZE = "headerSize";  
        }  
    }  
}  

打包后扔到flume-ng的lib目录下,然后在flume-conf配置文件,增加流速拦截器:


agent.sources.seqGenSrc.interceptors = limitrate  
agent.sources.seqGenSrc.interceptors.limitrate.type = com.xxx.flume.core.interceptor.LimitInterceptor$Builder  
agent.sources.seqGenSrc.interceptors.limitrate.limitRate = 500  
agent.sources.seqGenSrc.interceptors.limitrate.headerSize = 8  



这里限制发送速度500kb/s(可以为不同的source设置不同的发送速度),设置每个event头大小约为8字节。
发送端输出信息:

Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649436601991827,sleepTime:929, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649437601991827,sleepTime:929, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649438601991827,sleepTime:929, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649439601991827,sleepTime:927, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649440601991827,sleepTime:928, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649441601991827,sleepTime:928, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649442601991827,sleepTime:928, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649443601991827,sleepTime:925, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649444601991827,sleepTime:929, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512025,lastEventSentTick:79649445601991827,sleepTime:889, num:3421  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649446601991827,sleepTime:924, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649447601991827,sleepTime:927, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649448601991827,sleepTime:928, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649449601991827,sleepTime:925, num:3437  
Limit source send rate, headerLength:8,pastSentLength:512113,lastEventSentTick:79649450601991827,sleepTime:928, num:3437  












路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条