分享

Storm0.8.1的(mapreduce) Spout/Blot编程实例实例详解

hyj 发表于 2014-6-11 15:04:22 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 11 79092
问题导读:
1.storm中Topology是什么?
2.storm中Topology与Spout、Bolot的关系是什么?
3.storm的编程大致包含几个步骤?
4.storm通过哪个类读取hdfs数据?
5.Bolt类的作用是什么?







1.编程模型
我们知道hadoop有mapreduce编程模型,那么与之对应的storm的编程模型是什么那,下面我们详细介绍一下。

在storm的编程模型中,主要有三个组件,一个是Topology,这个Topology是一个由多个计算节点构成的拓扑图。结算节点分为两种,一种是Spout,一种是Blot。这些Spout和Bolt构成下图所示的一个多节点的有向图。(数据从左向右流动)。整个图是一个Topology,其中有2个Spout和4个Bolot组成。

20140113142119859.jpg




Spout是Storm的Topoloy的入口,数据流都是从Spout进入topology来进行处理。他负责从外部接收数据,处理,然后发射出去。每个Topology至少要有一个Spout

Bolt是数据处理单元,他可以从一个或者多个其他的计算节点接收数据,处理并发射出去。他可以从Spout接收数据,也可以从其他的Bolt接收数据。Bolt数量可以为0。

Spout的数据发射给谁,Bolt从谁接收数据这些将不会再Spout和Bolt类中声明,在组装Topology的时候说明。

2.编程步骤
我们要编写一个完整的storm的程序,需要经过四步

编写一个或者多个Spout
编写一个或者多个Bolt
编写一个带Main函数的然后将其组装成为一个Topology
最后提交给storm集群运行

3.详细实现

4.1需求
假设我们会不断的将一些移动用户上网日志记录文件存储到HDFS的某个目录下,要求我们能够快速的将新到达的文件解析成我们需要的结构,存储到HBase数据库的上网日志表中,并且对每个用户上网记录的次数做统计,超出门限值后存储到Hbase的超门限信息表中。

4.2具体实现
通过分析,我们采用storm来实现该需求,首先我们设计出我们的处理架构图如下所示

storm.png


由以上设计,我们需要实现一个Spout类(com.ygc.mobilenet.MobileNetLogAnalyseSpout)负责从HDFS读取数据,初步处理然后发射(emit)出去。我们要实现三个Bolt类,一个Bolt类(com.ygc.mobilenet.MobileNetLogSaveBolt)负责获取每条记录,并存储到HBase数据库中。一个Bolt类(com.inspur.mobilenet.MobileNetLogStatisticsBolt)负责按手机号在时间窗口内统计上网记录数。一个Bolt类(com.inspur.mobilenet.MobileNetLogThresholdBolt)负责检查每个手机号时间窗口内上网记录数是否超门限,如果超门限则保存到Hbase数据库中。
在实现之前,我们要导入依赖包。我是用maven来管理依赖包的。所以我的maven依赖配置如下所示

  1. <dependencies>
  2.         <dependency>
  3.             <groupId>storm</groupId>
  4.             <artifactId>storm</artifactId>
  5.             <version>0.8.1</version>
  6.         </dependency>
  7.         <dependency>
  8.             <groupId>com.ygc.hadoop</groupId>
  9.             <artifactId>hadoop2client</artifactId>
  10.             <version>1.0-SNAPSHOT</version>
  11.         </dependency>
  12.     </dependencies>
复制代码


我对HBase和HDFS的访问都封装到hadoop2client这个jar包中了,而这个jar包依赖的hadoop和hbase的包在他自己的pom.xml文件中声明,不需要我们自己显式的声明。这是maven的好处,只用声明直接依赖即可。
当然,我们一定要导入storm本身的包才行

以下章节分别描述每个部分具体的实现。

