分享

Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?

问题导读:
1.数据峰值有什么影响?
2.如何限制Spark的接收速度?




一:数据峰值的巨大影响
1. 数据确实不稳定,例如晚上的时候访问流量特别大
2. 在处理的时候例如GC的时候耽误时间会产生delay延迟

二:Backpressure:数据的反压机制
基本思想:根据上一次计算的Job的一些信息评估来决定下一个Job数据接收的速度。
如何限制Spark接收数据的速度?
Spark Streaming在接收数据的时候必须把当前的数据接收完毕才能接收下一条数据。

源码解析
RateController:
RateController是监听器,继承自StreamingListener.
[mw_shl_code=java,true]/**
* A StreamingListener that receives batch completion updates, and maintains
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`
*/
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
    extends StreamingListener with Serializable {[/mw_shl_code]

问题来了,RateContoller什么时候被调用的呢?
BackPressure是根据上一次计算的Job信息来评估下一个Job数据接收的速度。因此肯定是在JobScheduler中被调用的。

1. 在JobScheduler的start方法中rateController方法是从inputStream中获取的。
[mw_shl_code=java,true]// attach rate controllers of input streams to receive batch completion updates
for {
  inputDStream <- ssc.graph.getInputStreams
  rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)[/mw_shl_code]

2.  然后将此消息加入到listenerBus中。[mw_shl_code=java,true]/** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
  * receiving system events related to streaming.
  */
def addStreamingListener(streamingListener: StreamingListener) {
  scheduler.listenerBus.addListener(streamingListener)
}[/mw_shl_code]

3. 在StreamingListenerBus源码如下:
[mw_shl_code=java,true]/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
private[spark] class StreamingListenerBus
  extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
  with Logging {

  private val logDroppedEvent = new AtomicBoolean(false)

  override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
    event match {
      case receiverStarted: StreamingListenerReceiverStarted =>
        listener.onReceiverStarted(receiverStarted)
      case receiverError: StreamingListenerReceiverError =>
        listener.onReceiverError(receiverError)
      case receiverStopped: StreamingListenerReceiverStopped =>
        listener.onReceiverStopped(receiverStopped)
      case batchSubmitted: StreamingListenerBatchSubmitted =>
        listener.onBatchSubmitted(batchSubmitted)
      case batchStarted: StreamingListenerBatchStarted =>
        listener.onBatchStarted(batchStarted)
      case batchCompleted: StreamingListenerBatchCompleted =>
        listener.onBatchCompleted(batchCompleted)[/mw_shl_code]

4.在RateController就实现了onBatchCompleted
1.jpg

5. RateController中onBatchCompleted具体实现如下:
[mw_shl_code=applescript,true]override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
  val elements = batchCompleted.batchInfo.streamIdToInputInfo

  for {
    processingEnd <- batchCompleted.batchInfo.processingEndTime
    workDelay <- batchCompleted.batchInfo.processingDelay
    waitDelay <- batchCompleted.batchInfo.schedulingDelay
    elems <- elements.get(streamUID).map(_.numRecords)
  } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}[/mw_shl_code]

6.  RateController中computeAndPulish源码如下:
[mw_shl_code=applescript,true]/**
* Compute the new rate limit and publish it asynchronously.
*/
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
  Future[Unit] {
//评估新的更加合适Rate速度。      
    val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
    newRate.foreach { s =>
      rateLimit.set(s.toLong)
      publish(getLatestRate())
    }
  }[/mw_shl_code]

7.  其中publish实现是在ReceiverRateController中。
2.jpg

8. 将pulish消息给ReceiverTracker.
[mw_shl_code=applescript,true]/**
* A RateController that sends the new rate to receivers, via the receiver tracker.
*/
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
    extends RateController(id, estimator) {
  override def publish(rate: Long): Unit =
//因为会有很多RateController所以会有具体Id
    ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}[/mw_shl_code]

9.  在ReceiverTracker中sendRateUpdate源码如下:
此时的endpoint是ReceiverTrackerEndpoint. [mw_shl_code=applescript,true]/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
  if (isTrackerStarted) {
    endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
  }
}[/mw_shl_code]

10. 在ReceiverTrackerEndpoint的receive方法中就接收到了发来的消息。
[mw_shl_code=applescript,true]case UpdateReceiverRateLimit(streamUID, newRate) =>
//根据receiverTrackingInfos获取info信息,然后根据endpoint获取通信句柄。
//此时endpoint是ReceiverSupervisor的endpoint通信实体。
  for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
    eP.send(UpdateRateLimit(newRate))
  }[/mw_shl_code]

11. 因此在ReceiverSupervisorImpl中接收到ReceiverTracker发来的消息。
[mw_shl_code=applescript,true]/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint(
  "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
    override val rpcEnv: RpcEnv = env.rpcEnv

    override def receive: PartialFunction[Any, Unit] = {
      case StopReceiver =>
        logInfo("Received stop signal")
        ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
      case CleanupOldBlocks(threshTime) =>
        logDebug("Received delete old batch signal")
        cleanupOldBlocks(threshTime)
      case UpdateRateLimit(eps) =>
        logInfo(s"Received a new rate limit: $eps.")
        registeredBlockGenerators.foreach { bg =>
          bg.updateRate(eps)
        }
    }
  })[/mw_shl_code]

12. RateLimiter中updateRate源码如下:
[mw_shl_code=applescript,true]/**
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
//这里有最大限制,因为你的集群处理规模是有限的。
//Spark Streaming可能运行在YARN之上,因为多个计算框架都在运行的话,资源就//更有限了。
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
*
* @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
*/
private[receiver] def updateRate(newRate: Long): Unit =
  if (newRate > 0) {
    if (maxRateLimit > 0) {
      rateLimiter.setRate(newRate.min(maxRateLimit))
    } else {
      rateLimiter.setRate(newRate)
    }
  }[/mw_shl_code]*

总体流程图如下:
3.jpg

总结:
每次上一个Batch Duration的Job执行完成之后,都会返回JobCompleted等信息,基于这些信息产生一个新的Rate,然后将新的Rate通过远程通信交给了Executor中,而Executor也会根据Rate重新设置Rate大小。

来自:csdn snail_gesture
http://blog.csdn.net/snail_gesture/article/details/51700213

本帖被以下淘专辑推荐:

已有(8)人评论

跳转到指定楼层
Aimiyoo 发表于 2016-6-26 10:30:51
不错,辛苦了
回复

使用道具 举报

aboutSunFlower 发表于 2016-7-4 20:13:58
看上去很难得样子
回复

使用道具 举报

天空才是极限 发表于 2017-4-1 13:50:55
看到自己的文章被搬到这里,自己也不知道心理真不是滋味
回复

使用道具 举报

desehawk 发表于 2017-4-1 14:23:45
天空才是极限 发表于 2017-4-1 13:50
看到自己的文章被搬到这里,自己也不知道心理真不是滋味

感谢楼主的贡献,。楼主的文章在互联网很受欢迎
wenz.jpg
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条