分享

about云日志分析项目准备10-5:本地模式之根据ip计算地区访问论坛的比率

本帖最后由 PeersLee 于 2017-3-20 17:56 编辑
问题导读:
1. Spark 都有哪些常用的Rdd API?
2. 如何实现“地区论坛访问比率计算”Demo?




解决方案:

Spark RDD 常用API解析

map

  1.   
  2. val rdd = sc.parallelize(1 to 10)
  3. /*
  4. map:
  5. 1. 使用函数f 处理rdd 中的所有元素,产生一个新的mapRdd
  6. 2. 不会改变partition 数量
  7. */
  8. val mapRdd = rdd.map(_*2)

  9. print (mapRdd.collect().toBuffer)
  10. //ArrayBuffer(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
复制代码


reduce

reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

  1.   
  2. scala> val c = sc.parallelize(1 to 10)
  3. scala> c.reduce((x, y) => x + y)
  4. res4: Int = 55
复制代码




flatMap

  1.   
  2. val rdd = sc.textFile("file:/home/peerslee/data.txt")

  3. // 进行元素(行)遍历,但是会返回多个元素(根据空格分割单词)
  4. val flatMapRdd = rdd.flatMap(line => line.split("\\s"))

  5. println(flatMapRdd.collect().toBuffer)
  6. //ArrayBuffer(hello, spark, rdd, fire, fight)
复制代码


filter

  1.   
  2. val rdd = sc.parallelize(1 to 10)
  3. /*
  4. 函数f 返回值为boolean值,将满足要求的元素合成一个新的rdd.
  5. */
  6. val filterRdd = rdd.filter(_%2 == 0)
  7. println(filterRdd.collect().toBuffer)
  8. //ArrayBuffer(2, 4, 6, 8, 10)
复制代码


mapPartitions

  1.   val rdd = sc.parallelize(1 to 10)
  2. /*
  3. 1. 同map
  4. 2. 传入的是一个分区的所有iterator集合
  5. */
  6. val mapPartitionsRdd = rdd.mapPartitions(iter => iter.filter(_%2 == 1))
  7. println(mapPartitionsRdd.collect().toBuffer)
  8. //ArrayBuffer(1, 3, 5, 7, 9)
复制代码


glom
  1.   
  2. val rdd = sc.parallelize(1 to 10)
  3. // 将每个分区转化为数组
  4. val glomRdd = rdd.glom()
  5. glomRdd.foreach(arr => {
  6.   arr.foreach(print)
  7.   println()
  8. })
复制代码


17/03/16 15:35:57 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
16872
9
10
345
17/03/16 15:35:57 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 915 bytes result sent to driver

distinct

  1.   val rdd = sc.parallelize(Array(1,1,1,2,3,3,4,4,4,4,5))
  2. /*
  3. 1. 去重
  4. 2. 改变partition数量
  5. */
  6. val distinctRdd = rdd.distinct(2)

  7. println(distinctRdd.collect().toBuffer)
  8. //ArrayBuffer(4, 2, 1, 3, 5)
复制代码


cartesian

  1.   val rdd1 = sc.parallelize(Array('a', 'b', 'c'))
  2. val rdd2 = sc.parallelize(Array('A', 'B', 'C'))
  3. /*
  4. 求两个Rdd 之间的笛卡尔积,并返回
  5. */
  6. val cartesianRdd = rdd1.cartesian(rdd2)
  7. println(cartesianRdd.collect().toBuffer)
  8. //ArrayBuffer((a,A), (a,B), (a,C), (b,A), (b,B), (b,C), (c,A), (c,B), (c,C))
复制代码


union

  1.   val rdd1 = sc.parallelize(Array('a', 'b', 'c'))
  2. val rdd2 = sc.parallelize(Array('A', 'B', 'C'))
  3. /*
  4. 将两个rdd 合并成一个rdd
  5. */
  6. val unionRdd = rdd1.union(rdd2)
  7. println(unionRdd.collect().toBuffer)
  8. //ArrayBuffer(a, b, c, A, B, C)
复制代码


