分享

storm开发经验分享

52Pig 发表于 2014-10-18 17:27:20 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 9 21026
阅读导读:
1.storm日志的结构是怎样的?
2.如何使用自定义配置文件?
3.一个“topology”在storm集群中有什么作用?




有一些基本的概念,官方wiki没有专门说明,但是比较重要。这里首先介绍下,以便后文引用。
一个“task”指的是负责运行spout和bolt代码逻辑的单独一个线程;
一个“worker”指的是负责向多个task投递消息的网络监听程序(独立的进程),每“worker”代理的“task”由其自身在初始化时候创建;
一个“topology”在storm集群中会创建多个“worker”(分布在多个节点上)帮助其完成其定义的计算逻辑;

序列化中间结果
在storm中,任何由spout和bolt发射出去的元组都是动态类型,即可以是任何自定义类型。Storm将元组视为动态类型,而非类似hadoop的静态类型,有深层考虑,具体请参看此处。
发射序列化后的自定义类型对象与发射ASCII字符串相比,有很多优势:
I 支持复杂类型的中间结果(比如含有容器类的中间结果);
II 不再需要在发射和接收元组时进行繁冗的字符串操作;
III 代码可读性大大增强;
storm内置使用kryo来序列化自定义类型的对象。它比jdk自身的序列化高效30倍以上,其带来的序列化开销非常地小。以这样微小的代价,换来以上三个好处,非常值得。尝试合并中间结果storm不能单独作为一个个体提供实时的计算服务。向前,它需要一个消息队列的支持,向后则需要一个key-value存储系统的支持(广义上说任何存储系统都可以理解为key-value模式,比如对关系数据库来说,key是主键,value是记录;对文件来说,key是文件名,value是文件内容等等)。此外,由于storm是增量计算,每一条从MQ中取出的消息都是独立的。它基本可以理解为由两部分组成,“身份”信息和值信息。即:{ {身份} {值} }。
其中“身份”信息可映射到一组存储系统中需要更新的key,而值信息则是做具体更新操作时需要的数据样本。
其实,不仅是原始消息,storm任何bolt吐出的中间结果都由这两部分组成。
理解了以上内容,我们来看一个典型的storm程序需要做哪些事情:
1、 从消息队列中取数据
2、 对原始消息处理得到中间结果A1
对中间结果A1进行处理得到中间结果A2
       ……
对中间结果An-1进行处理得到An
3、 根据An的“身份”信息和值信息更新存储系统中的计算结果
从中我们可以看出程序读写存储系统的次数依赖于An的消息数量,而An的数量则依赖于An-1的数量,如果能够合并消息数量,后端bolt处理的压力就会极大的减小,这是一个“金字塔”型的收缩结构。能够合并的中间并需具备相同的“身份”信息。然而,并不是所有的中间结果都是能合并的。
一个中间结果An是否能够合并,决定于下一个task在对An的处理过程中,无论An是否是合并过的结果,都不影响此task处理An过程的正确性。
在“荧光”项目的开发中,我们设计出了一种CombiningBolt可以对开发者透明地进行中间结果合并,只要开发者自己的中间结果类继承了如下的接口:

