分享

Flume的Kafka Channel

zstu 发表于 2017-4-12 17:26:03 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 6575
我第一层用kafka收集数据,然后通过一台接口服务器flume收集第一层kafka的数据,并将数据传输到CDH集群的kafka中。我的flume的channel是用KafkaChannel,配置如下:[mw_shl_code=java,true]tier1.sources  =  src_kafka
tier1.channels = ch_kafka
tier1.sinks = sink_kafka

# sources src_kafka
tier1.sources.src_kafka.channels=ch_kafka
tier1.sources.src_kafka.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.src_kafka.zookeeperConnect = 10.25.26.136:2181
tier1.sources.src_kafka.groupId = flume
tier1.sources.src_kafka.topic = event
tier1.sources.src_kafka.kafka.consumer.timeout.ms = 100

# channels channel_kafka
tier1.channels.ch_kafka.type=org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.ch_kafka.brokerList=10.25.26.182:9092,10.25.26.183:9092,10.25.26.184:9092
tier1.channels.ch_kafka.topic=event
tier1.channels.ch_kafka.zookeeperConnect=10.25.26.176:2181[/mw_shl_code]
当我在第一层kafka发送数据的时候,在cdh集群的kafka接收的数据包含其他字符:
如 发送aaaaaa
ccccccccccccccccccccccccccccccccc

cdh的kafka接收:
topic
eventtimestamp1491974576638$aaaaaa

topic
eventtimestamp1491975454178Bccccccccccccccccccccccccccccccccc


想问一下,怎么只接收发送的数据,不要topic和eventtimestamp等字符。

已有(3)人评论

跳转到指定楼层
2017 发表于 2017-4-12 18:28:23
tier1.sources  =  src_kafka
tier1.channels = ch_kafka
tier1.sinks = sink_kafka

# sources src_kafka
tier1.sources.src_kafka.channels=ch_kafka
tier1.sources.src_kafka.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.src_kafka.zookeeperConnect = 10.25.26.136:2181
tier1.sources.src_kafka.groupId = flume
tier1.sources.src_kafka.topic = test1,test2
tier1.sources.src_kafka.kafka.consumer.timeout.ms = 100

# channels channel_kafka
tier1.channels.ch_kafka.type=org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.ch_kafka.brokerList=10.25.26.182:9092,10.25.26.183:9092,10.25.26.184:9092
tier1.channels.ch_kafka.topic=channels
tier1.channels.ch_kafka.zookeeperConnect=10.25.26.176:2181
楼主改成上面看看什么效果

回复

使用道具 举报

zstu 发表于 2017-4-12 18:38:29
2017 发表于 2017-4-12 18:28
tier1.sources  =  src_kafka
tier1.channels = ch_kafka
tier1.sinks = sink_kafka

改成那样CDH上的kafka就接收不到数据了
回复

使用道具 举报

starrycheng 发表于 2017-4-12 19:34:54
zstu 发表于 2017-4-12 18:38
改成那样CDH上的kafka就接收不到数据了

数据由kafkasource进入flume,添加了header信息
这个只能修改源码,可参考下面内容
flume写kafka topic带有时间戳问题




回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条