分享

谈谈分布式计算的算子层及常见算子层对比(Trident、RDD等对比)

导读
下面的思想比较新颖,给大家扩展一些思路。










什么是算子层,这里做一下简单的介绍
下载.png

例如storm的Trident,spark的RDD
明白了所指,我们继续






本文是我对分布式计算的算子这层的一些认识和想法。因为最近自己的开发任务也是这方面相关的,公司内部有自研的类流式计算框架需要做一层算子层。我主要分析的是流式系统上实现算子这一点入手,对比现有计算框架和业界正在开展的项目,分析分析这件事的表面和背后深层的含义,以及可想象空间。

趋势
Yahoo! 的Pig on Storm项目,让Pig-latin能够执行在Storm这种流式引擎上,最终使得Pig-latin能够混用于流式计算和批量计算场景下。应该说,无论是Spark,Summingbird,还是Pig,都在尝试做同一件事情:借助自己的DSL或原语在流式和批量两套引擎上表达(近)实时和离线数据处理能力。

Spark本身依赖RDD,实现了Spark Streaming这种小批流计算,其DStream就是RDD,所以在Spark上写批量作业和流式作业API自然是统一的。

Summingbird在API层面统一了Storm上和Hadoop上的作业,对于Hadoop上任务的编写借助的是Cascading,属性上看更多的是一种适配的角色,虽然Summingbird也称为Lambda Architecture的一种解决方案。

总结:表面上看,DSL需要支持不同的计算引擎,以达到算子层面的混用,这是趋势。那么实现上的难度在哪呢?

挑战

在流式系统上实现pig-latin这种本身就诞生于批量计算场景里的DSL,对某些关系型操作会有语义层面的不清晰性,具体可以看Pig on Storm初步讨论。对于filter,foreach,union,甚至稍微复杂点的需要借助state的distinct,limit,在批量和流式场景下都是没有歧义的,实现起来不会有太大的区别或难度。但是像两流做sql语义里的join,或者多流做pig语义里的group,cross的时候,流式上的实现就不一致了,而且这个原语的定义也不同了。

在流式系统上实现DSL或者一套FlumeJava,关键在能把UDAF给实现了。而要实现UDAF,就涉及到了跨批的事情。这件事情本质上需要引擎的支持,比如Trident有SpoutCoordinator作流控,还具备一定的事务性,那么在你要做跨批之间的UDAF的时候呢,可以借助Trident的State,也就是辅助存储,调用persistAggregate这样的操作来完成。如果引擎不支持的话,比如原生Storm的接口,就没办法做流式DSL。

那么像Spark那样又不同,因为Spark本身不是流式系统,他的Spark Streaming上可以实现DSL,甚至可以和Spark SQL结合起来跑Streaming形式的SQL,原因是Spark是批量计算框架,所以他可以做类流式DSL。

总结:实现上看,流式系统上实现DSL难点在UDAF,本质上是跨批计算。那么流式上的跨批可以抽象为一种怎样的模式呢?

增量计算

增量计算,理论上可以包含批量计算,流式计算,也包括了迭代计算。怎么理解呢。增量计算可以表达为 newValue = function ( currentValue, oldValue ),而newValue被保存为oldValue与之后新来的currentValue继续产生关系,而这个不断传承下去的oldValue就是增量计算结果。

增量计算和前面提到的流式系统上实现算子有什么关系?这个增量的模型就是跨批计算的一种形式。function可以理解为一个算子,currentValue可以理解为本批计算结果,oldValue可以理解为UDAF的计算结果。

这个模型只有流式系统能实现吗?不是的,批量计算框架也可以做,大不了newValue每次都落盘嘛。如果Hadoop MR来做这件事情,其实是把每一次MR的数据当作一批,跨批的结果是额外保存的。如果RDD来做这件事情,那就不同了,上述这种模型很适合RDD来做,因为迭代计算可以看成是增量计算的一种,而RDD很擅长构建DAG来完成迭代计算,只是每次计算出来的都是immutable的新RDD。

流式系统怎么实现这种增量计算模型呢?这就是我们组之前老大和同事智慧的结晶了,具体不方便说。其实实现它不是难点,难点是计算框架内需要对oldValue进行容错。RDD不用担心容错,因为有lineage来记录,大不了可以重算,而且是可以并行的。Storm和Trident也不用担心容错,因为他把fail逻辑都交给用户了!而我们组目前的增量计算引擎完成了这件事情,并且一直在checkpoint的优化上做着努力。

总结:计算模型上,在流式系统上实现增量计算引擎,是实现丰富算子层,做流式SQL的一个必要条件。流式上实现的增量计算模型,有什么本质缺陷吗?

深入RDD

