分享

apache tez(2)——DAG API

helianthus 发表于 2015-12-14 00:23:01 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 10315
本帖最后由 helianthus 于 2015-12-17 23:42 编辑
问题导读:
1.tez DAG是怎么创建的?
2.Tez DAG中如何确定数据流向?




1.TEZ中的常用术语:
(1)DAG:有向无环图,代表一个数据流处理工作流,数据按照边的方向流动
(2)Vertex:数据处理中的一个逻辑步骤,由应用程序代码使用,比如过滤/修改数据等等
(3)Logical DAG:由一组Vertex组成,其中每个vertex代表一个具体的计算步骤
(4)Task:代表一个vertex中的一个工作单位。每个task都是vertex的实例化,处理该节点vertex上的一部分数据
(5)Physical DAG:由逻辑DAG在运行时扩展为一组tasks产生
(6)Edge:代表上游生产节点和下游消费节点之间的数据迁移

logical DAG to Physical DAG

logical DAG to Physical DAG


2.通过一个OrderedWordCount实例体现DAG APIs:
(1)设置DAG数据源即最终结果类型:
[mw_shl_code=java,true]DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
        TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping).build();

    DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf),
        TextOutputFormat.class, outputPath).build();[/mw_shl_code]
(2)创建第一个处理逻辑对应的节点:
[mw_shl_code=java,true]Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
        TokenProcessor.class.getName()));
    tokenizerVertex.addDataSource(INPUT, dataSource);[/mw_shl_code]
(3)为第一个定点和第二个顶点间的边设置i数据传输格式:
[mw_shl_code=java,true]OrderedPartitionedKVEdgeConfig summationEdgeConf = OrderedPartitionedKVEdgeConfig
        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
            HashPartitioner.class.getName())
        .setFromConfiguration(tezConf)
        .build();[/mw_shl_code]
(4)创建第二个顶点(这是一个处理中间结果的节点)
[mw_shl_code=applescript,true] Vertex summationVertex = Vertex.create(SUMMATION, ProcessorDescriptor.create(
        SumProcessor.class.getName()), numPartitions);[/mw_shl_code]
(5)为第二个顶点和第三个顶点之间设置数据传输格式:
[mw_shl_code=applescript,true] OrderedPartitionedKVEdgeConfig sorterEdgeConf = OrderedPartitionedKVEdgeConfig
        .newBuilder(IntWritable.class.getName(), Text.class.getName(),
            HashPartitioner.class.getName())
        .setFromConfiguration(tezConf)
        .build();[/mw_shl_code]
(6)创建第三个顶点:
[mw_shl_code=java,true] Vertex sorterVertex = Vertex.create(SORTER, ProcessorDescriptor.create(
        NoOpSorter.class.getName()), 1);
    sorterVertex.addDataSink(OUTPUT, dataSink);[/mw_shl_code]
(7)组合出DAG
[mw_shl_code=java,true]    DAG dag = DAG.create(dagName);
    dag.addVertex(tokenizerVertex)
        .addVertex(summationVertex)
        .addVertex(sorterVertex)
        .addEdge(
            Edge.create(tokenizerVertex, summationVertex,
                summationEdgeConf.createDefaultEdgeProperty()))
        .addEdge(
            Edge.create(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty()));[/mw_shl_code]
3.创建DAG时边的一些重要属性
(1)数据迁移:定义不同tasks之间的数据路由
①one-to-one:即一对一,上游一个节点一个task结果只能发送给下游某个节点的一个task
②broadcast:广播形式
③scatter-gether:相当于MR中的shuffle
(2)调度属性:
①顺序调度
②并发执行
(3)数据源属性:
持久的:当前task退出后,该数据集还可用
持久可靠地:数据会可靠存储(磁盘)一直有效
暂时的:当前task使用之后便丢弃

data movement

data movement


已有(1)人评论

跳转到指定楼层
小南3707 发表于 2015-12-15 09:40:31
赞~                  
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条