如何使用directGrouping
- 申明流是direct Stream
spout/bolt的declareOutputFields中
outputFieldsDeclarer.declare(/direct/true, new Fields("word")); - 使用emitDirect来发送数据
spout/bolt的nextTuple/execute中
collector.emitDirect(/taskId/getWordCountIndex(word),new Values(word));
注意:因为第一个参数是taskid,首先需要在open/prepare里面获取下游bolt的taskid列表
topologyContext.getComponentTasks("word-counter") - topology定义中指定连接方式为directGrouping
builder.setBolt("word-counter", new WordCounter(), 3).directGrouping("spout");
如何利用Storm提供的可靠性保证
- 在spout中,用SpoutOutputCollector调用emit的时候,必须指定messageId。
一个messageId对应一个tuple树。
这样,在某个tuple树全部被处理完后,spout的回调函数ack会被调用,ack的参数是那个tuple树对应的messageId。 - 在每一个bolt中,用OutputCollector调用emit的时候,需要指定源tuple,并且在emit之后要调用ack。
如果该bolt继承自BaseBasicBolt,则自动完成上述操作。
问题集锦Problem1
问题
Spout:emit出去一个map,下游Bolt:有时候读到空的map,有时候读到有内容的map
原因
emit出去一个map,就调用pruneData函数把map给clear了。
因为本地模式在同一个虚拟机下,storm并没有把这个map深拷贝到tuple中保存,Bolt读到的map和上游Spout发送的map是来自同一块内存。
这样,就出现了一个竞争条件:如果Spout线程先clear,下游Bolt才接到的话,读到的就是空map;如果先接到,Spout再clear,读到的就是有内容的map。
解决
pruneData不clear这个map了,而是new一个新的map。让垃圾回收器去把不用的map回收。
Problem2
问题
Trident中,使用each和partitionAggregate函数,下游接收到空的tuple
原因
这和官方Documentation不符,原因不详
解决
不管是each,还是partitionAggregate函数,必须指定第一个参数(即Fields),表明输入的值列表。
Probelm3
问题
Trident中,一旦加上partitionPersist函数,就会报错
java.io.NotSerializableException: com.aliyun.aep.storm.binpacking.BinpackingTrident
解决
让BinpackTrident实现Serializable。