分享

kafka source 反序列化问题

kafka的序列化
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

配置中我有设置反序列化EVENT   hdfsAgent.sources.hdfsKafkaSource.kafka.useFlumeEventFormat = true
但是拿到的还是序列化的结果
求解?


agent的配置文件如下:

#自定义sources的名字
hdfsAgent.sources = hdfsKafkaSource
#自定义channels的名字
hdfsAgent.channels = testHdfsChannel
#自定义sinks的名字
hdfsAgent.sinks = hdfsSink


# 指定source使用的channel名字
hdfsAgent.sources.hdfsKafkaSource.channels = testHdfsChannel
# 指定sink需要使用的channel的名字,注意这里是channel
hdfsAgent.sinks.hdfsSink.channel = testHdfsChannel


#-------- hdfsKafkaSource相关配置-----------------
# 定义消息源类型
hdfsAgent.sources.hdfsKafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定义kafka所在zk的地址
#
# 这里特别注意: 是kafka的zookeeper的地址
#
#hdfsAgent.sources.hdfsKafkaSource.kafka.zookeeperConnect = nn1.hadoop:2181,nn2.hadoop:2181,s1.hadoop:2181
hdfsAgent.sources.hdfsKafkaSource.kafka.zookeeperConnect = m1.test:2181,m2.test:2181,s1.test:2181
#kafka 的brokers
hdfsAgent.sources.hdfsKafkaSource.kafka.bootstrap.servers = m1.test:9092,m2.test:9092,s1.test:9092
# 配置消费的kafka topic  多个逗号分隔
hdfsAgent.sources.hdfsKafkaSource.kafka.topics = test
#序列化
hdfsAgent.sources.hdfsKafkaSource.kafka.useFlumeEventFormat = true

# 配置消费者组的id
hdfsAgent.sources.hdfsKafkaSource.kafka.consumer.group.id = test
hdfsAgent.sources.hdfsKafkaSource.kafka.batchSize = 1000
hdfsAgent.sources.hdfsKafkaSource.batchDurationMillis = 2000


#------- fileChannel相关配置-------------------------
# channel类型
hdfsAgent.channels.testHdfsChannel.type = memory
hdfsAgent.channels.testHdfsChannel.capacity = 10000
hdfsAgent.channels.testHdfsChannel.transactionCapacity = 1000


#---------Sink 相关配置------------------
hdfsAgent.sinks.hdfsSink.type = hdfs
# 注意, 我们输出到下面一个子文件夹datax中
hdfsAgent.sinks.hdfsSink.hdfs.path = hdfs:///data/original/%{topic}/%Y%m%d

#设置hdfs文件的副本数量
hdfsAgent.sinks.hdfsSink.hdfs.minBlockReplicas = 1
# 当临时文件达到多少bytes生成新的文件,设置0标识不根据大小来分割文件,这是压缩前的文件大小
hdfsAgent.sinks.hdfsSink.hdfs.rollSize = 8435456
hdfsAgent.sinks.hdfsSink.hdfs.rollCount = 0
hdfsAgent.sinks.hdfsSink.hdfs.rollInterval = 0
hdfsAgent.sinks.hdfsSink.hdfs.roundValue = 5
hdfsAgent.sinks.hdfsSink.hdfs.roundUnit = minute
hdfsAgent.sinks.hdfsSink.hdfs.round = true



#配置前缀和后缀
hdfsAgent.sinks.hdfsSink.hdfs.filePrefix=%{topic}_%H%M%S_%{agentHost}
hdfsAgent.sinks.hdfsSink.hdfs.fileSuffix=.log
hdfsAgent.sinks.hdfsSink.hdfs.inUserSuffix =.tmp
hdfsAgent.sinks.hdfsSink.hdfs.idleTimeout = 120

hdfsAgent.sinks.hdfsSink.hdfs.writeFormat = Text
hdfsAgent.sinks.hdfsSink.hdfs.fileType = DataStream

hdfsAgent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
hdfsAgent.sinks.hdfsSink.hdfs.maxOpenFiles = 1000
hdfsAgent.sinks.hdfsSink.hdfs.batchSize= 100

hdfsAgent.sinks.hdfsSink.hdfs.connect-timeout=80000
hdfsAgent.sinks.hdfsSink.hdfs.callTimeout=120000
#自定义拦截器
hdfsAgent.sources.hdfsKafkaSource.interceptors = i1
hdfsAgent.sources.hdfsKafkaSource.interceptors.i1.type = host
hdfsAgent.sources.hdfsKafkaSource.interceptors.i1.useIP = false
hdfsAgent.sources.hdfsKafkaSource.interceptors.i1.hostHeader = agentHost


已有(6)人评论

跳转到指定楼层
desehawk 发表于 2018-5-28 19:36:39
这个不太好测试。单从配置,看不出什么问题。下面两个方法1.换下源数据,看看能否成功
2.简化成一个反序列化的小例子,测试下。

回复

使用道具 举报

jinwensc 发表于 2018-5-28 20:17:17
我解决了,搞了一天
hdfsAgent.sources.hdfsKafkaSource.kafka.useFlumeEventFormat = true
改成
hdfsAgent.sources.hdfsKafkaSource.useFlumeEventFormat = true
回复

使用道具 举报

langke93 发表于 2018-5-29 09:11:21
jinwensc 发表于 2018-5-28 20:17
我解决了,搞了一天
hdfsAgent.sources.hdfsKafkaSource.kafka.useFlumeEventFormat = true
改成

楼主厉害,从哪找到这个配置。
官网只是简单的给出属性说明而已
1.png
回复

使用道具 举报

jinwensc 发表于 2018-5-29 22:28:23
langke93 发表于 2018-5-29 09:11
楼主厉害,从哪找到这个配置。
官网只是简单的给出属性说明而已

没办法仔细看啊,hdfs sink  gzip压缩文件损坏的问题又出来了,何解?

回复

使用道具 举报

jinwensc 发表于 2018-5-29 22:29:32
我这是flume的问题,怎么移动到kafka了
回复

使用道具 举报

hello2018 发表于 2018-5-30 07:38:19
本帖最后由 hello2018 于 2018-5-30 07:41 编辑
jinwensc 发表于 2018-5-29 22:28
没办法仔细看啊,hdfs sink  gzip压缩文件损坏的问题又出来了,何解?

楼主找到对应属性修改,gzip压缩时,需要几个配置参数要一致,例如下面

hdfsAgent.sinks.hdfsSink.hdfs.rollCount=10000
hdfsAgent.sinks.hdfsSink.hdfs.batchSize =10000
hdfsAgent.channels.testHdfsChannel.transactionCapacity =10000


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条