分享

Kafka与 Flume 的区别记录分享

desehawk 2017-4-18 19:22:22 发表于 总结型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 16 15789
Kafka 与 Flume 很多功能确实是重复的。以下是评估两个系统的一些建议:
Kafka 是一个通用型系统。你可以有许多的生产者和消费者分享多个主题。相反地,Flume 被设计成特定用途的工作,特定地向 HDFS 和 HBase 发送出去。Flume 为了更好地为 HDFS 服务而做了特定的优化,并且与 Hadoop 的安全体系整合在了一起。基于这样的结论,Hadoop 开发商 Cloudera 推荐如果数据需要被多个应用程序消费的话,推荐使用 Kafka,如果数据只是面向 Hadoop 的,可以使用 Flume。

Flume 拥有许多配置的来源 (sources) 和存储池 (sinks)。然后,Kafka 拥有的是非常小的生产者和消费者环境体系,Kafka 社区并不是非常支持这样。如果你的数据来源已经确定,不需要额外的编码,那你可以使用 Flume 提供的 sources 和 sinks,反之,如果你需要准备自己的生产者和消费者,那你需要使用 Kafka。

Flume 可以在拦截器里面实时处理数据。这个特性对于过滤数据非常有用。Kafka 需要一个外部系统帮助处理数据。

无论是 Kafka 或是 Flume,两个系统都可以保证不丢失数据。然后,Flume 不会复制事件。相应地,即使我们正在使用一个可以信赖的文件通道,如果 Flume agent 所在的这个节点宕机了,你会失去所有的事件访问能力直到你修复这个受损的节点。使用 Kafka 的管道特性不会有这样的问题。
Flume 和 Kafka 可以一起工作的。如果你需要把流式数据从 Kafka 转移到 Hadoop,可以使用 Flume 代理 (agent),将 kafka 当作一个来源 (source),这样可以从 Kafka 读取数据到 Hadoop。你不需要去开发自己的消费者,你可以使用 Flume 与 Hadoop、HBase 相结合的特性,使用 Cloudera Manager 平台监控消费者,并且通过增加过滤器的方式处理数据。


已有(16)人评论

跳转到指定楼层
墨小黑 发表于 2017-5-24 11:08:01
easthome001 发表于 2017-5-6 17:42
重启删除,这样找不到问题呀。当挂掉的时候,你看看是不是里面满了。
还有另外一个可能性,就是jvm的参 ...

checkpointDir满了?如何查看是否满了?
调整conf/flume-env.sh的配置?如何调整?
回复

使用道具 举报

langke93 发表于 2017-5-20 21:34:45
wanghiu 发表于 2017-5-18 17:04
首先多谢您的回复,两个贴子我都看了,虽然没有我要的答案,但还是要感谢您。

1、首先,我这边通过tel ...

topic怎么创建的,怎么配置的,你这什么都没有,也帮不上你啊
回复

使用道具 举报

wanghiu 发表于 2017-5-18 17:04:09
langke93 发表于 2017-5-15 21:04
根配置有关系。可对比下面两个:是关于kafka与flume的整合及测试

about云日志分析项目准备8:Kafka集 ...

首先多谢您的回复,两个贴子我都看了,虽然没有我要的答案,但还是要感谢您。

1、首先,我这边通过telnet窗口,向flume监控的端口发送的数据,用程序和cousumer.sh都能正确接收;

2、通过HttUrlConnection,想flume端口发送的数据,kafka的consumer程序,只能接收到请求的头部,接收不到请求的正文部分。

这一点正是困扰我的地方,完全不知道该从哪里修改配置文件。

回复

使用道具 举报

langke93 发表于 2017-5-15 21:04:13
wanghiu 发表于 2017-5-15 18:24
楼主请指教,我用的是flume+kafka,就是把flume接收到的数据发送给kakfa,然后通过kafka的consumer程序打印 ...

根配置有关系。可对比下面两个:是关于kafka与flume的整合及测试

about云日志分析项目准备8:Kafka集群安装

about云日志分析项目准备9:Flume安装和使用



回复

使用道具 举报

wanghiu 发表于 2017-5-15 18:24:13
楼主请指教,我用的是flume+kafka,就是把flume接收到的数据发送给kakfa,然后通过kafka的consumer程序打印接收到的数据,flume版本1.5,kafka版本0.10.0。现在的情况是:
1、用kafka的producer程序生产的消息,消费者可以接收到;
2、在telnet窗口,手动输入的信息,kafka消费者也能获取到;
3、通过程序,向flume端口发送的数据,kafka接收不到,比如通过HttpUrlConnect发送的内容是hello,则消费者接收的信息是:
        value=POST / HTTP/1.1
        value=Content-Type: application/x-www-form-urlencoded
        value=Cache-Control: no-cache
        value=Pragma: no-cache
        value=User-Agent: Java/1.8.0_112
        value=Host: 192.168.0.101:9093
        value=Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
        value=Connection: keep-alive
        value=Content-Length: 5
        value=

