分享

Storm应用系列之——最基本的例子

hiqj 发表于 2014-12-28 22:10:31 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 17042
本帖最后由 nettman 于 2014-12-28 22:52 编辑
问题导读

1.collector.emit()方法与tuple什么关系?
2.Tuple中以List 的作用是什么?
3.如何建立Topology?



1. 建立Maven项目
我们用Maven来管理项目,方便lib依赖的引用和版本控制。

建立最基本的pom.xml如下:
  1.     <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  2.     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  3.     <modelVersion>4.0.0</modelVersion>  
  4.     <groupId>com.edi.storm</groupId>  
  5.     <artifactId>storm-samples</artifactId>  
  6.     <version>0.0.1-SNAPSHOT</version>  
  7.     <packaging>jar</packaging>  
  8.       
  9.     <properties>  
  10.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
  11.     </properties>  
  12.       
  13.     <repositories>  
  14.     <repository>  
  15.     <id>clojars.org</id>  
  16.     <url>http://clojars.org/repo</url>  
  17.     </repository>  
  18.     </repositories>  
  19.       
  20.     <build>  
  21.     <finalName>storm-samples</finalName>  
  22.     <plugins>  
  23.     <plugin>  
  24.     <groupId>org.apache.maven.plugins</groupId>  
  25.     <artifactId>maven-compiler-plugin</artifactId>  
  26.     <version>3.1</version>  
  27.     <configuration>  
  28.     <source>1.7</source>  
  29.     <target>1.7</target>  
  30.     <encoding>${project.build.sourceEncoding}</encoding>  
  31.     </configuration>  
  32.     </plugin>  
  33.       
  34.     <plugin>  
  35.     <artifactId>maven-assembly-plugin</artifactId>  
  36.     <configuration>  
  37.     <descriptorRefs>  
  38.     <descriptorRef>jar-with-dependencies</descriptorRef>  
  39.     </descriptorRefs>  
  40.     </configuration>  
  41.     <executions>  
  42.     <execution>  
  43.     <id>make-assembly</id>  
  44.     <phase>package</phase>  
  45.     <goals>  
  46.     <goal>single</goal>  
  47.     </goals>  
  48.     </execution>  
  49.     </executions>  
  50.     </plugin>  
  51.     </plugins>  
  52.     </build>  
  53.       
  54.     <dependencies>  
  55.     <dependency>  
  56.     <groupId>storm</groupId>  
  57.     <artifactId>storm</artifactId>  
  58.     <version>0.9.0-rc2</version>  
  59.     <scope>provided</scope>  
  60.     </dependency>  
  61.     </dependencies>  
  62.     </project>  
复制代码

这里我额外添加了两个build 插件:
maven-compiler-plugin : 为了方便指定编译时jdk。Storm的依赖包里面某些是jdk1.5的.

maven-assembly-plugin: 为了把所有依赖包最后打到一个jar包去,方便测试和部署。后面会提到如果不想打到一个jar该怎么做。

2. 建立Spout
前文提到过,Storm中的spout负责发射数据。我们来实现这样一个spout:
它会随机发射一系列的句子,句子的格式是 谁:说的话
代码如下:
  1.     public class RandomSpout extends BaseRichSpout {  
  2.       
  3.         private SpoutOutputCollector collector;  
  4.       
  5.         private Random rand;  
  6.          
  7.         private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};  
  8.          
  9.         @Override  
  10.         public void open(Map conf, TopologyContext context,  
  11.                 SpoutOutputCollector collector) {  
  12.             this.collector = collector;  
  13.             this.rand = new Random();  
  14.         }  
  15.       
  16.         @Override  
  17.         public void nextTuple() {  
  18.             String toSay = sentences[rand.nextInt(sentences.length)];  
  19.             this.collector.emit(new Values(toSay));  
  20.         }  
  21.       
  22.         @Override  
  23.         public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  24.             declarer.declare(new Fields("sentence"));  
  25.         }  
  26.       
  27.     }