之前在杭州Spark meetup,分享Spark SQL的时候,我提到过Spark RDD最重要的两层意义:原语的丰富和数据表示能力。前者使得Spark编程很easy,后者使得计算结果做到了reuse,适应了MR模型、迭代计算模型、BSP模型。基于这两点,Spark Core上可以轻松衍生出SQL产品、机器学习产品、图计算产品、流计算产品。
反观流式系统,比如Storm,原语要简单丰富易用不是难事,问题是你数据能reuse吗?!reuse有什么优点?拿RDD来说,节省内存空间以及并发的计算能力。RDD在设计之初就是immutable的,而且在计算内部消化掉了MapReduce,而暴露出丰富的Transformation和Action。在论文中,RDD与DSM(Distributed Shared Memory)也进行了多维度的对比。应该说,Matei在设计RDD之前的参与Hadoop MapReduce源码的开发经验,加上当时其他系统内DSM的差异设计,以及Google FlumeJava,微软DryadLINQ在API层面的理念,最终揉合成了RDD这套东西。现在只有Spark现在实现了它。
最近我在增量计算引擎上实现的算子层,也是参考了FlumeJava,Trident,RDD设计出来的,还在测试中。就像我开头说的,Pig on Storm这件事情,换引擎是表面。背后意义是算子层面的混用,最终的想象空间是一层统一的DAG,上面承接Pig、Hive、SQL等DSL,下面对接不同的计算系统。实现起来是不困难的,困难点可能不是技术问题。

总结:RDD两个致命优点,easy to use和数据的reuse,是其他系统难达到的,特别是第二点,也是RDD的精髓所在。

对比Storm

marz做了Storm,ElephentDB之后,按照他的理解在how to beat CAP里提出了一种解决方案。在他提出的lambda achitecture里,Storm的定位在流式处理,而做类似ad-hoc的service layer是HBase。如果换做是我们目前的增量计算框架的愿景的话,我认为,流式和ad-hoc这层有望被增量计算引擎统一。为什么?

Query = Function(All Data)

Data静,Query动,是ad-hoc计算;Data动,Query静,是流式计算;Data动,Query动,是持续计算。Storm处于第二者,增量计算框架可以做到第三者。Storm的拓扑提交是个严重问题,等Nimbus拉起bolt和spout的时候,黄花菜都凉了。它的确适合流式计算,为什么呢,因为流式的本质就是消息。Storm抽象的那层拓扑,bolt之间的消息通道,ack机制都很不错,这层抽象满足了流式计算,但是work这层以及调度这层远远不满足Query不断变化而仍需要流式计算的场景。我们现在做的框架将来会满足这件事情,从此统一了流式、批量、迭代,超越现在的流式计算,不仅仅是StreamSQL,Stream上的DSL都是可以通过算子层来实现的。

总结:Data动,Query动的场景如何统一解决?增量计算想象空间巨大,算子层重要性突显。


已有(2)人评论

跳转到指定楼层
desehawk 发表于 2014-11-9 23:29:02
补充: 常见计算框架算子层对比

Pig-latin
Hadoop MR上的DSL,面向过程,适用于large-scale的数据分析。语法很美,可惜只适合CLI 。

  1. A = load 'xx' AS (c1:int, c2:chararray, c3:float)
  2. B = GROUP A BY c1
  3. C = FOREACH B GENERATE group, COUNT(A)
  4. C = FOREACH B GENERATE $0. $1.c2
  5. X = COGROUP A by a1, B BY b1
  6. Y = JOIN A by a1 (LEFT|FULL|LEFT OUTER), B BY b1
复制代码

Cascading
Hadoop MR上的封装,Twitter Summingbird正是基于Cascading的。 每个算子都是new出来的,Pipe实例被"迭代式"地传入新的算子里 。



  1. // define source and sink Taps.
  2. Scheme sourceScheme = new TextLine( new Fields( "line" ) );
  3. Tap source = new Hfs( sourceScheme, inputPath );
  4. Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
  5. Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );
  6. // the 'head' of the pipe assembly
  7. Pipe assembly = new Pipe( "wordcount" );
  8. // For each input Tuple
  9. // parse out each word into a new Tuple with the field name "word"
  10. // regular expressions are optional in Cascading
  11. String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
  12. Function function = new RegexGenerator( new Fields( "word" ), regex );
  13. assembly = new Each( assembly, new Fields( "line" ), function );
  14. // group the Tuple stream by the "word" value
  15. assembly = new GroupBy( assembly, new Fields( "word" ) );
  16. // For every Tuple group
  17. // count the number of occurrences of "word" and store result in
  18. // a field named "count"
  19. Aggregator count = new Count( new Fields( "count" ) );
  20. assembly = new Every( assembly, count );
  21. // initialize app properties, tell Hadoop which jar file to use
  22. Properties properties = new Properties();
  23. AppProps.setApplicationJarClass( properties, Main.class );
  24. // plan a new Flow from the assembly using the source and sink Taps
  25. // with the above properties
  26. FlowConnector flowConnector = new HadoopFlowConnector( properties );
  27. Flow flow = flowConnector.connect( "word-count", source, sink, assembly );
  28. // execute the flow, block until complete
  29. flow.complete();
