分享

storm集成卡夫卡报错 求解决方法

Hentai 发表于 2016-10-24 17:37:30 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 15639
org.apache.atlas.notification.NotificationException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
        at org.apache.atlas.kafka.KafkaNotification.sendInternalToProducer(KafkaNotification.java:249) ~[atlas-notification-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        at org.apache.atlas.kafka.KafkaNotification.sendInternal(KafkaNotification.java:222) ~[atlas-notification-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        at org.apache.atlas.notification.AbstractNotification.send(AbstractNotification.java:84) ~[atlas-notification-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        at org.apache.atlas.hook.AtlasHook.notifyEntitiesInternal(AtlasHook.java:129) [atlas-notification-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:114) [atlas-notification-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:167) [atlas-notification-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:101) [atlas-notification-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        at org.apache.atlas.storm.hook.StormAtlasHook.notify(StormAtlasHook.java:104) [storm-bridge-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        at org.apache.atlas.storm.hook.StormAtlasHook.notify(StormAtlasHook.java:57) [storm-bridge-shim-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        at org.apache.storm.StormSubmitter.invokeSubmitterHook(StormSubmitter.java:284) [storm-core-1.0.1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245]
        at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:257) [storm-core-1.0.1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245]
        at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:310) [storm-core-1.0.1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245]
        at org.apache.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:346) [storm-core-1.0.1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245]
        at org.apache.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:327) [storm-core-1.0.1.2.5.0.0-1245.jar:1.0.1.2.5.0.0-1245]
        at mytopology.DistributeWordTopology.main(DistributeWordTopology.java:144) [925343ae99c911e69c4500505691b725.jar:?]
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:730) ~[kafka-clients-0.10.0.2.5.0.0-1245.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483) ~[kafka-clients-0.10.0.2.5.0.0-1245.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) ~[kafka-clients-0.10.0.2.5.0.0-1245.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) ~[kafka-clients-0.10.0.2.5.0.0-1245.jar:?]
        at org.apache.atlas.kafka.KafkaNotification.sendInternalToProducer(KafkaNotification.java:232) ~[atlas-notification-0.7.0.2.5.0.0-1245.jar:0.7.0.2.5.0.0-1245]
        ... 14 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.



已有(3)人评论

跳转到指定楼层
einhep 发表于 2016-10-24 18:02:54


出现这个问题的原因很多,楼主需要详细描述下:
比如你是否创建了topic
比如:
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZK_ENDPOINT --topic ATLAS_HOOK --create --partitions 1 --replication-factor $KAFKA_REPL_FACTOR

第二: producers 和consumers的hosts是否配置,端口是否有问题



如果英文较好,可参考这个
https://community.hortonworks.co ... st-api-timeout.html
回复

使用道具 举报

Hentai 发表于 2016-10-25 09:27:38
einhep 发表于 2016-10-24 18:02
出现这个问题的原因很多,楼主需要详细描述下:
比如你是否创建了topic
比如:

我是用的ambari安装的kafka
这是我的部分代码
BrokerHosts brokerHosts = new ZkHosts(zks);
   
         GlobalPartitionInformation partitionInformation = new GlobalPartitionInformation(topic);
         partitionInformation.addPartition(0, broker);
         StaticHosts hosts = new StaticHosts(partitionInformation);*/
         SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
         spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
         spoutConf.ignoreZkOffsets = true;
         spoutConf.zkServers = Arrays.asList(new String[] {"10.0.40.3"});
         spoutConf.zkPort = 2181;


TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("kafka-reader", new KafkaSpout(spoutConf),1);
         builder.setBolt("to-upper", new KafkaWordToUpperCase(), 1).shuffleGrouping("kafka-reader");
         builder.setBolt("hdfs-bolt", hdfsBolt, 1).shuffleGrouping("to-upper");
         builder.setBolt("realtime", new RealtimeBolt(), 1).shuffleGrouping("to-upper");

回复

使用道具 举报

Hentai 发表于 2016-10-25 10:12:56
Hentai 发表于 2016-10-25 09:27
我是用的ambari安装的kafka
这是我的部分代码
BrokerHosts brokerHosts = new ZkHosts(zks);

我发现了一个奇怪的问题 我用本地模式没错 但是用集群模式才会出这个错 我觉得应该是我的kafka的配置文件有问题
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条