分享

flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题

pig2 2017-8-21 08:59:59 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 23829
本帖最后由 pig2 于 2017-10-13 14:53 编辑
问题导读
1.对于不断追加的文件可以使用flume哪个属性?
2.对于不断追加的文件及变化的文件个数,可是使用flume哪个属性?
3.该如何配置能够搜集网站日志的flume?




上篇文章
flume与kafka整合高可靠教程2:flume与kafka整合安装
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22173



本文的背景:
在搜集日志的过程中,日志文件的个数及日志文件需要不断的追加。flume1.6中,可以使用tail -f可以解决不断追加的文件,但是由于日志文件的个数是变化的,不可能只产生一个文件。所以tail -f就已经不能解决这个搜集日志的问题。

需求:
需要能够监控文件,并且追加文件,同时文件个数也是不断变化的。

解决办法:
这时候flume1.7就产生了,很好的通过 TAILDIRl解决了这个问题。TAILDIRl可以监控一个目录下的文件。

官网地址:http://flume.apache.org/FlumeUserGuide.html

官网文档截图:




上面加粗为常用属性。

这里我们只使用了下面两个属性
a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.*
a1.sources.source1.type = TAILDIR

flume1.7安装包
链接:http://pan.baidu.com/s/1c1Pzo9i 密码:fxa4



一、Flume安装

1. 压缩安装包[mw_shl_code=bash,true]tar -zxvf ~/jar/apache-flume-1.7.0-bin.tar.gz -C /data
mv /data/apache-flume-1.7.0-bin/ /data/flume-1.7.0 # 重命名[/mw_shl_code]


2. 配置环境变量
[mw_shl_code=bash,true]
echo -e "export FLUME_HOME=/data/flume-1.7.0\nexport PATH=\$FLUME_HOME/bin:\$PATH" >> ~/.bashrc
source ~/.bashrc[/mw_shl_code]

3. 配置flume

[mw_shl_code=bash,true]
cp flume-env.sh.template flume-env.sh修改JAVA_HOME
export JAVA_HOME= /data/jdk1.8.0_111
[/mw_shl_code]


4. 验证安装
[mw_shl_code=bash,true]flume-ng version
[/mw_shl_code]



二、Flume使用

一个agent由source、channel、sink组成。这儿我们使用Spooling Directory Source、File Channel、Kafka Sink。

1. 单节点的agent
1) 增加配置文件
[mw_shl_code=bash,true]cd $FLUME_HOME/conf
vim single_agent.conf[/mw_shl_code]


将以下内容拷贝进去
[mw_shl_code=bash,true]

# agent的名称为a1
a1.sources = source1
a1.channels = channel1
a1.sinks = sink1

# set source
#a1.sources.source1.type = spooldir
a1.sources.source1.type = TAILDIR
a1.sources.source1.filegroups = f1
a1.sources.source1.filegroups.f1 = /data/aboutyunlog/.*log.*
#a1.sources.source1.spoolDir=/data/aboutyunlog
a1sources.source1.fileHeader = flase

# set sink
a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
#a1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.brokerList= master:9092,slave1:9092,slave2:9092
a1.sinks.sink1.topic= aboutyunlog
a1.sinks.sink1.kafka.flumeBatchSize = 20
a1.sinks.sink1.kafka.producer.acks = 1
a1.sinks.sink1.kafka.producer.linger.ms = 1
a1.sinks.sink1.kafka.producer.compression.type = snappy

# set channel
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /data/flume_data/checkpoint
a1.channels.channel1.dataDirs= /data/flume_data/data

# bind
a1.sources.source1.channels = channel1
a1.sinks.sink1.channel = channel1[/mw_shl_code]


2. 创建所需文件

[mw_shl_code=bash,true]mkdir -p /data/aboutyunlog
mkdir -p /data/flume_data/checkpoint
mkdir -p /data/flume_data/data[/mw_shl_code]



3. 查看kafka现有的topic

[mw_shl_code=bash,true]kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --list
[/mw_shl_code]

4. 在kafka上创建名为aboutyunlog的topic
[mw_shl_code=bash,true]
kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic aboutyunlog --replication-factor 1 --partitions 3
[/mw_shl_code]
5. 启动flume
[mw_shl_code=bash,true]
flume-ng agent --conf-file /data/flume-1.7.0/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console[/mw_shl_code]
启动过程中控制台会输出很多日志。

6. 创建一个kafka的consumer
[mw_shl_code=bash,true]
kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181  --topic aboutyunlog --from-beginning[/mw_shl_code]

这条命令的意思是说创建aboutyunlog这个topic下的消费者,消费时从最开始的一条信息开始消费。

上图说明该消费者创建成功,由于本地/data/aboutyunlog目录下没有新文件加入,造成aboutyunlog这个topic没有信息输入,所以消费者没有得到一条信息。

7.  添加文件到flume source目录


[mw_shl_code=bash,true]
echo -e "this is a test file! \nhttp://www.aboutyun.com20170820"
mv log.1 /data/aboutyunlog/[/mw_shl_code]
为:echo -e "this is a test file! \nhttp://www.aboutyun.com20170820">log.1
再次执行
[mw_shl_code=bash,true]
echo -e "this is a test file! \nhttp://www.aboutyun.com20170820">log.2[/mw_shl_code]




然后我们看到

master上


注意:需要通过xshell链接两个master。也就是打开两个master界面

8. 再次查看kafka consumer


切换到创建kafka consumer的shell界面,会看到我们log.1\log.2中文件的内容被打印在屏幕上。


上图说明我们已经成功使用flume监控/data/aboutyunlog目录,并将监控目录中的内容发送到kafka的aboutyunlog主题中。

注意:如果使用flume1.6会找不到类。

[mw_shl_code=bash,true]17/08/17 19:21:08 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: TAILDIR, class: TAILDIR
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:69)
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:42)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: TAILDIR
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
        ... 11 more
[/mw_shl_code]
所以需更换flume1.7



本帖被以下淘专辑推荐:

已有(5)人评论

跳转到指定楼层
天上有et 发表于 2017-9-5 22:25:50
新人求教,没权限发帖,请见谅啊~~

我使用flume1.7,想实现将kafka中的数据传输到hdfs中,分别采用kafka source,file channal和hdfs sink。现在kafka中要求采用avro的文件格式,hdfs中要求为parquet文件格式。
我查阅了官方文档,有几点不明白:
1.我使用flume直接将avro格式的数据获取进来,应该不用再做特殊处理吧?
2.flume1.7中的hdfs文件格式支持 SequenceFile, DataStream 或 CompressedStream,我应该选择哪一种?需要二次开发转化成parquet格式吗?
谢谢啊!!

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条