分享

SparkSQL结合SparkStreaming,通过SQL实现实时计算数据统计


问题导读

1.本文的场景是什么?
2.SparkSQL结合SparkStreaming,通过SQL实现实时计算数据统计需要做哪些准备?
3.SparkSQL结合SparkStreaming,通过SQL实现实时计算数据统计如何通过代码实现?







Flume+Kafka+SparkStreaming已经发展为一个比较成熟的实时日志收集与计算架构,利用Kafka,即可以支持将用于离线分析的数据流到HDFS,又可以同时支撑多个消费者实时消费数据,包括SparkStreaming。然而,在SparkStreaming程序中如果有复杂业务逻辑的统计,使用scala代码实现起来比较困难,也不易于别人理解。但如果在SparkSteaming中也使用SQL来做统计分析,是不是就简单的多呢?

本文介绍将SparkSQL与SparkStreaming结合起来,使用SQL完成实时的日志数据统计。
SparkStreaming程序以yarn-cluster模式运行在YARN上,不单独部署Spark集群。


环境部署

Hadoop-2.3.0-cdh5.0.0(YARN)
spark-1.5.0-bin-hadoop2.3
kafka_2.10-0.8.2.1
另外,还编译了SparkStreaming用于读取Kafka数据的插件:
spark-streaming-kafka_2.10-1.5.0.jar
相关环境的部署本文不做介绍


实时统计需求

以60秒为间隔,统计60秒内的pv,ip数,uv
最终结果包括:
时间点:pv:ips:uv



原始日志格式

[mw_shl_code=bash,true]2015-11-11T14:59:59|~|xxx|~|202.109.201.181|~|xxx|~|xxx|~|xxx|~|B5C96DCA0003DB546E7
2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|B1611D0E00003857808
2015-11-11T14:59:59|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|1555BD0100016F2E76F
2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E
2015-11-11T15:00:00|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|C0EA13670E0B942E70E
2015-11-11T15:00:01|~|xxx|~|125.119.144.252|~|xxx|~|xxx|~|xxx|~|4E3512790001039FDB9[/mw_shl_code]

每条日志包含7个字段,分隔符为|~|,其中,第3列为ip,第7列为cookieid。
假设原始日志已经由Flume流到Kafka中。


SparkStreaming程序代码

程序中使用下面的SQL语句完成对一个批次的数据统计:

[mw_shl_code=bash,true]SELECT date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') AS time,
COUNT(1) AS pv,
COUNT(DISTINCT ip) AS ips,
COUNT(DISTINCT cookieid) as uv
FROM daplog[/mw_shl_code]

SparkStreaming程序代码:

[mw_shl_code=bash,true]package com.lxw.test

import scala.reflect.runtime.universe

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.kafka.KafkaUtils
/**
* auth:lxw1234
* http://lxw1234.com
*/
object DapLogStreaming {
  
  def main (args : Array[String]) {
    val sparkConf = new SparkConf().setMaster("yarn-cluster").setAppName("DapLogStreaming")
    //每60秒一个批次
    val ssc = new StreamingContext(sparkConf, Seconds(60))
    //从Kafka中读取数据,topic为daplog,该topic包含两个分区
    val kafkaStream = KafkaUtils.createStream(
          ssc,
          "bj11-65:2181", //Kafka集群使用的zookeeper
          "group_spark_streaming", //该消费者使用的group.id
          Map[String, Int]("daplog" -> 0,"daplog" -> 1), //日志在Kafka中的topic及其分区
          StorageLevel.MEMORY_AND_DISK_SER)
      .map(x => x._2.split("\\|~\\|", -1))  //日志以|~|为分隔符
   
    kafkaStream.foreachRDD((rdd: RDD[Array[String]], time: Time) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._
      //构造case class: DapLog,提取日志中相应的字段
      val logDataFrame = rdd.map(w => DapLog(w(0).substring(0, 10),w(2),w(6))).toDF()
      //注册为tempTable
      logDataFrame.registerTempTable("daplog")
      //查询该批次的pv,ip数,uv
      val logCountsDataFrame =
        sqlContext.sql("select date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss') as time,count(1) as pv,count(distinct ip) as ips,count(distinct cookieid) as uv from daplog")
      //打印查询结果
      logCountsDataFrame.show()
    })
   
   
    ssc.start()
    ssc.awaitTermination()
   
  }
  
  
}

