分享

storm编程--实战入门

desehawk 发表于 2014-8-24 23:30:31 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 8 18368
问题导读
SimpleSpout类继承哪个类,这个类的作用是什么?
SimpleBolt类继承哪个类,完成什么任务?
Storm程序执行的入口点是哪个类?




本节探讨一下storm具体怎么使用,明白怎么在windows下开发storm程序。
功能描述:实时随机输出一字符串。
在开发前记得导入storm需要的jar包。
1、SimpleSpout类继承BaseRichSpout类,用来产生数据并且向topology里面发出消息:tuple。

  1. package com.ljq.helloword;
  2. import java.util.Map;
  3. import java.util.Random;
  4. import backtype.storm.spout.SpoutOutputCollector;
  5. import backtype.storm.task.TopologyContext;
  6. import backtype.storm.topology.OutputFieldsDeclarer;
  7. import backtype.storm.topology.base.BaseRichSpout;
  8. import backtype.storm.tuple.Fields;
  9. import backtype.storm.tuple.Values;
  10. /**
  11. * Spout起到和外界沟通的作用,他可以从一个数据库中按照某种规则取数据,也可以从分布式队列中取任务
  12. *
  13. * @author Administrator
  14. *
  15. */
  16. @SuppressWarnings("serial")
  17. public class SimpleSpout extends BaseRichSpout{
  18.     //用来发射数据的工具类
  19.     private SpoutOutputCollector collector;
  20.     private static String[] info = new String[]{
  21.         "comaple\t,12424,44w46,654,12424,44w46,654,",
  22.         "lisi\t,435435,6537,12424,44w46,654,",
  23.         "lipeng\t,45735,6757,12424,44w46,654,",
  24.         "hujintao\t,45735,6757,12424,44w46,654,",
  25.         "jiangmin\t,23545,6457,2455,7576,qr44453",
  26.         "beijing\t,435435,6537,12424,44w46,654,",
  27.         "xiaoming\t,46654,8579,w3675,85877,077998,",
  28.         "xiaozhang\t,9789,788,97978,656,345235,09889,",
  29.         "ceo\t,46654,8579,w3675,85877,077998,",
  30.         "cto\t,46654,8579,w3675,85877,077998,",
  31.         "zhansan\t,46654,8579,w3675,85877,077998,"};
  32.    
  33.     Random random=new Random();
  34.    
  35.     /**
  36.      * 初始化collector
  37.      */
  38.     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  39.         this.collector = collector;
  40.     }
  41.    
  42.     /**
  43.      * 在SpoutTracker类中被调用,每调用一次就可以向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用
  44.      */
  45.     @Override
  46.     public void nextTuple() {
  47.         try {
  48.             String msg = info[random.nextInt(11)];
  49.             // 调用发射方法
  50.             collector.emit(new Values(msg));
  51.             // 模拟等待100ms
  52.             Thread.sleep(100);
  53.         } catch (InterruptedException e) {
  54.             e.printStackTrace();
  55.         }
  56.     }
  57.     /**
  58.      * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
  59.      * 该declarer变量有很大作用,我们还可以调用declarer.declareStream();来定义stramId,该id可以用来定义更加复杂的流拓扑结构
  60.      */
  61.     @Override
  62.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  63.         declarer.declare(new Fields("source")); //collector.emit(new Values(msg));参数要对应
  64.     }
  65. }
复制代码

2、SimpleBolt类继承BaseBasicBolt类,处理一个输入tuple。

  1. package com.ljq.helloword;
  2. import backtype.storm.topology.BasicOutputCollector;
  3. import backtype.storm.topology.OutputFieldsDeclarer;
  4. import backtype.storm.topology.base.BaseBasicBolt;
  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Tuple;
  7. import backtype.storm.tuple.Values;
  8. /**
  9. * 接收喷发节点(Spout)发送的数据进行简单的处理后,发射出去。
  10. *
  11. * @author Administrator
  12. *
  13. */
  14. @SuppressWarnings("serial")
  15. public class SimpleBolt extends BaseBasicBolt {
  16.     public void execute(Tuple input, BasicOutputCollector collector) {
  17.         try {
  18.             String msg = input.getString(0);
  19.             if (msg != null){
  20.                 //System.out.println("msg="+msg);
  21.                 collector.emit(new Values(msg + "msg is processed!"));
  22.             }
  23.                
  24.         } catch (Exception e) {
  25.             e.printStackTrace();
  26.         }
  27.     }
  28.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  29.         declarer.declare(new Fields("info"));
  30.     }
  31. }
