分享

flume搭建调试备忘录

pig2 发表于 2014-5-29 13:43:54 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 5024
flume搭建调试
Installing CDH3
https://ccp.cloudera.com/display/CDHDOC/CDH3+Installation
流水账,备忘。

  1. wget http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo -O /etc/yum.repos.d/cloudera.repo

  2. yum search hadoop
  3. yum -y install hadoop-0.20


  4. yum -y install hadoop-0.20-namenode
  5. yum -y install hadoop-0.20-datanode
  6. #yum -y install hadoop-0.20-secondarynamenode
  7. yum -y install hadoop-0.20-jobtracker
  8. yum -y install hadoop-0.20-tasktracker
复制代码


Installing CDH3 Components
https://ccp.cloudera.com/display ... llingCDH3Components
yum install
install/Use
---------------------------------
Flume        flume
Sqoop        sqoop
Hue        hue
Pig        hadoop-pig
Hive        hadoop-hive
HBase        hadoop-hbase
ZooKeeper        hadoop-zookeeper
Oozie server        oozie
Oozie client        oozie-client
Whirr        whirr
Snappy        hadoop-0.20-native
Mahout        mahout
flume分为:
flume 核心
flume.node 作为节点的服务自启动脚本
flume.master 作为maaster的服务自启动脚本
yum install flume*


  1. [root@flume-hadoop-node-1 ~]# flume
  2. usage: flume command [args...]
  3. commands include:
  4. dump Takes a specified source and dumps to console
  5. source Takes a specified source and dumps to console
  6. node Start a Flume node/agent (with watchdog)
  7. master Start a Flume Master server (with watchdog)
  8. version Dump flume build version information
  9. node_nowatch Start a flume node/agent (no watchdog)
  10. master_nowatch Start a Flume Master server (no watchdog)
  11. class <class> Run specified fully qualified class using Flume environment (no watchdog)
  12. ex: flume com.cloudera.flume.agent.FlumeNode
  13. classpath Dump the classpath used by the java executables
  14. shell Start the flume shell
  15. killmaster Kill a running master
  16. dumplog Takes a specified WAL/DFO log file and dumps to console
  17. sink Start a one-shot flume node with console source and specified sink



  18. cd /etc/flume/conf
  19. mv flume-site.xml.template flume-site.xml
  20. vi flume-site.xml

  21. #修改masterhost为你的host


  22. /etc/init.d/flume-master start
  23. /etc/init.d/flume-node start
复制代码

flume文档
http://archive.cloudera.com/cdh/3/flume/UserGuide/index.html
flume总的来说,是面向流的设计,“source“和”sink"分别代表产生和消费,push、pull都支持,可以扩展支持各种数据源,及数据的处理,非常灵活。
先停掉服务,以前台模式运行,方便查看各种输出,直观的了解一把

  1. /etc/init.d/flume-master stop && /etc/init.d/flume-node stop
复制代码


启动flume
  1. flume dump console
复制代码



启动之后,你可以在输入任何字符,然后会有来自flume的回显,因为我们参数指定了console,这个其实是配置flume的source为console的输入,默认sink也是console
source为文件的情况
flume dump 'text("/etc/services")'
tail文件末尾信息的方法
flume dump 'tail("testfile")'
testfile可以不存在,没有问题,我们在另外的console里面创建这个文件,并添加些内容


  1. [root@flume-hadoop-node-1 tmp]# echo "test flume">testfile
  2. [root@flume-hadoop-node-1 tmp]# echo "test flume 123">testfile
  3. [root@flume-hadoop-node-1 tmp]# echo "test flume 123">>testfile
  4. [root@flume-hadoop-node-1 tmp]# echo "test flume 1234">>testfile
  5. [root@flume-hadoop-node-1 tmp]# echo "test flume 12345\r\n123456">>testfile
复制代码
在flume这边,就可以实时的看到反馈
  1. 2012-01-06 20:42:55,818 [main] INFO agent.LogicalNodeManager: Loading node name with FlumeConfigData: {srcVer:'Thu Jan 01 08:00:00 CST 1970' snkVer:'Thu Jan 01 08:00:00 CST 1970' ts='Thu Jan 01 08:00:00 CST 1970' flowId:'null' source:'tail( "testfile" )' sink:'console' }
  2. 2012-01-06 20:42:55,836 [main] INFO agent.LogicalNode: Node config successfully set to FlumeConfigData: {srcVer:'Thu Jan 01 08:00:00 CST 1970' snkVer:'Thu Jan 01 08:00:00 CST 1970' ts='Thu Jan 01 08:00:00 CST 1970' flowId:'null' source:'tail( "testfile" )' sink:'console' }
  3. 2012-01-06 20:42:55,920 [logicalNode dump-10] INFO debug.ConsoleEventSink: ConsoleEventSink( debug ) opened
  4. 2012-01-06 20:42:55,973 [main] INFO agent.FlumeNode: Hadoop Security enabled: false
  5. flume-hadoop-node-1 [INFO Fri Jan 06 20:43:21 CST 2012] { tailSrcFile : (long)8387236824819002469 (string) 'testfile' (double)4.914663849160389E252 } test flume
  6. flume-hadoop-node-1 [INFO Fri Jan 06 20:43:36 CST 2012] { tailSrcFile : (long)8387236824819002469 (string) 'testfile' (double)4.914663849160389E252 } 123
  7. flume-hadoop-node-1 [INFO Fri Jan 06 20:43:48 CST 2012] { tailSrcFile : (long)8387236824819002469 (string) 'testfile' (double)4.914663849160389E252 } test flume 123
  8. flume-hadoop-node-1 [INFO Fri Jan 06 20:43:56 CST 2012] { tailSrcFile : (long)8387236824819002469 (string) 'testfile' (double)4.914663849160389E252 } test flume 1234
  9. flume-hadoop-node-1 [INFO Fri Jan 06 20:44:11 CST 2012] { tailSrcFile : (long)8387236824819002469 (string) 'testfile' (double)4.914663849160389E252 } test flume 12345\\r\\n123456
复制代码

多个文件,也是可以的

  1. flume dump 'multitail("test1", "test2")'
复制代码


默认情况下,tail会处理文件的每一行,并分别生成event,默认分隔符是“\n”,并且不会排除分隔符本身,如果你需要自定义分隔符(采用正则表达式),也是可以的,支持
”prev":分隔符属于前一个event
"next":分隔符属于下一个event
"exclude":分隔符丢弃

  1. tail("file", delim="\n\n+", delimMode="exclude")
  2. tail("file", delim="</a>", delimMode="prev")
