分享

flume应该思考的问题

问题导读

1.flume的配置你是如何理解的?
2.flume与kafka整合,kafka可以做哪些组件?
3.flume与kafka的区别是什么?

flume是比较常用的大数据技术,那么学习flume,我们还需要思考flume,这样理解才能在遇到问题的时候,更容易解决,使用起来更加的得心应手。下面介绍了flume的相关内容及个人的理解。

flume应用
一般来讲,我们接触flume可能更早一些。flume如何安装可参考
让你快速认识flume及安装和使用flume1.5传输数据(日志)到hadoop2.2
http://www.aboutyun.com/forum.php?mod=viewthread&tid=7949
如果你安装测试过flume,可以知道flume可以传递数据到另外的地方。比如我们可以传递本地文件到hadoop文件,比如搜集日志到hadoop,然后通过mapreduce或则spark处理。这也是比较常见的。


flume解析
flume有哪些内容,我们刚开始学习的时候,几乎都是复制黏贴的方式。对于它们几乎不怎么理解,或则只是停留在表面的理解。所以导致我们产生异常或则错误的时候,就不知道怎么解决了。
这里解析下flume,可以知道我们在干什么,我们遇到错误的时候,能够知道哪里出现了问题。

channel的作用
flume传递数据,包含三个组件:source,channel,sink.
那么如果我们来开发flume,我们会如何开发。好像channel这个不是必须的。因为有了数据源source和数据传递目标sink,应该就可以了。为何还需要channel。感觉channel是多此一举。
从正常的角度来说channel确实是不需要的。但是有一个前提,source和sink要保持同步。也就说,source发送一条数据,sink需要立即消费和保存一条数据。
下图是正常flume




下图是去掉channel的flume。如果一旦数据源频率过快,sink来不及消费保存数据,那么就会造成丢失数据。
flume.jpg

如何定制flume
一个灵活的程序,都是可以配置的,最常见的是xml格式文件,当然也可以是其它格式,普通txt也是可以的。所以我们看到无论是那种开源技术,都是可以配置的。甚至对于刚入门的初学者来说,就认为配置文件是必须的。
所以我们这里所说的定制,是对flume的的定义。那么flume该如何定制。
那就是通过对应source、channel、sink的定义。
这里我们只接贴出配置文件
[mw_shl_code=xml,true]agent1表示代理名称
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1


#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/usr/aboutyunlog
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false

#配置sink1
agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=hdfs://master:8020/aboutyunlog
agent1.sinks.sink1.hdfs.fileType=DataStream
agent1.sinks.sink1.hdfs.writeFormat=TEXT
agent1.sinks.sink1.hdfs.rollInterval=4
agent1.sinks.sink1.channel=channel1


#配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/usr/aboutyun_tmp123
agent1.channels.channel1.dataDirs=/usr/aboutyun_tmp[/mw_shl_code]
这里解析下上面的配置:首先是对flume的agent的配置。对于代理取了一个名字agent1。
agent1里面包含三个组件,这三个组件也分别取一个名字:source1,sink1,channel1
agent1表示代理名称
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1
我们为什么给他们取名字,是为了方便下面我们给他们定义。不取名字会带来什么后果。如下图,如果多个channel或则sink,我们就无法区分和定义了。


取名之后,我们分别对他们定义。
[mw_shl_code=xml,true]#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/usr/aboutyunlog
agent1.sources.source1.channels=channel1
agent1.sources.source1.fileHeader = false[/mw_shl_code]
先来解析一条
agent1.sources.source1.type=spooldir
上面是说agent1的数据源,source1的类型是spooldir。上面看上去很复杂,但是其实就定义了那么几项:
spoolDir,channels,fileHeader 分别是目录,使用哪个channel,及是否添加Header等信息。

sink同样的道理
[mw_shl_code=xml,true]#配置sink1
agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.hdfs.path=hdfs://master:8020/aboutyunlog
agent1.sinks.sink1.hdfs.fileType=DataStream
agent1.sinks.sink1.hdfs.writeFormat=TEXT
agent1.sinks.sink1.hdfs.rollInterval=4
agent1.sinks.sink1.channel=channel1[/mw_shl_code]
配置了sink1的类型,hdfs路径,file类型,格式,滚动时间,使用channel等。
通过上面,我们或许就明白了,flume的各种配置。也能轻易读懂别人是如何配置的。