mapValues

  1.   val rdd = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
  2. /*
  3. 对kv形式的rdd中的value进行操作,返回一个Rdd
  4. */
  5. val mapValuesRdd = rdd.mapValues(num => num *2)
  6. println(mapValuesRdd.collect().toBuffer)
  7. //ArrayBuffer((A,2), (B,4), (C,6))
复制代码


subtract
  1.   
  2. val rdd1 = sc.parallelize(Array("A", "B", "C", "a", "b", "c"))
  3. val rdd2 = sc.parallelize(Array("A", "B", "C", "D"))

  4. /*
  5. 找到rdd1 中有,rdd2 没有的元素
  6. */
  7. val subtractRdd1 = rdd1.subtract(rdd2)
  8. println(subtractRdd1.collect().toBuffer)
  9. // ArrayBuffer(a, b, c)
  10. val subtractRdd2 = rdd2.subtract(rdd1)
  11. println(subtractRdd2.collect().toBuffer)
  12. // ArrayBuffer(D)
复制代码


sample
  1.   
  2. val rdd = sc.parallelize(Array("A", "B", "C", "a", "b"))
  3. /*
  4. 随机抽取元素,
  5. 百分比,随机种子
  6. 返回rdd
  7. */
  8. val sampleRdd = rdd.sample(true, 0.6, 2)
  9. println(sampleRdd.collect().toBuffer)
  10. // ArrayBuffer(B, C, b)
复制代码


takeSample

  1.   
  2. val rdd = sc.parallelize(Array("A", "B", "C", "a", "b"))
  3. /*
  4. 同上,
  5. 可以指定个数,
  6. 返回array
  7. */
  8. val takeSampleRdd = rdd.takeSample(true, 2, 1)
  9. println(takeSampleRdd.toBuffer)
  10. // ArrayBuffer(a, b)
复制代码

groupBy

  1.   val rdd = sc.parallelize(Array("A1", "B1", "C1", "A2", "B2", "A3"))
  2. /*
  3. 根据,f产生的key进行分组
  4. */
  5. val groupByRdd = rdd.groupBy(_.substring(0, 1))
  6. println(groupByRdd.collect().toBuffer)
  7. //ArrayBuffer((A,CompactBuffer(A1, A2, A3)), (B,CompactBuffer(B1, B2)), (C,CompactBuffer(C1)))
复制代码


partitionBy

适用key-value对
对RDD重新分区
如果相同,返回本身

cogroup

  1.   val rdd1 = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 1), ('b',2), ('c', 1)))
  2. val rdd2 = sc.parallelize(Array(('a', 11), ('a', 22), ('b', 11), ('b',22), ('c', 11)))
  3. /*
  4. 1. 对于kv
  5. 2. 将一个两个Rdd 中相同key,整合为新的kv,返回新的rdd
  6. */
  7. val cogroupRdd = rdd1.cogroup(rdd2)
  8. println(cogroupRdd.collect().toBuffer)
  9. // ArrayBuffer((a,(CompactBuffer(1, 2),CompactBuffer(11, 22))), (b,(CompactBuffer(1, 2),CompactBuffer(11, 22))), (c,(CompactBuffer(1),CompactBuffer(11))))
复制代码


combineByKey
  1.   
  2. val rdd = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 3), ('b',4), ('c', 5)))
  3. /*
  4. 将每个分区的元素按照,key,合并
  5. */
  6. val combineByKeyRdd = rdd.combineByKey((v : Int) => List(v),
  7.   (c : List[Int], v : Int) => v::c,
  8.   (c1 : List[Int], c2 : List[Int]) => c1:::c2)

  9. println(combineByKeyRdd.collect().toBuffer)
  10. // ArrayBuffer((a,List(1, 2)), (b,List(3, 4)), (c,List(5)))
复制代码


reduceByKey
  1.   // 相同key,用f处理value
  2. val rdd = sc.parallelize(Array(('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b',1), ('c', 1)))
  3. val reduceByKeyRdd = rdd.reduceByKey((v1 : Int, v2 : Int) => v1+v2)

  4. println(reduceByKeyRdd.collect().toBuffer)
  5. // ArrayBuffer((a,3), (b,2), (c,1))
