分享

mac10.9下eclipse的storm入门及开发环境搭建

本帖最后由 howtodown 于 2014-8-26 00:30 编辑
问题导读:
1、什么是STORM?
2、搭建标题中的环境都需要哪些软件?
3、如何制作eclipse的storm开发环境?





  一、什么是SOTRM?
  STORM是一个开源框架,来自Twitter公司,其目标是大数据流的实时处理。STORM可以可靠地处理无限的数据流,实时处理Hadoop的批任务。
  对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。同Hadoop一样Storm也可以处理大批量的数据,然而Storm在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm同样还具备容错和分布计算这些特性,这就让Storm可以扩展到不同的机器上进行大批量的数据处理。他同样还有以下的这些特性:
  • 易于扩展。对于扩展,你只需要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样可以充分的保证大型集群的良好运行。
  • 每条信息的处理都可以得到保证。
  • Storm集群管理简易。
  • Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm重新分配任务。
  • 尽管通常使用Java,Storm中的topology可以用任何语言设计。

  为什么 Storm 比 Hadoop 快?
  全量数据处理使用的大多是鼎鼎大名的hadoop或者hive,作为一个批处理系统,hadoop以其吞吐量大、自动容错等优点,在海量数据处理上得到了广泛的使用。但是,hadoop不擅长实时计算,因为它天然就是为批处理而生的,这也是业界一致的共识。否则最近这两年也不会有s4,storm,puma这些实时计算系统冒出来啦。
  storm的网络直传、内存计算,其时延必然比hadoop的通过hdfs传输低得多;当计算模型比较适合流式时,storm的流式处理,省去了批处理的收集数据的时间;因为storm是服务型的作业,也省去了作业调度的时延。所以从时延上来看,storm要快于hadoop。  
  按照storm作者的说法,storm对于实时计算的意义类似于hadoop对于批处理的意义。我们都知道,根据google mapreduce来实现的hadoop为我们提供了map, reduce原语,使我们的批处理程序变得非常地简单和优美。我们可以看一下storm和hadoop的对比概念:
HadoopStorm
系统角色JobTrackerNimbus
TaskTrackerSupervisor
ChildWorker
应用名称JobTopology
组件接口Mapper/ReducerSpout/Bolt


  Storm术语解释
  Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的 线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者 广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为Direct)。Topology是由Stream Grouping连接起来的Spout和Bolt节点网络.下面进行详细介绍:
  • Topologies 用于封装一个实时计算应用程序的逻辑,类似于Hadoop的MapReduce,下面是一个Topology内部Spout和Bolt之间的数据流关系:
1.jpeg
  • Stream 消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理。在Topology定义时,需要为每个Bolt指定接收什么样的Stream作为其输入(注:Spout并不需要接收Stream,只会发射Stream)。
2.png
  • Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple
  • Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生输出的新数据流,可执行过滤,聚合,查询数据库等操作
3.jpeg
  • Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.
4.jpeg
  • Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如果分配给Bolts们.
5.png

  • stream grouping分类
    • Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同.
    • Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts.
    • All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到.
    • Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task.
    • Non Grouping: 不分组,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,有点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程去执行.
    • Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)


  二、搭建标题中的环境都需要哪些软件?
  1)、HomeBrew:
  Homebrew is the easiest and most flexible way to install the UNIX tools Apple didn’t include with OS X. 官方的解释非常明了,Homebrew是一个包管理器,用于在Mac上安装一些OS X没有的UNIX工具(比如著名的wget)。
  Homebrew将这些工具统统安装到了 /usr/local/Cellar 目录中,并在 /usr/local/bin 中创建符号链接。

  2)、Maven:
  Maven 除了以程序构建能力为特色之外,还提供 Ant 所缺少的高级项目管理工具。由于 Maven 的缺省构建规则有较高的可重用性,所以常常用两三行 Maven 构建脚本就可以构建简单的项目,而使用 Ant 则需要十几行。事实上,由于 Maven 的面向项目的方法,许多 Apache Jakarta 项目发文时使用 Maven,而且公司项目采用 Maven 的比例在持续增长。

  三、如何制作eclipse的storm开发环境
  1)、安装Homebrew和Maven
  Home的安装非常简单,在官方主页http://brew.sh最下面有安装方法,只需要执行以下命令:
  1. liondeMacBook-Pro:~ lion$ ruby -e "$(curl -fsSL https://raw.github.com/Homebrew/homebrew/go/install)"
复制代码

安装完brew后,只需要下面一句命令就可以轻松的完成Maven安装,安装完成后,执行mvn -v可以查看版本
  1. liondeMacBook-Pro:~ lion$ sudo brew install maven
