分享

Spark1.0.0 多语言编程之python实现

问题导读:  
1.python开发Spark Application拥有哪些优势?
2.sogou日志数据分析python如何实现的?
3.Spark1.0.0的Python程序是否可以使用spark-submit提交?








Spark公开了pyhton的编程模型-PySpark,开发者通过PySpark可以很容易开发Spark application。
但是Python API和Scala API略有不同:
  • Python是动态语言,RDD可以持有不同类型的对象
  • PySpark目前并没有支持全部的API,但核心部分已经全部支持

      在PySpark里,RDD支持scala一样的方法,只不过这些方法是Python函数来实现的,返回的也是Python的集合类型;对于RDD方法中使用的短函数可以使用Python的lambda语法实现。
      不过python开发Spark Application拥有很多优势:
  • 不需要编译,使用方便
  • 可以与许多系统集成,特别是NoSQL大部分都提供了python开发包


1:开发环境
      笔者的Spark开发环境参见Spark1.0.0 开发环境快速搭建,另外在操作系统安装的时候已经默认安装了python:


2:sogou日志数据分析python实现
A:用户在00:00:00到12:00:00之间的查询数

  1. import sys  
  2. from pyspark import SparkContext  
  3.   
  4. if __name__ == "__main__":  
  5.     if len(sys.argv) != 2:  
  6.         print >> sys.stderr, "Usage: SogouA <file>"  
  7.         exit(-1)  
  8.     sc = SparkContext(appName="SogouA")  
  9.     sgRDD = sc.textFile(sys.argv[1])  
  10.     print sgRDD.filter(lambda line : line.split('\t')[0] >= '00:00:00' and line.split('\t')[0] <= '12:00:00').count()  
  11.     sc.stop()  
复制代码

虚拟集群中任意节点运行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouA.py hdfs://hadoop1:8000/dataguru/data/mini.txt
运行结果:527300

B:搜索结果排名第1,但是点击次序排在第2的数据有多少?

  1. import sys  
  2. from pyspark import SparkContext  
  3.   
  4. if __name__ == "__main__":  
  5.     if len(sys.argv) != 2:  
  6.         print >> sys.stderr, "Usage: SogouB <file>"  
  7.         exit(-1)  
  8.     sc = SparkContext(appName="SogouB")  
  9.     sgRDD = sc.textFile(sys.argv[1])  
  10.     print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : line.split('\t')[3]).filter(lambda line : int(line.split(' ')[0])==1 and int(line.split(' ')[1])==2).count()  
  11.     sc.stop()  
复制代码

虚拟集群中任意节点运行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouB.py hdfs://hadoop1:8000/dataguru/data/mini.txt
运行结果:79765

C:一个session内查询次数最多的用户的session与相应的查询次数

  1. import sys  
  2. from pyspark import SparkContext  
  3.   
  4. if __name__ == "__main__":  
  5.     if len(sys.argv) != 2:  
  6.         print >> sys.stderr, "Usage: SogouC <file>"  
  7.         exit(-1)  
  8.     sc = SparkContext(appName="SogouC")  
  9.     sgRDD = sc.textFile(sys.argv[1])  
  10.     print sgRDD.filter(lambda line : len(line.split('\t')) == 5).map(lambda line : (line.split('\t')[1],1)).reduceByKey(lambda x , y : x + y ).map(lambda pair : (pair[1],pair[0])).sortByKey(False).map(lambda pair : (pair[1],pair[0])).take(10)  
  11.     sc.stop()  
复制代码

虚拟集群中任意节点运行命令:./bin/spark-submit --master spark://hadoop1:7077 --executor-memory 3g --driver-memory 1g SogouC.py hdfs://hadoop1:8000/dataguru/data/mini.txt
运行结果:[(u'11579135515147154', 431), (u'6383499980790535', 385), (u'7822241147182134', 370), (u'900755558064074', 335), (u'12385969593715146', 226), (u'519493440787543', 223), (u'787615177142486', 214), (u'502949445189088', 210), (u'2501320721983056', 208), (u'9165829432475153', 201)]

3:疑问
      笔者使用spark-submit从客户端提交python程序给虚拟集群运行时,出现Task错误,不知是集群和客户端Python版本
不同造成的,还是其他原因造成的。等稍空的时候研究一下源码。
20140615191708046.jpg











没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条