分享

flume+spark结合的问题

fengfengda 发表于 2017-9-15 10:12:17 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 11 9696
结合时出现了问题,求大神告知。。。。。。。。。。在线等。。。
通过sparkStreaming的拉模式拉去flume中的数据flume中的配置是
# 指定Agent的组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 指定Flume source(要监听的路径)
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/weixf_kafka/testflume


# 指定Flume sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel =c1
a1.sinks.k1.hostname=...
a1.sinks.k1.port = 19999


# 指定Flume channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# 绑定source和sink到channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1



sparkStreaming测试代码是

object FlumeLogPull {
  Logger.getRootLogger.setLevel(Level.WARN)
  def main(args: Array[String]): Unit = {
    val appName="FlumeLogPull"
    val masterUrl = "spark://...:7077"
    val host = "..."
    val port=19999
    val batchInterval = Seconds(10) //Milliseconds(10000)

    // 创建SparkConf对象,并指定AppName和Master
    val conf = new SparkConf()
      .setAppName(appName)
      .setMaster(masterUrl)

    // 创建StreamingContext对象
    val ssc = new StreamingContext(conf, batchInterval)

    // 创建 flumeStream  拉模式
    val flumeStream = FlumeUtils.createPollingStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2)
    flumeStream.foreachRDD(rdd => {
      rdd.foreachPartition(it=>{
        it.foreach(event=>{
          val sensorInfo = new String(event.event.getBody.array()) //单行记录
          println(sensorInfo)
        })
      })
    })
    flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()
    //拿到消息中的event,从event中拿出body,body是真正的消息体
    flumeStream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }
但是运行结果是
17/09/15 10:01:01 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 268.0 (TID 457, 172.28.41.196): java.lang.NoClassDefFoundError: org/spark-project/guava/util/concurrent/ThreadFactoryBuilder
        at org.apache.spark.streaming.flume.FlumePollingReceiver.channelFactoryExecutor$lzycompute(FlumePollingInputDStream.scala:69)
        at org.apache.spark.streaming.flume.FlumePollingReceiver.channelFactoryExecutor(FlumePollingInputDStream.scala:68)
        at org.apache.spark.streaming.flume.FlumePollingReceiver.channelFactory$lzycompute(FlumePollingInputDStream.scala:73)
        at org.apache.spark.streaming.flume.FlumePollingReceiver.channelFactory(FlumePollingInputDStream.scala:72)
        at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:83)
        at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:82)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.streaming.flume.FlumePollingReceiver.onStart(FlumePollingInputDStream.scala:82)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:587)
        at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
        at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


-------------------------------------------
Time: 1505440860000 ms
-------------------------------------------
Received 0 flume events.


-------------------------------------------
Time: 1505440860000 ms
-------------------------------------------


可是并没有这个jar包啊,
环境版本:
flume1.7
spark2.0.2
使用的依赖是 libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.6.3"
可能是依赖的问题吧,求大神解惑啊!!!不胜感激

已有(11)人评论

跳转到指定楼层
yaojiank 发表于 2017-9-15 11:49:11
fengfengda 发表于 2017-9-15 10:22
有人说是少了guava-11.0.2.jar个jar但是我拉到每个节点之后运行还是不行,这个只是警告信息,程序还是在运 ...

可能是版本兼容的问题。
那就换个版本试试,这是官网给的依赖
[mw_shl_code=bash,true] groupId = org.apache.spark
artifactId = spark-streaming-flume_2.11
version = 2.2.0[/mw_shl_code]
http://spark.apache.org/docs/latest/streaming-flume-integration.html
回复

使用道具 举报

fengfengda 发表于 2017-9-15 10:22:04
有人说是少了guava-11.0.2.jar个jar但是我拉到每个节点之后运行还是不行,这个只是警告信息,程序还是在运行,但是我把文件移动到监控的目录并没有在sparkStreaming程序中输出我想要的结果,就和没有监控到一样,不知道是什么原因造成的
回复

使用道具 举报

fengfengda 发表于 2017-9-15 14:35:45
yaojiank 发表于 2017-9-15 11:49
可能是版本兼容的问题。
那就换个版本试试,这是官网给的依赖
[mw_shl_code=bash,true] groupId = org. ...

的确是依赖问题现在用的是libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.0.2"
回复

使用道具 举报

fengfengda 发表于 2017-9-15 14:39:56
但是拉进去文件就