复制代码

  2)、storm-starter的编译,过程会需要下载一些内容,有些慢

  1. liondeMacBook-Pro:java lion$ pwd   
  2. /Users/lion/Documents/_my_project/java  
  3. liondeMacBook-Pro:java lion$ git clone https://github.com/nathanmarz/storm-starter.git   
  4. liondeMacBook-Pro:java lion$ cd storm-starter/   
  5. liondeMacBook-Pro:storm-starter lion$ mvn -f m2-pom.xml package
复制代码

       3)、打开eclipse,新建一个maven项目,并修改pom.xml,pom.xml文件内容如下:
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3.     <modelVersion>4.0.0</modelVersion>
  4.   
  5.     <groupId>storm.starter</groupId>
  6.     <artifactId>storm-starter</artifactId>
  7.     <version>0.0.1-SNAPSHOT</version>
  8.     <packaging>jar</packaging>
  9.   
  10.     <properties>
  11.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12.     </properties>
  13.   
  14.     <repositories>
  15.         <repository>
  16.             <id>github-releases</id>
  17.             <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
  18.         </repository>
  19.         <repository>
  20.             <id>clojars.org</id>
  21.             <url>http://clojars.org/repo</url>
  22.         </repository>
  23.     </repositories>
  24.   
  25.     <dependencies>
  26.         <dependency>
  27.             <groupId>junit</groupId>
  28.             <artifactId>junit</artifactId>
  29.             <version>4.11</version>
  30.             <scope>test</scope>
  31.         </dependency>
  32.         <dependency>
  33.             <groupId>storm</groupId>
  34.             <artifactId>storm</artifactId>
  35.             <version>0.9.0.1</version>
  36.             <!-- keep storm out of the jar-with-dependencies -->
  37.             <scope>provided</scope>
  38.         </dependency>
  39.         <dependency>
  40.             <groupId>commons-collections</groupId>
  41.             <artifactId>commons-collections</artifactId>
  42.             <version>3.2.1</version>
  43.         </dependency>
  44.     </dependencies>
  45. </project>
复制代码

 4)、在Eclipse中编写测试程序,刚才我们编译storm-start,会在storm-start里产生一个文件夹 target,里面有一个storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar,这个时候可以导入到项目中。

6.png


  WordCountTopology.java

  1. package storm.starter;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4.   
  5. import storm.starter.RandomSentenceSpout;
  6. import backtype.storm.Config;
  7. import backtype.storm.LocalCluster;
  8. import backtype.storm.StormSubmitter;
  9. import backtype.storm.topology.BasicOutputCollector;
  10. import backtype.storm.topology.OutputFieldsDeclarer;
  11. import backtype.storm.topology.TopologyBuilder;
  12. import backtype.storm.topology.base.BaseBasicBolt;
  13. import backtype.storm.tuple.Fields;
  14. import backtype.storm.tuple.Tuple;
  15. import backtype.storm.tuple.Values;
  16. /**
  17. * This topology demonstrates Storm's stream groupings and multilang
  18. * capabilities.
  19. */
  20. public class WordCountTopology {
  21.     public static class SplitSentence extends BaseBasicBolt {
  22.          
  23.         public void execute(Tuple input, BasicOutputCollector collector) {
  24.             try {
  25.                 String msg = input.getString(0);
  26.                 System.out.println(msg + "-------------------");
  27.                 if (msg != null) {
  28.                     String[] s = msg.split(" ");
  29.                     for (String string : s) {
  30.                         collector.emit(new Values(string));
  31.                     }
  32.                 }
  33.             } catch (Exception e) {
  34.                 e.printStackTrace();
  35.             }
  36.         }
  37.   
  38.         
  39.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  40.             declarer.declare(new Fields("word"));
  41.         }
  42.     }
  43.   
  44.     public static class WordCount extends BaseBasicBolt {
  45.         Map<String, Integer> counts = new HashMap<String, Integer>();
  46.   
  47.         
  48.         public void execute(Tuple tuple, BasicOutputCollector collector) {
  49.             String word = tuple.getString(0);
  50.             Integer count = counts.get(word);
  51.             if (count == null)
  52.                 count = 0;
  53.             count++;
  54.             counts.put(word, count);
  55.             collector.emit(new Values(word, count));
  56.         }
  57.   
  58.       
  59.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  60.             declarer.declare(new Fields("word", "count"));
  61.         }
  62.     }
  63.   
  64.     public static void main(String[] args) throws Exception {
  65.   
  66.         TopologyBuilder builder = new TopologyBuilder();
  67.   
  68.         builder.setSpout("spout", new RandomSentenceSpout(), 5);
  69.   
  70.         builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
  71.         builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split",newFields("word"));
  72.   
  73.         Config conf = new Config();
  74.         conf.setDebug(true);
  75.   
  76.         if (args != null && args.length > 0) {
  77.              /*设置该topology在storm集群中要抢占的资源slot数,一个slot对应这supervisor节点上的以个worker进程
  78.             如果你分配的spot数超过了你的物理节点所拥有的worker数目的话,有可能提交不成功,加入你的集群上面已经有了
  79.             一些topology而现在还剩下2个worker资源,如果你在代码里分配4个给你的topology的话,那么这个topology可以提交
  80.             但是提交以后你会发现并没有运行。 而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
  81.            */
  82.             conf.setNumWorkers(3);
  83.   
  84.             StormSubmitter.submitTopology(args[0], conf,
  85.                     builder.createTopology());
  86.         } else {
  87.             conf.setMaxTaskParallelism(3);
  88.             //指定为本地模式运行
  89.             LocalCluster cluster = new LocalCluster();
  90.             cluster.submitTopology("word-count", conf, builder.createTopology());
  91.   
  92.             Thread.sleep(10000);
  93.   
  94.             cluster.shutdown();
  95.         }
  96.     }
  97. }
