分享

Spark 高级分析:第二章第4节 开始学习Spark Shell和SparkContext

本帖最后由 feilong 于 2017-10-13 11:57 编辑
问题导读

1.什么是Spark Shell,SparkContext,REPL?

2.什么是RDD?
3.如何创建RDD?
4.var 和 val有什么区别?




上一篇:Spark 高级分析:第二章第2,3节
http://www.aboutyun.com/forum.ph ... 53&page=1#pid241808



       我们将使用来自加州大学欧文分校机器学习库的示例数据集,这对研究和教育来说是一个有趣的(和免费的)数据集的极好来源。我们要分析的数据集是从一个记录联动策划研究,是在2010个德国医院进行的,它包含了几个亿对病人的记录是按照不同的标准相匹配,如病人的姓名(姓),地址,和他们的生日。每个匹配字段是根据字符串的相似程度从0到1分配了一个数值,然后对数据进行手工标记,以确定哪些对代表同一个人,哪些没有。用于创建数据集的字段本身的基本值被删除,以保护患者的隐私,而数字标识符、字段的匹配分数和每对(匹配与非匹配)的标签被公布用于记录链接研究。[mw_shl_code=shell,false]      
       $ mkdir linkage     
       $ cd linkage/
       $ curl -o donation.zip https://archive.ics.uci.edu/ml/m ... /00210/donation.zip
       $ unzip donation.zip
       $ unzip 'block*.zip'[/mw_shl_code]
      如果你手边有Hadoop集群,可以在HDFS上创建目录,并将数据集拷贝到这里:
       [mw_shl_code=shell,false]
       $ hadoop fs -mkdir linkage
       $ hadoop fs -put block*csv linkage[/mw_shl_code]
现在我们准备使用类似于scala语言的即时交互编程环境REPL(读取-求值-打印-循环),同时他有spark扩展展板Spark-Shell。如果你之前没有用过REPL,可以认为它类似于R的编程环境:这是一个能让你用Scala定义函数和操作数据的地方。
如果你有支持YARN的Hadoop集群,你可以以yarn-client方式向Spark master提交Spark作业:
[mw_shl_code=shell,false]
$ spark-shell --master yarn-client[/mw_shl_code]
如果你在你自己电脑上运行实例,你可以以local[N]的方式向本地Spark集群提交,N代表同时运行线程的数量,或者是用*匹配你的机器的核数。例如对运行了8个线程在一台八核的机器上的本地集群来说:
[mw_shl_code=shell,false]
$ spark-shell --master local[/mw_shl_code]

实例在本地同样可以运行。你只用传递本地文件路径,而不是HDFS上以hdfs://开头的路径。
本书剩余的示例不会显示--master参数,但是你通常需要指定这个参数适合你的环境。
你需要声明其他的参数以使Spark Shell充分利用你的资源。例如,在本地运行,你可以设置--drivermemory 2g让单个进程使用2G的内存。YARN内存配置更复杂一些,相关参数--executor -memory 会在Spark on YARN中 阐述。
运行这些命令中的一个之后,你会看到它初始化时的大量日志消息,但你还应该看到一点ASCII艺术,接着是一些附加的日志消息和提示符:
       a.png 如果你是第一次使用Spark Shell(或者是任何其他类似Scala REPL的应用),可以使用help命令列出可用命令。history和h?命令可以帮你列出你在会话中想写但一下又找不到的变量或函数名称。paste命令可以帮你从剪贴板正确插入代码--随书以及随附的源代码。