复制代码


开启一个UDP服务,并监听5140端口
  1. flume dump 'syslogUdp(5140)'
复制代码
flume web console
http://10.129.8.125:35871/flumemaster.jsp
Cloudera Manager Free Edition
https://ccp.cloudera.com/display ... ition+Documentation
  1. wget http://archive.cloudera.com/cloudera-manager/installer/latest/cloudera-manager-installer.bin
  2. chmod a+x cloudera-manager-installer.bin
  3. ./cloudera-manager-installer.bin
复制代码
安装之前,先禁用Selinux
  1. vi /etc/selinux/config
  2. --
  3. SELINUX=disabled
  4. --


  5. setenforce 0
复制代码

./cloudera-manager-installer.bin
安装失败,查看日志,发现安装包下载不下来,只能手动下载安装了。
手动安装JDK

  1. wget http://archive.cloudera.com/cloudera-manager/redhat/5/x86_64/cloudera-manager/3/RPMS/jdk-6u21-linux-amd64.rpm
  2. rpm -Uhv jdk-6u21-linux-amd64.rpm
复制代码
http://archive.cloudera.com/cloudera-manager/redhat/5/x86_64/cloudera-manager/3/RPMS/cloudera-manager-daemons-3.7.2.143-1.noarch.rpm
----------------------------华丽的不行了的分割线-----------------------------------------
2台机器:125 126
125上配置:
  1. vi /etc/flume/conf/flume-site.xml

  2. <property>
  3. <name>flume.collector.event.host</name>
  4. <value>collector</value>
  5. <description>This is the host name of the default "remote" collector.
  6. </description>
  7. </property>
  8. <property>
  9. <name>flume.collector.port</name>
  10. <value>35853</value>
  11. <description>This default tcp port that the collector listens to in order to receive events it is collecting.
  12. </description>
  13. </property>
复制代码
启动flume各节点
  1. flume node_watch -n collector