17/09/15 14:37:00 INFO scheduler.JobScheduler: Starting job streaming job 1505457420000 ms.0 from job set of time 1505457420000 ms
17/09/15 14:37:00 INFO scheduler.JobScheduler: Finished job streaming job 1505457420000 ms.0 from job set of time 1505457420000 ms
17/09/15 14:37:00 INFO scheduler.JobScheduler: Total delay: 0.091 s for time 1505457420000 ms (execution: 0.000 s)
17/09/15 14:37:00 INFO rdd.MapPartitionsRDD: Removing RDD 13 from persistence list
17/09/15 14:37:00 INFO storage.BlockManager: Removing RDD 13
17/09/15 14:37:00 INFO rdd.BlockRDD: Removing RDD 12 from persistence list
17/09/15 14:37:00 INFO storage.BlockManager: Removing RDD 12
17/09/15 14:37:00 INFO flume.FlumePollingInputDStream: Removing blocks of RDD BlockRDD[12] at createPollingStream at FlumeLogPull.scala:33 of time 1505457420000 ms
17/09/15 14:37:00 INFO scheduler.ReceivedBlockTracker: Deleting batches: 1505457400000 ms
17/09/15 14:37:00 INFO scheduler.InputInfoTracker: remove old batch metadata: 1505457400000 ms
17/09/15 14:37:10 INFO scheduler.JobScheduler: Added jobs for time 1505457430000 ms
-------------------------------------------
Time: 1505457430000 ms
-------------------------------------------

17/09/15 14:37:10 INFO scheduler.JobScheduler: Starting job streaming job 1505457430000 ms.0 from job set of time 1505457430000 ms
17/09/15 14:37:10 INFO scheduler.JobScheduler: Finished job streaming job 1505457430000 ms.0 from job set of time 1505457430000 ms
17/09/15 14:37:10 INFO scheduler.JobScheduler: Total delay: 0.021 s for time 1505457430000 ms (execution: 0.001 s)
17/09/15 14:37:10 INFO rdd.MapPartitionsRDD: Removing RDD 15 from persistence list
17/09/15 14:37:10 INFO storage.BlockManager: Removing RDD 15
17/09/15 14:37:10 INFO rdd.BlockRDD: Removing RDD 14 from persistence list
17/09/15 14:37:10 INFO storage.BlockManager: Removing RDD 14
17/09/15 14:37:10 INFO flume.FlumePollingInputDStream: Removing blocks of RDD BlockRDD[14] at createPollingStream at FlumeLogPull.scala:33 of time 1505457430000 ms
17/09/15 14:37:10 INFO scheduler.ReceivedBlockTracker: Deleting batches: 1505457410000 ms
17/09/15 14:37:10 INFO scheduler.InputInfoTracker: remove old batch metadata: 1505457410000 ms
17/09/15 14:37:20 INFO scheduler.JobScheduler: Added jobs for time 1505457440000 ms
-------------------------------------------
Time: 1505457440000 ms
-------------------------------------------

