分享

kafka+storm 报错

bahe 发表于 2016-12-4 00:17:08 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 18920
7532 [Thread-16-spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Refreshing partition manager connections
7532 [Thread-16-spout-executor[3 3]-EventThread] INFO  o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
7535 [Thread-16-spout-executor[3 3]] INFO  o.a.s.k.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{topic=test-topic, partitionMap={0=127.0.0.1:9094}}
7536 [Thread-16-spout-executor[3 3]] INFO  o.a.s.k.KafkaUtils - Task [1/1] assigned [Partition{host=127.0.0.1:9094, topic=test-topic, partition=0}]
7536 [Thread-16-spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] Deleted partition managers: []
7536 [Thread-16-spout-executor[3 3]] INFO  o.a.s.k.ZkCoordinator - Task [1/1] New partition managers: [Partition{host=127.0.0.1:9094, topic=test-topic, partition=0}]
7630 [Thread-16-spout-executor[3 3]] INFO  o.a.s.k.PartitionManager - Read partition information from: /test-topic/04680174-656f-41ad-ad6f-2976d28b2d24/partition_0  --> null
7663 [Thread-16-spout-executor[3 3]] INFO  k.c.SimpleConsumer - Reconnect due to error:
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85) [kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) [kafka_2.11-0.10.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) [kafka_2.11-0.10.0.1.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:103) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) [storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_111]
7672 [Thread-16-spout-executor[3 3]] ERROR o.a.s.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.10.0.1.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:103) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_111]
7673 [Thread-16-spout-executor[3 3]] ERROR o.a.s.d.executor -
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.10.0.1.jar:?]
        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.10.0.1.jar:?]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:103) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) ~[storm-kafka-1.0.2.jar:1.0.2]
        at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) ~[storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_111]
7694 [Thread-16-spout-executor[3 3]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
        at org.apache.storm.daemon.worker$fn__8659$fn__8660.invoke(worker.clj:761) [storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274) [storm-core-1.0.2.jar:1.0.2]
        at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) [storm-core-1.0.2.jar:1.0.2]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Unknown Source) [?:1.8.0_111]

求大神讲解下,这个错是什么原因?

已有(5)人评论

跳转到指定楼层
einhep 发表于 2016-12-4 08:39:02
org.apache.kafka.common.network.NetworkSend 是一个Kafka客户端库,kafka 0.9以前,首先初始化这个类。你确定在kafka-client客户端只有一个版本kafka-client jar 在classpath
回复

使用道具 举报

bahe 发表于 2016-12-4 09:14:08
本机部署的kafka2.11-0.10.0.1,mvn pom是
<dependencies>
    <dependency>
                        <groupId>org.apache.storm</groupId>
                        <artifactId>storm-core</artifactId>
                        <version>1.0.2</version>
                        <scope>provided</scope>
                </dependency>
                <dependency>
                        <groupId>org.apache.storm</groupId>
                        <artifactId>storm-kafka</artifactId>
                        <version>1.0.2</version>
                        <scope>provided</scope>
                </dependency>
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
            <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
        </dependency>
  </dependencies>
但在mvn本地库确实有0.8.2.1,0.9.0.0,0.9.0.1等版本。但应该没载入呀!

回复

使用道具 举报

bahe 发表于 2016-12-4 09:19:27
另外,用Trident方式向kafka写数据,调试通了,正常。
但无论是core,还是Trident方式从kafka取数据都报的上面的错误。
回复

使用道具 举报

bahe 发表于 2016-12-4 11:24:43
上面问题已解决,修改POM,增加:
                <dependency>                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-clients</artifactId>
                        <version>0.10.0.1</version>
                        <scope>provided</scope>
                </dependency>
但现在报错信息是:
7799 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x158c7d40a250012 type:create cxid:0x3 zxid:0x3f txntype:-1 reqpath:n/a Error Path:/TestK Error:KeeperErrorCode = NoNode for /TestK
代码:
        BrokerHosts hosts = new ZkHosts("localhost:2181");
        SpoutConfig spoutConfig = new SpoutConfig(hosts, "my-tpc", "", "TestK");

请高手继续解惑,谢谢。
回复

使用道具 举报

easthome001 发表于 2016-12-4 12:44:11
bahe 发表于 2016-12-4 11:24
上面问题已解决,修改POM,增加:
                                        org.apache.kafka
                        kafka-clients

SpoutConfig spoutConfig = new SpoutConfig(hosts, "my-tpc", "", "TestK");
TestK这个路径应该写全路径
比如你以前创建过 /kafka,那么路径就应该是 /kafka/TestK
SpoutConfig spoutConfig = new SpoutConfig(hosts, "my-tpc", "", "/kafka/TestK");


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条