复制代码
HDFS服务器设置(新配)
hdfs://10.129.8.126/
  1. cp /usr/lib/hadoop/example-confs/conf.pseudo/* /etc/hadoop/conf/


  2. mkdir /var/lib/hadoop-0.20/cache/hadoop/dfs/name -p
  3. chmod 777 -R /var/lib/hadoop-0.20/

  4. sudo -u hdfs hadoop namenode -format (注意大写的:Y)



  5. [root@cloudera-node-1 logs]# hadoop fs -ls hdfs://127.0.0.1/
  6. ls: Wrong FS: hdfs://127.0.0.1/, expected: hdfs://cloudera-node-1
  7. Usage: java FsShell [-ls <path>]
  8. [root@cloudera-node-1 logs]# hadoop fs -ls hdfs://cloudera-node-1
  9. ls: Pathname from hdfs://cloudera-node-1 is not a valid DFS filename.
  10. Usage: java FsShell [-ls <path>]
  11. [root@cloudera-node-1 logs]# hadoop fs -ls hdfs://cloudera-node-1/
  12. [root@cloudera-node-1 logs]# hadoop fs -mkdir hdfs://cloudera-node-1/test
  13. [root@cloudera-node-1 logs]# hadoop fs -ls hdfs://cloudera-node-1/
  14. Found 1 items
  15. drwxr-xr-x - root supergroup 0 2012-02-03 00:54 /test
  16. [root@cloudera-node-1 logs]#
复制代码
修改hadoop配置,使用外部ip
  1. vi /etc/hadoop/conf/core-site.xml

  2. <property>
  3. <name>fs.default.name</name>
  4. <value>hdfs://10.129.8.126:8020</value>
  5. </property>

  6. /etc/init.d/hadoop-0.20-namenode restart

  7. [root@cloudera-node-1 logs]# hadoop fs -ls hdfs://10.129.8.126/
  8. Found 1 items
  9. drwxr-xr-x - root supergroup 0 2012-02-03 00:54 /test
复制代码
设置访问权限:
  1. hadoop dfs -chmod 777 hdfs://10.129.8.126/flume/
  2. hadoop dfs -chmod 777 hdfs://10.129.8.126/flume/*
复制代码

126节点,启动flume

  1. flume node_nowatch
复制代码

打开flume master
http://10.129.8.125:35871/flumemaster.jsp

  1. cloudera-node-1 : text("/etc/services") | agentSink("10.129.8.125",35853);
  2. collector : collectorSource(35853) | collectorSink("hdfs://10.129.8.126/flume/","srcdata");
复制代码



Flume’s Tiered Event Sources
collectorSource[(port)]
Collector source. Listens for data from agentSinks forwarding to port port. If port is not specified, the node default collector TCP port, 35853.
!!
hadoop dfs -ls hdfs://10.129.8.126/flume/
125上报错:

  1. org.apache.hadoop.ipc.RemoteException: java.io.IOException: File /flume/srcdata20120203-013616957+0800.2438481505068540.00000021.tmp could only be replicated to 0 nodes, instead of 1
  2. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:1520)
  3. at org.apache.hadoop.hdfs.server.namenode.NameNode.addBlock(NameNode.java:665)
  4. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  5. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  6. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  7. at java.lang.reflect.Method.invoke(Method.java:597)
  8. at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:557)
  9. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1434)
  10. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1430)
  11. at java.security.AccessController.doPrivileged(Native Method)
  12. at javax.security.auth.Subject.doAs(Subject.java:396)
  13. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1157)
  14. at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1428)

  15. at org.apache.hadoop.ipc.Client.call(Client.java:1107)
  16. at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:226)
  17. at $Proxy6.addBlock(Unknown Source)
  18. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  19. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  20. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  21. at java.lang.reflect.Method.invoke(Method.java:597)
  22. at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
  23. at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
  24. at $Proxy6.addBlock(Unknown Source)
  25. at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.locateFollowingBlock(DFSClient.java:3178)
  26. at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:3047)
  27. at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$1900(DFSClient.java:2305)
  28. at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2500)
复制代码

vi /etc/hadoop/conf/hdfs-site.xml
设置replica为0.也不行
  1. <delete>
  2. 设置 vi /etc/hadoop/conf/hdfs-site.xml
  3. <property>
  4. <name>dfs.thrift.address</name>
  5. <value>10.129.8.126:10090</value>
  6. </property>
  7. </delete>
复制代码


vi /etc/hadoop/conf/masters
替换localhost为ip:10.129.8.126
还是不行,在125上手动执行upload操作

  1. vi a.txt
  2. hadoop dfs -put a.txt hdfs://10.129.8.126/flume/srcdata20120203-014405668+0800.2438950215947540.00000019.tmp.1
复制代码

报一样的错误,
在126上执行如上操作,报同样错误,MD
看来是datanode挂了,但是服务显示启动,重启试试。

  1. [root@cloudera-node-1 ~]# /etc/init.d/hadoop-0.20-datanode status
  2. datanode (pid 4866) is running...
  3. [root@cloudera-node-1 ~]# /etc/init.d/hadoop-0.20-datanode restart
  4. Stopping Hadoop datanode daemon (hadoop-datanode): stopping datanode
  5. datanode is stopped [ OK ]
  6. Starting Hadoop datanode daemon (hadoop-datanode): starting datanode, logging to /usr/lib/hadoop-0.20/logs/hadoop-hadoop-datanode-cloudera-node-1.out
  7. datanode (pid 8570) is running... [ OK ]
  8. [root@cloudera-node-1 ~]# vi /usr/lib/hadoop/logs/hadoop-hadoop-datanode-cloudera-node-1.log
  9. [root@cloudera-node-1 ~]# vi /usr/lib/hadoop/logs/hadoop-hadoop-datanode-cloudera-node-1.log
  10. [root@cloudera-node-1 ~]# vi /usr/lib/hadoop/logs/hadoop-hadoop-datanode-cloudera-node-1.log
  11. [root@cloudera-node-1 ~]# vi /usr/lib/hadoop/logs/hadoop-hadoop-datanode-cloudera-node-1.log
  12. [root@cloudera-node-1 ~]# hadoop dfs -put a.txt hdfs://10.129.8.126/flume/srcdata20120203-014405668+0800.2438950215947540.00000019.tmp.12
  13. put: Target hdfs://10.129.8.126/flume/srcdata20120203-014405668+0800.2438950215947540.00000019.tmp.12 already exists
  14. [root@cloudera-node-1 ~]# hadoop dfs -put a.txt hdfs://10.129.8.126/flume/srcdata20120203-014405668+0800.2438950215947540.00000019.tmp.123
  15. [root@cloudera-node-1 ~]#
复制代码


ok了。
如果报safemode了

  1. 2012-02-03 01:42:17,467 [logicalNode collector-19] INFO rolling.RollSink: closing RollSink 'escapedCustomDfs("hdfs://10.129.8.126/flume/","srcdata%{rolltag}" )'
  2. 2012-02-03 01:42:17,467 [logicalNode collector-19] INFO rolling.RollSink: opening RollSink 'escapedCustomDfs("hdfs://10.129.8.126/flume/","srcdata%{rolltag}" )'
  3. 2012-02-03 01:42:17,468 [logicalNode collector-19] INFO debug.InsistentOpenDecorator: Opened MaskDecorator on try 0
  4. 2012-02-03 01:42:17,469 [pool-7-thread-1] INFO hdfs.EscapedCustomDfsSink: Opening hdfs://10.129.8.126/flume/srcdata20120203-014217467+0800.2438842015436540.00000019
  5. 2012-02-03 01:42:17,476 [logicalNode collector-19] INFO debug.InsistentAppendDecorator: append attempt 3 failed, backoff (8000ms): org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot create file/flume/srcdata20120203-014217467+0800.2438842015436540.00000019.tmp. Name node is in safe mode.
  6. The number of live datanodes 0 needs an additional 1 live datanodes to reach the minimum number 1. Safe mode will be turned off automatically.
  7. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1182)
  8. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:1150)
  9. at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:597)
  10. at org.apache.hadoop.hdfs.server.namenode.NameNode.create(NameNode.java:576)
  11. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  12. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  13. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  14. at java.lang.reflect.Method.invoke(Method.java:597)
  15. at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:557)
  16. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1434)
  17. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1430)
  18. at java.security.AccessController.doPrivileged(Native Method)
  19. at javax.security.auth.Subject.doAs(Subject.java:396)
  20. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1157)
  21. at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1428)
复制代码

执行
hadoop dfsadmin -safemode leave
ok,再来一遍
125上面;
flume node_nowatch -n collector
126上面:
flume node_nowatch
ok搞定

  1. [root@flume-hadoop-node-1 log]# hadoop fs -ls hdfs://10.129.8.126/flume/
  2. Found 2 items
  3. -rw-r--r-- 3 root supergroup 11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413+0800.2440835961446540.00000021
  4. -rw-r--r-- 3 root supergroup 0 2012-02-03 02:18 /flume/srcdata20120203-021605232+0800.2440869780410540.00000023.tmp
  5. [root@flume-hadoop-node-1 log]# hadoop fs -ls hdfs://10.129.8.126/flume/
  6. Found 2 items
  7. -rw-r--r-- 3 root supergroup 11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413+0800.2440835961446540.00000021
  8. -rw-r--r-- 3 root supergroup 7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232+0800.2440869780410540.00000023
  9. [root@flume-hadoop-node-1 log]# hadoop fs -ls hdfs://10.129.8.126/flume/
  10. Found 2 items
  11. -rw-r--r-- 3 root supergroup 11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413+0800.2440835961446540.00000021
  12. -rw-r--r-- 3 root supergroup 7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232+0800.2440869780410540.00000023
  13. [root@flume-hadoop-node-1 log]# hadoop fs -tail hdfs://10.129.8.126/flume/srcdata20120203-021531413+0800.2440835961446540.00000021
  14. \t\t3881/udp\t\t\t# Data Acquisition and Control","timestamp":1328205987177,"pri":"INFO","nanos":100778763829457,"host":"cloudera-node-1","fields":{"AckTag":"20120203-020626329+0800.100777916519457.00000019","AckType":"msg","AckChecksum":"\u0000\u0000\u0000\u0000陋赂\u0010娄","rolltag":"20120203-021531413+0800.2440835961446540.00000021"}}
  15. {"body":"msdts1\t\t3882/tcp\t\t\t# DTS Service Port","timestamp":1328205987177,"pri":"INFO","nanos":100778763863457,"host":"cloudera-node-1","fields":{"AckTag":"20120203-020626329+0800.100777916519457.00000019","AckType":"msg","AckChecksum":"\u0000\u0000\u0000\u0000?隆w?","rolltag":"20120203-021531413+0800.2440835961446540.00000021"}}
  16. {"body":"msdts1\t\t3882/udp\t\t\t# DTS Service Port","timestamp":1328205987177,"pri":"INFO","nanos":100778763897457,"host":"cloudera-node-1","fields":{"AckTag":"20120203-020626329+0800.100777916519457.00000019","AckType":"msg","AckChecksum":"\u0000\u0000\u0000\u00005=?\u0002","rolltag":"20120203-021531413+0800.2440835961446540.00000021"}}
复制代码

新加flume node
126上面:
flume node_nowatch -n agentAB
flume-master页面上面添加配置
agentAB : text("/var/log/dmesg") | agentSink("10.129.8.125",35853);
OK,没有问题,下面试试默认配置
flume node_nowatch -n agentABC
agentABC : text("/tmp/medcl") | agentSink("10.129.8.125");
这个时候,
node status里面
agentABC        agentABC        flume-hadoop-node-1        OPENING        Fri Feb 03 02:31:11 CST 2012        3        Fri Feb 03 02:32:49 CST 2012
console端报错:

  1. 2012-02-03 02:31:14,823 [logicalNode agentABC-22] INFO connector.DirectDriver: Connector logicalNode agentABC-22 exited with error: /tmp/medcl (No such file or directory)
  2. java.io.FileNotFoundException: /tmp/medcl (No such file or directory)
  3. at java.io.RandomAccessFile.open(Native Method)
  4. at java.io.RandomAccessFile.<init>(RandomAccessFile.java:212)
  5. at java.io.RandomAccessFile.<init>(RandomAccessFile.java:98)
  6. at com.cloudera.flume.handlers.debug.TextFileSource.open(TextFileSource.java:75)
  7. at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:87)
  8. Exception in thread "logicalNode agentABC-22" java.lang.NullPointerException
  9. at com.cloudera.flume.handlers.debug.TextFileSource.close(TextFileSource.java:69)
  10. at com.cloudera.flume.core.connector.DirectDriver$PumperThread.ensureClosed(DirectDriver.java:183)
  11. at com.cloudera.flume.core.connector.DirectDriver$PumperThread.errorCleanup(DirectDriver.java:204)
  12. at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)
复制代码

创建文件
echo "hello world" > /tmp/medcl
继续失败着,不能自动恢复,只能重启node

  1. [root@cloudera-node-1 log]# hadoop dfs -tail hdfs://10.129.8.126/flume/srcdata20120203-023644240+0800.2442108787815540.00000021
  2. {"body":"hello world","timestamp":1328207806233,"pri":"INFO","nanos":2442110780978540,"host":"flume-hadoop-node-1","fields":{"AckTag":"20120203-023646196+0800.2442110743929540.00000022","AckType":"msg","AckChecksum":"\u0000\u0000\u0000\u0000\rJ\u0011?","rolltag":"20120203-023644240+0800.2442108787815540.00000021"}}
复制代码
  1. flume node_nowatch -n agentABCD
  2. agentABCD : text("/tmp/medcl") | agentSink("10.129.8.125");
复制代码


text sink只能执行一次,后续文件有变化,并不处理
tail就可以实现监听

  1. flume node_nowatch -n collector #如果collector已经关闭,需要重新打开,配置文件在前面
  2. flume node_nowatch -n agentABCDE
  3. agentABCDE : tail("/tmp/medcl") | agentSink("10.129.8.125");
复制代码

collector每30秒写一次hadoop,hadoop文件每次新建一个

  1. [root@flume-hadoop-node-1 tmp]# echo "happy new year">>medcl
  2. [root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
  3. Found 7 items
  4. -rw-r--r-- 3 root supergroup 11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413+0800.2440835961446540.00000021
  5. -rw-r--r-- 3 root supergroup 7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232+0800.2440869780410540.00000023
  6. -rw-r--r-- 3 root supergroup 197377 2012-02-03 02:25 /flume/srcdata20120203-022338788+0800.2441323335749540.00000021
  7. -rw-r--r-- 3 root supergroup 318 2012-02-03 02:38 /flume/srcdata20120203-023644240+0800.2442108787815540.00000021
  8. -rw-r--r-- 3 root supergroup 761621 2012-02-07 19:00 /flume/srcdata20120207-185754757+0800.2846579304755540.00000021
  9. -rw-r--r-- 3 root supergroup 336 2012-02-07 19:02 /flume/srcdata20120207-185954947+0800.2846699494856540.00000021
  10. -rw-r--r-- 3 root supergroup 329 2012-02-07 19:09 /flume/srcdata20120207-190658071+0800.2847122618653540.00000021
  11. [root@flume-hadoop-node-1 tmp]#
  12. [root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
  13. Found 8 items
  14. -rw-r--r-- 3 root supergroup 11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413+0800.2440835961446540.00000021
  15. -rw-r--r-- 3 root supergroup 7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232+0800.2440869780410540.00000023
  16. -rw-r--r-- 3 root supergroup 197377 2012-02-03 02:25 /flume/srcdata20120203-022338788+0800.2441323335749540.00000021
  17. -rw-r--r-- 3 root supergroup 318 2012-02-03 02:38 /flume/srcdata20120203-023644240+0800.2442108787815540.00000021
  18. -rw-r--r-- 3 root supergroup 761621 2012-02-07 19:00 /flume/srcdata20120207-185754757+0800.2846579304755540.00000021
  19. -rw-r--r-- 3 root supergroup 336 2012-02-07 19:02 /flume/srcdata20120207-185954947+0800.2846699494856540.00000021
  20. -rw-r--r-- 3 root supergroup 329 2012-02-07 19:09 /flume/srcdata20120207-190658071+0800.2847122618653540.00000021
  21. -rw-r--r-- 3 root supergroup 337 2012-02-07 19:12 /flume/srcdata20120207-190929343+0800.2847273890577540.00000021
  22. [root@flume-hadoop-node-1 tmp]# hadoop fs -get hdfs://10.129.8.126/flume/srcdata20120207-190929343+0800.2847273890577540.00000021 /tmp/lo2
复制代码

如果是替换文件内容,不是追加,第一条记录会造成丢失,此处应该特别注意(bug?)
  1. [root@flume-hadoop-node-1 tmp]# echo "who is your daddy?">medcl
  2. [root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
  3. Found 8 items
  4. -rw-r--r-- 3 root supergroup 11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413+0800.2440835961446540.00000021
  5. -rw-r--r-- 3 root supergroup 7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232+0800.2440869780410540.00000023
  6. -rw-r--r-- 3 root supergroup 197377 2012-02-03 02:25 /flume/srcdata20120203-022338788+0800.2441323335749540.00000021
  7. -rw-r--r-- 3 root supergroup 318 2012-02-03 02:38 /flume/srcdata20120203-023644240+0800.2442108787815540.00000021
  8. -rw-r--r-- 3 root supergroup 761621 2012-02-07 19:00 /flume/srcdata20120207-185754757+0800.2846579304755540.00000021
  9. -rw-r--r-- 3 root supergroup 336 2012-02-07 19:02 /flume/srcdata20120207-185954947+0800.2846699494856540.00000021
  10. -rw-r--r-- 3 root supergroup 329 2012-02-07 19:09 /flume/srcdata20120207-190658071+0800.2847122618653540.00000021
  11. -rw-r--r-- 3 root supergroup 337 2012-02-07 19:12 /flume/srcdata20120207-190929343+0800.2847273890577540.00000021
复制代码
再追加一条数据
  1. [root@flume-hadoop-node-1 tmp]# echo "here is a new line">>medcl
  2. [root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
  3. Found 9 items
  4. -rw-r--r-- 3 root supergroup 11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413+0800.2440835961446540.00000021
  5. -rw-r--r-- 3 root supergroup 7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232+0800.2440869780410540.00000023
  6. -rw-r--r-- 3 root supergroup 197377 2012-02-03 02:25 /flume/srcdata20120203-022338788+0800.2441323335749540.00000021
  7. -rw-r--r-- 3 root supergroup 318 2012-02-03 02:38 /flume/srcdata20120203-023644240+0800.2442108787815540.00000021
  8. -rw-r--r-- 3 root supergroup 761621 2012-02-07 19:00 /flume/srcdata20120207-185754757+0800.2846579304755540.00000021
  9. -rw-r--r-- 3 root supergroup 336 2012-02-07 19:02 /flume/srcdata20120207-185954947+0800.2846699494856540.00000021
  10. -rw-r--r-- 3 root supergroup 329 2012-02-07 19:09 /flume/srcdata20120207-190658071+0800.2847122618653540.00000021
  11. -rw-r--r-- 3 root supergroup 337 2012-02-07 19:12 /flume/srcdata20120207-190929343+0800.2847273890577540.00000021
  12. -rw-r--r-- 3 root supergroup 0 2012-02-07 19:19 /flume/srcdata20120207-191702865+0800.2847727413000540.00000021.tmp

  13. [root@flume-hadoop-node-1 tmp]# hadoop fs -ls hdfs://10.129.8.126/flume/
  14. Found 9 items
  15. -rw-r--r-- 3 root supergroup 11829304 2012-02-03 02:17 /flume/srcdata20120203-021531413+0800.2440835961446540.00000021
  16. -rw-r--r-- 3 root supergroup 7080210 2012-02-03 02:18 /flume/srcdata20120203-021605232+0800.2440869780410540.00000023
  17. -rw-r--r-- 3 root supergroup 197377 2012-02-03 02:25 /flume/srcdata20120203-022338788+0800.2441323335749540.00000021
  18. -rw-r--r-- 3 root supergroup 318 2012-02-03 02:38 /flume/srcdata20120203-023644240+0800.2442108787815540.00000021
  19. -rw-r--r-- 3 root supergroup 761621 2012-02-07 19:00 /flume/srcdata20120207-185754757+0800.2846579304755540.00000021
  20. -rw-r--r-- 3 root supergroup 336 2012-02-07 19:02 /flume/srcdata20120207-185954947+0800.2846699494856540.00000021
  21. -rw-r--r-- 3 root supergroup 329 2012-02-07 19:09 /flume/srcdata20120207-190658071+0800.2847122618653540.00000021
  22. -rw-r--r-- 3 root supergroup 337 2012-02-07 19:12 /flume/srcdata20120207-190929343+0800.2847273890577540.00000021
  23. -rw-r--r-- 3 root supergroup 341 2012-02-07 19:19 /flume/srcdata20120207-191702865+0800.2847727413000540.00000021
  24. [root@flume-hadoop-node-1 tmp]# hadoop fs -tail hdfs://10.129.8.126/flume/srcdata20120207-191702865+0800.2847727413000540.00000021
  25. {"body":"here is a new line","timestamp":1328613446703,"pri":"INFO","nanos":2847751251273540,"host":"flume-hadoop-node-1","fields":{"AckTag":"20120207-191720960+0800.2847745508415540.00000025","AckType":"msg","AckChecksum":"\u0000\u0000\u0000\u0000/rN?","tailSrcFile":"medcl","rolltag":"20120207-191702865+0800.2847727413000540.00000021"}}
复制代码
果然,数据丢了一条了。
ok,前面提到了flume使用3种工作模式来保证数据的可靠性与可用性:
1.End2End,2端确认,失败会自动重试(重试次数多少,重试失败之后怎样处理,还要继续研究)
agentE2ESink[("machine"[,port])]
2.DiskFailover,失败写本地磁盘,周期性检查,collector可用的时候,自动重做任务。
agentDFOSink[("machine"[,port])]
3.高效模式,collector失败就丢弃日志,够狠够绝
agentBESink[("machine"[,port])]
前面使用到的agentSink,是第一种End2End的别名,效果和End2End一样。
多收集器的配置
多个collector能够提高吞吐量,因为日志收集都是平行,前面提到过,为保证可靠性,如果collector挂了,agent需要写本地磁盘,然后周期性的去重新连接collector,另外,日志收集停止了,后面的日志处理与分析也歇菜了,这个可不行的。
多个collector就可以解决这个问题,汗!
另外多个collector中,如果其中一个挂了,agent应该是能够自动切换的,怎么配呢?
使用failover chains,
  1. agentA : src | agentE2EChain("collectorA:35853","collectorB:35853");
  2. agentB : src | agentE2EChain("collectorA:35853","collectorC:35853");
  3. agentC : src | agentE2EChain("collectorB:35853","collectorA:35853");
  4. agentD : src | agentE2EChain("collectorB:35853","collectorC:35853");
  5. agentE : src | agentE2EChain("collectorC:35853","collectorA:35853");
  6. agentF : src | agentE2EChain("collectorC:35853","collectorB:35853");
  7. collectorA : collectorSource(35853) | collectorSink("hdfs://...","src");
  8. collectorB : collectorSource(35853) | collectorSink("hdfs://...","src");
  9. collectorC : collectorSource(35853) | collectorSink("hdfs://...","src");
复制代码
如上配置,chain指定了2个,第一个collector失败了之后,自动切换使用第二个。
自动FailoverChain,主要是通过使用特殊的source和sink名字(多master下不适用)
source使用:
autoCollectorSource
sink使用:
autoE2EChain, autoDFOChain, or autoBEChain
配置为:
agentA : src | autoE2EChain ;
agentB : src | autoE2EChain ;
agentC : src | autoE2EChain ;
agentD : src | autoE2EChain ;
agentE : src | autoE2EChain ;
agentF : src | autoE2EChain ;
collectorA : autoCollectorSource | collectorSink("hdfs://...", "src");
collectorB : autoCollectorSource | collectorSink("hdfs://...", "src");
collectorC : autoCollectorSource | collectorSink("hdfs://...", "src");
Logical Configurations
一个physical node包含若干个logical node,logical node又分为:logical sources 和logical sinks ,使用flow来隔离nodes和分组
logical node允许一个JVM实例包含多个logical nodes,实现在一个JVM上跑多个Source和Sink的线程。
每个logical node的名称必须唯一,包括physical node 名称或者 host名称都不能相同
logical定义分两步,
1.定义node类型
agent1 : _source_ | autoBEChain ;
collector1 : autoCollectorSource | collectorSink("hdfs://....") ;
2.mapping logical node和 physical node
map host1 agent1
map host2 collector1
3.解除一个logical节点
decommission agent1
试试
  1. 125上
  2. 1004 cd /tmp/
  3. 1005 ls
  4. 1006 rm -rif flume-*
  5. 1007 /etc/init.d/flume-master restart
  6. 1008 /etc/init.d/flume-node star
复制代码
126上
  1. /etc/init.d/flume-node star
复制代码
flume master页面
config:


  1. agent1 : tail("/tmp/medcl") | autoBEChain ;
  2. collector1 : autoCollectorSource | collectorSink("hdfs://10.129.8.126/flume/","medcl") ;
复制代码

注:主机名-ip
cloudera-node-1:10.129.8.126
flume-hadoop-node-1:10.129.8.125
raw command:


  1. command: map
  2. arguments:10.129.8.125 agent1
  3. #flume-hadoop-node-1 agent1

  4. command: map
  5. arguments: 10.129.8.126 collector1
  6. #cloudera-node-1 collector1
  7. 试试解除
  8. map 10.129.8.125 agent2

  9. decommission agent2
复制代码

(注意空格,decommission两端不能有空格)
或者unmap和map操作来移动logicalnode

  1. unmap host2 collector1
  2. map host3 collector1
复制代码


抓包得到请求为:
curl -XPOST http://10.129.8.125:35871/mastersubmit.jsp -d'cmd=unmap&args=10.129.8.125+agent1'

注:logical sources和logical sinks在多master下不适用
通过logical source和logical sink可以在不知道具体物理节点的时候就进行流程的配置,flume有一种翻译的机制,会自动将logical节点名称替换成实际的主机名和端口
事实上,autoSinks和auto-Chain也是这样来实现的。
Flow 隔离,(注,多master下也不适用,悲催啊)
假设你需要收集一个物理机的多种数据,并存放到不同的地方,一种方式是对所有的数据打上tag,通过同一个管道来传数据,然后通过后处理来分离数据
另一种是在整个传输过程中通过将两两种数据隔离,避免后处理的产生
Flume两种都支持,并且延时很低,通过引入flow的概念,将节点进行分组,配置方式如下:
flume master页面:
raw commands
命令:config
参数:[logincal node] [flow name] fooSrc autoBEChain
实际例子:

  1. config AgentC myflow tail("/tmp/medcl") autoBEChain
  2. config CollectorC myflow autoCollectorSource collectorSink("hdfs://10.129.8.126/flume/","medcl_flow")

  3. map 10.129.8.125 AgentC
  4. map 10.129.8.126 CollectorC
复制代码

------------
1.问题:
fail( "logical node not mapped to physical node yet" )
1.使用主机名来做map,node status显示的是什么名称,map的时候就用什么名称
2.先map好logical node,然后再更新config配置
正常工作的配置,

  1. map cloudera-node-1 agent1
  2. map flume-hadoop-node-1 collector1

  3. agent1 : tail("/tmp/medcl") | agentSink("10.129.8.125",35853);
  4. collector1 : collectorSource(35853) | collectorSink("hdfs://10.129.8.126/flume/","medcl");
复制代码

多master配置
多master之间自动同步,一个master挂了,其下node会自动转移到其他master上去。
flume master有两种工作模式:standalone和distributed
如何配置呢?

  1. <property>
  2. <name>flume.master.servers</name>
  3. <value>hostA,hostB</value>
  4. </property>
复制代码

一个Host则是standalone模式,多个host即distributed模式【分布式模式下,每个master的配置文件必须一样】
另外,每个master必须要配置不同的serverid,如下:

  1. MaserA:
  2. <property>
  3. <name>flume.master.serverid</name>
  4. <value>0</value>
  5. </property>
  6. MasterB:
  7. <property>
  8. <name>flume.master.serverid</name>
  9. <value>1</value>
  10. </property>
复制代码

【数字和前面配置的服务器列表的下标保持一致即可】
分布式环境下,至少需要3台服务器来保证允许一台失败,如果要允许同时两台挂掉,则至少需要5台服务器
,如果master节点存活率不能超过总数的一半,整个flume master 集群就会block住,无法读写配置信息
flume master存放配置信息的地方叫做:configuration store,支持插拔,本身支持两种实现:
基于内存的:MBCS和基于ZooKeeper的:ZBCS
默认ZBCS,flume内置zookeeper,支持配置到现有的zookeeper集群去

  1. <property>
  2. <name>flume.master.store</name>
  3. <value>zookeeper</value>
  4. </property>
复制代码

【value值可选:zookeeper或者memory】
ZBCS配置
flume.master.zk.logdir:存储配置文件信息,更新日志,失败信息等
flume.master.zk.server.quorum.port:默认3182,zookeeper server本地监听
flume.master.zk.server.election.port:默认3183,zookeeper server用来寻找其它节点
flume.master.zk.client.port:默认3181,用来与zookeeper server通讯
FlumeMaster的gossip协议支持:

  1. <property>
  2. <name>flume.master.gossip.port</name>
  3. <value>57890</value>
  4. </property>
复制代码

分布式模型下,flume node的配置也需要调整,从连一个改成连接多个master
  1. <property>
  2. <name>flume.master.servers</name>
  3. <value>masterA,masterB,masterC</value>
  4. </property>
复制代码
flume node通过定期与master的端口做心跳检测,一旦master 连接失败,自动随机切换到剩下的可以连上的master上去。【master节点通过配置flume.master.heartbeat.port来配置心跳端口】
如果要使用外部的zookeeper,配置如下
conf/flume-site.xml.
  1. <property>
  2. <name>flume.master.zk.use.external</name>
  3. <value>true</value>
  4. </property>

  5. <property>
  6. <name>flume.master.zk.servers</name>
  7. <value>zkServerA:2181,zkServerB:2181,zkServerC:2181</value>
  8. </property>
复制代码
Flume与数据源集成
Flume强大就在于灵活,支持各种数据源,结构化的,非结构化的,半结构化等等
三种方式:
pushing、polling、embedding(嵌入flume组件到你的应用程序中)
Push Sources:
syslogTcp,syslogUdp:syslog,syslog-ng日志协议
scribe:scribe日志系统的协议
Polling:
tail,mulitail:监视文件内容的追加信息
exec:适合从现有系统抽取数据
poller:收集来着flume node本身的信息
Flume Event的数据模型
6个主要的字段;
Unix timestamp
Nanosecond timestamp 【纳秒级别的时间戳】
Priority
Source host
Body
Metadata table with an arbitrary number of attribute value pairs.
所有的event都有这几个字段,不过body长度可能为0,metadata表可能为空。
priority :TRACE, DEBUG, INFO, WARN, ERROR, or FATAL,这几种
body:raw格式,默认最大32KB,多余的截掉,通过参数flume.event.max.size.bytes来进行配置
使用event的字段来自定义输出位置
collectorSink("hdfs://namenode/flume/webdata/%H00/", "%{host}-")
%H 为时间timestamp字段里的小时,host为field里面的主机名
快速参考:
[horizontal] %{host}
host
%{nanos}
nanos
%{priority}
priority string
%{body}
body
%%
a % character.
%t
Unix time in millis
时间比较特殊,直接使用,不需要{}
collectorSink("hdfs://namenode/flume/webdata/%Y-%m-%d/%H00/", "web-")
快速参考:
%a
locale’s short weekday name (Mon, Tue, …)
%A
locale’s full weekday name (Monday, Tuesday, …)
%b
locale’s short month name (Jan, Feb,…)
%B
locale’s long month name (January, February,…)
%c
locale’s date and time (Thu Mar 3 23:05:25 2005)
%d
day of month (01)
%D
date; same as %m/%d/%y
%H
hour (00..23)
%I
hour (01..12)
%j
day of year (001..366)
%k
hour ( 0..23)
%l
hour ( 1..12)
%m
month (01..12)
%M
minute (00..59)
%P
locale’s equivalent of am or pm
%s
seconds since 1970-01-01 00:00:00 UTC
%S
second (00..60)
%y
last two digits of year (00..99)
%Y
year (2010)
%z
+hhmm numeric timezone (for example, -0400)
输出文件格式
两种方式:
一直是在 flume-site.xml里面设置默认值,另外是由特定的sink来决定
1.flume-site.xml
flume.collector.output.format
格式快速参考
avro
Avro Native file format. Default currently is uncompressed.
avrodata
Binary encoded data written in the avro binary format.
avrojson
JSON encoded data generated by avro.
default
a debugging format.
json
JSON encoded data.
log4j
a log4j pattern similar to that used by CDH output pattern.
raw
Event body only. This is most similar to copying a file but does not preserve any uniqifying metadata like host/timestamp/nanos.
syslog
a syslog like text output format.
seqfile
the binary hadoop Sequence file format with WritableEventKeys keys, and WritableEvent as values.
2.分别配置
  1. collectorSink( "dfsdir","prefix"[, rollmillis[, format]])
  2. text("file"[,format])
  3. formatDfs("hdfs://nn/file" [, format])
  4. escapedFormatDfs("hdfs://nn/file" [, format])
复制代码
压缩seqfile
formatDfs("hdfs://nn/dir/file", seqfile("bzip2"))
HDFS大量小文件与高延迟的处理
Flume两种策略来处理
1.合并小文件到大的文件
2.使用CombinedFileInputFormat
  1. <property>
  2. <name>flume.collector.dfs.compress.codec</name>
  3. <value>None</value>
  4. <description>Writes formatted data compressed in specified codec to
  5. dfs. Value is None, GzipCodec, DefaultCodec (deflate), BZip2Codec,
  6. or any other Codec Hadoop is aware of </description>
  7. </property>
复制代码
seqfile和avrodata支持内部的压缩,具体再研究
DataFlow定义语言
Fan out,往所有sinks写:
[ console, collectorSink ]
Fail over,当前失败,转移到下一个,尝试候选sink:
  1. < logicalSink("collector1") ? logicalSink("collector2") >
复制代码
配置样例:
  1. agent1 : source | < logicalSink("collector1") ? logicalSink("collector2") > ;
复制代码

Roll sink,每隔一段时间,关闭当前实例,创建新的实例,每次会创建新的独立的文件:
roll(millis) sink
配置样例:

  1. roll(1000) [ console, escapedCustomDfs("hdfs://namenode/flume/file-%{rolltag}") ]
复制代码
Sink Decorators,sink装饰器
Fan out和Failover影响messages去哪里,但不修改数据,如果要过滤数据什么的,使用sink decorator
sink decorator可以做很多事情,如可以给数据流添加属性,可以通过写ahead 日志来确保可靠性,或者通过批量、压缩来提供网络吞吐,抽样甚至轻量级的分析
flumenode: source | intervalSampler(10) sink;
flumenode: source | batch(100) sink;
flumenode: source | batch(100) gzip sink;
collector(15000) { escapedCustomDfs("xxx","yyy-%{rolltag}") }
collector(15000) { [ escapedCustomDfs("xxx","yyy-%{rolltag}"), hbase("aaa", "bbb-%{rolltag}"), elasticSearch("eeee","ffff") ] } 【同时往3个sink里面写数据,可能有些是持久化的,有些是瞬时的,都成功之后,才会确认成功】
node1 : tail("foo") | ackedWriteAhead batch(100) gzip lazyOpen stubbornAppend logicalSink("bar");【write ahead,批量100,gzip压缩】
Metadata支持正则来进行抽取
支持类似select语法来筛选
thriftSink and thriftSource
扩展与插件
http://archive.cloudera.com/cdh/ ... of_flume_extensions
附录真是好啊
http://archive.cloudera.com/cdh/ ... lume_source_catalog
  1. map cloudera-node-1 agent2
  2. agent2 : syslogTcp(2012) | agentSink("10.129.8.125",35853);

  3. flume node_nowatch -n medcl
  4. agent2 : syslogTcp(2012) | agentSink("10.129.8.125",35853);
复制代码
测试syslog信息
  1. 1.NC连接
  2. nc 10.129.8.126 2012

  3. 2.输入syslog消息(遵照格式:http://blog.csdn.net/xcj0535/article/details/4158624)
  4. <165>Aug 24 05:34:00 CST 1987 mymachine myproc[10]: %% It's
  5. time to make the do-nuts. %% Ingredients: Mix=OK, Jelly=OK #
  6. Devices: Mixer=OK, Jelly_Injector=OK, Frier=OK # Transport:
  7. Conveyer1=OK, Conveyer2=OK # %%

  8. <1> medcl is back


  9. syslog的格式

  10. 下面是一个syslog消息:
  11. <30>Oct 9 22:33:20 hlfedora auditd[1787]: The audit daemon is exiting.
  12. 其中“<30>”是PRI部分,“Oct 9 22:33:20 hlfedora”是HEADER部分,“auditd[1787]: The audit daemon is exiting.”是MSG部分。



  13. [root@cloudera-node-1 ~]# hadoop fs -cat /flume/medcl20120209-221925655+0800.3031470203471540.00000026
  14. {"body":"medcl is back","timestamp":1328797314800,"pri":"INFO","nanos":692106386851457,"host":"cloudera-node-1","fields":{"AckTag":"20120209-222148285+0800.692099872659457.00000037","syslogfacility":"\u0001","AckType":"msg","AckChecksum":"\u0000\u0000\u0000\u0000qu锚茫","syslogseverity":"\u0003","rolltag":"20120209-221925655+0800.3031470203471540.00000026"}}


  15. upload到HDFS的文件包含了太多内容
  16. raw下

  17. collector2 : syslogTcp( 2013)         | collectorSink( "hdfs://10.129.8.126/flume/", "medcl_raw",3000,raw );


  18. C:\Windows\system32>nc 10.129.8.125 2013
  19. <1> i will be back
  20. <1> i will be back2
  21. <1> i will be back3
  22. <1> i will be back4

  23. [root@cloudera-node-1 ~]# hadoop fs -cat /flume/medcl_raw20120209-235000888+0800.3036905435701540.00000069
  24. i will be back
  25. i will be back2
  26. i will be back3
  27. i will be back4
复制代码

.NET Agent 25个线程,结果压趴下了[另外后续测试发现经常无原因socket断开,服务端socket直接挂掉,flume显示error]。
2012-02-10 21:29:44,154 ERROR com.cloudera.flume.core.connector.DirectDriver: Exiting driver logicalNode collector2-20 in error state SyslogTcpSourceThreads | Collector because null
syslogTcp不稳定,果断换thriftRpc作为Source,经测果然很稳定




  1. thrift-0.6.0.exe -r -gen csharp flume.thrift

  2. 2012-02-13 23:36:30,574 [pool-4-thread-1] ERROR server.TSaneThreadPoolServer: Thrift error occurred during processing of message.
  3. org.apache.thrift.protocol.TProtocolException: Missing version in readMessageBegin, old client?
  4. at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:213)
  5. at com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer$Processor.process(ThriftFlumeEventServer.java:224)
  6. at org.apache.thrift.server.TSaneThreadPoolServer$WorkerProcess.run(TSaneThreadPoolServer.java:280)
  7. at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
  8. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
  9. at java.lang.Thread.run(Thread.java:619)
复制代码

此异常可能是因为服务端和客户端使用了不相同的transport,如framed和buffered不匹配
  1. collector3 : thriftSource( 2014 )| collectorSink( "hdfs://10.129.8.126/flume/", "medcl_thrift",60000,raw );
  2. collector4 : thriftSource( 2015 )| collectorSink( "hdfs://10.129.8.126/flume/", "medcl_thrift",60000,raw );
  3. collector5 : thriftSource( 2016 )| collectorSink( "hdfs://10.129.8.126/flume/", "medcl_thrift",60000,raw );
  4. collector6 : thriftSource( 2017 )| collectorSink( "hdfs://10.129.8.126/flume/", "medcl_thrift",60000,raw );
  5. collector7 : thriftSource( 2018 )| collectorSink( "hdfs://10.129.8.126/flume/", "medcl_thrift2",30000);

  6. map cloudera-node-1         collector7
复制代码
vi flume-site.xml,添加压缩和默认roll时间
  1. <property>
  2. <name>flume.collector.dfs.compress.gzip</name>
  3. <value>true</value>
  4. <description>Writes compressed output in gzip format to dfs. value is
  5. boolean type, i.e. true/false</description>
  6. </property>

  7. <property>
  8. <name>flume.collector.roll.millis</name>
  9. <value>60000</value>
  10. <description>The time (in milliseconds)
  11. between when hdfs files are closed and a new file is opened
  12. (rolled).
  13. </description>
  14. </property>
复制代码
测试文件模板
  1. collector8 : thriftSource( 2019 )| collectorSink("hdfs://10.129.8.126/flume/app/%{host}/%Y-%m-%d/", "%H%M%S-test1-%t",5000);
  2. map cloudera-node-1         collector8

  3. [root@flume-hadoop-node-1 ~]# hadoop fs -lsr hdfs://10.129.8.126/flume/app
  4. drwxr-xr-x - flume supergroup 0 2012-02-17 00:49 /flume/app/MEDCL-THINK
  5. drwxr-xr-x - flume supergroup 0 2012-02-17 00:49 /flume/app/MEDCL-THINK/4113221-02-12
  6. -rw-r--r-- 1 flume supergroup 219 2012-02-17 00:49 /flume/app/MEDCL-THINK/4113221-02-12/203942-test1-12973855419598268720120217-004946767+0800.1305778353827457.00006891
复制代码
更新
  1. collector8 : thriftSource( 2019 )| collectorSink("hdfs://10.129.8.126/flume/%{catalog}/2012-%m/%d/", "%a-%{host}-",5000,raw());
复制代码

结果:

  1. /flume/FileTemplateRaw/2012-11/19/Fri-MEDCL-THINK-20120217-013005302+0800.1308196889416457.00007109
复制代码
  1. collector8 : thriftSource( 2019 )| collectorSink("hdfs://10.129.8.126/flume/%{catalog}/2012", "",5000,raw());
复制代码

















没找到任何评论,期待你打破沉寂

关闭

推荐上一条 /2 下一条