4.2.1Spout实现
4.2.1.1com.ygc.mobilenet.MobileNetLogVal 类
这个是一个辅助类,用来描述读取的数据文件结构的,不需要解释
  1. package com.ygc.mobilenet;
  2. import backtype.storm.tuple.Fields;
  3. /**
  4. * 用来描述要解析的日志内容的辅助类
  5. */
  6. public class MobileNetLogVal {
  7.     //上网开始时间
  8.     public static String START_TIME_FIELD = "START_TIME";
  9.     //上网响应时间
  10.     public static String RESPONSE_TIME_FIELD = "RESPONSE_TIME";
  11.     //响应结束时间
  12.     public static String END_TIME_FIELD = "END_TIME";
  13.     //源设备IP
  14.     public static String SOURCE_DEV_IP_FIELD = "SOURCE_DEV_IP";
  15.     //源用户IP
  16.     public static String SOURCE_USER_IP_FIELD = "SOURCE_USER_IP";
  17.     //源端口
  18.     public static String SOURCE_PORT_FIELD = "SOURCE_PORT";
  19.     //目标设备IP
  20.     public static String DEST_DEV_IP_FIELD = "DEST_DEV_IP";
  21.     //目标用户IP
  22.     public static String DEST_USER_IP_FIELD = "DEST_USER_IP";
  23.     //目标端口
  24.     public static String DEST_PORT_FIELD = "DEST_PORT";
  25.     //IMSI标识设备
  26.     public static String IMSI_FIELD = "IMSI";
  27.     //MSISDN标识手机号码
  28.     public static String MSISDN_FIELD = "MSISDN";
  29.     //移动上网接入点
  30.     public static String APN_FIELD = "APN";
  31.     //用户操作系统。一般是浏览器型号或者程序名
  32.     public static String USER_AGENT_FIELD = "USER_AGENT";
  33.     //目标URL
  34.     public static String URL_FIELD = "URL";
  35.     //目标主机,包含在URL中
  36.     public static String HOST_FIELD = "HOST";
  37.     /**
  38.      * 创建一个Spout要发射的数据结构。
  39.      *
  40.      * @return 返回Spout要发射的数据结构。
  41.      */
  42.     public Fields createFields() {
  43.         return new Fields(
  44.                 START_TIME_FIELD,
  45.                 RESPONSE_TIME_FIELD,
  46.                 END_TIME_FIELD,
  47.                 SOURCE_DEV_IP_FIELD,
  48.                 SOURCE_USER_IP_FIELD,
  49.                 SOURCE_PORT_FIELD,
  50.                 DEST_DEV_IP_FIELD,
  51.                 DEST_USER_IP_FIELD,
  52.                 DEST_PORT_FIELD,
  53.                 IMSI_FIELD,
  54.                 MSISDN_FIELD,
  55.                 APN_FIELD,
  56.                 USER_AGENT_FIELD,
  57.                 URL_FIELD,
  58.                 HOST_FIELD);
  59.     }
  60. }
复制代码