flume与kafka整合
flume与kafka整合应该是用的比较多的,而且这也是一个难点。这里只是简单说下。
1.kafka作为数据源
kafka作为数据源其实kafka消费者,从kafka topic读取消息。如果你有多个kafka数据源运行,你可以配置他们为同一个Consumer Group。它们只能读取topics的一个分区.
这里只介绍下一些必须的属性,更多可参考官网
属性名称:
type值为 org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers值为kafka作为数据源的broker的列表。格式为host:端口例如localhost:9092
kafka.consumer.group.id:这个不是必须的。默认为flume。
kafka.topics:kafka消费者从topics 列表读取消息
kafka.topics.regex:定义了一组topic.比 kafka.topics有更高的优先级.是对kafka.topics的重写。
过时的属性
topic,groupId,zookeeperConnect分别替换为
kafka.topics,kafka.consumer.group.id,kafka.bootstrap.servers 链接 kafka cluster
下面举两个例子
多个topic的配置
[mw_shl_code=bash,true]tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id[/mw_shl_code]
使用正则订阅topic
[mw_shl_code=bash,true]tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used[/mw_shl_code]
当然还有使用认证的订阅,大家可以参考官网
http://flume.apache.org/FlumeUserGuide.html

2.Kafka作为Sink
也就是说flume将数据发送到kakfa的topic。支持的kafka的版本为Kafka 0.9.x,0.8.x不在支持。

属性介绍
type必须设置为org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers:连接的broker 列表
kafka.topic:默认default-flume-topic,发布数据所到的topic
过时的属性
brokerList替换为kafka.bootstrap.servers
topic替换为kafka.topic
batchSize替换为kafka.flumeBatchSize
requiredAcks替换为kafka.producer.acks
配置例子
[mw_shl_code=bash,true]a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy[/mw_shl_code]


3.kafka作为channel
events存储在kafka集群,kafka提供高可用和副本,因此如果客户端或则kafka broker崩溃的话,可以立即使用其它sinks。Kafka channel可以用于多种场景。1.Flume source 和sink--为events提供可靠及高可用的channel
2.lume source 和过滤器,没有sink.--为其它应用程序,允许写flume events到kafka topic
3. Flume sink, 和没有source--它是低延迟,容错的方式去发送events,从kafka到Flume sinks 比如HDFS, HBase or Solr
flume整合需要的kafka的版本为 0.9 及以后版本。channel 配置发生了变化,比以前的flume 版本。

flume兼容以前版本,但是过时的属性会有警告。
属性

type值为org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers值为hostname:port,可以是brokers list的一部分。但是值得注意的至少两个,这样可以高可用。
kafka.topic默认为flume-channel        
过时的属性
brokerList替换为hostname:port
topic替换为kafka.topic
groupId替换为 kafka.consumer.group.id
readSmallestOffset替换为  kafka.consumer.auto.offset.reset
例子:
[mw_shl_code=bash,true]a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer[/mw_shl_code]



除了上面kafka与flume整合,很多人对于kafka和flume有些混淆,下面内容,贴出来分享给大家
网上有一些好的内容,拿出来分享给大家
Kafka 与 Flume 很多功能确实是重复的。以下是评估两个系统的一些建议:
Kafka 是一个通用型系统。你可以有许多的生产者和消费者分享多个主题。相反地,Flume 被设计成特定用途的工作,特定地向 HDFS 和 HBase 发送出去。Flume 为了更好地为 HDFS 服务而做了特定的优化,并且与 Hadoop 的安全体系整合在了一起。基于这样的结论,Hadoop 开发商 Cloudera 推荐如果数据需要被多个应用程序消费的话,推荐使用 Kafka,如果数据只是面向 Hadoop 的,可以使用 Flume。