17/09/15 14:37:20 INFO scheduler.JobScheduler: Starting job streaming job 1505457440000 ms.0 from job set of time 1505457440000 ms
17/09/15 14:37:20 INFO scheduler.JobScheduler: Finished job streaming job 1505457440000 ms.0 from job set of time 1505457440000 ms
17/09/15 14:37:20 INFO scheduler.JobScheduler: Total delay: 0.030 s for time 1505457440000 ms (execution: 0.001 s)
17/09/15 14:37:20 INFO rdd.MapPartitionsRDD: Removing RDD 17 from persistence list
17/09/15 14:37:20 INFO storage.BlockManager: Removing RDD 17
17/09/15 14:37:20 INFO rdd.BlockRDD: Removing RDD 16 from persistence list
17/09/15 14:37:20 INFO storage.BlockManager: Removing RDD 16
17/09/15 14:37:20 INFO flume.FlumePollingInputDStream: Removing blocks of RDD BlockRDD[16] at createPollingStream at FlumeLogPull.scala:33 of time 1505457440000 ms
17/09/15 14:37:20 INFO scheduler.ReceivedBlockTracker: Deleting batches: 1505457420000 ms
17/09/15 14:37:20 INFO scheduler.InputInfoTracker: remove old batch metadata: 1505457420000 ms
17/09/15 14:37:26 INFO storage.BlockManagerInfo: Added input-0-1505457365928 in memory on 172.28.41.196:59597 (size: 91.0 B, free: 413.9 MB)
17/09/15 14:37:30 INFO scheduler.JobScheduler: Added jobs for time 1505457450000 ms
17/09/15 14:37:30 INFO scheduler.JobScheduler: Starting job streaming job 1505457450000 ms.0 from job set of time 1505457450000 ms
17/09/15 14:37:30 INFO spark.SparkContext: Starting job: print at FlumeLogPull.scala:46
17/09/15 14:37:30 INFO scheduler.DAGScheduler: Got job 2 (print at FlumeLogPull.scala:46) with 1 output partitions
17/09/15 14:37:30 INFO scheduler.DAGScheduler: Final stage: ResultStage 3 (print at FlumeLogPull.scala:46)
17/09/15 14:37:30 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/09/15 14:37:30 INFO scheduler.DAGScheduler: Missing parents: List()
17/09/15 14:37:30 INFO scheduler.DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[21] at flatMap at FlumeLogPull.scala:46), which has no missing parents
17/09/15 14:37:30 INFO memory.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 1864.0 B, free 413.8 MB)
17/09/15 14:37:30 INFO memory.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1247.0 B, free 413.8 MB)
17/09/15 14:37:30 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.28.41.193:33204 (size: 1247.0 B, free: 413.9 MB)
17/09/15 14:37:30 INFO spark.SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1012
17/09/15 14:37:30 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[21] at flatMap at FlumeLogPull.scala:46)
17/09/15 14:37:30 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.0.2"libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.0.2"17/09/15 14:37:40 INFO scheduler.JobScheduler: Added jobs for time 1505457460000 ms
17/09/15 14:37:50 INFO scheduler.JobScheduler: Added jobs for time 1505457470000 ms
17/09/15 14:38:00 INFO scheduler.JobScheduler: Added jobs for time 1505457480000 ms
17/09/15 14:38:10 INFO scheduler.JobScheduler: Added jobs for time 1505457490000 ms
17/09/15 14:38:15 INFO storage.BlockManagerInfo: Added input-0-1505457365929 in memory on 172.28.41.196:59597 (size: 91.0 B, free: 413.9 MB)
17/09/15 14:38:20 INFO scheduler.JobScheduler: Added jobs for time 1505457500000 ms
17/09/15 14:38:30 INFO scheduler.JobScheduler: Added jobs for time 1505457510000 ms
17/09/15 14:38:40 INFO scheduler.JobScheduler: Added jobs for time 1505457520000 ms
17/09/15 14:38:50 INFO scheduler.JobScheduler: Added jobs for time 1505457530000 ms
17/09/15 14:38:58 INFO storage.BlockManagerInfo: Added input-0-1505457365930 in memory on 172.28.41.196:59597 (size: 91.0 B, free: 413.9 MB)
17/09/15 14:39:00 INFO scheduler.JobScheduler: Added jobs for time 1505457540000 ms

出现这个结果,还是没有预料的结果呢
回复

使用道具 举报

nextuser 发表于 2017-9-15 16:25:14
fengfengda 发表于 2017-9-15 14:39
但是拉进去文件就

17/09/15 14:37:00 INFO scheduler.JobScheduler: Starting job streaming job 150545 ...

可能是没有取到。那就是代码问题了。
回复

使用道具 举报

fengfengda 发表于 2017-9-15 16:28:42
nextuser 发表于 2017-9-15 16:25
可能是没有取到。那就是代码问题了。

代码就
flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()这一句输出啊
回复

使用道具 举报

fengfengda 发表于 2017-9-15 17:48:14
难道是libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.0.2"这个依赖还有问题导致的
回复

使用道具 举报

fengfengda 发表于 2017-9-15 18:16:32
fengfengda 发表于 2017-9-15 17:48
难道是libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.0.2"这个依赖还有 ...

换成了
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.2.0"还是不行
回复

使用道具 举报

yuwenge 发表于 2017-9-18 10:36:09
fengfengda 发表于 2017-9-15 18:16
换成了
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.11" % "2.2.0"还是不行 ...


试试这个代码
[mw_shl_code=scala,true]/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam

/**
*  Produces a count of events received from Flume.
*
*  This should be used in conjunction with the Spark Sink running in a Flume agent. See
*  the Spark Streaming programming guide for more details.
*
*  Usage: FlumePollingEventCount <host> <port>
*    `host` is the host on which the Spark Sink is running.
*    `port` is the port at which the Spark Sink is listening.
*
*  To run this example:
*    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
*/
object FlumePollingEventCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumePollingEventCount <host> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(host, IntParam(port)) = args

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)

    // Create a flume stream that polls the Spark Sink running in a Flume agent
    val stream = FlumeUtils.createPollingStream(ssc, host, port)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
// scalastyle:on println[/mw_shl_code]
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条