4.2.1.2com.ygc.mobilenet.MobileNetLogAnalyseSpout类
  1. package com.ygc.mobilenet;
  2. import backtype.storm.spout.SpoutOutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.IRichSpout;
  5. import backtype.storm.topology.OutputFieldsDeclarer;
  6. import backtype.storm.tuple.Values;
  7. import backtype.storm.utils.Utils;
  8. import com.ygc.hadoop.hdfs.HDFSClient;
  9. import org.apache.commons.logging.Log;
  10. import org.apache.commons.logging.LogFactory;
  11. import java.io.BufferedReader;
  12. import java.io.File;
  13. import java.util.Map;
  14. /**
  15. * @author inspur research
  16. * @since 2014-01-09
  17. * 接收移动用户上网记录并发射
  18. */
  19. public class MobileNetLogAnalyseSpout implements IRichSpout {
  20.     private SpoutOutputCollector outputCollector;
  21.     private HDFSClient hdfsClient;
  22.     private Log log = LogFactory.getLog(MobileNetLogAnalyseSpout.class);
  23.     //要监控的HDFS文件目录
  24.     private String MONITOR_PATH = "/storm/mobilelog";
  25.     @Override
  26.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  27.         //每个Spout或者Blot要emit数据,必须指定数据的结构。
  28.         outputFieldsDeclarer.declare(new MobileNetLogVal().createFields());
  29.     }
  30.     /**
  31.      * 解析一行数据,转换为后续要处理的数据结构,这里从数据中读取了15列数据
  32.      * @param line 从数据源接收的一行数据
  33.      * @return 格式化好的Values对象。
  34.      */
  35.     public Values createValues(String line) {
  36.         String[] cols = line.split("[|]");
  37.         return new Values(
  38.                 cols[0],
  39.                 cols[1],
  40.                 cols[2],
  41.                 cols[4],
  42.                 cols[6],
  43.                 cols[12],
  44.                 cols[5],
  45.                 cols[7],
  46.                 cols[13],
  47.                 cols[28],
  48.                 cols[29],
  49.                 cols[32],
  50.                 cols[33],
  51.                 cols[30],
  52.                 cols[31]
  53.         );
  54.     }
  55.     @Override
  56.     public Map<String, Object> getComponentConfiguration() {
  57.         return null;
  58.     }
  59.     @Override
  60.     public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  61.         //需要通过SpoutOutputCollector对象来emit数据流和
  62.         this.outputCollector = spoutOutputCollector;
  63.         try {            
  64.             //读取HDFS文件的客户端,自己实现
  65.             hdfsClient = new HDFSClient();
  66.         } catch (Exception e) {
  67.             e.printStackTrace();
  68.         }
  69.     }
  70.     @Override
  71.     public void close() {
  72.     }
  73.     @Override
  74.     public void activate() {
  75.     }
  76.     @Override
  77.     public void deactivate() {
  78.     }
  79.     @Override
  80.     public void nextTuple() {
  81.         Utils.sleep(10000);
  82.         try {
  83.             //该方法从指定的目录中中读取符合条件的文件列表,并随机从中选择一个将其独占并返回文件名
  84.             String lockFileName = hdfsClient.lockFileName(MONITOR_PATH, ".TXT");
  85.             if (lockFileName != null) {
  86.                 //从HDFS中打开已经被独占的文本文件
  87.                 BufferedReader read = hdfsClient.getBufferedReader(lockFileName);
  88.                 String line;
  89.                 int count = 0;
  90.                 while ((line = read.readLine()) != null) {
  91.                     if (line.trim().length() > 0) {
  92.                         count++;                        
  93.                         this.outputCollector.emit(createValues(line));
  94.                     }
  95.                 }
  96.                 log.info("deal file " + lockFileName + " " + count + " rows!");
  97.             }           
  98.         } catch (Exception e) {
  99.             log.error(e);
  100.         }
  101.     }
  102.     @Override
  103.     public void ack(Object o) {
  104.     }
  105.     @Override
  106.     public void fail(Object o) {
  107.     }
  108. }
复制代码



4.2.1.3重点解释

要实现一个Spout类,一般的做法是实现backtype.storm.topology.IRichSpout接口,重点的是要实现下面的几个接口4.2.1.3.1public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) 接口在这个接口里,我们要声明本Spout要发射(emit)的数据的结构,及一个backtype.storm.tuple.Fields对象。这个对象和public void nextTuple()接口中emit的backtype.storm.tuple.Values共同组成了一个元组对象(backtype.storm.tuple.Tuple)供后面接收该数据的Blot使用

4.2.1.3.2public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector)接口
该接口是初始化接口。在这里要将SpoutOutputCollector spoutOutputCollector对象保存下来,供后面的public void nextTuple()接口使用,还可以执行一些其他的操作。例如这里将HDFSClient对象初始化了。

4.2.1.3.3public void nextTuple() 接口该接口实现具体的读取数据源的方法。本例中是从HDFS中读取一个数据文件,并逐行解析,将获取的数据emit出去。emit的对象是通过public Values createValues(String line)方法生成的backtype.storm.tuple.Values对象。该方法从数据源的一行数据中,选取的15个目标值组成一个backtype.storm.tuple.Values对象。这个对象中可以存储不同类型的对象,例如你可以同时将String对象,Long对象存取在一个backtype.storm.tuple.Values中emit出去。实际上只要实现了Storm要求的序列化接口的对象都可以存储在里面。emit该值得时候需要注意,他的内容要和declareOutputFields中声明的backtype.storm.tuple.ields对象相匹配,必须一一对应。他们被共同组成一个backtype.storm.tuple.Tuple元组对象,被后面接收该数据流的对象使用。