复制代码


join

  1.   val rdd1 = sc.parallelize(Array(('a', 1), ('a', 2), ('b', 1), ('b',2), ('c', 1)))
  2.   val rdd2 = sc.parallelize(Array(('a', 11), ('a', 22), ('b', 11), ('b',22), ('c', 11)))

  3. //相同key,value做笛卡尔积
  4. val joinRdd = rdd1.join(rdd2)
  5. println(joinRdd.collect().toBuffer)
  6. //ArrayBuffer((a,(1,11)), (a,(1,22)), (a,(2,11)), (a,(2,22)), (b,(1,11)), (b,(1,22)), (b,(2,11)), (b,(2,22)), (c,(1,11)))
复制代码



地区比率计算

思路:

  • 将log文件读到spark RDD 中
  • 将ip规则表(ip.txt)也读到Spark Rdd 中,处理成为我们想要的形式(start_num, end_num, province) 形成一个新的Rdd,将这个规则做一次广播
  • 正则匹配出log中的访问者的ip,形成一个访问者ip的Rdd
  • 对访问着ip的rdd进行处理,根据ip规则找出所属于的城市,然后设置城市为key,value=1,把key相同的元素的value相加
  • 求出总共有多少个元素,再直接求比率


实现:

  1. import org.apache.spark.{SparkConf, SparkContext}

  2. import scala.util.matching.Regex

  3. /**
  4.   * Created by peerslee on 17-3-10.
  5.   */
  6. object IpLocationByTime {
  7.   // 将ip地址转换为整数
  8.   def ip2num(ip : String) : Long = {
  9.     val fragments = ip.split("\\.")
  10.     var ipNum = 0L
  11.     for (i <- 0 until fragments.length) {
  12.       // 与运算
  13.       ipNum = fragments(i).toLong | ipNum << 8L
  14.     }
  15.     ipNum
  16.   }
  17.   // 折半查找
  18.   def binarySearch(lines:Array[(Long,Long,String)],ip:Long): Int ={
  19.     var low =0
  20.     var high = lines.length-1
  21.     while (low<=high){
  22.       val  middle = (low + high)/2
  23.       if((ip>=lines(middle)._1)&&(ip<=lines(middle)._2)){
  24.         return middle
  25.       }
  26.       if(ip<lines(middle)._1){
  27.         high=middle-1
  28.       }else{
  29.         low = middle +1
  30.       }
  31.     }
  32.     -1
  33.   }
  34.   def main (args : Array[String]): Unit = {
  35.     val conf = new SparkConf().setAppName("IpLocationByTime").setMaster("local")
  36.     val sc = new SparkContext(conf)

  37.     // 加载ip属地规则
  38.     val ipRuelsRdd = sc.textFile("/home/peerslee/spark_data/ip.txt").map(line =>
  39.       // map RDD 的Transformation 操作,用 f 处理一个Rdd 的所有元素,将结果输出到另一个Rdd
  40.     {
  41.       val fields = line.trim().split("\t")
  42.       val start_num = ip2num(fields(0).trim())
  43.       val end_num = ip2num(fields(1).trim())
  44.       val province = fields(2).trim()
  45.       (start_num, end_num, province)
  46.     })
  47.     // 将Rdd 转成Scala数组,并返回
  48.     val ipRulesArray = ipRuelsRdd.collect()

  49.     // 广播变量:保持一个缓存在每台机器上的只读变量
  50.     val ipRulesBroadcast= sc.broadcast(ipRulesArray)

  51.     // ip
  52.     val ipPat = new Regex("((\\d{1,3}\\.){3}\\d{1,3})")

  53.     // 处理加载的数据
  54.     val ipsRDD = sc.textFile("/home/peerslee/spark_data/ex17020509.log").map(line =>
  55.     {
  56.       // 需要字符串
  57.       ipPat.findFirstIn(line.toString()).mkString("")
  58.     })
  59.     // ((2007496192,2007496447,山东省青岛市,北京百度网讯科技有限公司联通节点),30)
  60.     val result = ipsRDD.map(ip => {
  61.       var info : Any = None
  62.       if(!ip.isEmpty) {
  63.         val ipNum = ip2num(ip)
  64.         val index = binarySearch(ipRulesBroadcast.value, ipNum)
  65.         info = ipRulesBroadcast.value(index)
  66.       }
  67.       (info, 1L)
  68.     }).reduceByKey(_+_) // 按照key 求和
  69. //    for(r <- result.collect()) {
  70. //      println(r)
  71. //    }

  72.     // total:(Total,94)
  73.     val total = result.reduce((x, y) => {
  74.       val v = x._2 + y._2
  75.       ("Total", v)
  76.     })
  77.     // ((2007496192,2007496447,山东省青岛市,北京百度网讯科技有限公司联通节点),0.31914893)
  78.     val rate = result.map(x => {
  79.       val r = x._2.toFloat / total._2
  80.       (x._1, r)
  81.     })
  82.     for(r <- rate.collect()) {
  83.       println(r)
  84.     }

  85.     sc.stop()

  86.   }
  87. }
