分享

Flume客户端flume-ng-log4jappender 负载平衡(LoadBalancingLog4jAppender)

问题导读
1、负载均衡,日志方面需要哪些改进?
2、Flume如何实现负载均衡?


主机DNS配置:
  1. 192.168.177.167 machine-1  
  2. 192.168.177.168 machine-2  
  3. 192.168.177.158 machine-0  
  4. 192.168.177.174 hadoop-master hbase-master  
复制代码

     hadoop-maser 和machine-2当主机,其它机器当做collector机,存储在HDFS中。
     hadoop-master和machine-2机上的flume配置:
  1. agent.sources=s1  
  2. agent.channels=c1  
  3. agent.sinks=k1 k2  
  4.   
  5. agent.sinkgroups = g1   
  6. agent.sinkgroups.g1.sinks = k1 k2   
  7. agent.sinkgroups.g1.processor.type = load_balance   
  8. agent.sinkgroups.g1.processor.selector = round_robin   
  9. agent.sinkgroups.g1.processor.backoff = true   
  10.   
  11.   
  12. agent.sources.s1.type=avro  
  13. agent.sources.s1.channels=c1  
  14. agent.sources.s1.bind=0.0.0.0  
  15. agent.sources.s1.port=51515  
  16. agent.sources.s1.interceptors=i1  
  17. agent.sources.s1.interceptors.i1.type=timestamp  
  18.   
  19.   
  20. agent.channels.c1.type=jdbc  
  21.   
  22.   
  23. agent.sinks.k1.channel = c1   
  24. agent.sinks.k1.type = avro   
  25. agent.sinks.k1.hostname = machine-0  
  26. agent.sinks.k1.port = 51515  
  27. agent.sinks.k2.channel = c1   
  28. agent.sinks.k2.type = avro  
  29. agent.sinks.k2.hostname = machine-1  
  30. agent.sinks.k2.port = 51515   
复制代码

      machine-1 和machine-0的flume配置:
  1. agent.sources=s1  
  2. agent.channels=c1  
  3. agent.sinks=k1  
  4.   
  5.   
  6. agent.sources.s1.type=avro  
  7. agent.sources.s1.channels=c1  
  8. agent.sources.s1.bind=0.0.0.0  
  9. agent.sources.s1.port=51515  
  10.   
  11.   
  12. agent.channels.c1.type=jdbc  
  13.   
  14.   
  15. agent.sinks.k1.type=hdfs  
  16. agent.sinks.k1.channel=c1  
  17. agent.sinks.k1.hdfs.path=/flume/%Y/%m  
  18. agent.sinks.k1.hdfs.filePrefix=flume  
  19. agent.sinks.k1.hdfs.fileSuffix=.log  
  20. agent.sinks.k1.hdfs.rollInterval=3600  
  21. agent.sinks.k1.hdfs.rollCount=0  
  22. agent.sinks.k1.hdfs.rollSize=0  
  23. agent.sinks.k1.hdfs.fileType=DataStream  
  24. agent.sinks.k1.hdfs.writeFormat=Text  
  25. agent.sinks.k1.hdfs.useLocalTimeStamp=false  
复制代码

      log4j的配置:
  1. # File Appender rootLog  
  2. log4j.rootLogger=DEBUG,stdout,rootLog  
  3.   
  4.   
  5. #console configure for DEV environment  
  6. log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
  7. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
  8. log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n  
  9.   
  10.   
  11. log4j.appender.rootLog=org.apache.log4j.RollingFileAppender  
  12. log4j.appender.rootLog.File= rootLog.log  
  13. log4j.appender.rootLog.MaxFileSize=5000KB  
  14. log4j.appender.rootLog.MaxBackupIndex=20  
  15. log4j.appender.rootLog.layout=org.apache.log4j.PatternLayout  
  16. log4j.appender.rootLog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n  
  17.   
  18.   
  19. # File Appender boentel  
  20. #log4j.logger.com.boentel=DEBUG,boentel  
  21. #log4j.additivity.com.boentel=true  
  22. #log4j.appender.boentel=org.apache.log4j.RollingFileAppender  
  23. #log4j.appender.boentel.File= boentel.log  
  24. #log4j.appender.boentel.MaxFileSize=2000KB  
  25. #log4j.appender.boentel.MaxBackupIndex=20  
  26. #log4j.appender.boentel.layout=org.apache.log4j.PatternLayout  
  27. #log4j.appender.boentel.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n  
  28.   
  29.   
  30. log4j.logger.com.loadbalance= DEBUG,loadbalance  
  31. log4j.additivity.com.loadbalance= true  
  32.   
  33.   
  34. log4j.appender.loadbalance = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender  
  35. log4j.appender.loadbalance.Hosts =machine-2:51515 hadoop-master:51515  
  36. #log4j.appender.loadbalance.UnsafeMode = true  
  37. log4j.appender.out2.MaxBackoff = 30000  
  38. #FQDN RANDOM ,default is ROUND_ROBIN  
  39. log4j.appender.loadbalance.Selector = RANDOM  
  40. log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout  
  41. log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n  
