分享

SparkStreaming与flume的整合问题

SparkStreaming与flume的整合问题[急,在线等!!!]

各个版本信息:
spark2.0.2
flume1.7
sbt部分依赖 libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.0.2"

拉模式代码和简单的输出语句
val flumeStream = FlumeUtils.createPollingStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2)
flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()

已经在各个节点添加依赖flume简单配置# 指定Agent的组件名称

a1.sources = r1
a1.sinks = k1
a1.channels = c1

指定Flume source(要监听的路径)

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/weixf_kafka/testflume

指定Flume sink

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel =c1
a1.sinks.k1.hostname=172.28.41.196
a1.sinks.k1.port = 19999

指定Flume channel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000

绑定source和sink到channel上

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume,再启动SparkStreaming程序发现如下信息(部分)
17/09/15 17:44:53 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (Receiver 0 ParallelCollectionRDD[3] at makeRDD at ReceiverTracker.scala:610), which has no missing parents
17/09/15 17:44:53 INFO scheduler.ReceiverTracker: Receiver 0 started
17/09/15 17:44:53 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 70.6 KB, free 413.8 MB)
17/09/15 17:44:53 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 25.1 KB, free 413.8 MB)
17/09/15 17:44:53 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.28.41.193:41571 (size: 25.1 KB, free: 413.9 MB)
17/09/15 17:44:53 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1012
17/09/15 17:44:53 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (Receiver 0 ParallelCollectionRDD[3] at makeRDD at ReceiverTracker.scala:610)
17/09/15 17:44:53 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
17/09/15 17:44:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 70, 172.28.41.196, partition 0, PROCESS_LOCAL, 6736 bytes)
17/09/15 17:44:54 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 70 on executor id: 0 hostname: 172.28.41.196.
17/09/15 17:44:54 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.28.41.196:33364 (size: 25.1 KB, free: 413.9 MB)
17/09/15 17:44:54 INFO util.RecurringTimer: Started timer for JobGenerator at time 1505468700000
17/09/15 17:44:54 INFO scheduler.JobGenerator: Started JobGenerator at 1505468700000 ms
17/09/15 17:44:54 INFO scheduler.JobScheduler: Started JobScheduler
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@534e58b6{/streaming,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1b495d4{/streaming/json,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@12fe1f28{/streaming/batch,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@26fb4d06{/streaming/batch/json,null,AVAILABLE}
17/09/15 17:44:54 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2d38edfd{/static/streaming,null,AVAILABLE}
17/09/15 17:44:54 INFO streaming.StreamingContext: StreamingContext started
17/09/15 17:44:55 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 172.28.41.196:45983
17/09/15 17:45:01 INFO scheduler.JobScheduler: Added jobs for time 1505468700000 ms
17/09/15 17:45:01 INFO scheduler.JobScheduler: Starting job streaming job 1505468700000 ms.0 from job set of time 1505468700000 ms
17/09/15 17:45:01 INFO spark.SparkContext: Starting job: print at FlumeLogPull.scala:44
17/09/15 17:45:01 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.28.41.196:33364 in memory (size: 1969.0 B, free: 413.9 MB)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Registering RDD 7 (union at DStream.scala:605)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Got job 2 (print at FlumeLogPull.scala:44) with 1 output partitions
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (print at FlumeLogPull.scala:44)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 3)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 3)
17/09/15 17:45:01 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 3 (UnionRDD[7] at union at DStream.scala:605), which has no missing parents
17/09/15 17:45:01 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 172.28.41.193:41571 in memory (size: 1969.0 B, free: 413.9 MB)
17/09/15 17:45:02 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.3 KB, free 413.8 MB)
17/09/15 17:45:02 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.0 KB, free 413.8 MB)
17/09/15 17:45:02 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.28.41.193:41571 (size: 2.0 KB, free: 413.9 MB)
17/09/15 17:45:02 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1012
17/09/15 17:45:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 3 (UnionRDD[7] at union at DStream.scala:605)
17/09/15 17:45:02 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
17/09/15 17:45:30 INFO scheduler.JobScheduler: Added jobs for time 1505468730000 ms
17/09/15 17:46:00 INFO scheduler.JobScheduler: Added jobs for time 1505468760000 ms
17/09/15 17:46:30 INFO scheduler.JobScheduler: Added jobs for time 1505468790000 ms
17/09/15 17:47:00 INFO scheduler.JobScheduler: Added jobs for time 1505468820000 ms
17/09/15 17:47:30 INFO scheduler.JobScheduler: Added jobs for time 1505468850000 ms
17/09/15 17:48:00 INFO scheduler.JobScheduler: Added jobs for time 1505468880000 ms
17/09/15 17:48:30 INFO scheduler.JobScheduler: Added jobs for time 1505468910000 ms
17/09/15 17:49:00 INFO scheduler.JobScheduler: Added jobs for time 1505468940000 ms
17/09/15 17:49:30 INFO scheduler.JobScheduler: Added jobs for time 1505468970000 ms
17/09/15 17:50:00 INFO scheduler.JobScheduler: Added jobs for time 1505469000000 ms
17/09/15 17:50:30 INFO scheduler.JobScheduler: Added jobs for time 1505469030000 ms
17/09/15 17:51:00 INFO scheduler.JobScheduler: Added jobs for time 1505469060000 ms
17/09/15 17:51:30 INFO scheduler.JobScheduler: Added jobs for time 1505469090000 ms
17/09/15 17:52:00 INFO scheduler.JobScheduler: Added jobs for time 1505469120000 ms
17/09/15 17:52:30 INFO scheduler.JobScheduler: Added jobs for time 1505469150000 ms
17/09/15 17:53:00 INFO scheduler.JobScheduler: Added jobs for time 1505469180000 ms
17/09/15 17:53:30 INFO scheduler.JobScheduler: Added jobs for time 1505469210000 ms
17/09/15 17:54:00 INFO scheduler.JobScheduler: Added jobs for time 1505469240000 ms
17/09/15 17:54:30 INFO scheduler.JobScheduler: Added jobs for time 1505469270000 ms
17/09/15 17:55:00 INFO scheduler.JobScheduler: Added jobs for time 1505469300000 ms
17/09/15 17:55:30 INFO scheduler.JobScheduler: Added jobs for time 1505469330000 ms
17/09/15 17:56:00 INFO scheduler.JobScheduler: Added jobs for time 1505469360000 ms
17/09/15 17:56:30 INFO scheduler.JobScheduler: Added jobs for time 1505469390000 ms
17/09/15 17:57:00 INFO scheduler.JobScheduler: Added jobs for time 1505469420000 ms
17/09/15 17:57:30 INFO scheduler.JobScheduler: Added jobs for time 1505469450000 ms
17/09/15 17:58:00 INFO scheduler.JobScheduler: Added jobs for time 1505469480000 ms
17/09/15 17:58:30 INFO scheduler.JobScheduler: Added jobs for time 1505469510000 ms
17/09/15 17:59:00 INFO scheduler.JobScheduler: Added jobs for time 1505469540000 ms
17/09/15 17:59:30 INFO scheduler.JobScheduler: Added jobs for time 1505469570000 ms
17/09/15 18:00:00 INFO scheduler.JobScheduler: Added jobs for time 1505469600000 ms
17/09/15 18:00:30 INFO scheduler.JobScheduler: Added jobs for time 1505469630000 ms
17/09/15 18:00:59 INFO storage.BlockManagerInfo: Added input-0-1505469659600 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.9 MB)
17/09/15 18:01:00 INFO scheduler.JobScheduler: Added jobs for time 1505469660000 ms
17/09/15 18:01:00 INFO storage.BlockManagerInfo: Added input-0-1505469659800 in memory on 172.28.41.196:33364 (size: 15.3 KB, free: 413.9 MB)
17/09/15 18:01:03 INFO storage.BlockManagerInfo: Added input-0-1505469662800 in memory on 172.28.41.196:33364 (size: 7.3 KB, free: 413.9 MB)
17/09/15 18:01:25 INFO storage.BlockManagerInfo: Added input-0-1505469684800 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.8 MB)
17/09/15 18:01:25 INFO storage.BlockManagerInfo: Added input-0-1505469685000 in memory on 172.28.41.196:33364 (size: 15.3 KB, free: 413.8 MB)

