分享

storm中关于kafkSpout的问题求教

caiyifeng 发表于 2015-8-26 13:47:42 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 11283
我在storm平台发布了一个Topology(例如 testTopology),通过kafkaSpout 从kafka上抓取消息进行处理;
storm会在 zookeeper的节点 /storm/testTopology/partition_0  中记录当前处理的消息的offset,例如:
{"topology":{"id":"e20a3347-f43b-4765-94fc-2aec6b8a7d25","name":"testTopology"},"offset":4305106,"partition":0,"broker":{"host":"gjzq-zz5-20","port":9092},"topic":"test-topic"}
说明当前处理到的消息的offset 是4305106

我的问题是:
假设我想指定任意的offset,让Topology从该offset开始进行处理,应该怎么做??
我曾经将/storm/testTopology/partition_0 节点中的offset设置为一个新值,但没有任何作用,反而storm无法抓取到kafka的消息了


请大神解答。。;。多谢!!!

已有(3)人评论

跳转到指定楼层
Alkaloid0515 发表于 2015-8-26 14:44:10
应该参数的问题

KafkaConfig的startOffsetTime字段。默认为kafka.api.OffsetRequest.EarliestTime()开始读,也就是从Kafka中最早的消息开始处理。也可以设成kafka.api.OffsetRequest.LatestOffset,也就是最早的消息开始读。也可以自己指定具体的值。
回复

使用道具 举报

hyj 发表于 2015-8-26 17:56:32




影响初始读取进度的配置项

在一个topology上线后,它从哪个offset开始读取消息呢?有一些配置项对此有影响:
  • SpoutConfig中的id字段。如果想要一个topology从另一个topology之前的处理进度继续处理,它们需要有相同的id。
  • KafkaConfig的forceFromStart字段。如果此字段设为true, 那么它一个topology上线后,它会忽略之前相同id的topology的进度,并且从Kafka中最早的消息开始处理。
  • KafkaConfig的startOffsetTime字段。默认为kafka.api.OffsetRequest.EarliestTime()开始读,也就是从Kafka中最早的消息开始处理。也可以设成kafka.api.OffsetRequest.LatestOffset,也就是最早的消息开始读。也可以自己指定具体的值。
  • KafkaConfig的maxOffsetBehind字段。这个字段对于KafkaSpout的多个处理流程都有影响。当提交一个新topology时,如果没有forceFromStart, 当KafkaSpout对某个partition的处理进度落后startOffsetTime对应的offset多于此值时,KafkaSpout会丢弃中间的消息,从而强制赶上目标进度.比如,如果startOffsetTime设成了lastestTime,那么如果进度落后超过maxOffsetBehind,KafkaSpout会直接从latestTime对应的offset开始处理。如果设成了froceFromStart,则在提交新任务时,始终会从EarliestTime开始读。
  • KafkaSpout的userStartOffsetTimeIfOffsetOutOfRange字段。如果设成true,那么当fetch消息时出错,且FetchResponse显示的出错原因是OFFSET_OUT_OF_RANGE,那么就会尝试从KafkaSpout指定的startOffsetTime对应的消息开始读。例如,如果有一批消息因为超过了保存期限被Kafka删除,并且zk里记录的消息在这批被删除的消息里。如果KafkaSpout试图从zk的记录继续读,那么就会出现OFFSET_OUT_OF_RANGE的错误,从而触发这个配置。
实际上maxOffsetBehind有时候有点名不符实。当startOffsetTime为A, zk里的进度为B, A - B > maxOffsetBehind时,应该从A - maxOffsetBehind除开始读或许更好一些,而不是直接跳到startOffsetTime。此处的逻辑参见PartitionManager的实现。


更多:
KafkaSpout分析:配置
http://www.aboutyun.com/thread-14944-1-1.html





回复

使用道具 举报

caiyifeng 发表于 2015-8-26 18:14:25
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条