复制代码

 RandomSentenceSpout.java
  1. package storm.starter;
  2. import backtype.storm.spout.SpoutOutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.OutputFieldsDeclarer;
  5. import backtype.storm.topology.base.BaseRichSpout;
  6. import backtype.storm.tuple.Fields;
  7. import backtype.storm.tuple.Values;
  8. import backtype.storm.utils.Utils;
  9.   
  10. import java.util.Map;
  11. import java.util.Random;
  12. public class RandomSentenceSpout extends BaseRichSpout {
  13.       
  14.      /**
  15.      * 用来发射数据的工具类
  16.      */
  17.        SpoutOutputCollector _collector;
  18.        Random _rand;
  19.       
  20.       
  21.        /**
  22.           * 这里初始化collector
  23.           */
  24.        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  25.          _collector = collector;
  26.          _rand = new Random();
  27.        }
  28.       
  29.        /**
  30.           * 该方法会在SpoutTracker类中被调用每调用一次就可以向storm集群中发射一条数据(一个tuple元组)
  31.           * 该方法会被不停的调用
  32.           */
  33.        public void nextTuple() {
  34.             
  35.           //模拟等待100ms
  36.          Utils.sleep(100);
  37.          
  38.          //构造随机数据
  39.          String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
  40.              "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
  41.          String sentence = sentences[_rand.nextInt(sentences.length)];
  42.          //调用发射方法
  43.          _collector.emit(new Values(sentence));
  44.        }
  45.       
  46.       
  47.        public void ack(Object id) {
  48.        }
  49.       
  50.       
  51.        public void fail(Object id) {
  52.        }
  53.       
  54.        /**
  55.           * 这里定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
  56.           * 该declarer变量有很大作用,我们还可以调用  declarer.declareStream();  来定义stramId,该id可以用来定义
  57.           * 更加复杂的流拓扑结构
  58.           */
  59.        public void declareOutputFields(OutputFieldsDeclarer declarer) {
  60.          declarer.declare(new Fields("word"));
  61.        }
  62.       
  63.      }
复制代码

在Eclipse中的WordCountTopology.java右键,选择Run As->Java Application,然后可以在Console窗口看到控制台输出的信息如下,信息太多,只截取部分:
  1. an apple a day keeps the doctor away-------------------
  2. 13324 [Thread-20-count] INFO  backtype.storm.daemon.task - Emitting: count default [with, 57]
  3. 13324 [Thread-20-count] INFO  backtype.storm.daemon.executor - Processing received message source: split:5, stream: default, id: {}, [nature]
  4. 13324 [Thread-24-split] INFO  backtype.storm.daemon.task - Emitting: split default [an]
  5. 13324 [Thread-20-count] INFO  backtype.storm.daemon.task - Emitting: count default [nature, 57]
  6. 13324 [Thread-24-split] INFO  backtype.storm.daemon.task - Emitting: split default [apple]
  7. 13324 [Thread-20-count] INFO  backtype.storm.daemon.executor - Processing received message source: split:5, stream: default, id: {}, [an]
  8. 13324 [Thread-24-split] INFO  backtype.storm.daemon.task - Emitting: split default [a]
  9. 13324 [Thread-20-count] INFO  backtype.storm.daemon.task - Emitting: count default [an, 44]
复制代码

  通过这个例子你可以了解到storm的开发过程。


storm.starter.zip (16.91 KB, 下载次数: 91)

已有(4)人评论

跳转到指定楼层
hb1984 发表于 2014-8-26 15:34:06
谢谢楼主分享。               
回复

使用道具 举报

shawl84 发表于 2014-12-28 16:37:01

请教下
为什么书上的单机版是需要 zookeeper+zeromq jzmq的,
而你可以用eclipse直接用?
能解释下吗?因为你的方法看上去更简单。
回复

使用道具 举报

ainubis 发表于 2015-3-29 01:37:19
好东西,支持!
回复

使用道具 举报

ainubis 发表于 2015-3-29 16:19:44
谢谢楼主分享。   
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条