复制代码
2017-03-17 18-10-43 的屏幕截图.jpg

其他:

log
  1. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 352 1057 31
  2. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 352 1058 31
  3. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 370 1057 31
  4. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 370 1054 31
  5. 2017-02-05 09:42:04 GET /uc_server/avatar.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 370 1054 31
  6. 2017-02-05 09:42:04 GET /plugin.php 58.211.2.138 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 925 1072 140
  7. 2017-02-05 09:42:04 GET /uc_server/data/avatar/000/00/55/20_avatar_middle.jpg 58.211.2.60 Mozilla/5.0+(Windows+NT+6.1;+WOW64)+AppleWebKit/537.36+(KHTML,+like+Gecko)+Chrome/55.0.2883.87+Safari/537.36 [url]http://www.aboutyun.com/thread-7746-1-1.html[/url] [url]www.aboutyun.com[/url] 0 6430 1156 109
复制代码


ip.txt(部分)
  1.    0.0.0.0          0.255.255.255        IANA,保留地址
  2.         1.0.0.0              1.0.0.255        澳大利亚, CZ88.NET
  3.         1.0.1.0              1.0.3.255        福建省,电信
  4.         1.0.4.0              1.0.7.255        澳大利亚, CZ88.NET
  5.         1.0.8.0             1.0.15.255        广东省,电信
  6.        1.0.16.0             1.0.31.255        日本,Beacon服务器
  7.        1.0.32.0             1.0.63.255        广东省,电信
  8.        1.0.64.0            1.0.127.255        日本,広島県中区大手町Energia通信公司
  9.       1.0.128.0            1.0.255.255        泰国, CZ88.NET
  10.         1.1.0.0              1.1.0.255        福建省,电信
  11.         1.1.1.0              1.1.1.255        澳大利亚,亚太互联网络信息中心
  12.         1.1.2.0              1.1.7.255        福建省,电信
  13.         1.1.8.0             1.1.63.255        广东省,电信
  14.        1.1.64.0            1.1.127.255        日本,东京都新宿区歌舞伎町i2ts公司
  15.       1.1.128.0            1.1.142.255        泰国,穆达汉
  16.       1.1.143.0            1.1.144.255        泰国, CZ88.NET
  17.       1.1.145.0            1.1.147.255        泰国,沙功那空
  18.       1.1.148.0            1.1.149.255        泰国, CZ88.NET
  19.       1.1.150.0            1.1.150.128        泰国,沙功那空
  20.     1.1.150.129            1.1.150.255        泰国,曼谷
  21.       1.1.151.0            1.1.151.255        泰国, CZ88.NET
  22.       1.1.152.0            1.1.152.127        泰国,曼谷
  23.     1.1.152.128            1.1.152.255        泰国,沙功那空
  24.       1.1.153.0            1.1.153.255        泰国,沙功那空廊曼
  25.       1.1.154.0            1.1.157.255        泰国,沙功那空
  26.       1.1.158.0            1.1.160.255        泰国,曼谷
复制代码

没找到任何评论,期待你打破沉寂

关闭

推荐上一条 /2 下一条