1.png
并且以类似如下的方式定义自己的topology(将CombiningBolt放在两个计算逻辑bolt之间,并且以中间结果的“身份”信息分组,其中CombiningBolt构造函数可以接受一个int型参数,指明合并多少个中间结果后发送合并后的结果):
2.png
下图显示的是“荧光”中使用CombiningBolt合并中间结果的效果,可以看到CombiningBolt与最后的StoreResultBolt ack的消息数量正好1:10。这将redis压力降低为未合并前的1/10。
3.png
CombiningBolt代码如下所示:
  1. package com.tencent.admonitor.storm;
  2. import java.io.Serializable;
  3. import java.util.ArrayList;
  4. import java.util.Collections;
  5. import java.util.Iterator;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.concurrent.ConcurrentLinkedQueue;
  9. import org.apache.log4j.Logger;
  10. import com.tencent.admonitor.Util.Combinable;
  11. import backtype.storm.task.TopologyContext;
  12. import backtype.storm.topology.BasicOutputCollector;
  13. import backtype.storm.topology.OutputFieldsDeclarer;
  14. import backtype.storm.topology.base.BaseBasicBolt;
  15. import backtype.storm.tuple.Fields;
  16. import backtype.storm.tuple.Tuple;
  17. import backtype.storm.tuple.Values;
  18. import backtype.storm.utils.TimeCacheMap;
  19. import backtype.storm.utils.TimeCacheMap.ExpiredCallback;
  20. /**
  21. * CombiningBolt is used for combining intermediate objects which implement the Combinable
  22. * interface.
  23. *
  24. *
  25. */
  26. public class CombiningBolt extends BaseBasicBolt{
  27.         private static final Logger LOG = Logger.getLogger(CombiningBolt.class);
  28.         private TimeCacheMap<String,Combinable> combiningMap ;
  29.         private ConcurrentLinkedQueue<Object> expiredList;
  30.         private static int DEFAULT_COMBINING_WINDOW = 10;
  31.         private static int DEFAULT_NUM_KEYS = 10000;
  32.         private static int DEFAULT_EXPIRED_SECONDS= 5;
  33.         private static int NUM_BUCKETS = 30;
  34.         
  35.         private int combiningWindow;
  36.         private int numKeys;
  37.         private int expiredSeconds;
  38.         public CombiningBolt(){
  39.                 this(DEFAULT_COMBINING_WINDOW,DEFAULT_NUM_KEYS,DEFAULT_EXPIRED_SECONDS);
  40.         }
  41.         
  42.         public CombiningBolt(int CombiningWindow, int numKeys, int expiredSeconds){
  43.                 this.combiningWindow = CombiningWindow;
  44.                 this.numKeys = numKeys;
  45.                 this.expiredSeconds = expiredSeconds;
  46.                
  47.         }
  48.          @Override
  49.         public void prepare(Map stormConf, TopologyContext context) {
  50.                  expiredList = new ConcurrentLinkedQueue<Object>();
  51.                  ExpiredCallback<String,Combinable> callBack = new CallBackMove<String,Combinable>();
  52.                  combiningMap = new TimeCacheMap(expiredSeconds,NUM_BUCKETS,callBack);
  53.         }
  54.          
  55.         @Override
  56.         public void execute(Tuple input, BasicOutputCollector collector) {
  57.                 String ident = input.getString(0);
  58.                 Combinable combinable = (Combinable)input.getValue(1);
  59.                
  60.                 if(!combiningMap.containsKey(ident)){
  61.                         if(combiningMap.size() >= numKeys){
  62.                                 collector.emit(new Values(ident,combinable));
  63.                         }else{
  64.                                 combiningMap.put(ident, combinable);
  65.                         }
  66.                 }else{
  67.                         Combinable old = combiningMap.get(ident);
  68.                         Combinable new_combinable = old.combine(combinable);
  69.                         if(new_combinable.getCombinedCount() >= combiningWindow){
  70.                                 collector.emit(new Values(ident,new_combinable));
  71.                                 combiningMap.remove(ident);
  72.                         }else{
  73.                                 combiningMap.put(ident, new_combinable);
  74.                         }
  75.                 }
  76.                 int length = expiredList.size();
  77.                 while(length > 0){
  78.                         Combinable e = (Combinable)expiredList.poll();
  79.                         if(e != null)
  80.                                 collector.emit(new Values(e.ident(),e));
  81.                         length--;
  82.                 }
  83.                
  84.         }
  85.         @Override
  86.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  87.                 declarer.declare(new Fields("ident","combinable"));
  88.                
  89.         }
  90.         
  91.         private class CallBackMove<K,V> implements ExpiredCallback<K,V>,Serializable{
  92.         
  93.                 private static final long serialVersionUID = 1L;
  94.                 @Override
  95.                 public void expire(K key, V val) {
  96.                         expiredList.offer(val);
  97.                 }        
  98.         }
  99. }
复制代码
如何使用自定义配置文件?

开发storm应用时可能需要用到自身的配置文件。storm.yaml只能提供给storm自身的配置系统使用。针对后台最广泛使用的ini配置文件,推荐使用ini4j。它是一个轻量的ini配置文件读取器、具有简单易懂的api。
请在spout和bolt的prepare方法中读取配置文件。任何资源类的初始化都需要放到prepare方法中(比如数据库连接,和读取文件索引类等)。