复制代码


       测试代码:
  1. import java.util.Date;  
  2. import java.util.concurrent.Executors;  
  3. import java.util.concurrent.ScheduledExecutorService;  
  4. import java.util.concurrent.TimeUnit;  
  5.   
  6.   
  7. import org.apache.log4j.Logger;  
  8.   
  9.   
  10.   
  11.   
  12.   
  13.   
  14. public class Worker implements Runnable{  
  15.   
  16.   
  17.       
  18.     private static final Logger LOG = Logger.getLogger(Worker.class);   
  19.     private String command;  
  20.       
  21.     /**
  22.      * @param args
  23.      */  
  24.     public static void main(String[] args) {  
  25.         new Worker("0").init();  
  26.     }  
  27.   
  28.   
  29.     public void init(){  
  30.         int numWorkers = 1;  
  31.         int threadPoolSize = 3 ;  
  32.   
  33.   
  34.         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(threadPoolSize);  
  35.          
  36.         //schedule to run after sometime  
  37.         System.out.println("Current Time = "+new Date());  
  38.         Worker worker = null;  
  39.         for(int i=0; i< numWorkers; i++){  
  40.             try {  
  41.                 Thread.sleep(1000);  
  42.             } catch (InterruptedException e) {  
  43.                 e.printStackTrace();  
  44.             }  
  45.             worker = new Worker("do heavy processing");  
  46. //              scheduledThreadPool.schedule(worker, 10, TimeUnit.SECONDS);  
  47.             //scheduleAtFixedRate  
  48. //              scheduledThreadPool.scheduleAtFixedRate(worker, 0, 1, TimeUnit.SECONDS);  
  49.             scheduledThreadPool.scheduleWithFixedDelay(worker, 5, 10,  
  50.                     TimeUnit.SECONDS);  
  51.          
  52.         }  
  53.            
  54.         //add some delay to let some threads spawn by scheduler  
  55.         try {  
  56.             Thread.sleep(30000);  
  57.         } catch (InterruptedException e) {  
  58.             e.printStackTrace();  
  59.         }  
  60.            
  61.         scheduledThreadPool.shutdown();  
  62.         while(!scheduledThreadPool.isTerminated()){  
  63.             //wait for all tasks to finish  
  64.         }  
  65.         LOG.info("Finished all threads");  
  66.     }  
  67.     public Worker(String command){  
  68.         this.command = command;  
  69.     }  
  70.     @Override  
  71.     public void run() {  
  72.         LOG.info(Thread.currentThread().getName()+" Start. Command = "+command);  
  73.         processCommand();  
  74.         LOG.info(Thread.currentThread().getName()+" End.");  
  75.     }  
  76.    
  77.     private void processCommand() {  
  78.         try {  
  79.             for(int i = 1000; i < 1200; i++){  
  80.                 LOG.info("sequence:" + i);  
  81.             }  
  82.             Thread.sleep(5000);  
  83.         } catch (InterruptedException e) {  
  84.             e.printStackTrace();  
  85.         }  
  86.     }  
  87.       
  88.     @Override  
  89.     public String toString(){  
  90.         return this.command;  
  91.     }  
  92. }  
复制代码


      小结:
     最终能实现负载均衡的作用,但是,性能上还有些欠缺。
     当一台机死掉时,客户端将尝试不断链接,影响到数据传送到其它机子上。当死掉的机器恢复后,客户端备份的数据会重新发送到flume agent。数据正确性是达到了,但是,万一这个app当掉了,对应的日志信息不就丢了吗?这是一个问题,有待进一步的改进。

已有(1)人评论

跳转到指定楼层
shadowhtx 发表于 2014-7-23 19:35:50
咋不用filechannel磁盘缓存日志?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条