4.2.2Bolt实现
4.2.2.1com.ygc.mobilenet.MobileNetLogSaveBolt类
  1. package com.ygc.mobilenet;
  2. import backtype.storm.task.OutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.IRichBolt;
  5. import backtype.storm.topology.OutputFieldsDeclarer;
  6. import backtype.storm.tuple.Tuple;
  7. import com.inspur.hadoop.hbase.HTableClient;
  8. import org.apache.commons.logging.Log;
  9. import org.apache.commons.logging.LogFactory;
  10. import java.util.Map;
  11. /**
  12. * 负责将所有上网记录保存到HBase数据库中
  13. */
  14. public class MobileNetLogSaveBolt implements IRichBolt {
  15.     private OutputCollector outputCollector;
  16.     private HTableClient tableClient;
  17.     private Log log = LogFactory.getLog(MobileNetLogSaveBolt.class);
  18.     @Override
  19.     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  20.         this.outputCollector = outputCollector;
  21.         try {
  22.             //初始化HBase数据库
  23.             tableClient = new HTableClient("192.168.1.230");
  24.         } catch (Exception e) {
  25.             log.error(e);
  26.         }
  27.     }
  28.     @Override
  29.     public void execute(Tuple tuple) {
  30.         try {
  31.             log.info("access tuple " + tuple.getValues()+"files info = " +tuple.getFields().toString());
  32.             saveTupleToHBase(tuple);
  33.             this.outputCollector.emit(tuple, tuple.getValues());
  34.         } catch (Exception e) {
  35.             log.error(e);
  36.         } finally {
  37.             outputCollector.ack(tuple);
  38.         }
  39.     }
  40.     /**
  41.      * 将接收的元组保存到数据库
  42.      * @param tuple  从其他计算节点接收的数据流,这里是从com.ygc.mobilenet.MobileNetLogAnalyseSpout中接收的数据流。注意和该类中声明的backtype.storm.tuple.Fields对象和emit的backtype.storm.tuple.Values对象对应
  43.      * @return
  44.      */
  45.     private boolean saveTupleToHBase(Tuple tuple) {
  46.         try {
  47.             String rowKey = createRowKeyByeTuple(tuple);
  48.             long ts = System.currentTimeMillis();
  49.             tableClient.insertRow("t_mobilenet_log", rowKey, "TIME",
  50.                     MobileNetLogVal.START_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.START_TIME_FIELD));
  51.             tableClient.insertRow("t_mobilenet_log", rowKey, "TIME",
  52.                     MobileNetLogVal.RESPONSE_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.RESPONSE_TIME_FIELD));
  53.             tableClient.insertRow("t_mobilenet_log", rowKey, "TIME",
  54.                     MobileNetLogVal.END_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.END_TIME_FIELD));
  55.             tableClient.insertRow("t_mobilenet_log", rowKey, "SOURCE_INFO",
  56.                     MobileNetLogVal.SOURCE_DEV_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_DEV_IP_FIELD));
  57.             tableClient.insertRow("t_mobilenet_log", rowKey, "SOURCE_INFO",
  58.                     MobileNetLogVal.SOURCE_USER_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_USER_IP_FIELD));
  59.             tableClient.insertRow("t_mobilenet_log", rowKey, "SOURCE_INFO",
  60.                     MobileNetLogVal.SOURCE_PORT_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_PORT_FIELD));
  61.             tableClient.insertRow("t_mobilenet_log", rowKey, "DEST_INFO",
  62.                     MobileNetLogVal.DEST_DEV_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_DEV_IP_FIELD));
  63.             tableClient.insertRow("t_mobilenet_log", rowKey, "DEST_INFO",
  64.                     MobileNetLogVal.DEST_USER_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_USER_IP_FIELD));
  65.             tableClient.insertRow("t_mobilenet_log", rowKey, "DEST_INFO",
  66.                     MobileNetLogVal.DEST_PORT_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_PORT_FIELD));
  67.             tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
  68.                     MobileNetLogVal.APN_FIELD, ts, tuple.getStringByField(MobileNetLogVal.APN_FIELD));
  69.             tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
  70.                     MobileNetLogVal.IMSI_FIELD, ts, tuple.getStringByField(MobileNetLogVal.IMSI_FIELD));
  71.             tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
  72.                     MobileNetLogVal.MSISDN_FIELD, ts, tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD));
  73.             tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
  74.                     MobileNetLogVal.URL_FIELD, ts, tuple.getStringByField(MobileNetLogVal.URL_FIELD));
  75.             tableClient.insertRow("t_mobilenet_log", rowKey, "BASIC_INFO",
  76.                     MobileNetLogVal.HOST_FIELD, ts, tuple.getStringByField(MobileNetLogVal.HOST_FIELD));
  77.         } catch (Exception e) {
  78.             log.error(e);
  79.             return false;
  80.         }
  81.         return true;
  82.     }
  83.     private String createRowKeyByeTuple(Tuple tuple) {
  84.         return tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD)
  85.                 + tuple.getStringByField(MobileNetLogVal.START_TIME_FIELD);
  86.     }
  87.     @Override
  88.     public void cleanup() {
  89.     }
  90.     @Override
  91.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  92.         outputFieldsDeclarer.declare(new MobileNetLogVal().createFields());
  93.     }
  94.     @Override
  95.     public Map<String, Object> getComponentConfiguration() {
  96.         return null;
  97.     }
  98. }
