分享

Storm入门指南第三章 拓扑结构

本帖最后由 xioaxu790 于 2014-7-24 09:06 编辑
问题导读
1、如何将topology部署到运行的Storm集群上?
2、Storm的工具LinearDRPCTopologyBuilder作用是什么?
3、流分组是什么?




在本章你将会看到如何在一个Storm拓扑的不同组件间传递元组,以及如何将一个topology部署到一个运行的Storm集群上。

流分组
在设计一个topology时,一件最重要的事就是定义数据在组件之间怎样交换(流怎样被bolt消费)。流分组(Stream Grouping)指定了每个bolt消费哪些流,以及这些流如何被消费。
提示:一个节点可以发送不止一条数据流,流分组允许我们选择接收哪些流。
正如第二章中见到的,流分组在topology被定义的时候就已经被设定了:
  1. builder.setBolt("word-normalizer",new WordNormalizer())
  2.     .shuffleGrouping("word-reader");
复制代码

在上面的代码块中,在topology builder上设置了一个bolt,然后设置该bolt的源使用随机分组(shuffleGrouping)。通常情况下,一个流分组携带源组件的Id作为参数,并且还有其他可选参数,这取决于流分组的种类。
提示:每个InputDeclarer可以有不止一个源,并且每个源可以用不同的流分组来分组。

随机分组
随机分组(Shuffle Grouping)是最常用的分组方式,该分组方式携带一个参数(源组件Id),源组件发送元组到一个随机选择的bolt并确保每个消费者(即bolt)会收到相等数量的元组。

随机分组对于做原子操作(例如数学运算)是很有用的。然而,如果操作不能被随机分配,就应该考虑使用其他分组,例如在第二章为单词计数的例子中。

字段分组
字段分组(Fields Grouping)允许你基于tuple中的一个或多个字段控制元组如何被发送到bolt,该分组方式确保了对于一个组合字段所确定的的值集合总是会被发送到相同的bolt。回到单词计数的例子,如果你根据“word”字段将流分组,则WordNormalizer bolt总是会将包含给定的单词的元组一起发送到相同的WordCounter bolt实例中。
  1. builder.setBolt("word-counter",new WordCounter(),2)
  2.     .fieldsGrouping("word-normalizer",new Fields("word"));
复制代码

提示:字段分组中设置的所有字段在源组件的输出字段声明中也必须存在(译者注:在源组件的declareOutputFields()方法中声明)。

全部分组
全部分组(All Grouping)会向接收bolt的所有实例发送一份每个元组的副本,这种分组方式被用于向bolts发送信号。例如,如果你需要刷新缓存,你可以向所有bolt发送一个刷新缓存信号。在第二章单词计数的例子中,可以使用所有分组方式增加清空计数器缓存的功能(WordCounter中相应代码如下)
  1. public void execute(Tuple input) {
  2.     String str =null;
  3.     try{
  4.         if(input.getSourceStreamId().equals("signals")){
  5.             str =input.getStringByField("action");
  6.             if("refreshCache".equals(str))
  7.                 counters.clear();
  8.         }
  9.     }catch(IllegalArgumentException e) {
  10.     //Do nothing
  11.     }
  12.     ...
  13. }
复制代码

上面的代码中增加了一个if检查源流的名字 (sourceStreamId) 是否为”signal”。Storm中可以声明命名的流 (named streams) ,如果你不发送元组到一个命名的流,则流的默认名字为“default”。这是一个非常好的方式来确定元组的源,正如这个例子中我们需要确定信号一样。

在topology定义中,向word-counter bolt 增加第二个流分组方式,以便来自signals-spout 流中的每个tuple能发送到bolt的所有实例中。
  1. builder.setBolt("word-counter",new WordCounter(),2)
  2.     .fieldsGrouping("word-normalizer",new Fields("word"))
  3.     .allGrouping("signals-spout","signals")
复制代码

关于signals-spout的实现可以在git库中找到。

