立即注册 登录
About云-梭伦科技 返回首页

muyannian的个人空间 https://www.aboutyun.com/?440 [收藏] [复制] [分享] [RSS]

日志

如何使用directGrouping

已有 2189 次阅读2014-12-28 17:25

如何使用directGrouping
  1. 申明流是direct Stream
    spout/bolt的declareOutputFields中
    outputFieldsDeclarer.declare(/direct/true, new Fields("word"));
  2. 使用emitDirect来发送数据
    spout/bolt的nextTuple/execute中
    collector.emitDirect(/taskId/getWordCountIndex(word),new Values(word));
    注意:因为第一个参数是taskid,首先需要在open/prepare里面获取下游bolt的taskid列表
    topologyContext.getComponentTasks("word-counter")
  3. topology定义中指定连接方式为directGrouping
    builder.setBolt("word-counter", new WordCounter(), 3).directGrouping("spout");
如何利用Storm提供的可靠性保证
  1. 在spout中,用SpoutOutputCollector调用emit的时候,必须指定messageId。
    一个messageId对应一个tuple树。
    这样,在某个tuple树全部被处理完后,spout的回调函数ack会被调用,ack的参数是那个tuple树对应的messageId。
  2. 在每一个bolt中,用OutputCollector调用emit的时候,需要指定源tuple,并且在emit之后要调用ack。
    如果该bolt继承自BaseBasicBolt,则自动完成上述操作。
问题集锦Problem1

问题
Spout:emit出去一个map,下游Bolt:有时候读到空的map,有时候读到有内容的map
原因
emit出去一个map,就调用pruneData函数把map给clear了。
因为本地模式在同一个虚拟机下,storm并没有把这个map深拷贝到tuple中保存,Bolt读到的map和上游Spout发送的map是来自同一块内存。
这样,就出现了一个竞争条件:如果Spout线程先clear,下游Bolt才接到的话,读到的就是空map;如果先接到,Spout再clear,读到的就是有内容的map。
解决
pruneData不clear这个map了,而是new一个新的map。让垃圾回收器去把不用的map回收。

Problem2

问题
Trident中,使用each和partitionAggregate函数,下游接收到空的tuple
原因
这和官方Documentation不符,原因不详
解决
不管是each,还是partitionAggregate函数,必须指定第一个参数(即Fields),表明输入的值列表。

Probelm3

问题
Trident中,一旦加上partitionPersist函数,就会报错

java.io.NotSerializableException: com.aliyun.aep.storm.binpacking.BinpackingTrident

解决
让BinpackTrident实现Serializable。


路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条