分享

Spark Streaming与driver的问题

Fortitude 发表于 2016-4-5 17:08:00 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 8 9901
SparkConf sparkConf = new SparkConf().setMaster(master).set("spark.driver.host", host);
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);

我的spark是Standalone模式,Spark Streaming接收Flume数据,设置Flume监听的host和port,本地调试没问题,可发布到集群上经常报Failed to bind to: /host:port的问题。查询日志发现在好几个节点都报这样的错误,设置spark.driver.host本意是想指定driver宿主机的地址与flume接收地址一致,可发现这个设置并不起作用,driver还是会在其他节点上运行,并且数量也不确定。我的问题是:
1、FlumeUtils.createStream(ssc, host, port);这句代码是不是在driver上执行?
2、为什么设置spark.driver.host不好使,我要怎么设置才能指定driver的宿主机?

已有(8)人评论

跳转到指定楼层
xuanxufeng 发表于 2016-4-5 18:43:47
采用下面形式
System.setProperty("spark.driver.host", "host")
val sc = new SparkContext(...)


更多参考
spark配置说明
http://www.aboutyun.com/forum.php?mod=viewthread&tid=17937

希望有所帮助

回复

使用道具 举报

Fortitude 发表于 2016-4-5 22:13:38
xuanxufeng 发表于 2016-4-5 18:43
采用下面形式
System.setProperty("spark.driver.host", "host")
val sc = new SparkContext(...)

好的,明天试试,先谢谢了
回复

使用道具 举报

Fortitude 发表于 2016-4-6 09:39:04
xuanxufeng 发表于 2016-4-5 18:43
采用下面形式
System.setProperty("spark.driver.host", "host")
val sc = new SparkContext(...)

试了下,还是在好几个worker上都尝试绑定监听端口,在非指定的driver机器上肯定不能在driver机器上绑定。我现在纳闷的是为什么会在好几个worker上都执行FlumeUtils.createStream(ssc, host, port)代码,standalone方式下默认不是只启动一个driver core吗,难道FlumeUtils.createStream(ssc, host, port)不是在driver上执行的?
回复

使用道具 举报

杰仕人生 发表于 2016-4-6 14:06:34
我也遇到了楼主相同的问题
回复

使用道具 举报

atsky123 发表于 2016-4-6 15:59:20
Fortitude 发表于 2016-4-6 09:39
试了下,还是在好几个worker上都尝试绑定监听端口,在非指定的driver机器上肯定不能在driver机器上绑定。 ...

package scala;

import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;

import java.nio.ByteBuffer;

public static void JavaFlumeEventTest(String master, String host, int port) {
        Duration batchInterval = new Duration(2000);

        JavaStreamingContext ssc = new JavaStreamingContext(master,
               "FlumeEventCount", batchInterval,
                System.getenv("SPARK_HOME"),
                JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
        StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
        JavaDStream<SparkFlumeEvent> flumeStream =
                FlumeUtils.createStream(ssc, host, port, storageLevel);

        flumeStream.count().map(new Function<java.lang.Long, String>() {
            @Override
            public String call(java.lang.Long in) {
                return "Received " + in + " flume events.";
            }
        }).print();

        ssc.start();
        ssc.awaitTermination();
}

楼主看下上面程序,ssc中是可以指定master的

回复

使用道具 举报

Fortitude 发表于 2016-4-7 08:31:26
atsky123 发表于 2016-4-6 15:59
package scala;

import org.apache.flume.source.avro.AvroFlumeEvent;

您好,我问下只有一个receiver的话,是不是数据只能在一个节点接收数据,这样是不是影响分布式计算的效率?如果我在flume上设置两个或更多的sink采用load_balance的方式推送events,spark的receiver也设置多个与之相对应,这样是不是效率会高点?
回复

使用道具 举报

atsky123 发表于 2016-4-7 11:23:08
Fortitude 发表于 2016-4-7 08:31
您好,我问下只有一个receiver的话,是不是数据只能在一个节点接收数据,这样是不是影响分布式计算的效率 ...

可以一试
回复

使用道具 举报

邓立辉 发表于 2016-4-14 08:59:54
杰仕人生 发表于 2016-4-6 14:06
我也遇到了楼主相同的问题

在论坛上看到你了哈哈 qq大辉
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条