自定义分组
通过实现CustomStreamGrouping接口,你也可以创建自定义流分组,这让你有权决定每个元组将被哪个(些)bolt接收。
下面我们修改单词计数的例子,将元组分组以便相同字母开头的单词能被相同的bolt接收。
  1. public class ModuleGrouping implements CustomStreamGrouping,Serializable{
  2.     int numTasks=0;
  3.     @Override
  4.     public List chooseTasks(List</pre>
复制代码

直接分组
这是一个特殊的分组方式,由源组件决定哪个组件将接收元组。同前面的例子类似,源组件将基于单词中的第一个字母决定哪个bolt接收这个tuple,为了使用直接分组,需要在WordNormalizer bolt中使用emitDirect()方法代替emit方法。
  1. public void execute(Tuple input) {
  2.     ...
  3.     for(String word:words){
  4.         if(!word.isEmpty()){
  5.             ...
  6.             collector.emitDirect(getWordCountIndex(word),new Values(word));
  7.         }
  8.     }
  9.     // Acknowledge the tuple
  10.     collector.ack(input);
  11. }
  12. public Integer getWordCountIndex(String word) {
  13.     word =word.trim().toUpperCase();
  14.     if(word.isEmpty())
  15.         return 0;
  16.     else
  17.         return word.charAt(0) % numCounterTasks;
  18. }
复制代码

在prepare()方法中算出目标任务的数目:
  1. public void prepare(Map stormConf,TopologyContext context,
  2.     OutputCollector collector) {
  3.     this.collector=collector;
  4.     this.numCounterTasks=context.getComponentTasks("word-counter");
  5. }
复制代码


在topology的定义中,指定流将被直接分组:
  1. builder.setBolt("word-counter",new WordCounter(),2)
  2.     .directGrouping("word-normalizer");
复制代码

全局分组
全局分组(Global Grouping)将源组件的所有实例产生的元组发送到一个目标组件的实例()中(具体地说,是bolt中Id最小的那个任务)。

无分组
在Storm0.7.1时,使用这种分组和使用随机分组一样,即不关注流怎样被分组。

LocalCluster vs. StormSubmitter
到现在为止,你都在使用一个叫LocalCluster的工具在本地计算机上运行topology。在自己的计算机上运行Storm基础结构可以使你方便地运行和调试不同的topology。但是当你想将你的topology提交到一个运行的Storm集群上时该怎么做呢?Storm的一大特点就是可以很方便地将你的topology发送到一个真实的集群上运行,此时你需要将LocalCluster改成StormSubmitter并且实现其中的submitTopology()方法,该方法负责将topology发送到集群上。

代码改变如下:
  1. //LocalCluster cluster = new LocalCluster();
  2. //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache",conf,
  3.     builder.createTopology());
  4. StormSubmitter.submitTopology("Count-Word-Topology-With-Refresh-Cache",conf,
  5.     builder.createTopology());
  6. //Thread.sleep(1000);
  7. //cluster.shutdown();
复制代码

提示:当使用StormSubmitter时,不可以在代码中控制集群,但是使用LocalCluster可以。

接着,将源码打包成一个jar文件,它将在你运行Storm客户端命令提交topology时被发送。因为使用的是Maven,所以你只需进入源文件夹下运行命令:mvn package

生成jar文件后,使用 storm jar 命令提交topology(如何安装Storm客户端参见附录A),这个命令的语法是:
  1. storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3
复制代码


本例中,在topologies的源项目文件夹运行:
  1. storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt
复制代码


通过这些命令,你就已经将topology提交到集群上了。要想停止或者杀死该topology,运行:
  1. storm kill Count-word-Topology-With-Refresh-Cache
复制代码

提示:Topology的名字必须唯一。

DRPC拓扑结构
有一种被称为DRPC(分布式远程过程调用,Distributed Remote Procedure Call)的特殊的topology类型,它使用Storm分布式的能力来执行远程过程调用 (RPC)。如图3-1所示,Storm提供了一些工具让你使用DRPC,第一个工具是DRPC服务器,它的作用是充当DRPC 客户端和topology之间的连接器 ,以及topology spouts的源。DRPC服务器接收一个函数和它的参数来执行,然后对于函数操作的每个数据片,服务器都分配一个在整个topology中使用的请求ID来识别RPC请求。当topology执行最后一个bolt时,它必须发送标识RPC的请求ID和结果,使得DRPC服务器可以返回结果至正确的客户端。
1.jpg

图3-1.DRPC topology图解

提示:一个DRPC服务器可以执行很多函数,每个函数都被一个唯一的ID标识。

Storm提供的第二个工具是LinearDRPCTopologyBuilder——一个来帮助构建DRPC topologies的抽象。构建的topology创建DRPCSpouts(它连接DRPC服务器,并且发送数据到topology的剩余部分)、包装bolt(以便结果从最后一个bolt返回)。所有添加到LinearDRPCTopologyBuilder的bolt被顺序执行。

作为这种类型topology的一个例子,我们将创建一个累加数的程序。这个例子很简单,但是这种理念可以被扩展到执行复杂的分布式数学运算。

bolt有如下输出声明:
  1. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  2.     declarer.declare(new Fields("id","result"));
  3. }
复制代码

由于这是topology中的唯一bolt,所以必须发送标识RPC的请求ID和结果。

execute()方法负责执行累加操作:
  1. public void execute(Tuple input) {
  2.     String[] numbers= input.getString(1).split("\\+");
  3.     Integer added =0;
  4.     if(numbers.length<2){
  5.         throw new InvalidParameterException("Shouldbe at least 2 numbers");
  6.     }
  7.     for(String num:numbers){
  8.         added +=Integer.parseInt(num);
  9.     }
  10.     collector.emit(new Values(input.getValue(0),added));
  11. }
复制代码


包含做累加的bolt的topology的定义如下:
  1. public static void main(String[] args) {
  2.     LocalDRPC drpc =new LocalDRPC();
  3.     LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add");
  4.     builder.addBolt(new AdderBolt(),2);
  5.     Config conf =new Config();
  6.     conf.setDebug(true);
  7.     LocalCluster cluster =new LocalCluster();
  8.     cluster.submitTopology("drpc-adder-topology",conf,
  9.         builder.createLocalTopology(drpc));
  10.     String result =drpc.execute("add","1+-1");
  11.     checkResult(result,0);
  12.     result =drpc.execute("add","1+1+5+10");
  13.     checkResult(result,17);
  14.     cluster.shutdown();
  15.     drpc.shutdown();
  16. }
复制代码


创建LocalDRPC对象来在本地运行DRPC。接着,创建topology builder来添加bolt到topology中。为了测试这个topology,在DRPC对象上使用execute方法。

提示:使用DRPCClient类连接到远程DRPC服务器。DRPC服务器提供一个可被多种语言使用的Trift API,并且在本地或者远程运行DRPC服务器使用同样的API。  为了将topology提交到Storm集群,使用builder对象中的createRemoteTopology()方法代替createLocalTopology()方法,该方法使用storm配置中的DRPC配置。




最后,感谢原作者的分享:东风化宇

已有(1)人评论

跳转到指定楼层
only 发表于 2014-7-25 09:16:22
楼主v5,论坛不错
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条