复制代码


需要注意几个方面:

因为这个Bolt处理完了后,将数据原样发射出去。所以这里的public void declareOutputFields方法和com.ygc.mobilenet.MobileNetLogAnalyseSpout中的方法一样。
而在private boolean saveTupleToHBase(Tuple tuple)方法中处理backtype.storm.tuple.Tuple对象的时候主要该对象的内容和前面计算单元(com.ygc.mobilenet.MobileNetLogAnalyseSpout)声明的backtype.storm.tuple.Fields对象和emit的backtype.storm.tuple.Values对象对应。


4.2.2.2com.ygc.mobilenet.MobileNetLogStatisticsBolt
  1. package com.ygc.mobilenet;
  2. import backtype.storm.task.OutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.IRichBolt;
  5. import backtype.storm.topology.OutputFieldsDeclarer;
  6. import backtype.storm.tuple.Fields;
  7. import backtype.storm.tuple.Tuple;
  8. import backtype.storm.tuple.Values;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. /**
  12. * 统计时间窗口内上网记录数。这里没有实现时间窗口。
  13. */
  14. public class MobileNetLogStatisticsBolt implements IRichBolt {
  15.     private OutputCollector outputCollector;
  16.     private Map<String, Integer> requestStatistic = new HashMap<String, Integer>();
  17.     @Override
  18.     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  19.         this.outputCollector = outputCollector;
  20.     }
  21.     @Override
  22.     public void execute(Tuple tuple) {
  23.         String msisdn = tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD);
  24.         Integer count = 1;
  25.         if (requestStatistic.containsKey(msisdn)) {
  26.             count = requestStatistic.get(msisdn) + 1;
  27.         }
  28.         requestStatistic.put(msisdn, count);
  29.         this.outputCollector.emit(tuple, new Values(msisdn, count));
  30.         this.outputCollector.ack(tuple);
  31.     }
  32.     @Override
  33.     public void cleanup() {
  34.     }
  35.     @Override
  36.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  37.         outputFieldsDeclarer.declare(new Fields(MobileNetLogVal.MSISDN_FIELD,"count"));
  38.     }
  39.     @Override
  40.     public Map<String, Object> getComponentConfiguration() {
  41.         return null;
  42.     }
  43. }
复制代码



重点注意:
这里声明的backtype.storm.tuple.Fields包含两个字段MobileNetLogVal.MSISDN_FIELD,"count";
这里emit的backtype.storm.tuple.Values同样对应两个字段,而且一个是String类型一个是Integer类型
这里emit时,第一个字段为原始的acktype.storm.tuple.Tuple,第二个字段才是要emit的backtype.storm.tuple.Values。这里是为了建立源消息和派生消息直接的树形结构,让storm框架能够自动跟踪该消息是否被完整的处理。这是保证storm中的数据可靠性的一个机制。这里先不讨论。你也可以不要第一个参数。那么backtype.storm.tuple.Values被发射出去后,是否被成功处理storm就不管了。