请问楼主,用程序向flume发送数据,传递给kafka,发送数据部分,应该怎么处理?kafka消费者这边,应该怎么处理?
求指教,谢谢!!!
回复

使用道具 举报

easthome001 发表于 2017-5-6 17:42:43
本帖最后由 easthome001 于 2017-5-6 17:44 编辑
墨小黑 发表于 2017-5-6 17:33
每次启动,都会把checkpointDir下面的文件删除。
把参数调小调大,都试过了!

重启删除,这样找不到问题呀。当挂掉的时候,你看看是不是里面满了。
还有另外一个可能性,就是jvm的参数配置。内存足够不代表,jvm配置是足够的。
调整下conf/flume-env.sh的配置
回复

使用道具 举报

墨小黑 发表于 2017-5-6 17:33:37
easthome001 发表于 2017-5-6 17:28
agent1.channels.fileChannel.checkpointDir = /home/hadoop/apache-flume-1.6.0-bin/checkpoint
目录是 ...

每次启动,都会把checkpointDir下面的文件删除。
把参数调小调大,都试过了!
回复

使用道具 举报

easthome001 发表于 2017-5-6 17:28:59
墨小黑 发表于 2017-5-6 16:17
配置文件如下:--------------------------------------------------
agent1.sources = spooldirSource
...

agent1.channels.fileChannel.checkpointDir = /home/hadoop/apache-flume-1.6.0-bin/checkpoint
目录是否满了

下面两个参数调小一些
agent1.sinks.hdfsSink.hdfs.rollSize = 12800000
agent1.sinks.hdfsSink.hdfs.batchSize = 1000


回复

使用道具 举报

墨小黑 发表于 2017-5-6 16:17:27
sstutu 发表于 2017-5-6 16:01
看日志,这个一定时间就挂掉,肯定那里设置不合理造成的。还有你复制的是什么文件。大概多大。猜测可能是 ...

配置文件如下:--------------------------------------------------
agent1.sources = spooldirSource
agent1.channels = fileChannel
agent1.sinks = hdfsSink

#配置sources,即被监听的源目录
agent1.sources.spooldirSource.type = spooldir
agent1.sources.spooldirSource.spoolDir = /tmp/testflumesource
agent1.sources.spooldirSource.channels = fileChannel
agent1.sources.spooldirSource.basenameHeader = true
agent1.sources.spooldirSource.basenameHeaderKey = basename
agent1.sources.spooldirSource.fileHeader = true
agent1.sources.spooldirSource.deletePolicy = immediate
# agent1.sources.spooldirSource.batchSize = 1000
#数据源编码格式
agent1.sources.spooldirSource.inputCharset = gbk
agent1.sources.spooldirSOurce.ignorePattern = ^(.)*\\.tmp$

#配置sinks,即目的目录
agent1.sinks.hdfsSink.type = hdfs
agent1.sinks.hdfsSink.hdfs.path = hdfs://node1.hde.h3c.com:8020/user/flume/%{topic}/%Y%m%d/%H%M
agent1.sinks.hdfsSink.fileType = DataStream
agent1.sinks.hdfsSink.writeFormat = Text
agent1.sinks.hdfsSink.hdfs.filePrefix = %{basename}
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true

# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.hdfsSink.hdfs.rollInterval = 0
# File size to trigger roll, in bytes (0: never roll based on file size)
agent1.sinks.hdfsSink.hdfs.rollSize = 128000000
agent1.sinks.hdfsSink.hdfs.rollCount = 0
agent1.sinks.hdfsSink.hdfs.batchSize = 100000
agent1.sinks.hdfsSink.channel = fileChannel

#channels,通道目录配置:把文件事件持久化到本地硬盘上
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop/apache-flume-1.6.0-bin/checkpoint
agent1.channels.fileChannel.dataDirs = /home/hadoop/apache-flume-1.6.0-bin/dataDir
agent1.channels.fileChannel.capacity = 2000000
agent1.channels.fileChannel.transactionCapacity = 2000000
agent1.channels.fileChannel.keep-alive = 6
--------------------------------------------------------------------------------------------
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条