分享

Kafka与 Flume 的区别记录分享

desehawk 2017-4-18 19:22:22 发表于 总结型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 16 15794
Kafka 与 Flume 很多功能确实是重复的。以下是评估两个系统的一些建议:
Kafka 是一个通用型系统。你可以有许多的生产者和消费者分享多个主题。相反地,Flume 被设计成特定用途的工作,特定地向 HDFS 和 HBase 发送出去。Flume 为了更好地为 HDFS 服务而做了特定的优化,并且与 Hadoop 的安全体系整合在了一起。基于这样的结论,Hadoop 开发商 Cloudera 推荐如果数据需要被多个应用程序消费的话,推荐使用 Kafka,如果数据只是面向 Hadoop 的,可以使用 Flume。

Flume 拥有许多配置的来源 (sources) 和存储池 (sinks)。然后,Kafka 拥有的是非常小的生产者和消费者环境体系,Kafka 社区并不是非常支持这样。如果你的数据来源已经确定,不需要额外的编码,那你可以使用 Flume 提供的 sources 和 sinks,反之,如果你需要准备自己的生产者和消费者,那你需要使用 Kafka。

Flume 可以在拦截器里面实时处理数据。这个特性对于过滤数据非常有用。Kafka 需要一个外部系统帮助处理数据。

无论是 Kafka 或是 Flume,两个系统都可以保证不丢失数据。然后,Flume 不会复制事件。相应地,即使我们正在使用一个可以信赖的文件通道,如果 Flume agent 所在的这个节点宕机了,你会失去所有的事件访问能力直到你修复这个受损的节点。使用 Kafka 的管道特性不会有这样的问题。
Flume 和 Kafka 可以一起工作的。如果你需要把流式数据从 Kafka 转移到 Hadoop,可以使用 Flume 代理 (agent),将 kafka 当作一个来源 (source),这样可以从 Kafka 读取数据到 Hadoop。你不需要去开发自己的消费者,你可以使用 Flume 与 Hadoop、HBase 相结合的特性,使用 Cloudera Manager 平台监控消费者,并且通过增加过滤器的方式处理数据。


已有(16)人评论

跳转到指定楼层
墨小黑 发表于 2017-5-5 17:33:12

求大神指导,SpooldirSources,fileChannels,HDFSSink,配置如下:
agent1.sources = spooldirSource
agent1.channels = fileChannel
agent1.sinks = hdfsSink

#配置sources,即被监听的源目录
agent1.sources.spooldirSource.type = spooldir
agent1.sources.spooldirSource.spoolDir = /tmp/testflumesource
agent1.sources.spooldirSource.channels = fileChannel
agent1.sources.spooldirSource.basenameHeader = true
agent1.sources.spooldirSource.basenameHeaderKey = basename
agent1.sources.spooldirSource.deletePolicy = immediate
# agent1.sources.spooldirSource.batchSize = 1000
#数据源编码格式
agent1.sources.spooldirSource.inputCharset = gbk
agent1.sources.spooldirSOurce.ignorePattern = ^(.)*\\.tmp$

#配置sinks,即目的目录
agent1.sinks.hdfsSink.type = hdfs
agent1.sinks.hdfsSink.hdfs.path = hdfs://node1.hde.h3c.com:8020/user/flume/%{topic}/%Y%m%d/%H%M
agent1.sinks.hdfsSink.fileType = DataStream
agent1.sinks.hdfsSink.writeFormat = Text
agent1.sinks.hdfsSink.hdfs.filePrefix = %{basename}
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true

# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.hdfsSink.hdfs.rollInterval = 1
# File size to trigger roll, in bytes (0: never roll based on file size)
agent1.sinks.hdfsSink.hdfs.rollSize = 12800000
agent1.sinks.hdfsSink.hdfs.rollCount = 0
agent1.sinks.hdfsSink.hdfs.batchSize = 1000
agent1.sinks.hdfsSink.channel = fileChannel

#channels,通道目录配置:把文件事件持久化到本地硬盘上
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop/apache-flume-1.6.0-bin/checkpoint
agent1.channels.fileChannel.dataDirs = /home/hadoop/apache-flume-1.6.0-bin/dataDir
agent1.channels.fileChannel.capacity = 2000000
agent1.channels.fileChannel.transactionCapacity = 2000
agent1.channels.fileChannel.keep-alive = 6

启动flume后,将大量文件copy到spoolDir下,只传了几个文件后就被打断了,然后我重启flume,会继续接着传输,但是传输大概100多个文件后又被打断了,再重启,flume就报错了。
尝试的方案:先将文件copy到spooldir,然后启动flume,大概150个文件后flume就被打断,再重启后还是会打断。
请问怎么处理啊!!!
回复

使用道具 举报

desehawk 发表于 2017-5-5 18:42:55
墨小黑 发表于 2017-5-5 17:33
求大神指导,SpooldirSources,fileChannels,HDFSSink,配置如下:
agent1.sources = spooldirSource
...



下面二者可以保持一致
agent1.channels.fileChannel.capacity = 2000000
agent1.channels.fileChannel.transactionCapacity = 2000000

回复

使用道具 举报

墨小黑 发表于 2017-5-6 08:46:14
desehawk 发表于 2017-5-5 18:42
下面二者可以保持一致
agent1.channels.fileChannel.capacity = 2000000
agent1.channels.fileChan ...

为啥要保持一致?官网上的指导capacity=100000,transactionCapacity=1000.
capacity表示channel可以保存提交event的最大数量
transactionCapacity表示单个事务中可以写入或者读取的最大数量
回复

