分享

flume+kafka+hdfs疑问

本想搭建一个 flume+hdfs+kafka+storm+mysql 的日志实时分析和存储的系统,但是flume日志收集这块一直不通,查看flume的日志也没有报错,不知道该怎么解决了,求大家帮帮忙,贴出集群配置和配置文件如下:
共5台机器:node1~node5,其中node3~node5为日志收集的agent,node1~node2为flume的collector,最终存储两份,一份到kafka,一份到hdfs。
agent的配置文件如下:

#def
agent.sources = src_spooldir
agent.channels = file memory
agent.sinks = collector_avro1 collector_avro2

# sources
agent.sources.src_spooldir.type = spooldir
agent.sources.src_spooldir.channels = file memory
agent.sources.src_spooldir.spoolDir = /data/flume/spoolDir
agent.sources.src_spooldir.selector.type = multiplexing
agent.sources.src_spooldir.fileHeader = true

# channels
agent.channels.file.type = file
agent.channels.file.checkpointDir = /data/flume/checkpoint
agent.channels.file.dataDirs = /data/flume/data
agent.channels.memory.type = memory
agent.channels.memory.capacity = 10000
agent.channels.memory.transactionCapacity = 10000
agent.channels.memory.byteCapacityBufferPercentage = 20
agent.channels.memory.byteCapacity = 800000

# sinks
agent.sinks.collector_avro1.type = avro
agent.sinks.collector_avro1.channel = file
agent.sinks.collector_avro1.hostname = node1
agent.sinks.collector_avro1.port = 45456
agent.sinks.collector_avro2.type = avro
agent.sinks.collector_avro2.channel = memory
agent.sinks.collector_avro2.hostname = node2
agent.sinks.collector_avro2.port = 4545


collector端的配置文件如下:

#def
agent.sources = src_avro
agent.channels = file memory
agent.sinks = hdfs kafka

# sources
agent.sources.src_avro.type = avro
agent.sources.src_avro.channels = file memory
agent.sources.src_avro.bind = node1
agent.sources.src_avro.port = 45456
agent.sources.src_avro.selector.type = replicating

# channels
agent.channels.file.type = file
agent.channels.file.checkpointDir = /data/flume/checkpoint
agent.channels.file.dataDirs = /data/flume/data
agent.channels.memory.type = memory
agent.channels.memory.capacity = 10000
agent.channels.memory.transactionCapacity = 10000
agent.channels.memory.byteCapacityBufferPercentage = 20
agent.channels.memory.byteCapacity = 800000

# sinks
agent.sinks.hdfs.type = hdfs
agent.sinks.hdfs.channel = file
agent.sinks.hdfs.hdfs.path = hdfs://node1/flume/events/%y-%m-%d/%H%M/%S
agent.sinks.hdfs.hdfs.filePrefix = log_%Y%m%d_%H
agent.sinks.hdfs.hdfs.fileSuffix = .txt
agent.sinks.hdfs.hdfs.useLocalTimeStamp = true
agent.sinks.hdfs.hdfs.writeFormat = Text
agent.sinks.hdfs.hdfs.rollCount = 0
agent.sinks.hdfs.hdfs.rollSize = 1024
agent.sinks.hdfs.hdfs.rollInterval = 0

agent.sinks.kafka.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka.channel = memory
agent.sinks.kafka.kafka.topic = test
agent.sinks.kafka.kafka.bootstrap.servers = node3:9092,node4:9092,node5:9092
agent.sinks.kafka.kafka.flumeBatchSize = 20
agent.sinks.kafka.kafka.producer.acks = 1
agent.sinks.kafka.kafka.producer.linger.ms = 1
agent.sinks.kafka.kafka.producer.compression.type = snappy

最终 hdfs和kafka都没有接收到数据。


已有(5)人评论

跳转到指定楼层
qcbb001 发表于 2018-5-12 21:41:03
agent.channels = file memory
agent.sinks = collector_avro1 collector_avro2
为啥还有空格?而且file memory似乎没见过。

回复

使用道具 举报

qing_D7BWs 发表于 2018-5-12 22:59:44
qcbb001 发表于 2018-5-12 21:41
agent.channels = file memory
agent.sinks = collector_avro1 collector_avro2
为啥还有空格?而且file  ...

这是两个 chennel 和 两个 sink

# List the sources, sinks and channels for the agent<Agent>.sources = <Source1><Agent>.sinks = <Sink1> <Sink2><Agent>.channels = <Channel1> <Channel2>
这是官网原话


回复

使用道具 举报

hello2018 发表于 2018-5-13 08:28:38
qing_D7BWs 发表于 2018-5-12 22:59
这是两个 chennel 和 两个 sink

# List the sources, sinks and channels for the agent.sources = . ...

这个配置不简单。
有两点疑问
1.agent.sources.src_spooldir.selector.type = multiplexing

这个常识配置为replicating

2.滚动的方式有点大。

配置的方式,可以暂时去掉collector,首先确定是agent的问题,还是collector的问题

回复

使用道具 举报

qing_D7BWs 发表于 2018-5-13 09:10:25
hello2018 发表于 2018-5-13 08:28
这个配置不简单。
有两点疑问
1.agent.sources.src_spooldir.selector.type = multiplexing

我这个配置主要是参考了美团点评的flume架构,主要是做负载均衡和高可用,我试了很长时间,主要是配置完 agent.sources.src_spooldir.selector.type = multiplexing 这个以后,还需要配置相应的过滤条件,只是条件要怎样配置,我还一直都没搞明白,大概意思就是根据 event中的header进行筛选,来判断该event要发送到哪个channel

回复

使用道具 举报

qing_D7BWs 发表于 2018-5-13 12:43:16
整整一天半加半个晚上终于搞明白了,collector端的配置没有问题,主要是agent端的问题,agent.sources.src_spooldir.selector.type = multiplexing,这个配置主要是在source端对channel进行选择,还需要一些额外的配置项,来定义规则对Event进行分发。也可以直接配置 <Agent>.sources.<Source1>.selector.default = <Channel2> 默认会把所有的 Event 发送到<Channel2>,如果配置了多个channel,就会复制分发。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条