复制代码
这里要先理解Tuple的概念。
Storm中,基本元数据是靠Tuple才承载的。或者说,Tuple是数据的一个大抽象。它要求实现类必须能序列化。

该Spout代码里面最核心的部分有两个:
a. 用collector.emit()方法发射tuple。我们不用自己实现tuple,我们只需要定义tuple的value,Storm会帮我们生成tuple。Values对象接受变长参数。Tuple中以List存放Values,List的Index按照new Values(obj1, obj2,...)的参数的index,例如我们emit(new Values("v1", "v2")), 那么Tuple的属性即为:{ [ "v1" ], [ "V2" ] }
b. declarer.declare方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。

3. 建立Bolt
既然有了源,那么我们就来建立节点处理源流出来的数据。怎么处理呢?为了演示,我们来做些无聊的事情:末尾添加"!",然后打印。

两个功能,两个Bolt。
先看添加"!"的Bolt
  1.     public class ExclaimBasicBolt extends BaseBasicBolt {  
  2.       
  3.         @Override  
  4.         public void execute(Tuple tuple, BasicOutputCollector collector) {  
  5.             //String sentence = tuple.getString(0);  
  6.             String sentence = (String) tuple.getValue(0);  
  7.             String out = sentence + "!";  
  8.             collector.emit(new Values(out));  
  9.         }  
  10.       
  11.         @Override  
  12.         public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  13.             declarer.declare(new Fields("excl_sentence"));  
  14.         }  
  15.       
  16.     }
复制代码
在RandomSpout中,我们发射的Tuple具有这样的属性 { [ "edi:I'm Happy" ] }, 所以tuple的value list中第0个值,肯定是个String。我们用tuple.getvalue(0)取到。
Storm为tuple封装了一些方法方便我们取一些基本类型,例如String,我们可以直接用getString(int N) 。
取到以后,我们在末尾添加"!"后,仍然发射一个Tuple,定义其唯一的value的field 名字为"excl_sentence"

打印Bolt
  1.     public class PrintBolt extends BaseBasicBolt {  
  2.       
  3.         @Override  
  4.         public void execute(Tuple tuple, BasicOutputCollector collector) {  
  5.             String rec = tuple.getString(0);  
  6.             System.err.println("String recieved: " + rec);  
  7.         }  
  8.       
  9.         @Override  
  10.         public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  11.             // do nothing  
  12.         }  
  13.       
  14.     }  
复制代码
仍然是取第一个,因为我们并没有定义过第二个value

4. 建立Topology
现在我们建立拓扑结构的主要组件都有了,可以创建topology了。
  1.     public class ExclaimBasicTopo {  
  2.       
  3.         public static void main(String[] args) throws Exception {  
  4.             TopologyBuilder builder = new TopologyBuilder();  
  5.               
  6.             builder.setSpout("spout", new RandomSpout());  
  7.             builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");  
  8.             builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");  
  9.       
  10.             Config conf = new Config();  
  11.             conf.setDebug(false);  
  12.       
  13.             if (args != null && args.length > 0) {  
  14.                 conf.setNumWorkers(3);  
  15.       
  16.                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
  17.             } else {  
  18.       
  19.                 LocalCluster cluster = new LocalCluster();  
  20.                 cluster.submitTopology("test", conf, builder.createTopology());  
  21.                 Utils.sleep(100000);  
  22.                 cluster.killTopology("test");  
  23.                 cluster.shutdown();  
  24.             }  
  25.         }  
  26.     }
复制代码
很简单,对吧。
其中,
  1. builder.setSpout("spout", new RandomSpout());  
复制代码
定义一个spout,id为"spout"
  1. builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");   
复制代码
定义了一个id为"exclaim"的bolt,并且按照随机分组获得"spout"发射的tuple
  1. builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");  
