分享

flume 可以sink到本地文件目录么

aqi915 发表于 2015-6-19 15:21:02 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 12 80664
因在测试环境,想测试下flume,目的是监听本地a目录下新文件,实时放到本地b目录,测试环境没有hadoop集群。需要用什么配置文件呢,可以具体点么

已有(12)人评论

跳转到指定楼层
muyannian 发表于 2015-6-19 15:36:59
应该应该可以的。
这样配置试下
[mw_shl_code=bash,true]a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
[/mw_shl_code]


############################################

File Roll Sink
Stores events on the local filesystem. Required properties are in bold.

Property NameDefaultDescription
channel

type
The component type name, needs to be file_roll.
sink.directory
The directory where files will be stored
sink.rollInterval
30
Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file.
sink.serializer
TEXT
Other possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize
100








回复

使用道具 举报

aqi915 发表于 2015-6-20 12:19:05
muyannian 发表于 2015-6-19 15:36
应该应该可以的。
这样配置试下
a1.channels = c1

目的:检测本地目录(/home/see/seeupdate/blacklistOperRecord)中有新的文件,将它搬到本地指定目录(/home/see/seeupdate/blacklistOperRecord/temp)

改了您的方法,报错如下。
执行语句:
[mw_shl_code=bash,true]/home/see/flume/flume/bin/flume-ng agent -n mobileAgent -c /home/see/flume/flume/conf/ -f /home/see/flume/flume/conf/mobileAgent.conf -Dflume.root.logger=DEBUG,console &[/mw_shl_code]