关于help除此之外要注意,Spark日志信息表明:Spark context 就是sc。这是SparkContext的引用,它来协调Spark作业在集群上的执行。在命令行输入sc:
[mw_shl_code=shell,false]
sc
...
res0: org.apache.spark.SparkContext =
org.apache.spark.SparkContext@DEADBEEF[/mw_shl_code]
REPL将打印对象的字符串形式,对SparkContext对象来说,这就是它的名字加在内存中的对象的十六进制地址(deadbeef是占位符;每次运行你看到这个精确值可能会有所不同。)
sc变量存在的确很好,但是我们又能用它来做什么呢?SparkContext是一个对象,作为对象,它具有与它相关的方法。我们可以看到在Scala取代那些方法有哪些输入变量的名字,后跟一个句号,其次是tab:
[mw_shl_code=shell,false]
sc.[\t]
...
accumulable accumulableCollection
accumulator addFile
addJar addSparkListener
appName asInstanceOf
broadcast cancelAllJobs
cancelJobGroup clearCallSite
clearFiles clearJars
clearJobGroup defaultMinPartitions
defaultMinSplits defaultParallelism
emptyRDD files
getAllPools getCheckpointDir
getConf getExecutorMemoryStatus
getExecutorStorageStatus getLocalProperty
getPersistentRDDs getPoolForName
getRDDStorageInfo getSchedulingMode
hadoopConfiguration hadoopFile
hadoopRDD initLocalProperties
isInstanceOf isLocal
jars makeRDD
master newAPIHadoopFile
newAPIHadoopRDD objectFile
parallelize runApproximateJob
runJob sequenceFile
setCallSite setCheckpointDir
setJobDescription setJobGroup
startTime stop
submitJob tachyonFolderName
textFile toString
union version
wholeTextFiles
[/mw_shl_code]
SparkContext有这么多方法,但是我们用一些常用的方法创建弹性分布式数据集,或者称为RDD。RDD是Spark的一个基本抽象,用来表示可以分布在集群不同机器上对象的集合。有两种方法创建RDD:
  • 利用SparkContext从外部数据源创建一个RDD,外部数据源可以是HDFS文件,通过JDBC读取的数据库,或者是在Spark Shell创建的本地对象集合。  
  • 从一个或多个已存在的RDD转换而来,例如通过相同的键值过滤、聚合记录,或者合并多个RDD

RDD是一种将数据作为小的序列,独立的步骤计算更便利的描述方式。

Resilient Distributed Datasets 分布式数据集

RDD设计为集群上跨机器分区集合,每个分区都是数据的子集。分区定义了Spark中并行的单元。框架上运行的对象对于单个分区来说是有序的,多分区是并发执行的。创建RDD最简单的方法是使用本地对象集合调用parrallelize方法:
第一个参数是并行对象集合。第二个是分区数。当在一个分区中对一个对象进行计算时,Spark会从driver进程拉取一个子集。
[mw_shl_code=scala,false]val rdd2 = sc.textFile("hdfs:///some/path.txt")
...
rdd2: org.apache.spark.rdd.RDD[String] = ...[/mw_shl_code]
当在本地模式下运行Spark时,textFile可以访问驻留在本地文件系统上的路径。 如果Spark被赋予一个目录而不是一个单独的文件,那么它将把该目录中的所有文件视为给定RDD的一部分。 最后,请注意,在我们的客户端计算机或集群上,Spark还没有读取实际的数据或加载到内存中。 当计算分区内的对象时,Spark会读取输入文件的一个部分(也称为split),然后应用于我们通过其他RDD定义的任何后续转换(过滤,聚合等)。

     我们的记录链接数据存储在文本文件中,每行都有一个观察结果。我们会使用textFile方法得到一个数据的引用作为RDD:
     这个过程中有几件事情需要找一下。 首先,我们是声明一个名为rawblock的新变量。 从shell可以看到,rawblock变量为RDD [String]类型,即使我们从未在我们的变量声明中指定了类型信息。这是Scala编程语言的一个特点称之为类型推断,当我们使用语言时,它可以节省大量的键盘输入。只要有可能,Scala就会根据其上下文推测出一个变量的类型。 在这种情况下,Scala会从SparkContext对象的textFile函数中查找返回类型,看到它返回一个RDD [String],并将该类型分配给rawblocks变量。

The REPL and Compilation  REPL和编译

除了交互式shell外,Spark还支持和编译应用程序。我们通常建议使用Maven来编译和管理依赖项。本书附带的代码示例在simplesparkproject /目录下保存一个独立的Maven项目设置,以帮助您你开始使用。

使用shell和编译作为选项,在测试和构建数据管道时应该使用哪个选项?在REPL中完全开展工作通常是有用的。这样可以实现快速原型设计,迭代速度更快,想法和实现之间的延迟时间更短。然而,随着程序的大小,维护一个单一的代码文件变得更加繁重,而Scala解释耗费了更多的时间。在处理大量数据时这一情况会恶化,尝试操作而导致Spark应用程序崩溃或以其他方式导致SparkContext不可用。这样意味着到目前为止输入的任何工作和代码都会丢失。在这一点上,采取混合方法通常是有用的。保持REPL中的开发前端,并且随着代码的加强,将它们转移到编译库中。编译的jar可以通过将其传递给--jars属性使其可用于spark-shell。操作完成后,编译的jar不需要经常重建,而REPL允许在仍然需要修改的代码和方法上进行快速迭代。



本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条