分享

about云日志分析项目准备11-2:spark 实现业务

本帖最后由 PeersLee 于 2017-5-11 12:28 编辑
问题导读:

1. 如何启动集群?
2. 工程结构是什么样?
3. 处理结果如何储存?




解决方案:


1. 集群启动

1.1 spark

QQ截图20170427123455.jpg

QQ截图20170427123405.jpg

1.2 zookeeper

QQ截图20170427124201.jpg

QQ截图20170427123927.jpg

1.3 kafka

QQ截图20170427124406.jpg

1.4 flume

  1. flume-ng agent --conf-file ~/opt/flume/conf/single_agent.conf --name a1 -Dflume.root.logger=INFO,console
复制代码


2. 工程结构

QQ截图20170427125224.jpg

2.1 delivery

传入一个StreamingContext 和 topics String,返回一个DStream

  1. class KafkaMsg {
  2.   def Processing(ssc: StreamingContext, topics : String, groupID : String): DStream[String] ={
  3.     // kafka集群中的一台或多台服务器统称为broker
  4.     val brokers = "localhost:9092,localhost:9093,localhost:9094"
  5.     // kafka
  6.     val topicsSet = topics.split(",").toSet
  7.     // 参数
  8.     val kafkaParams = Map[String, String](
  9.       "metadata.broker.list" -> brokers,
  10.       "group.id" -> groupID,
  11.       // 说明每次程序启动,从kafka中最开始的第一条消息开始读取
  12.       "auto.offset.reset" -> OffsetRequest.SmallestTimeString
  13.     )
  14.     // 返回一个DStream
  15.     KafkaUtils
  16.       .createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
  17.       .map(_._2)
  18.   }
复制代码


2.2 mongo utils

传入,schema String用来定义mongodb 中字段的 key,rowRdd 对应key, collection是对应集合名字

  1.   def put(schemaStr : String, rowRdd : RDD[Row], collection : String): Unit = {

  2.     val sqlContext = SQLContext.getOrCreate(sc)

  3.     val schema = StructType(schemaStr.split(",")
  4.       .map(column => StructField(column, StringType, true)))
  5.     val df = sqlContext.createDataFrame(rowRdd, schema)
  6.     MongoSpark.save(df.write.option("collection", collection).mode("append"))
  7.   }
复制代码


2.3 processing

2.3.1 Batch

从本地读取数据, 利用Rdd最基本的Api实现业务,然后封装Row传入MongoUtils入库

QQ截图20170427130913.jpg
  • 统计访问about云访问地区包括国内外及所占比率


  1. def IpLocation(sc : SparkContext, mongoSQL : MongoSQL) : Unit = {

  2.     val ipRuelsRdd = sc.textFile("/home/peerslee/spark_data/ip.txt").map(line =>
  3.       // map RDD 的Transformation 操作,用 f 处理一个Rdd 的所有元素,将结果输出到另一个Rdd
  4.     {
  5.       val fields = line.trim().split("\t")
  6.       val start_num = ip2num(fields(0).trim())
  7.       val end_num = ip2num(fields(1).trim())
  8.       val province = fields(2).trim()
  9.       (start_num, end_num, province)
  10.     })
  11.     // 将Rdd 转成Scala数组,并返回
  12.     val ipRulesArray = ipRuelsRdd.collect()
  13.     // 广播变量:保持一个缓存在每台机器上的只读变量
  14.     val ipRulesBroadcast= sc.broadcast(ipRulesArray)
  15.     // ip
  16.     val ipPat = new Regex("((\\d{1,3}\\.){3}\\d{1,3})")
  17.     // ip -> 访问次数 -> 本地数据
  18.     val result = sc.textFile("/home/peerslee/spark_data/ex17032606.log")
  19.       .map(line => { ipPat.findFirstIn(line.toString()).mkString("")})
  20.       .map(ip => {
  21.         var info : Any = None
  22.         if(!ip.isEmpty) {
  23.           val ipNum = ip2num(ip)
  24.           val index = binarySearch(ipRulesBroadcast.value, ipNum)
  25.           info = ipRulesBroadcast.value(index)
  26.         }
  27.         (info, 1L)})
  28.       .reduceByKey(_+_)
  29.     // total
  30.     val total = result.reduce((x, y) => ("t", x._2+y._2))
  31.     // 次数
  32.     val rowRdd = result.map(x => {
  33.       val r = x._2.toFloat/total._2
  34.       (x._1, r)
  35.     }).map(line => Row(line._1.toString, line._2.toString))
  36.     val schemaStr = "loc,rate"
  37.     mongoSQL.put(schemaStr, rowRdd, "IP_rate_2")
  38.   }
复制代码


  • 统计每天访问量比较高的前50篇文章

  1. def Top50(sc : SparkContext, mongoSQL: MongoSQL) : Unit = {
  2.     val pat = "\\shttp://www\\.aboutyun\\.com/thread.*?\\.html.*?".r
  3.     val rddArr = sc.textFile("/home/peerslee/spark_data/ex17032606.log")
  4.       .map(lines => pat.findFirstIn(lines.toString()).mkString("").trim()).filter(!_.isEmpty)
  5.       .map(lines => (lines, 1L)).reduceByKey(_ + _).map(e => (e._2, e._1)).sortByKey(false) take 50
  6.     val rowRdd = sc.parallelize(rddArr).map(line => Row(line._1.toString, line._2.toString))
  7.     val schemaStr = "num,url"
  8.     mongoSQL.put(schemaStr, rowRdd, "Top50_3")
  9.   }
复制代码


  • 统计模块访问量并排序


  1. def ModulePV(sc : SparkContext, sqlContext : SQLContext, MongoSQL : MongoSQL) : Unit = {
  2.     val df = sqlContext.read.json("/home/peerslee/spark_data/boutyun_plate_id.json").select(
  3.       "id", "name", "rank"
  4.     )

  5.     val plateIdRuels = df.collect()
  6.     val plateIdBroadcast = sc.broadcast(plateIdRuels)

  7.     val idPat = new Regex("fid=\\d+")
  8.     // 各个模块访问次数
  9.     val rowRdd = sc.textFile("/home/peerslee/spark_data/ex17032606.log").map(lines =>
  10.       idPat.findFirstIn(lines.toString()).mkString("").replace("fid=","")).filter(!_.isEmpty).map(id => {
  11.       var res : Any = None
  12.       plateIdBroadcast.value.foreach(bc => {
  13.         if(bc(0).equals(id)) {
  14.           res = bc
  15.         }
  16.       })
  17.       (res, 1L)
  18.     }).reduceByKey(_+_).map(e => (e._2, e._1)).sortByKey().filter(_._2 != None)
  19.       .map(line => Row(line._1.toString, line._2.toString))
  20.     val schemaStr = "num,type"
  21.     MongoSQL.put(schemaStr, rowRdd, "module_pv_4")
  22.   }
复制代码


  • 统计使用搜索次数


  1. def SearchUseNum(sc : SparkContext, MongoSQL : MongoSQL) : Unit = {

  2.     val blogPat = ".*?\\shttp://www\\.aboutyun\\.com/search\\.php\\?mod=blog\\s.*?".r
  3.     val forumPat = ".*?\\shttp://www\\.aboutyun\\.com/search\\.php\\?mod=forum\\s.*?".r

  4.     // 各个模块访问次数
  5.     val rowRdd = sc.textFile("/home/peerslee/spark_data/ex17032606.log").map(lines => {
  6.       var res : Any = None
  7.       lines match {
  8.         case blogPat() => res = "blog"
  9.         case forumPat() => res = "forum"
  10.         case _ => res = "no"
  11.       }
  12.       res
  13.     }
  14.     ).filter(!_.equals("no")).map((_, 1L)).reduceByKey(_+_)
  15.       .map(line => Row(line._1.toString, line._2.toString))
  16.     val schemaStr = "search,num"
  17.     MongoSQL.put(schemaStr, rowRdd, "search_use_num_5")
  18.   }
复制代码


  • 统计导航的使用情况


  1. def Navi(sc : SparkContext, MongoSQL : MongoSQL) : Unit = {
  2.     // 读取log
  3.     val ddPat = ".*?\\shttp://www\\.aboutyun\\.com/forum\\.php\\?mod=guide\\s.*?".r
  4.     val bkPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\?mod=space&do=blog\\s.*?".r
  5.     val ztPat = ".*?\\shttp://www\\.aboutyun\\.com/forum\\.php\\?mod=collection\\s.*?".r
  6.     val llPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\s.*?".r
  7.     val ydPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\?mod=space&do=share\\s.*?".r
  8.     // 各个模块访问次数
  9.     val rowRdd = sc.textFile("/home/peerslee/spark_data/ex17032606.log").map(lines => {
  10.       var res : Any = None
  11.       lines match {
  12.         case ddPat() => res = "guide"
  13.         case bkPat() => res = "blog"
  14.         case ztPat() => res = "collection"
  15.         case llPat() => res = "home"
  16.         case ydPat() => res = "space"
  17.         case _ => res = "no"
  18.       }
  19.       res
  20.     }
  21.     ).filter(!_.equals("no"))
  22.       .map((_, 1L)).reduceByKey(_+_)
  23.       .map(line => Row(line._1.toString, line._2.toString))
  24.     val schemaStr = "type,num"
  25.     MongoSQL.put(schemaStr, rowRdd, "navi_6")
  26.   }
复制代码


2.3.2 Stream

从KafkaMsg中读取DStream,实现业务,foreachRdd进行封住,入库

QQ截图20170427130942.jpg

  • 统计访问量,pv,每个人访问该网站的数量,[uv,一个网站被多少人访问]

  1. def PV(dStream : DStream[String], mongoSQL: MongoSQL) : Unit = {

  2.     dStream.map((line: String) => (ipPat.findFirstIn(line).mkString(""), 1L))
  3.       .reduceByKey(_ + _).foreachRDD((rdd : RDD[(String, Long)]) => {
  4.       val rowRdd = rdd.map(r => Row(r._1.toString, r._2.toString))
  5.       val schemaStr = "ip,num"
  6.       mongoSQL.put(schemaStr, rowRdd, "pv_1")
  7.       println("1...ok")
  8.     })
  9.   }
复制代码

  • 统计直接访问量及间接访问量


  1. def DirectOrInDirect(dStream : DStream[String], mongoSQL: MongoSQL) : Unit = {
  2.     dStream.map(line => {
  3.       val url = urlPat.findFirstIn(line)
  4.       var key = "direct"
  5.       if(!url.isEmpty) {
  6.         key = "inDirect"
  7.       }
  8.       (key, 1L)
  9.     }).reduceByKey(_+_).foreachRDD((rdd : RDD[(String, Long)]) => {
  10.       val rowRdd = rdd.map(r => Row(r._1.toString, r._2.toString))
  11.       val schemaStr = "type,num"
  12.       mongoSQL.put(schemaStr, rowRdd, "direct_or_indirect_7")
  13.       println("2...ok")
  14.     })
  15.   }
复制代码

  • 当出现攻击时,统计出可疑10个ip


  1. def DangerIP(rdd : DStream[String],sc : SparkContext, mongoSQL: MongoSQL) : Unit = {
  2.     rdd.filter(_ != "").map(line => {
  3.       (ipPat.findFirstIn(line).toString, 1L)
  4.     }).reduceByKey(_+_).map(i => (i._2, i._1.toString())).foreachRDD{(rdd : RDD[(Long, String)]) => {
  5.         val sort = rdd.sortByKey(false) take 10 // array
  6.         val schemaStr = "num,ip"
  7.         val rowRdd = sc.parallelize(sort).map(r => Row(r._1.toString, r._2.toString))
  8.         mongoSQL.put(schemaStr, rowRdd, "danger_IP_8")
  9.         println("3...ok")
  10.       }
  11.     }
  12.   }
复制代码

  • 统计访问about云访问地区包括国内外及所占比率

  1. def IPLocation(dStream: DStream[String], sc : SparkContext, mongoSQL: MongoSQL): Unit = {
  2.     val ipRulesBroadcast = IPRuels.getInstance(sc)
  3.     val result = dStream.map(line => { ipPat.findFirstIn(line.toString()).mkString("")})
  4.       .map(ip => {
  5.         var info : Any = None
  6.         if(!ip.isEmpty) {
  7.           val ipNum = ip2num(ip)
  8.           val index = binarySearch(ipRulesBroadcast.value, ipNum)
  9.           info = ipRulesBroadcast.value(index)
  10.         }
  11.         (info, 1L)})
  12.       .reduceByKey(_+_)

  13.     result.foreachRDD((rdd : RDD[(Any, Long)]) => {
  14.       // 将total 写在map,for里
  15.       var total = 1L
  16.       try {
  17.          total = rdd.reduce((x, y) => ("t", x._2 + y._2))._2
  18.       } catch {
  19.         case  ex : UnsupportedOperationException => {
  20.           total = 1
  21.         }
  22.       }
  23.       val rowRdd = rdd.map(x => {
  24.         val r = x._2.toFloat/total
  25.         (x._1, r)
  26.       }).map(line => Row(line._1.toString, line._2.toString))
  27.       val schemaStr = "loc,rate"
  28.       mongoSQL.put(schemaStr, rowRdd, "IP_rate_2")
  29.     })
  30.   }

复制代码


  • 统计每天访问量比较高的前50篇文章


  1. def Top50(dStream: DStream[String], sc : SparkContext, mongoSQL: MongoSQL) : Unit = {
  2.     val pat = "\\shttp://www\\.aboutyun\\.com/thread.*?\\.html.*?".r
  3.     dStream.foreachRDD(rdd => {
  4.       val rddArr = rdd.map(lines => pat.findFirstIn(lines.toString()).mkString("").trim()).filter(!_.isEmpty)
  5.         .map(lines => (lines, 1L)).reduceByKey(_ + _).map(e => (e._2, e._1)).sortByKey(false) take 50
  6.       val rowRdd = sc.parallelize(rddArr).map(line => Row(line._1.toString, line._2.toString))
  7.       val schemaStr = "num,url"
  8.       mongoSQL.put(schemaStr, rowRdd, "Top50_3")
  9.     })

  10.   }

复制代码


  • 统计模块访问量并排序

  1. def ModulePV(dStream: DStream[String], sqlContext : SQLContext, MongoSQL : MongoSQL) : Unit = {

  2.     val plateIdBroadcast = PlateIDRuels.getInstance(sqlContext)

  3.     val idPat = new Regex("fid=\\d+")
  4.     // 各个模块访问次数
  5.     dStream
  6.       .map(lines =>
  7.       idPat.findFirstIn(lines.toString()).mkString("").replace("fid=","")).filter(!_.isEmpty).map(id => {
  8.       var res : Any = None
  9.       plateIdBroadcast.value.foreach(bc => {
  10.         if(bc(0).equals(id)) {
  11.           res = bc
  12.         }
  13.       })
  14.       (res, 1L)
  15.     }).reduceByKey(_+_).map(e => (e._2, e._1)).foreachRDD(rdd => {
  16.       val rowRdd = rdd.sortByKey().filter(_._2 != None)
  17.         .map(line => Row(line._1.toString, line._2.toString))
  18.       val schemaStr = "num,type"
  19.       MongoSQL.put(schemaStr, rowRdd, "module_pv_4")
  20.     })

  21.   }

复制代码


  • 统计使用搜索次数


  1. def SearchUseNum(dStream: DStream[String], MongoSQL : MongoSQL) : Unit = {

  2.     // 各个模块访问次数
  3.     dStream.foreachRDD(rdd => {
  4. //      rdd.collect().foreach(println)
  5.       val Rdd = rdd.map(lines => {
  6.         val blogPat = ".*?\\shttp://www\\.aboutyun\\.com/search\\.php\\?mod=blog\\s.*?".r
  7.         val forumPat = ".*?\\shttp://www\\.aboutyun\\.com/search\\.php\\?mod=forum\\s.*?".r
  8.         var res: Any = None
  9.         lines.toString match {
  10.           case blogPat() => res = "blog"
  11.           case forumPat() => res = "forum"
  12.           case _ => res = "no"
  13.         }
  14.         res
  15.       }
  16.       ).filter(!_.equals("no"))
  17.       val rowRdd = Rdd.map((_, 1L)).reduceByKey(_ + _)
  18.         .map(line => Row(line._1.toString, line._2.toString))
  19.       val schemaStr = "search,num"
  20.       MongoSQL.put(schemaStr, rowRdd, "search_use_num_5")
  21.     }
  22.     )
  23.   }

复制代码


  • 统计导航的使用情况


  1. def Navi(dStream: DStream[String], MongoSQL : MongoSQL) : Unit = {
  2.     // 读取log
  3.     val ddPat = ".*?\\shttp://www\\.aboutyun\\.com/forum\\.php\\?mod=guide\\s.*?".r
  4.     val bkPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\?mod=space&do=blog\\s.*?".r
  5.     val ztPat = ".*?\\shttp://www\\.aboutyun\\.com/forum\\.php\\?mod=collection\\s.*?".r
  6.     val llPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\s.*?".r
  7.     val ydPat = ".*?\\shttp://www\\.aboutyun\\.com/home\\.php\\?mod=space&do=share\\s.*?".r
  8.     // 各个模块访问次数
  9.     dStream.map(lines => {
  10.       var res : Any = None
  11.       lines match {
  12.         case ddPat() => res = "guide"
  13.         case bkPat() => res = "blog"
  14.         case ztPat() => res = "collection"
  15.         case llPat() => res = "home"
  16.         case ydPat() => res = "space"
  17.         case _ => res = "no"
  18.       }
  19.       res
  20.     }
  21.     ).filter(!_.equals("no"))
  22.       .map((_, 1L)).reduceByKey(_+_)
  23.       .foreachRDD(rdd => {
  24.         println(rdd.collect().toBuffer)
  25.         val rowRdd = rdd.map(line => Row(line._1.toString, line._2.toString))
  26.         val schemaStr = "type,num"
  27.         MongoSQL.put(schemaStr, rowRdd, "navi_6")
  28.       })
  29.   }

复制代码


3. 结果展示

QQ截图20170427131450.jpg

3.1 Top50

QQ截图20170427131624.jpg

3.2 IP_rate


QQ截图20170427131743.jpg


完整代码下载: 地址
链接: https://pan.baidu.com/s/1c1BMjbq 密码: 7d6a

没找到任何评论,期待你打破沉寂

关闭

推荐上一条 /2 下一条