报错:
FJXM-DM-TEST-NAG-01 flume/bin> /home/see/flume/flume/bin/flume-ng agent -n mobileAgent -c /home/see/flume/flume/conf/ -f /home/see/flume/flume/conf/mobileAgent.conf -Dflume.root.logger=DEBUG,console &
[1] 126950
FJXM-DM-TEST-NAG-01 flume/bin> Info: Sourcing environment configuration script /home/see/flume/flume/conf/flume-env.sh
+ exec /home/see/seeupdate/jdk1.6.0_16/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/home/see/flume/flume/conf:/home/see/flume/flume/lib/*' -Djava.library.path= org.apache.flume.node.Application -n mobileAgent -f /home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 00:00:51,367 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2015-06-20 00:00:51,372 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2015-06-20 00:00:51,376 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes
2015-06-20 00:00:51,377 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 00:00:51,384 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 00:00:51,385 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for hdfsSinkContentDownload: type
2015-06-20 00:00:51,385 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 00:00:51,385 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: hdfsSinkContentDownload Agent: mobileAgent
2015-06-20 00:00:51,385 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 00:00:51,385 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: mobileAgent, initial-configuration: AgentConfiguration[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{directory=/home/see/seeupdate/blacklistOperRecord/temp, type=file_roll, channel=memoryChannelContentDownload} }}

2015-06-20 00:00:51,390 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:468)] Created channel memoryChannelContentDownload
2015-06-20 00:00:51,397 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: hdfsSinkContentDownload using FILE_ROLL
2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:371)] Post validation configuration for mobileAgent
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{directory=/home/see/seeupdate/blacklistOperRecord/temp, type=file_roll, channel=memoryChannelContentDownload} }}

2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:memoryChannelContentDownload

2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks hdfsSinkContentDownload

2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources spooldirSourceContentDownload

2015-06-20 00:00:51,401 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [mobileAgent]
2015-06-20 00:00:51,401 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2015-06-20 00:00:51,410 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel memoryChannelContentDownload type memory
2015-06-20 00:00:51,415 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel memoryChannelContentDownload
2015-06-20 00:00:51,416 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source spooldirSourceContentDownload, type spooldir
2015-06-20 00:00:51,427 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: hdfsSinkContentDownload, type: file_roll
2015-06-20 00:00:51,431 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:432)] Sink hdfsSinkContentDownload has been removed due to an error during configuration
java.lang.IllegalArgumentException: Directory may not be null
        at com.google.common.base.Preconditions.checkArgument(Preconditions.java:88)
        at org.apache.flume.sink.RollingFileSink.configure(RollingFileSink.java:84)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
2015-06-20 00:00:51,433 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel memoryChannelContentDownload connected to [spooldirSourceContentDownload]
2015-06-20 00:00:51,441 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{spooldirSourceContentDownload=EventDrivenSourceRunner: { source:Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord } }} sinkRunners:{} channels:{memoryChannelContentDownload=org.apache.flume.channel.MemoryChannel{name: memoryChannelContentDownload}} }
2015-06-20 00:00:51,451 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel memoryChannelContentDownload
2015-06-20 00:00:51,496 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: memoryChannelContentDownload: Successfully registered new MBean.
2015-06-20 00:00:51,497 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: memoryChannelContentDownload started
2015-06-20 00:00:51,497 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source spooldirSourceContentDownload
2015-06-20 00:00:51,499 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)] SpoolDirectorySource source starting with directory: /home/see/seeupdate/blacklistOperRecord
2015-06-20 00:00:51,507 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:132)] Initializing ReliableSpoolingFileEventReader with directory=/home/see/seeupdate/blacklistOperRecord, metaDir=.flumespool, deserializer=LINE
2015-06-20 00:00:51,527 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:154)] Successfully created and deleted canary file: /home/see/seeupdate/blacklistOperRecord/flume-spooldir-perm-check-713511486621893148.canary
2015-06-20 00:00:51,529 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:110)] SpoolDirectorySource source started
2015-06-20 00:00:51,530 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: spooldirSourceContentDownload: Successfully registered new MBean.
2015-06-20 00:00:51,530 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: spooldirSourceContentDownload started
2015-06-20 00:00:51,807 (pool-3-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
        at java.nio.charset.CoderResult.throwException(CoderResult.java:260)
        at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
        at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:134)
        at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:72)
        at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:91)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:238)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
2015-06-20 00:01:21,499 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes



配置文件:
JXM-DM-TEST-NAG-01 flume/conf> more mobileAgent.conf
mobileAgent.sources = spooldirSourceContentDownload
mobileAgent.channels = memoryChannelContentDownload
mobileAgent.sinks = hdfsSinkContentDownload
############contentDownload####################################
mobileAgent.sources.spooldirSourceContentDownload.type = spooldir
mobileAgent.sources.spooldirSourceContentDownload.spoolDir =/home/see/seeupdate/blacklistOperRecord
mobileAgent.sources.spooldirSourceContentDownload.deletePolicy = immediate
mobileAgent.sources.spooldirSourceContentDownload.fileHeader = false
mobileAgent.sources.spooldirSourceContentDownload.maxBackoff = 60000
mobileAgent.sources.spooldirSourceContentDownload.basenameHeader = true
mobileAgent.sources.spooldirSourceContentDownload.basenameHeaderKey = basename
#mobileAgent.sources.spooldirSourceContentDownload.ignorePattern = ^(.)*\\.tmp$

mobileAgent.channels.memoryChannelContentDownload.type = memory
mobileAgent.channels.memoryChannelContentDownload.keep-alive = 30
mobileAgent.channels.memoryChannelContentDownload.capacity = 2000
mobileAgent.channels.memoryChannelContentDownload.transactionCapacity =1000

mobileAgent.sinks.hdfsSinkContentDownload.type = file_roll
mobileAgent.sinks.hdfsSinkContentDownload.directory =/home/see/seeupdate/blacklistOperRecord/temp
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.path = hdfs://10.46.192.173:1220/DataBase/Flume/blacklist/20%y%m%d/contentDownload
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.filePrefix = %{basename}
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.fileSuffix = .dat
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.writeFormat = Text
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.fileType = DataStream
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollInterval = 0
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollSize = 0
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollCount = 0
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.batchSize = 1
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.callTimeout = 30000
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.useLocalTimeStamp = true
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollTimerPoolSize = 1
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.maxOpenFiles = 1
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.idleTimeout= 60

mobileAgent.sinks.hdfsSinkContentDownload.channel = memoryChannelContentDownload
mobileAgent.sources.spooldirSourceContentDownload.channels = memoryChannelContentDownload


回复

使用道具 举报

aqi915 发表于 2015-6-20 12:27:20
muyannian 发表于 2015-6-19 15:36
应该应该可以的。
这样配置试下
[mw_shl_code=bash,true]a1.channels = c1

额,刚回复不成功么?

我flume 的目的是:检测本地目录新文件(/home/see/seeupdate/blacklistOperRecord),将其搬到本地目录(/home/see/seeupdate/blacklistOperRecord/temp)

执行语句
/home/see/flume/flume/bin/flume-ng agent -n mobileAgent -c /home/see/flume/flume/conf/ -f /home/see/flume/flume/conf/mobileAgent.conf -Dflume.root.logger=DEBUG,console &

报错
[mw_shl_code=bash,true]FJXM-DM-TEST-NAG-01 flume/bin> /home/see/flume/flume/bin/flume-ng agent -n mobileAgent -c /home/see/flume/flume/conf/ -f /home/see/flume/flume/conf/mobileAgent.conf -Dflume.root.logger=DEBUG,console &
[1] 126950
FJXM-DM-TEST-NAG-01 flume/bin> Info: Sourcing environment configuration script /home/see/flume/flume/conf/flume-env.sh
+ exec /home/see/seeupdate/jdk1.6.0_16/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/home/see/flume/flume/conf:/home/see/flume/flume/lib/*' -Djava.library.path= org.apache.flume.node.Application -n mobileAgent -f /home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 00:00:51,367 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2015-06-20 00:00:51,372 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2015-06-20 00:00:51,376 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes
2015-06-20 00:00:51,377 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 00:00:51,384 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 00:00:51,385 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for hdfsSinkContentDownload: type
2015-06-20 00:00:51,385 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 00:00:51,385 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: hdfsSinkContentDownload Agent: mobileAgent
2015-06-20 00:00:51,385 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 00:00:51,385 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: mobileAgent, initial-configuration: AgentConfiguration[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{directory=/home/see/seeupdate/blacklistOperRecord/temp, type=file_roll, channel=memoryChannelContentDownload} }}

2015-06-20 00:00:51,390 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:468)] Created channel memoryChannelContentDownload
2015-06-20 00:00:51,397 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: hdfsSinkContentDownload using FILE_ROLL
2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:371)] Post validation configuration for mobileAgent
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{directory=/home/see/seeupdate/blacklistOperRecord/temp, type=file_roll, channel=memoryChannelContentDownload} }}

2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:memoryChannelContentDownload

2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks hdfsSinkContentDownload

2015-06-20 00:00:51,399 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources spooldirSourceContentDownload

2015-06-20 00:00:51,401 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [mobileAgent]
2015-06-20 00:00:51,401 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2015-06-20 00:00:51,410 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel memoryChannelContentDownload type memory
2015-06-20 00:00:51,415 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel memoryChannelContentDownload
2015-06-20 00:00:51,416 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source spooldirSourceContentDownload, type spooldir
2015-06-20 00:00:51,427 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: hdfsSinkContentDownload, type: file_roll
2015-06-20 00:00:51,431 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:432)] Sink hdfsSinkContentDownload has been removed due to an error during configuration
java.lang.IllegalArgumentException: Directory may not be null
        at com.google.common.base.Preconditions.checkArgument(Preconditions.java:88)
        at org.apache.flume.sink.RollingFileSink.configure(RollingFileSink.java:84)
        at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
2015-06-20 00:00:51,433 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel memoryChannelContentDownload connected to [spooldirSourceContentDownload]
2015-06-20 00:00:51,441 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{spooldirSourceContentDownload=EventDrivenSourceRunner: { source:Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord } }} sinkRunners:{} channels:{memoryChannelContentDownload=org.apache.flume.channel.MemoryChannel{name: memoryChannelContentDownload}} }
2015-06-20 00:00:51,451 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel memoryChannelContentDownload
2015-06-20 00:00:51,496 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: memoryChannelContentDownload: Successfully registered new MBean.
2015-06-20 00:00:51,497 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: memoryChannelContentDownload started
2015-06-20 00:00:51,497 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source spooldirSourceContentDownload
2015-06-20 00:00:51,499 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)] SpoolDirectorySource source starting with directory: /home/see/seeupdate/blacklistOperRecord
2015-06-20 00:00:51,507 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:132)] Initializing ReliableSpoolingFileEventReader with directory=/home/see/seeupdate/blacklistOperRecord, metaDir=.flumespool, deserializer=LINE
2015-06-20 00:00:51,527 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:154)] Successfully created and deleted canary file: /home/see/seeupdate/blacklistOperRecord/flume-spooldir-perm-check-713511486621893148.canary
2015-06-20 00:00:51,529 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:110)] SpoolDirectorySource source started
2015-06-20 00:00:51,530 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: spooldirSourceContentDownload: Successfully registered new MBean.
2015-06-20 00:00:51,530 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: spooldirSourceContentDownload started
2015-06-20 00:00:51,807 (pool-3-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
        at java.nio.charset.CoderResult.throwException(CoderResult.java:260)
        at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
        at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:134)
        at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:72)
        at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:91)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:238)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
2015-06-20 00:01:21,499 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes[/mw_shl_code]


配置文件
[mw_shl_code=bash,true]FJXM-DM-TEST-NAG-01 flume/conf> more mobileAgent.conf
mobileAgent.sources = spooldirSourceContentDownload
mobileAgent.channels = memoryChannelContentDownload
mobileAgent.sinks = hdfsSinkContentDownload
############contentDownload####################################
mobileAgent.sources.spooldirSourceContentDownload.type = spooldir
mobileAgent.sources.spooldirSourceContentDownload.spoolDir =/home/see/seeupdate/blacklistOperRecord
mobileAgent.sources.spooldirSourceContentDownload.deletePolicy = immediate
mobileAgent.sources.spooldirSourceContentDownload.fileHeader = false
mobileAgent.sources.spooldirSourceContentDownload.maxBackoff = 60000
mobileAgent.sources.spooldirSourceContentDownload.basenameHeader = true
mobileAgent.sources.spooldirSourceContentDownload.basenameHeaderKey = basename
#mobileAgent.sources.spooldirSourceContentDownload.ignorePattern = ^(.)*\\.tmp$

mobileAgent.channels.memoryChannelContentDownload.type = memory
mobileAgent.channels.memoryChannelContentDownload.keep-alive = 30
mobileAgent.channels.memoryChannelContentDownload.capacity = 2000
mobileAgent.channels.memoryChannelContentDownload.transactionCapacity =1000

mobileAgent.sinks.hdfsSinkContentDownload.type = file_roll
mobileAgent.sinks.hdfsSinkContentDownload.directory =/home/see/seeupdate/blacklistOperRecord/temp
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.path = hdfs://10.46.192.173:1220/DataBase/Flume/blacklist/20%y%m%d/contentDownload
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.filePrefix = %{basename}
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.fileSuffix = .dat
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.writeFormat = Text
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.fileType = DataStream
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollInterval = 0
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollSize = 0
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollCount = 0
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.batchSize = 1
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.callTimeout = 30000
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.useLocalTimeStamp = true
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.rollTimerPoolSize = 1
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.maxOpenFiles = 1
#mobileAgent.sinks.hdfsSinkContentDownload.hdfs.idleTimeout= 60

mobileAgent.sinks.hdfsSinkContentDownload.channel = memoryChannelContentDownload
mobileAgent.sources.spooldirSourceContentDownload.channels = memoryChannelContentDownload
[/mw_shl_code]

回复

使用道具 举报

aqi915 发表于 2015-6-20 12:28:00
什么情况,回复内容都为空了呢
回复

使用道具 举报

Alkaloid0515 发表于 2015-6-20 12:50:59
aqi915 发表于 2015-6-20 12:19
目的:检测本地目录(/home/see/seeupdate/blacklistOperRecord)中有新的文件,将它搬到本地指定目录(/ ...


下面红字是自定义???
mobileAgent.sinks.hdfsSinkContentDownload.channel = memoryChannelContentDownload
mobileAgent.sources.spooldirSourceContentDownload.channels = memoryChannelContentDownload

回复

使用道具 举报

aqi915 发表于 2015-6-20 13:10:34
Alkaloid0515 发表于 2015-6-20 12:50
下面红字是自定义???
mobileAgent.sinks.hdfsSinkContentDownload.channel = memoryChannelContent ...

恩恩,是吧,这样不可以么?
回复

使用道具 举报

Alkaloid0515 发表于 2015-6-20 13:14:46
aqi915 发表于 2015-6-20 13:10
恩恩,是吧,这样不可以么?

org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: hdfsSinkContentDownload, type: file_roll
2015-06-20 00:00:51,431 (conf-file-poller-0) [ERROR - org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:432)] Sink hdfsSinkContentDownload has been removed due to an error during configuration
java.lang.IllegalArgumentException: Directory may not be null

楼主最好试试没有二次开发前,采用这样的配置,如果没有错误。
说明楼主二次开发的,有些问题

回复

使用道具 举报

aqi915 发表于 2015-6-20 16:08:34
Alkaloid0515 发表于 2015-6-20 13:14
org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of  ...

我flume 的目的是:检测本地目录新文件(/home/see/seeupdate/blacklistOperRecord),将其搬到本地目录(/home/see/seeupdate/blacklistOperRecord/temp)

执行语句
/home/see/flume/flume/bin/flume-ng agent -n mobileAgent -c /home/see/flume/flume/conf/ -f /home/see/flume/flume/conf/mobileAgent.conf -Dflume.root.logger=DEBUG,console &
报错(现在会在目标目录生成空文件 /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-3  
FJXM-DM-TEST-NAG-01 flume/conf> Info: Sourcing environment configuration script /home/see/flume/flume/conf/flume-env.sh
+ exec /home/see/seeupdate/jdk1.6.0_16/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/home/see/flume/flume/conf:/home/see/flume/flume/lib/*' -Djava.library.path= org.apache.flume.node.Application -n mobileAgent -f /home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 03:56:40,560 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2015-06-20 03:56:40,567 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2015-06-20 03:56:40,571 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes
2015-06-20 03:56:40,572 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:/home/see/flume/flume/conf/mobileAgent.conf
2015-06-20 03:56:40,582 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 03:56:40,582 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for hdfsSinkContentDownload: type
2015-06-20 03:56:40,582 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 03:56:40,582 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfsSinkContentDownload
2015-06-20 03:56:40,582 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: hdfsSinkContentDownload Agent: mobileAgent
2015-06-20 03:56:40,583 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: mobileAgent, initial-configuration: AgentConfiguration[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{type=file_roll, channel=memoryChannelContentDownload, sink.directory=/home/see/seeupdate/blacklistOperRecord/temp} }}

2015-06-20 03:56:40,587 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:468)] Created channel memoryChannelContentDownload
2015-06-20 03:56:40,594 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: hdfsSinkContentDownload using FILE_ROLL
2015-06-20 03:56:40,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:371)] Post validation configuration for mobileAgent
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[mobileAgent]
SOURCES: {spooldirSourceContentDownload={ parameters:{maxBackoff=60000, basenameHeaderKey=basename, fileHeader=false, basenameHeader=true, channels=memoryChannelContentDownload, spoolDir=/home/see/seeupdate/blacklistOperRecord, type=spooldir, deletePolicy=immediate} }}
CHANNELS: {memoryChannelContentDownload={ parameters:{transactionCapacity=1000, capacity=2000, keep-alive=30, type=memory} }}
SINKS: {hdfsSinkContentDownload={ parameters:{type=file_roll, channel=memoryChannelContentDownload, sink.directory=/home/see/seeupdate/blacklistOperRecord/temp} }}

2015-06-20 03:56:40,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:memoryChannelContentDownload

2015-06-20 03:56:40,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks hdfsSinkContentDownload

2015-06-20 03:56:40,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources spooldirSourceContentDownload

2015-06-20 03:56:40,598 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [mobileAgent]
2015-06-20 03:56:40,598 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2015-06-20 03:56:40,606 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel memoryChannelContentDownload type memory
2015-06-20 03:56:40,610 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel memoryChannelContentDownload
2015-06-20 03:56:40,611 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source spooldirSourceContentDownload, type spooldir
2015-06-20 03:56:40,621 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: hdfsSinkContentDownload, type: file_roll
2015-06-20 03:56:40,626 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel memoryChannelContentDownload connected to [spooldirSourceContentDownload, hdfsSinkContentDownload]
2015-06-20 03:56:40,633 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{spooldirSourceContentDownload=EventDrivenSourceRunner: { source:Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord } }} sinkRunners:{hdfsSinkContentDownload=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2d58f9d3 counterGroup:{ name:null counters:{} } }} channels:{memoryChannelContentDownload=org.apache.flume.channel.MemoryChannel{name: memoryChannelContentDownload}} }
2015-06-20 03:56:40,644 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel memoryChannelContentDownload
2015-06-20 03:56:40,703 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: memoryChannelContentDownload: Successfully registered new MBean.
2015-06-20 03:56:40,704 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: memoryChannelContentDownload started
2015-06-20 03:56:40,704 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:160)] Waiting for channel: memoryChannelContentDownload to start. Sleeping for 500 ms
2015-06-20 03:56:41,204 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink hdfsSinkContentDownload
2015-06-20 03:56:41,206 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source spooldirSourceContentDownload
2015-06-20 03:56:41,206 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.RollingFileSink.start(RollingFileSink.java:104)] Starting org.apache.flume.sink.RollingFileSink{name:hdfsSinkContentDownload, channel:memoryChannelContentDownload}...
2015-06-20 03:56:41,207 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)] SpoolDirectorySource source starting with directory: /home/see/seeupdate/blacklistOperRecord
2015-06-20 03:56:41,208 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: hdfsSinkContentDownload: Successfully registered new MBean.
2015-06-20 03:56:41,208 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: hdfsSinkContentDownload started
2015-06-20 03:56:41,211 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.RollingFileSink.start(RollingFileSink.java:136)] RollingFileSink hdfsSinkContentDownload started.
2015-06-20 03:56:41,215 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting
2015-06-20 03:56:41,215 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:167)] Opening output stream for file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-1
2015-06-20 03:56:41,227 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:132)] Initializing ReliableSpoolingFileEventReader with directory=/home/see/seeupdate/blacklistOperRecord, metaDir=.flumespool, deserializer=LINE
2015-06-20 03:56:41,258 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:154)] Successfully created and deleted canary file: /home/see/seeupdate/blacklistOperRecord/flume-spooldir-perm-check-7155634312227911932.canary
2015-06-20 03:56:41,259 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:110)] SpoolDirectorySource source started
2015-06-20 03:56:41,260 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: spooldirSourceContentDownload: Successfully registered new MBean.
2015-06-20 03:56:41,260 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: spooldirSourceContentDownload started
2015-06-20 03:56:41,530 (pool-3-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
        at java.nio.charset.CoderResult.throwException(CoderResult.java:260)
        at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
        at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:134)
        at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:72)
        at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:91)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:238)
        at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
2015-06-20 03:57:11,207 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes
2015-06-20 03:57:11,210 (rollingFileSink-roller-13-0) [DEBUG - org.apache.flume.sink.RollingFileSink$1.run(RollingFileSink.java:127)] Marking time to rotate file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-1
2015-06-20 03:57:12,238 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:142)] Time to rotate /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-1
2015-06-20 03:57:12,238 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:145)] Closing file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-1
2015-06-20 03:57:12,239 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:167)] Opening output stream for file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-2
2015-06-20 03:57:41,208 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:/home/see/flume/flume/conf/mobileAgent.conf for changes
2015-06-20 03:57:41,210 (rollingFileSink-roller-13-0) [DEBUG - org.apache.flume.sink.RollingFileSink$1.run(RollingFileSink.java:127)] Marking time to rotate file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-2
2015-06-20 03:57:44,239 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:142)] Time to rotate /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-2
2015-06-20 03:57:44,240 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:145)] Closing file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-2
2015-06-20 03:57:44,240 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:167)] Opening output stream for file /home/see/seeupdate/blacklistOperRecord/temp/1434787000623-3


配置文件

mobileAgent.sources = spooldirSourceContentDownload
mobileAgent.channels = memoryChannelContentDownload
mobileAgent.sinks = hdfsSinkContentDownload
############contentDownload####################################
mobileAgent.sources.spooldirSourceContentDownload.type = spooldir
mobileAgent.sources.spooldirSourceContentDownload.spoolDir =/home/see/seeupdate/blacklistOperRecord
mobileAgent.sources.spooldirSourceContentDownload.deletePolicy = immediate
mobileAgent.sources.spooldirSourceContentDownload.fileHeader = false
mobileAgent.sources.spooldirSourceContentDownload.maxBackoff = 60000
mobileAgent.sources.spooldirSourceContentDownload.basenameHeader = true
mobileAgent.sources.spooldirSourceContentDownload.basenameHeaderKey = basename


mobileAgent.channels.memoryChannelContentDownload.type = memory
mobileAgent.channels.memoryChannelContentDownload.keep-alive = 30
mobileAgent.channels.memoryChannelContentDownload.capacity = 2000
mobileAgent.channels.memoryChannelContentDownload.transactionCapacity =1000


mobileAgent.sinks.hdfsSinkContentDownload.type = file_roll
mobileAgent.sinks.hdfsSinkContentDownload.sink.directory =/home/see/seeupdate/blacklistOperRecord/temp


mobileAgent.sinks.hdfsSinkContentDownload.channel = memoryChannelContentDownload

mobileAgent.sources.spooldirSourceContentDownload.channels = memoryChannelContentDownload


原目录文件

FJXM-DM-TEST-NAG-01 seeupdate/blacklistOperRecord> ll
total 36
-rw-r--r-- 1 see seegroup    10 2015-06-20 15:54 adj.txt
-rwx--x--x 1 see seegroup 27606 2015-06-19 19:30 s21211.txt

drwxr-xr-x 2 see seegroup  4096 2015-06-20 16:02 temp



回复

使用道具 举报

bob007 发表于 2015-6-20 16:48:40
aqi915 发表于 2015-6-20 16:08
我flume 的目的是:检测本地目录新文件(/home/see/seeupdate/blacklistOperRecord),将其搬到本地目录 ...

[ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source spooldirSourceContentDownload: { spoolDir: /home/see/seeupdate/blacklistOperRecord }:

错误有所变化
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条