4.2.2.3com.ygc.mobilenet.MobileNetLogThresholdBolt


  1. package com.ygc.mobilenet;
  2. import backtype.storm.task.OutputCollector;
  3. import backtype.storm.task.TopologyContext;
  4. import backtype.storm.topology.IRichBolt;
  5. import backtype.storm.topology.OutputFieldsDeclarer;
  6. import backtype.storm.tuple.Tuple;
  7. import com.inspur.hadoop.hbase.HTableClient;
  8. import org.apache.commons.logging.Log;
  9. import org.apache.commons.logging.LogFactory;
  10. import java.util.Map;
  11. /**
  12. * 负责检查用户上网次数是否门限,如果超门限则保存到HBase数据库中
  13. */
  14. public class MobileNetLogThresholdBolt implements IRichBolt {
  15.     private OutputCollector outputCollector;
  16.     //操作HBase数据库的客户端
  17.     private HTableClient tableClient;
  18.     private Log log = LogFactory.getLog(MobileNetLogThresholdBolt.class);
  19.     @Override
  20.     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  21.         this.outputCollector = outputCollector;
  22.         try {
  23.             tableClient = new HTableClient("192.168.1.230");
  24.         } catch (Exception e) {
  25.             log.error(e);
  26.         }
  27.     }
  28.     @Override
  29.     public void execute(Tuple tuple) {
  30.         log.info("deal data " + tuple.getString(0) + "=" + tuple.getInteger(1));
  31.         if (tuple.getInteger(1) > 2) {
  32.             try {
  33.                 tableClient.insertRow("t_mobilenet_threshold", tuple.getString(0), "STAT_INFO", "COUNT", String.valueOf(tuple.getInteger(1)));
  34.             } catch (Exception e) {
  35.                 log.error(e);
  36.             }
  37.         }
  38.         this.outputCollector.emit(tuple, tuple.getValues());
  39.         this.outputCollector.ack(tuple);
  40.     }
  41.     @Override
  42.     public void cleanup() {
  43.     }
  44.     @Override
  45.     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  46.     }
  47.     @Override
  48.     public Map<String, Object> getComponentConfiguration() {
  49.         return null;
  50.     }
  51. }
复制代码


重点解释:

只要注意在从元组中取数的时候,tuple.getString(0)和tuple.getInteger(1)分别对应了com.inspur.mobilenet.MobileNetLogStatisticsBolt对象中发射的MobileNetLogVal.MSISDN_FIELD,"count"字段对应即可。在com.inspur.mobilenet.MobileNetLogStatisticsBolt中我们取元组数据的时候是通过Field的名字来取的,这里是通过序号来取的。

4.2.3组装实现
现在,我们开发的组件需要组装成Topology了这个是在一个带Main函数的类里面实现的。从上面的设计图看,结构应该是com.ygc.mobilenet.MobileNetLogAnalyseSpout作为该Topology读取数据的源,他发射的数据分别被com.ygc.mobilenet.MobileNetLogSaveBolt、com.ygc.mobilenet.MobileNetLogStatisticsBolt对象接收。而com.ygc.mobilenet.MobileNetLogStatisticsBolt对象发射的数据流被com.ygc.mobilenet.MobileNetLogThresholdBolt对象接收。所以组装Topology的代码如下所示
  1. package com.ygc.mobilenet;
  2. import backtype.storm.Config;
  3. import backtype.storm.StormSubmitter;
  4. import backtype.storm.topology.TopologyBuilder;
  5. import backtype.storm.tuple.Fields;
  6. /**
  7. * 组装storm的Topology的代码
  8. */
  9. public class MobileNetLogTopology {
  10.     public static void main(String[] args) {
  11.         if (args.length > 0) {
  12.             int worknum = Integer.parseInt(args[0]);
  13.             int execunum = Integer.parseInt(args[1]);
  14.             TopologyBuilder builder = new TopologyBuilder();
  15.             builder.setSpout("analyseMobileNetlog", new MobileNetLogAnalyseSpout(), execunum);
  16.             builder.setBolt("saveMobileNetlog", new MobileNetLogSaveBolt(), execunum).shuffleGrouping("analyseMobileNetlog");
  17.             builder.setBolt("countMobileNetlog", new MobileNetLogStatisticsBolt(), execunum).fieldsGrouping("analyseMobileNetlog", new Fields(MobileNetLogVal.MSISDN_FIELD));
  18.             builder.setBolt("thresholdMobileNetlog", new MobileNetLogThresholdBolt(), execunum).shuffleGrouping("countMobileNetlog");
  19.             Config conf = new Config();
  20.             conf.setNumWorkers(worknum);
  21.             conf.setMaxSpoutPending(5000);
  22.             try {
  23.                 StormSubmitter.submitTopology("mobilenetlogtopology", conf, builder.createTopology());
  24.             } catch (Exception e) {
  25.                 e.printStackTrace();
  26.             }
  27.         }
  28.     }
  29. }