其中没有我想要的输出信息而是一直有类似
17/09/15 17:45:30 INFO scheduler.JobScheduler: Added jobs for time 1505468730000 ms
这样的信息,如果向监控的文件夹下copy文件得到这样的输出信息
17/09/15 18:00:59 INFO storage.BlockManagerInfo: Added input-0-1505469659600 in memory on 172.28.41.196:33364 (size: 15.7 KB, free: 413.9 MB)

想要的效果是输出类似这样的正常结果

Time: 1505468700000 ms

Received .. flume events.


实在是找不出来什么原因,求大神解惑,不胜感激


已有(9)人评论

跳转到指定楼层
yuwenge 发表于 2017-9-18 10:38:48
a1.sinks.k1.hostname=172.28.41.196
这里换成hostname.
代码按照下面
[mw_shl_code=scala,true]/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam

/**
*  Produces a count of events received from Flume.
*
*  This should be used in conjunction with the Spark Sink running in a Flume agent. See
*  the Spark Streaming programming guide for more details.
*
*  Usage: FlumePollingEventCount <host> <port>
*    `host` is the host on which the Spark Sink is running.
*    `port` is the port at which the Spark Sink is listening.
*
*  To run this example:
*    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
*/
object FlumePollingEventCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumePollingEventCount <host> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(host, IntParam(port)) = args

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)

    // Create a flume stream that polls the Spark Sink running in a Flume agent
    val stream = FlumeUtils.createPollingStream(ssc, host, port)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println[/mw_shl_code]