Flume 拥有许多配置的来源 (sources) 和存储池 (sinks)。然后,Kafka 拥有的是非常小的生产者和消费者环境体系,Kafka 社区并不是非常支持这样。如果你的数据来源已经确定,不需要额外的编码,那你可以使用 Flume 提供的 sources 和 sinks,反之,如果你需要准备自己的生产者和消费者,那你需要使用 Kafka。

Flume 可以在拦截器里面实时处理数据。这个特性对于过滤数据非常有用。Kafka 需要一个外部系统帮助处理数据。

无论是 Kafka 或是 Flume,两个系统都可以保证不丢失数据。然后,Flume 不会复制事件。相应地,即使我们正在使用一个可以信赖的文件通道,如果 Flume agent 所在的这个节点宕机了,你会失去所有的事件访问能力直到你修复这个受损的节点。使用 Kafka 的管道特性不会有这样的问题。
Flume 和 Kafka 可以一起工作的。如果你需要把流式数据从 Kafka 转移到 Hadoop,可以使用 Flume 代理 (agent),将 kafka 当作一个来源 (source),这样可以从 Kafka 读取数据到 Hadoop。你不需要去开发自己的消费者,你可以使用 Flume 与 Hadoop、HBase 相结合的特性,使用 Cloudera Manager 平台监控消费者,并且通过增加过滤器的方式处理数据。

来自:
Kafka与 Flume 的区别记录分享
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21563


 Flume :管道 ----个人认为比较适合有多个生产者场景,或者有写入Hbase、HDFS和kafka需求的场景。

    Kafka :消息队列-----由于Kafka是Pull模式,因此适合有多个消费者的场景。

cnblogs
http://www.cnblogs.com/ibyte/p/5830715.html



本文链接
http://www.aboutyun.com/forum.php?mod=viewthread&tid=22102


本帖被以下淘专辑推荐:

已有(11)人评论

跳转到指定楼层
yunge2016 发表于 2017-7-4 17:35:48
感谢楼主的付出,自己研究这块总是遇到各种问题。现在又有了一些认识。非常感谢
回复

使用道具 举报

皆空 发表于 2017-7-10 20:15:19
楼主画模型的工具用的那个啊?求推荐个好用的
回复

使用道具 举报

szcountryboy 发表于 2017-7-10 21:01:37
flume和elastic里面的各种beat有什么区别呢?
回复

使用道具 举报

doudoupower 发表于 2017-7-18 21:41:34
请问楼主 flume 怎么处理 /log/app/10.0.0.1/test.log /log/app/10.0.0.2/test.log 这种一个应用多个子目录的日志,怎么区分应用呢?
回复

使用道具 举报

hasqjh7 发表于 2017-8-3 10:07:14
感谢 楼主!
小弟最近 遇到了一个问题:
使用spooldir的source进行配置的,并使用avro sink 输出,最近老是报错,异常信息如下 :01 Aug 2017 10:36:00,957 ERROR [pool-5-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:262)  - FATAL: Spool Directory source spoolDir: { spoolDir: /home/ads/flume-data }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.org.apache.avro.AvroRuntimeException: java.io.IOException: Block size invalid or too large for this implementation: -24
不知道 楼主,能否解答一下么?
回复

使用道具 举报

coding_wht 发表于 2017-8-3 11:35:13
博主 麻烦问下  我目前的需求是:多台机子爬虫去爬取数据 产生实时的日志 然后我要做的是日志的收集 然后做下各个机子爬虫的爬取情况等  这个怎们结合flume 来完成哦
回复

使用道具 举报

tntzbzc 发表于 2017-8-3 12:27:56
doudoupower 发表于 2017-7-18 21:41
请问楼主 flume 怎么处理 /log/app/10.0.0.1/test.log /log/app/10.0.0.2/test.log 这种一个应用多个子目录 ...

Flume的Spooling Directory Source支持Sub-directories
完整的代码已经更新到了github上

回复

使用道具 举报

yuwenge 发表于 2017-8-3 12:29:52
coding_wht 发表于 2017-8-3 11:35
博主 麻烦问下  我目前的需求是:多台机子爬虫去爬取数据 产生实时的日志 然后我要做的是日志的收集 然后做 ...

flume监控日志目录,日志,目录作为source,然后配置channel和存储的地方sink即可实现。
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条