分享

SparkListener监听使用方式及自定义的事件处理动作

问题导读

1.SparkListener类的作用事什么?
2.Spark如果想监控各个任务的事件该如何实现?
3.阶段的事件监听接口类是哪个?

概述
spark 提供了一系列整个任务生命周期中各个阶段变化的事件监听机制,通过这一机制可以在任务的各个阶段做一些自定义的各种动作。SparkListener便是这些阶段的事件监听接口类 通过实现这个类中的各种方法便可实现自定义的事件处理动作。

自定义示例代码
  1. import org.apache.spark.internal.Logging
  2. import org.apache.spark.scheduler.{SparkListenerApplicationStart, SparkListenerApplicationEnd, SparkListener}
  3. /**
  4. * Created by silent on 2019/1/11.
  5. */
  6. class MySparkAppListener extends SparkListener with Logging {
  7.   override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
  8.     val appId = applicationStart.appId
  9.     logInfo("***************************************************" + appId.get)
  10.   }
  11.   override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
  12.     logInfo("************************ app end time ************************ " + applicationEnd.time)
  13.   }
  14. }
复制代码
主函数运行示例

  1. object Main extends App {
  2.      val spark = SparkSession.builder()
  3.                  .appName("main")
  4.                  .master("local[2]")
  5.                  .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener")
  6.                  .getOrCreate()
  7.      //spark.sparkContext.addSparkListener(new MySparkAppListener)
  8.      spark.stop()
  9. }
复制代码
说明:

自定义监听sparListener后的注册方式有两种:

方法1:conf配置中指定

  1. //spark2.0以下
  2. val sparkConf=new SparkConf()
  3. sparkConf.set("spark.extraListeners","org.apache.spark.MySparkAppListener")
  4. // spark2.0+
  5. val spark = SparkSession.builder()
  6.                  .appName("main")
  7.                  .master("local[2]")
  8.                  .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener")
  9.                  .getOrCreate()
复制代码


方法2:sparkContext 类中指定
  1. //spark2.0前
  2. val sc = new SparkContext(sparkConf)
  3. sc.addSparkListener(new MySparkAppListener)
  4. //spark2.0+
  5. spark.sparkContext.addSparkListener(new MySparkAppListener)
复制代码



sparkListerner 代码记录

  1. //SparkListener 下各个事件对应的函数名非常直白,即如字面所表达意思。
  2. //想对哪个阶段的事件做一些自定义的动作,变继承SparkListener实现对应的函数即可
  3. abstract class SparkListener extends SparkListenerInterface {
  4.   //阶段完成时触发的事件
  5.   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
  6.   //阶段提交时触发的事件
  7.   override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
  8.   //任务启动时触发的事件
  9.   override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
  10.   //下载任务结果的事件
  11.   override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
  12.   //任务结束的事件
  13.   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
  14.   //job启动的事件
  15.   override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
  16.   //job结束的事件
  17.   override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
  18.   //环境变量被更新的事件
  19.   override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
  20.   //块管理被添加的事件
  21.   override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }
  22.   override def onBlockManagerRemoved(
  23.       blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
  24.   //取消rdd缓存的事件
  25.   override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
  26.   //app启动的事件
  27.   override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
  28.   //app结束的事件 [以下各事件也如同函数名所表达各个阶段被触发的事件不在一一标注]
  29.   override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }
  30.   override def onExecutorMetricsUpdate(
  31.       executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
  32.   override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
  33.   override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
  34.   override def onExecutorBlacklisted(
  35.       executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }
  36.   override def onExecutorUnblacklisted(
  37.       executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }
  38.   override def onNodeBlacklisted(
  39.       nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }
  40.   override def onNodeUnblacklisted(
  41.       nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
  42.   override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
  43.   override def onOtherEvent(event: SparkListenerEvent): Unit = { }
  44. }
复制代码


原文连接:
https://www.cnblogs.com/yyy-blog/p/10253830.html

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条