case class DapLog(day:String, ip:String, cookieid:String)

object SQLContextSingleton {
  @transient  private var instance: SQLContext = _
  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}[/mw_shl_code]

示例中只是将实时统计的结果打印到标准输出,真实场景一般是将结果持久化到数据库中。

将该程序打包成DapLogStreaming.jar,上传至网关机。



运行SparkStreaming程序

进入$SPARK_HOME/bin
执行下面的命令,将SparkStreaming程序提交到YARN:

[mw_shl_code=bash,true]./spark-submit \
--class com.lxw.test.DapLogStreaming \
--master yarn-cluster \
--executor-memory 2G \
--num-executors 6 \
--jars /home/liuxiaowen/kafka-clients-0.8.2.1.jar,/home/liuxiaowen/metrics-core-2.2.0.jar,/home/liuxiaowen/zkclient-0.3.jar,/home/liuxiaowen/spark-streaming-kafka_2.10-1.5.0.jar,/home/liuxiaowen/kafka_2.10-0.8.2.1.jar \
/home/liuxiaowen/DapLogStreaming.jar[/mw_shl_code]

注意:SparkStreaming及Kafka插件运行时候需要依赖相应的jar包。

查看运行结果


进入YARN ResourceManager的WEB界面,找到该程序对应的Application,
点击ApplicationMaster的链接,进入SparkMaster界面:
1.jpg
每个批次(60秒),会生成一个Job。
点击TAB页”Streaming”,进入Streaming的监控页面:
2.jpg
在最下方,显示正在处理的批次和已经完成的批次,包括每个批次的events数量。
最后,最主要的,点击ApplicationMaster的logs链接,查看stdout标准输出:
3.jpg
4.jpg

已经按照SQL中统计的字段,打印出统计结果,每60秒一个批次打印一次。

注意事项

由于kafka_2.10-0.8.2.1是基于Scala2.10的,因此Spark、Spark的Kafka插件、SparkStreaming应用程序都需要使用Scala2.10,如果使用Scala2.11,运行时候会报出因Scala版本不一致而造成的错误,比如:
[mw_shl_code=bash,true]15/11/11 15:36:26 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
        at org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:59)
        at com.lxw.test.DapLogStreaming$.main(DapLogStreaming.scala:23)
        at com.lxw.test.DapLogStreaming.main(DapLogStreaming.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)[/mw_shl_code]






原文链接:http://www.tuicool.com/articles/uIRZFv

已有(5)人评论

跳转到指定楼层
regan 发表于 2015-12-10 09:07:06
谢谢楼主分享

回复

使用道具 举报

a_zhen 发表于 2015-12-10 10:31:05
spark的东西是不是有点多啊
回复

使用道具 举报

woshichuanqi 发表于 2016-4-7 21:56:31
在sparksql中连接MySQL数据库 能否直接执行sql语句操作MySQL?
回复

使用道具 举报

iot-lee 发表于 2019-7-26 13:40:27
sparkstreaming 读取kafka日志数据,窗口处理为1分钟,日志数据自带时间戳,想根据日志时间戳进行分钟级别的数据统计,这个怎么支持实现?structed streaming 支持event Time,但是怎么让sparkstreaming 实现这种统计呢




企业微信截图_15641195459272.png
回复

使用道具 举报

琅琊榜尾 发表于 2019-7-29 01:00:30
没有flume搜集的么
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条