分享

flume学习(十一):如何使用Spooling Directory Source

本帖最后由 坎蒂丝_Swan 于 2015-3-19 22:12 编辑

问题导读


1.如何理解将FTP上的信令数据汇聚到HDFS上去存储?

2.使用Spooling Directory Source的时候同时读写一个文件会怎样?








最近在弄一个信令数据汇聚的事情,主要目的是把FTP上的信令数据汇聚到HDFS上去存储。 逻辑是这样的:把FTP服务器上的文件下载到一台主机上,然后SCP到另外一台主机上的Spooling Directory Source所监控的目录下面去,sink是hdfs(这里解释一下,由于网络环境的因素,另一台不能访问到内网的FTP服务器,所以只能这样中转一下)。


嗯,想法不错,逻辑上看上去也应该没啥问题,于是就开始吭哧吭哧写脚本了。FTP上每个信令数据的每个文件的大小差不多都有300M左右。SCP到远端服务器也没出现问题,可就是agent老是会挂掉,报这个异常:

  1. 2014-11-26 12:30:16,942 ERROR org.apache.flume.source.SpoolDirectorySource: FATAL: Spool Directory source source1: { spoolDir: /var/log/apache/flumeSpool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
  2. java.nio.charset.MalformedInputException: Input length = 1
  3.         at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
  4.         at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
  5.         at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:134)
  6.         at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:72)
  7.         at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:91)
  8.         at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:241)
  9.         at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224)
  10.         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
  11.         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
  12.         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
  13.         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  14.         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  15.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  16.         at java.lang.Thread.run(Thread.java:745)
复制代码

然后让我重启agent才会把Spooling Directory Source所监控的目录下面的文件抽取到HDFS上去,感觉很莫名,网上搜索了一下这个错误的原因,很多都是说可能传输的文件字符集的原因,不以为然,因为我反复测试了一下,如果是字符集的原因,那么为什么我重启一下agent又可以成功的抽取数据了。



于是我想了想是不是由于同时读写导致的问题,因为我SCP文件过去,文件较大,需要一定的时间,而flume监测到有文件马上就开始逐行读取文件转化成EVENT发送到HDFS上去,这中间肯定存在同时读写一个文件了,然后就产生的这个异常问题?


目前仅仅是猜测,于是我修改了Spooling Directory Source的配置,加了这么一个配置:

tier1.sources.source1.ignorePattern = ^(.)*\\.tmp$


就是忽略监控目录下面的.tmp文件。然后我修改了scp的逻辑,拷贝到另一台主机上时,先命名为:原文件名.tmp(由于是.tmp文件,agent不会采集此类文件),等SCP执行成功之后,在mv这个.tmp文件,去掉.tmp后缀,这样agent又会抽取这个文件的数据了,通过这么一处理,就巧妙的避免了同时读写一个文件的问题。


脚本调整好之后,重新运行脚本,惊喜的发现成功了,这次agent没有挂掉,大功告成了。


总结:使用Spooling Directory Source的时候,一定要避免同时读写一个文件的情况。采用上面提到的方法就可以巧妙的避开这个问题。




flume学习(五):flume将log4j日志数据写入到hdfs
flume学习(六):使用hive来分析flume收集的日志数据
flume学习(七)、(八):如何使用event header中的key值以及自定义source
flume学习(九):自定义拦截器
flume学习(十):使用Morphline Interceptor
flume学习(十一):如何使用Spooling Directory Source




欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(4)人评论

跳转到指定楼层
为了明天time 发表于 2015-9-18 14:55:33
厉害 学习了
回复

使用道具 举报

笑风 发表于 2016-4-3 19:44:47
求大神指点下,采用Spooling Directory Source监控目录文件夹,文件处理不过来怎么搞?
回复

使用道具 举报

墨小黑 发表于 2017-5-5 17:11:30
求大神指导,SpooldirSources,fileChannels,HDFSSink,配置如下:
agent1.sources = spooldirSource
agent1.channels = fileChannel
agent1.sinks = hdfsSink

#配置sources,即被监听的源目录
agent1.sources.spooldirSource.type = spooldir
agent1.sources.spooldirSource.spoolDir = /tmp/testflumesource
agent1.sources.spooldirSource.channels = fileChannel
agent1.sources.spooldirSource.basenameHeader = true
agent1.sources.spooldirSource.basenameHeaderKey = basename
agent1.sources.spooldirSource.deletePolicy = immediate
# agent1.sources.spooldirSource.batchSize = 1000
#数据源编码格式
agent1.sources.spooldirSource.inputCharset = gbk
agent1.sources.spooldirSOurce.ignorePattern = ^(.)*\\.tmp$

#配置sinks,即目的目录
agent1.sinks.hdfsSink.type = hdfs
agent1.sinks.hdfsSink.hdfs.path = hdfs://node1.hde.h3c.com:8020/user/flume/%{topic}/%Y%m%d/%H%M
agent1.sinks.hdfsSink.fileType = DataStream
agent1.sinks.hdfsSink.writeFormat = Text
agent1.sinks.hdfsSink.hdfs.filePrefix = %{basename}
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true

# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.hdfsSink.hdfs.rollInterval = 1
# File size to trigger roll, in bytes (0: never roll based on file size)
agent1.sinks.hdfsSink.hdfs.rollSize = 12800000
agent1.sinks.hdfsSink.hdfs.rollCount = 0
agent1.sinks.hdfsSink.hdfs.batchSize = 1000
agent1.sinks.hdfsSink.channel = fileChannel

#channels,通道目录配置:把文件事件持久化到本地硬盘上
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop/apache-flume-1.6.0-bin/checkpoint
agent1.channels.fileChannel.dataDirs = /home/hadoop/apache-flume-1.6.0-bin/dataDir
agent1.channels.fileChannel.capacity = 2000000
agent1.channels.fileChannel.transactionCapacity = 2000
agent1.channels.fileChannel.keep-alive = 6

启动flume后,将大量文件copy到spoolDir下,只传了几个文件后就被打断了,然后我重启flume,会继续接着传输,但是传输大概100多个文件后又被打断了,再重启,flume就报错了。
尝试的方案:先将文件copy到spooldir,然后启动flume,大概150个文件后flume就被打断,再重启后还是会打断。
请问怎么处理啊!!!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条