复制代码

3、SimpleTopology类包含一个main函数,是Storm程序执行的入口点,包括一个数据喷发节点spout和一个数据处理节点bolt。
  1. package com.ljq.helloword;
  2. import backtype.storm.Config;
  3. import backtype.storm.LocalCluster;
  4. import backtype.storm.StormSubmitter;
  5. import backtype.storm.topology.TopologyBuilder;
  6. /**
  7. * 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bolt。
  8. *
  9. * @author Administrator
  10. *
  11. */
  12. public class SimpleTopology {
  13.     public static void main(String[] args) {
  14.         try {
  15.             // 实例化TopologyBuilder类。
  16.             TopologyBuilder topologyBuilder = new TopologyBuilder();
  17.             // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
  18.             topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1);
  19.             // 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。
  20.             topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout");
  21.             Config config = new Config();
  22.             config.setDebug(true);
  23.             if (args != null && args.length > 0) {
  24.                 config.setNumWorkers(1);
  25.                 StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
  26.             } else {
  27.                 // 这里是本地模式下运行的启动代码。
  28.                 config.setMaxTaskParallelism(1);
  29.                 LocalCluster cluster = new LocalCluster();
  30.                 cluster.submitTopology("simple", config, topologyBuilder.createTopology());
  31.             }
  32.             
  33.         } catch (Exception e) {
  34.             e.printStackTrace();
  35.         }
  36.     }
  37. }
复制代码

运行结果效果如下:
28171041-b1832fe9917f43bd9ba473c301048a7c.png








已有(8)人评论

跳转到指定楼层
梦回三国 发表于 2014-9-28 15:20:11
我不明白楼主为什么说:“本节探讨一下storm具体怎么使用,明白怎么在windows下开发storm程序。”,我想问一下,楼主的意思是:1. 可以在windows中搭建storm环境,然后开发程序   or    2. 在linux中搭建开发环境,然后像远程连接hadoop一样,在windows中远程连接storm开发程序呢???如果是第2点的话怎么没看到有关IP的配置什么的呢?期待您的解答。
回复

使用道具 举报

sstutu 发表于 2014-9-28 15:35:01
梦回三国 发表于 2014-9-28 15:20
我不明白楼主为什么说:“本节探讨一下storm具体怎么使用,明白怎么在windows下开发storm程序。”,我想问 ...

storm提供cluster和Local两种运行模式
这里是本地模式下运行的启动代码。
回复

使用道具 举报

梦回三国 发表于 2014-9-28 15:49:57
sstutu 发表于 2014-9-28 15:35
storm提供cluster和Local两种运行模式
这里是本地模式下运行的启动代码。

这两种模式我知道,但都不是Windows下开发storm程序吧,我就是这一点不太明确。
回复

使用道具 举报

sstutu 发表于 2014-9-28 16:00:06
梦回三国 发表于 2014-9-28 15:49
这两种模式我知道,但都不是Windows下开发storm程序吧,我就是这一点不太明确。
可以参考这个:
Windows下基于eclipse的Storm应用开发与调试
http://www.aboutyun.com/thread-9338-1-1.html
回复

使用道具 举报

梦回三国 发表于 2014-9-29 16:51:38
sstutu 发表于 2014-9-28 16:00
可以参考这个:
Windows下基于eclipse的Storm应用开发与调试
http://www.aboutyun.com/thread-9338-1-1. ...

真是太感谢了
回复

使用道具 举报

loreting 发表于 2014-11-11 16:27:53
提示: 作者被禁止或删除 内容自动屏蔽
回复

使用道具 举报

ainubis 发表于 2015-3-29 01:42:49
好东西,支持!
回复

使用道具 举报

ainubis 发表于 2015-3-29 16:20:58
谢谢楼主分享。   
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条