程序部署中的注意事项
I 打包storm程序请不要把依赖打进去,依赖需要单独部署;
II 你需要一种有效的机制将程序的依赖包分发到所有的storm节点上(可上传到nimbus,通过rsync做目录同步);
III 你需要分发的依赖包集合=你的程序所有的依赖包集合 &#8722;  storm依赖包与你的依赖包的交集(否则topology会初始化失败)
IV 在所有supervisor节点上自定义配置文件的内容和路径需要完全一致,路径不一致topology会初始化失败,内容不一致,程序的行为则是未定义的;

如何使用日志系统?如何debug?
在经过一番辛勤劳动之后,你的程序编译成功了。但是,这不代表着它会按照你设定的行为去运行。如何debug成为了一个问题。
最基本也是最原始的思路就是log。在所有你认为可能出错的地方log(尤其是在prepare方法中,这里通常做的是资源类的初始化,如果它们初始化失败,topology则会初始化失败,这样能帮助你快速定位到错误原因)。

Storm默认与log4j集成。你可以再$STORM_HOME/log4j/目录下找log4j的配置文件。通过调整它来控制log4j的行为。

不要急于在真实环境下部署你的程序。想要测试的话,还是首先在Local mode模式下观察吧。如果在local mode下测试的结果是符合预期的,而在真实环境下出错,一般应是非代码的因素造成的。你需要检查节点的资源文件、自定义配置文件、依赖包等等。此外在local mode下,所有日志打印信息是直接输出到屏幕上的,这样方便你看的更加清楚。

下面介绍下storm日志的结构:
所有的日志文件都存放在$STORM_HOME/logs/路径下。

在nimbus节点上:nimbus.log记录的是nimbus启动过程中的输出信息,包括启动时间和各个worker和task初始化过程中打印信息等等。ui.log则记录的storm监控程序启动过程中的输出信息,包括启动时间等等。

在supervisor节点上:supervisor.log记录的则是supervisor的相关启动信息。worker-XXX(一个supervisor节点通常部署了多个worker)记录的是消息传递、和任务执行过程中的输出信息(也就是你代码中的日志打印部分)。storm的设计目标之一是让任务(task)部署对用户透明。这样造成了:当你需要观察一个task的日志输出信息时,你不知道到哪个节点的哪个日志文件去找这个信息。因此,还是强烈建议在local mode下debug你的程序,然后在真实环境做好日志告警。











已有(9)人评论

跳转到指定楼层
韩克拉玛寒 发表于 2014-10-19 09:06:09
楼主辛苦了。
回复

使用道具 举报

hb1984 发表于 2014-10-22 10:57:22
谢谢楼主分享。        
回复

使用道具 举报

loreting 发表于 2014-11-7 17:43:18
提示: 作者被禁止或删除 内容自动屏蔽
回复

使用道具 举报

benwen 发表于 2015-9-17 20:43:25
楼主,您好,请问下,我写了一个wordcount的例子,放在集群上面跑,集群是5个节点,输出结果写入到HDFS上面,写过写到HDFS上面的数据是5份,而且貌似是一个节点一份,请问下楼主如何才能实现分布式呢?是不是还需要做其他的设置?
回复

使用道具 举报

邓立辉 发表于 2015-10-26 17:19:14

谢谢楼主分享。     
回复

使用道具 举报

恋枫缩影 发表于 2015-10-30 00:23:09
不错的东西,动态类型序列化确实很有用
回复

使用道具 举报

hengyi 发表于 2015-11-19 20:13:16
楼主有个问题,CombiningBolt这个bolt中实现的就是类似批处理的处理方式,为什么不直接用Trident中的批处理bolt呢
回复

使用道具 举报

jackman 发表于 2016-10-18 16:48:37
请问storm本地模式下开启多个supervisor,为什么ui上只显示一个supervisor.storm本地模式下开启多个supervisor,为什么ui上只显示一个supervisor.系统是ubuntu14.0.4   local dir已经设置为777权限。data是local dir
QQ图片20161018163449.png
222222.png
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条