使用道具 举报

einhep 发表于 2017-5-6 10:26:41
墨小黑 发表于 2017-5-6 08:46
为啥要保持一致?官网上的指导capacity=100000,transactionCapacity=1000.
capacity表示channel可以保 ...

二者可以保持一致,总体来说
capacity>=transactionCapacity

capacity它指定了池子里可以存放的Event数量
transactionCapacity表示单个事务中可以写入或者读取的最大数量.'


回复

使用道具 举报

墨小黑 发表于 2017-5-6 15:26:06
einhep 发表于 2017-5-6 10:26
二者可以保持一致,总体来说
capacity>=transactionCapacity

明白。
现在我这边遇到几个问题,想请教一下。
1、按照上面的配置,先启动flume,然后往Spooldir中copy文件,只要copy超过10个文件,flume立马挂掉。
2、从问题1考虑,先将文件copy到spooldir中,再启动flume,然而,只能传输200个文件左右,flume又挂掉。
3、项目需求要传输图片,能否实现??
麻烦您给看看以上三个问题,有没有上面思路?

---------------------万分感谢----------------------------------------
回复

使用道具 举报

sstutu 发表于 2017-5-6 16:01:34
墨小黑 发表于 2017-5-6 15:26
明白。
现在我这边遇到几个问题,想请教一下。
1、按照上面的配置,先启动flume,然后往Spooldir中copy ...

看日志,这个一定时间就挂掉,肯定那里设置不合理造成的。还有你复制的是什么文件。大概多大。猜测可能是管道异常了。
图片好像不识别。会产生异常。
回复

使用道具 举报

墨小黑 发表于 2017-5-6 16:16:04
sstutu 发表于 2017-5-6 16:01
看日志,这个一定时间就挂掉,肯定那里设置不合理造成的。还有你复制的是什么文件。大概多大。猜测可能是 ...

复制的是100KB左右的xml文件,只是数量比较多而已。

----------------------------------------------------------------------------
[ERROR - org.apache.flume.channel.file.Log$BackgroundWorker.run(Log.java:1176)] General error in checkpoint worker
java.lang.OutOfMemoryError: Java heap space


[INFO - org.apache.flume.sink.hdfs.HDFSSequenceFile.configure(HDFSSequenceFile.java:63)] writeFormat = Writable, UseRawLocalFileSystem = false
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: Java heap space


OutOfMemoryError,但是我free查看了,还剩很多内存啊!
回复

使用道具 举报

墨小黑 发表于 2017-5-6 16:17:27
sstutu 发表于 2017-5-6 16:01
看日志,这个一定时间就挂掉,肯定那里设置不合理造成的。还有你复制的是什么文件。大概多大。猜测可能是 ...

配置文件如下:--------------------------------------------------
agent1.sources = spooldirSource
agent1.channels = fileChannel
agent1.sinks = hdfsSink

#配置sources,即被监听的源目录
agent1.sources.spooldirSource.type = spooldir
agent1.sources.spooldirSource.spoolDir = /tmp/testflumesource
agent1.sources.spooldirSource.channels = fileChannel
agent1.sources.spooldirSource.basenameHeader = true
agent1.sources.spooldirSource.basenameHeaderKey = basename
agent1.sources.spooldirSource.fileHeader = true
agent1.sources.spooldirSource.deletePolicy = immediate
# agent1.sources.spooldirSource.batchSize = 1000
#数据源编码格式
agent1.sources.spooldirSource.inputCharset = gbk
agent1.sources.spooldirSOurce.ignorePattern = ^(.)*\\.tmp$

#配置sinks,即目的目录
agent1.sinks.hdfsSink.type = hdfs
agent1.sinks.hdfsSink.hdfs.path = hdfs://node1.hde.h3c.com:8020/user/flume/%{topic}/%Y%m%d/%H%M
agent1.sinks.hdfsSink.fileType = DataStream
agent1.sinks.hdfsSink.writeFormat = Text
agent1.sinks.hdfsSink.hdfs.filePrefix = %{basename}
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true

# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.hdfsSink.hdfs.rollInterval = 0
# File size to trigger roll, in bytes (0: never roll based on file size)
agent1.sinks.hdfsSink.hdfs.rollSize = 128000000
agent1.sinks.hdfsSink.hdfs.rollCount = 0
agent1.sinks.hdfsSink.hdfs.batchSize = 100000
agent1.sinks.hdfsSink.channel = fileChannel

#channels,通道目录配置:把文件事件持久化到本地硬盘上
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop/apache-flume-1.6.0-bin/checkpoint
agent1.channels.fileChannel.dataDirs = /home/hadoop/apache-flume-1.6.0-bin/dataDir
agent1.channels.fileChannel.capacity = 2000000
agent1.channels.fileChannel.transactionCapacity = 2000000
agent1.channels.fileChannel.keep-alive = 6
--------------------------------------------------------------------------------------------
回复

使用道具 举报

easthome001 发表于 2017-5-6 17:28:59
墨小黑 发表于 2017-5-6 16:17
配置文件如下:--------------------------------------------------
agent1.sources = spooldirSource
...

agent1.channels.fileChannel.checkpointDir = /home/hadoop/apache-flume-1.6.0-bin/checkpoint
目录是否满了

下面两个参数调小一些
agent1.sinks.hdfsSink.hdfs.rollSize = 12800000
agent1.sinks.hdfsSink.hdfs.batchSize = 1000


回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条