复制代码
定义了一个id为"print"的bolt,并且按照随机分组获得"exclaim”发射出来的tuple

  • .shuffleGrouping  

是指明Storm按照何种策略将tuple分配到后续的bolt去。

可以看到,如果我们运行时不带参数,是把topology提交到了LocalCluster的,即所有的task都在一个本地JVM去执行。可以用LocalCluster来调试。如果后面带一个参数,即为该topology的名字,那么就把该topology提交到集群上去了。
把项目用M2E插件导入Eclipse直接运行试试
  1.     String recieved: marry:I'm angry!  
  2.     String recieved: edi:I'm happy!  
  3.     String recieved: john:I'm sad!  
  4.     String recieved: edi:I'm happy!  
  5.     String recieved: ted:I'm excited!  
  6.     String recieved: laden:I'm dangerous!  
  7.     String recieved: edi:I'm happy!  
  8.     String recieved: edi:I'm happy!  
复制代码
这里我们并没有指定并行,那么其实是每个spout、bolt仅有一个线程对应去执行。我们修改下代码,指定并行数
  1.     builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");  
  2.     builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");  
复制代码
由于我们并没有多指定task数目,所以默认,会有两个exectuor去执行两个exclaimBasicBolt的task,3个executor去执行3个PrintBolt的task。为了方便体现确实是并行,我们修改PrintBolt代码如下:
  1.     public class PrintBolt extends BaseBasicBolt {  
  2.       
  3.         private int indexId;  
  4.          
  5.         @Override  
  6.         public void prepare(Map stormConf, TopologyContext context) {  
  7.             this.indexId = context.getThisTaskIndex();  
  8.         }  
  9.       
  10.         @Override  
  11.         public void execute(Tuple tuple, BasicOutputCollector collector) {  
  12.             String rec = tuple.getString(0);  
  13.             System.err.println(String.format("Bolt[%d] String recieved: %s",this.indexId, rec));  
  14.         }  
  15.       
  16.         @Override  
  17.         public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  18.             // do nothing  
  19.         }  
  20.       
  21.     }  
复制代码
这里从上下文中拿到该Bolt的TaskIndex,我们指定了3的并发度,所以理论上有3个task,那么该值应该为[1,2,3]。运行下看看:
  1.     Bolt[0] String recieved: marry:I'm angry!  
  2.     Bolt[2] String recieved: john:I'm sad!  
  3.     Bolt[2] String recieved: ted:I'm excited!  
  4.     Bolt[2] String recieved: john:I'm sad!  
  5.     Bolt[2] String recieved: john:I'm sad!  
复制代码
证实确实是并发了。本地测试通过了,我们用 mvn clean install 命令编译,然后把target目录下生成的 storm-samples-jar-with-dependencies.jar 拷到nimbus机器上,执行
  • ./storm jar storm-samples-jar-with-dependencies.jar com.edi.storm.topos.ExclaimBasicTopo test  
    在StormUI里面,点进 test

20131230193755609.jpg
看到spout 已然已经emit了 11347280个tuple了…… 而id为exclaim的bolt也已经接受了2906920个tuple了。print没有输出,所以emit为0。




截止到这里,一个简单的Storm的topology已经完成了。
但是,这里依然有些问题:
1). 什么是acker?
2). Bolt为什么有两个继承类和接口?
3.) Topology的提交方式到底有几种?
4). 除了随机分组,还有哪些分组策略?
5). Storm是如何保证tuple不被丢失的?
6). 我看到spout发送数据比bolt处理的速度快太多了,我能不能在spout里面sleep?
7). 并发数要如何指定呢?


已有(3)人评论

跳转到指定楼层
355815741 发表于 2014-12-29 09:19:22
学习了,谢谢分享~
回复

使用道具 举报

remzhang 发表于 2015-1-23 08:00:24
版主V5,浅显易懂。
回复

使用道具 举报

ainubis 发表于 2015-3-29 18:07:43
好资料,谢谢分享。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条