复制代码



  • builder.setSpout("analyseMobileNetlog", new MobileNetLogAnalyseSpout(), execunum);设置Topology的Spout是谁。可以设置多个,其中第一个参数是该计算节点的名称,在同一个Topology中必须保持唯一性。第三个参数是设置该计算单元执行者数量的参数,即有多少个线程来执行该计算逻辑。
  • builder.setBolt("saveMobileNetlog", new MobileNetLogSaveBolt(), execunum).shuffleGrouping("analyseMobileNetlog");设置一个Bolt。第一个参数还是该计算节点的名称。必须保持唯一性。第三个节点同样是设置该计算单元执行数量的参数。shuffleGrouping("analyseMobileNetlog")方法,表示该Bolt计算单元要随机从ID为"analyseMobileNetlog"的计算节点,即前面设置的Spout对象中接收数据。
  • builder.setBolt("countMobileNetlog", new MobileNetLogStatisticsBolt(), execunum).fieldsGrouping("analyseMobileNetlog", new Fields(MobileNetLogVal.MSISDN_FIELD));设置第二个Bolt。因为是在内存中对手机号码做上网记录的统计,所以相同手机号的数据需要发送到同一个MobileNetLogStatisticsBolt的线程来处理,所以这里使用了一个fieldsGrouping("analyseMobileNetlog", new Fields(MobileNetLogVal.MSISDN_FIELD))的方式来分发数据。
  • builder.setBolt("thresholdMobileNetlog", new MobileNetLogThresholdBolt(), execunum).shuffleGrouping("countMobileNetlog");这个没有新鲜内容。



在组装Topology的过程中涉及到一个Stream Grouping的概念,这个概念就是Topology的各个计算单元之间数据流分发的一个策略。不同的策略应用在不同的场景下。例如上面第三节,要求某个字段(手机号)相同的数据必须发送到后续计算节点同一个任务中。Stream Grouping默认提供如下的类别

  • shuffleGrouping:是前一个计算单元随机的将数据流发送给后一个计算单元的Task
  • fieldsGrouping:是通过某个字段来识别,将该字段值一样的数据流发送给后一个计算单元的同一个Task。即如果以手机号为fieldsGrouping的依据,前一次将手机号为138111111111的数据发送Task1,则下一次的138111111111的数据不会发给其他的Task来处理。
  • All grouping:将数据流发送给所有后一个计算节点的Task,即如果后一个计算节点有5个Task,这5个Task都将接收到所有的数据。我现在想不到这个用在什么场景下。
  • Global grouping:这个意思是说数据流将被消费他的Bolt的任务号最小的任务接收。
  • None grouping:这个意思是你不关系数据如何被分发,实际上,他等同于shuffleGrouping。
  • Direct grouping:这个是说让生成元组的对象自己决定消费这个元组的计算单元的哪个任务来接收这个元组。他在发射的时候必须用emitDirect方法来发射,并指明taskId。




4.2.4提交执行
提交这个storm的Topology任务之前,要将以上代码需要的相关jar中和storm0.8.1无关的其他jar包全部打到一个jar中,例如上述相关的就有自己实现的HDFSClient、HTableClient以及他们相关的Hadoop、Hbase依赖包都打包到同一个stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar文件中。这里可以使用maven的装配插件来完成这个打包的工作

  1.     <build>
  2.         <plugins>
  3.             <plugin>
  4.                 <artifactId>maven-assembly-plugin</artifactId>
  5.                 <configuration>
  6.                     <descriptorRefs>
  7.                         <descriptorRef>jar-with-dependencies</descriptorRef>
  8.                     </descriptorRefs>                    
  9.                 </configuration>
  10.             </plugin>
  11.         </plugins>
  12.     </build>
复制代码


这个组装插件可以将你的项目依赖的所有jar包解压和你的class一起打包到你的jar文件中。当然,里面也会有一些坑。例如多个jar包中有同一个配置文件互相覆盖导致的问题我就遇到过。