复制代码

Trident
在Storm上提供高级的抽象原语,延续Transactional Topology的exactly-once的语义,满足事务性。 原语过于抽象,构造过程充斥重复性的字段定义。

  1. TridentState urlToTweeters =
  2.        topology.newStaticState(getUrlToTweetersState());
  3. TridentState tweetersToFollowers =
  4.        topology.newStaticState(getTweeterToFollowersState());
  5. topology.newDRPCStream("reach")
  6.        .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
  7.        .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
  8.        .shuffle()
  9.        .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
  10.        .parallelismHint(200)
  11.        .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
  12.        .groupBy(new Fields("follower"))
  13.        .aggregate(new One(), new Fields("one"))
  14.        .parallelismHint(20)
  15.        .aggregate(new Count(), new Fields("reach"));
复制代码

RDD
Spark上的分布式弹性数据集,具备丰富的原语。 RDD原语的灵活性归功于Scala语言本身的FP性质以及语法糖,而其丰富性又源自Scala语言本身API的丰富性。Java难以实现如此强大的表达能力。但RDD确实是非常有参考价值的。


  1. scala> val textFile = sc.textFile("README.md")
  2. textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
  3. scala> textFile.count() // Number of items in this RDD
  4. res0: Long = 126
  5. scala> textFile.first() // First item in this RDD
  6. res1: String = # Apache Spark
  7. scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
  8. linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
  9. scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
  10. res3: Long = 15
  11. scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
  12. res4: Long = 15
  13. scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
  14. wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
  15. scala> wordCounts.collect()
  16. res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
复制代码

SchemaRDD
Spark SQL里的"Table"型RDD,额外为SQL提供了一套DSL。 但是这套DSL只适合SQL,表达能力不够,偏"垂直"。


  1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  2. // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
  3. import sqlContext.createSchemaRDD
  4. // Define the schema using a case class.
  5. case class Person(name: String, age: Int)
  6. // Create an RDD of Person objects and register it as a table.
  7. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
  8. people.registerAsTable("people")
  9. // SQL statements can be run by using the sql methods provided by sqlContext.
  10. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  11. // DSL: where(), select(), as(), join(), limit(), groupBy(), orderBy() etc.
  12. val teenagers = people.where('age >= 10).where('age <= 19).select('name)
  13. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
复制代码

Apache Crunch
Google FlumeJava论文的开源实现,是一个标准的算子层,现在支持Hadoop任务和Spark任务。
Crunch符合FlumeJava的设定,实现了PCollection和PTable这样的分布式、不可变数据表示集,实现了parallelDo(),groupByKey(),combineValues(),flattern()四种基本原语,且基于此原语可以衍生出count(),join(),top()。也实现了Deffered Evalution 以及 针对MSCR(MapShuffleCombineReduce) Operation的优化。
Crunch的任务编写严重依赖Hadoop,其本质是为了在批量计算框架上写MapReduce Pipeline。原语方面不够丰富,且parallelDo()不太适合流式语境。此外,其很多特性和功能是我们不需要具备的,但是抽象数据表示、接口模型、流程控制是可以参考的。

  1. public class WordCount extends Configured implements Tool, Serializable {
  2.   public int run(String[] args) throws Exception {
  3.     // Create an object to coordinate pipeline creation and execution.
  4.     Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
  5.     // Reference a given text file as a collection of Strings.
  6.     PCollection<String> lines = pipeline.readTextFile(args[0]);
  7.     PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
  8.       public void process(String line, Emitter<String> emitter) {
  9.         for (String word : line.split("\\s+")) {
  10.           emitter.emit(word);
  11.         }
  12.       }
  13.     }, Writables.strings()); // Indicates the serialization format
  14.     PTable<String, Long> counts = words.count();
  15.     // Instruct the pipeline to write the resulting counts to a text file.
  16.     pipeline.writeTextFile(counts, args[1]);
  17.     // Execute the pipeline as a MapReduce.
  18.     PipelineResult result = pipeline.done();
  19.     return result.succeeded() ? 0 : 1;
  20.   }
  21.   public static void main(String[] args) throws Exception {
  22.     int result = ToolRunner.run(new Configuration(), new WordCount(), args);
  23.     System.exit(result);
  24.   }
  25. }
复制代码

总结

最后这张图展示了Hadoop之上各种Data Pipeline项目的实现层次对比:

下载 (1).png







回复

使用道具 举报

韩克拉玛寒 发表于 2014-11-10 09:06:04
很不错的文章,先分享了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条