回复

使用道具 举报

fengfengda 发表于 2017-9-18 12:34:33
yuwenge 发表于 2017-9-18 10:38
a1.sinks.k1.hostname=172.28.41.196
这里换成hostname.
代码按照下面

换成localhost还是不可以,运行结果还是那个样子
回复

使用道具 举报

yuwenge 发表于 2017-9-18 13:59:03
fengfengda 发表于 2017-9-18 12:34
换成localhost还是不可以,运行结果还是那个样子

说明不在这个地方。找找其它问题
回复

使用道具 举报

fengfengda 发表于 2017-9-18 17:17:58
最后这样设置可以了   val conf = new SparkConf().setAppName(appName).setMaster("local[2]")
之前的是  val conf = new SparkConf().setAppName(appName).setMaster("spark://*.*.*.*:7077")
但是不明白为什么直接指定master的ip不可以
回复

使用道具 举报

tntzbzc 发表于 2017-9-18 20:05:01
fengfengda 发表于 2017-9-18 17:17
最后这样设置可以了   val conf = new SparkConf().setAppName(appName).setMaster("local[2]")
之前的是  ...

这跟程序的原理有关系,在有些地方是一样的,有些地方不认识的。而且多看官网下面是官网解释
[mw_shl_code=scala,true]def setMaster(master: String): SparkConf
Permalink
The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.[/mw_shl_code]


setmaster.jpg

http://spark.apache.org/docs/lat ... che.spark.SparkConf
回复

使用道具 举报

xiaobaiyang 发表于 2017-9-19 09:45:09
fengfengda 发表于 2017-9-18 17:17
最后这样设置可以了   val conf = new SparkConf().setAppName(appName).setMaster("local[2]")
之前的是  ...

你好,如果val conf = new SparkConf().setAppName(appName).setMaster("local[2]") 这样设置的话,表示本地运行,但是部署是不能这样设置。你可以尝试如下做法:
val sparkConf = new SparkConf().setAppName("SparkStreamingPullFlume")
执行时执行如下语句:spark-submit --master spark://master:7077 --class cn.chinahadoop.flume.SparkStreamingPullFlume --jars lib/spark-streaming-flume_2.11-2.1.0.jar,lib/spark-streaming-flume-sink_2.11-2.1.0.jar,lib/avro-1.7.7.jar,lib/avro-ipc-1.7.7.jar,lib/avro-ipc-1.7.7-tests.jar,lib/flume-ng-sdk-1.7.0.jar chinahadoop.jar xxx.xxx.xx.xx 11111 50000

xxx.xxx.xx.xx 对应你得实际ip地址
master  为主机名

执行结果:

结果显示

结果显示


回复

使用道具 举报

xiaobaiyang 发表于 2017-9-19 09:52:36
fengfengda 发表于 2017-9-18 17:17
最后这样设置可以了   val conf = new SparkConf().setAppName(appName).setMaster("local[2]")
之前的是  ...

val conf = new SparkConf().setAppName(appName).setMaster("local[2]") 这是本地执行的意思,不能部署使用,
你可以尝试下述执行方法:


val sparkConf = new SparkConf().setAppName("SparkStreamingPullFlume")
执行语句:spark-submit --master spark://hostname:7077 --class cn.chinahadoop.flume.SparkStreamingPullFlume --jars lib/spark-streaming-flume_2.11-2.1.0.jar,lib/spark-streaming-flume-sink_2.11-2.1.0.jar,lib/avro-1.7.7.jar,lib/avro-ipc-1.7.7.jar,lib/avro-ipc-1.7.7-tests.jar,lib/flume-ng-sdk-1.7.0.jar chinahadoop.jar ip 11111 50000


执行结果:
QQ拼音截图20170919092757.png

回复

使用道具 举报

fengfengda 发表于 2017-9-19 14:47:21
xiaobaiyang 发表于 2017-9-19 09:52
val conf = new SparkConf().setAppName(appName).setMaster("local[2]") 这是本地执行的意思,不能部署 ...

是的,其实不需要设置master在程序启动的时候指定就可以了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条