打包好了后,将该jar文件上传到storm的nimbus进程所在的服务器上,执行如下的命令
storm jar stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.ygc.mobilenet.MobileNetLogTopology 16 4
其中16、4是com.ygc.mobilenet.MobileNetLogTopology对象要读取的的命令行参数。分别用户用来设置执行该Topology的worker的数量和executors的数量。这两个数量分别设置了这个Topoploy要执行的进程数和每个计算单元执行的线程数。
如果成功,会显示如下的信息

  1. 0    [main] INFO  backtype.storm.StormSubmitter  - Jar not uploaded to master yet. Submitting jar...
  2. 10   [main] INFO  backtype.storm.StormSubmitter  - Uploading topology jar stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar to assigned location: /home/storm/stormlocale/nimbus/inbox/stormjar-a3bdefb1-6ec5-40ab-8153-6baab889b043.jar
  3. 593  [main] INFO  backtype.storm.StormSubmitter  - Successfully uploaded topology jar to assigned location: /home/storm/stormlocale/nimbus/inbox/stormjar-a3bdefb1-6ec5-40ab-8153-6baab889b043.jar
  4. 593  [main] INFO  backtype.storm.StormSubmitter  - Submitting topology mobilenetlogtopology in distributed mode with conf {"topology.workers":16,"topology.max.spout.pending":5000}
  5. 719  [main] INFO  backtype.storm.StormSubmitter  - Finished submitting topology: mobilenetlogtopology
复制代码




在Storm UI界面上将看到如下的界面

stormui.jpg


4.2.5执行效果4.2.5.1查看Storm UI效果
现在我们将几个要处理的上网日志文件上传到HDFS的/storm/mobilelist文件夹中。等一会刷新storm UI界面,点击上图中的mobilenetlogtopology链接,可以看到如下的画面。里面是该Topology运行的详情

storm2.jpg


4.2.5.2查看HBase数据在HBase的HMaster上执行

hbase shell

进入HBase交互界面,执行如下的命令

scan 't_mobilenet_log'


storm3.jpg


另一个表的数据就不看了。到这里一个完整的Storm的流计算应用就算是完成了,希望对大家有用。如果有不对的地方也请指正。我也是刚刚接触。理解不到位的地方还有不少。希望一起讨论。




已有(11)人评论

跳转到指定楼层
junzi234 发表于 2014-6-11 16:01:03
好文档,收下了。
回复

使用道具 举报

hahaxixi 发表于 2014-11-12 16:34:08
非常好的文档,先收下,稍后在看!!
回复

使用道具 举报

尘世随缘 发表于 2015-3-28 21:33:48
正在学习storm,看了这个非常有收货。但是我不理解的是,你写了3个bolt,我感觉1个bolt也能实现此类的需求,为什么要3个呢?希望能答复。
回复

使用道具 举报

nettman 发表于 2015-3-28 21:48:05
尘世随缘 发表于 2015-3-28 21:33
正在学习storm,看了这个非常有收货。但是我不理解的是,你写了3个bolt,我感觉1个bolt也能实现此类的需求 ...

每一个bolt都有能实现一个功能。如果放到一个bolt也可以。
但是就如同不同的函数有不同的功能,你也可以都放到一个函数里来完成两个功能。

在单个程序中,这是不明显的,但是如果从项目的角度来看,他们就有所区别了
回复

使用道具 举报

尘世随缘 发表于 2015-3-28 22:03:34
谢谢提醒,后面看了些介绍现在明白了。其实在一个bolt里面实现是没有问题的,但是就体现不出分布式的优势了,单节点处理效率有限(如CPU,内存都是有限的),多个bolt可以将这些硬件资源共享,以并行处理来提升整体处理性能。
回复

使用道具 举报

ainubis 发表于 2015-3-29 16:22:31
谢谢楼主分享。   
回复

使用道具 举报

fisherhe 发表于 2015-6-26 09:15:15
hadoop2client的代码能分享下吗
回复

使用道具 举报

hapjin 发表于 2015-7-30 13:04:43
请问下楼主,关于依赖jar包的问题:
------------------------------------------
例如上述相关的就有自己实现的HDFSClient、HTableClient以及他们相关的Hadoop、Hbase依赖包都打包到同一个stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar文件中。
-------------------------------------------
这个包含了Hadoop、HBase依赖包的 stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar 文件里面的内容全部都是编译好的 .class 文件?还是 既有 .class 文件(Spout、Bolt 编译得到),也有 .jar 文件( Hadoop依赖的 .jar)?


回复

使用道具 举报

hapjin 发表于 2015-7-30 13:06:21
可以把 Hadoop依赖.jar(Hadoop、HBase的依赖包) 文件直接放到 storm 安装目录下的 /lib 目录下吧???
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条