分享

数据流分流和合并

问题导读
1、如何理解分流?
2、怎样定义输出流格式?
3、如何识别出数据的来源?





数据流经常需要分流与合并操作,如下图所示:

1.jpg


请参考示例代码

分流
分流有2钟情况,第一种是,相同的tuple发往下一级不同的bolt, 第二种,分别发送不同的tuple到不同的下级bolt上。

发送相同tuple

其实和普通1v1 发送一模一样,就是有2个或多个bolt接收同一个spout或bolt的数据 举例来说:
  1. SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
  2.                 new SequenceSpout(), spoutParal);
  3. builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
  4.                         SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
  5. builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
  6.                         .shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
复制代码



发送不同的tuple

当发送不同的tuple到不同的下级bolt时, 这个时候,就需要引入stream概念,发送方发送a 消息到接收方A'时使用stream A, 发送b 消息到接收方B'时,使用stream B

在topology提交时:
  1.           builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
  2.                         SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
  3.                 builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
  4.                         SequenceTopologyDef.SPLIT_BOLT_NAME,  // --- 发送方名字
  5.                         SequenceTopologyDef.TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple
  6.                 builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
  7.                         .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
  8.                                 SequenceTopologyDef.CUSTOMER_STREAM_ID);      // --- 接收发送方该stream 的tuple
复制代码



在发送消息时:
  1. public void execute(Tuple tuple, BasicOutputCollector collector) {
  2.      tpsCounter.count();
  3.      Long tupleId = tuple.getLong(0);
  4.      Object obj = tuple.getValue(1);
  5.      if (obj instanceof TradeCustomer) {
  6.          TradeCustomer tradeCustomer = (TradeCustomer)obj;
  7.          Pair trade = tradeCustomer.getTrade();
  8.          Pair customer = tradeCustomer.getCustomer();
  9.             collector.emit(SequenceTopologyDef.TRADE_STREAM_ID,
  10.                     new Values(tupleId, trade));
  11.             collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID,
  12.                     new Values(tupleId, customer));
  13.      }else if (obj != null){
  14.          LOG.info("Unknow type " + obj.getClass().getName());
  15.      }else {
  16.          LOG.info("Nullpointer " );
  17.      }
  18. }
复制代码



定义输出流格式:
  1. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  2.   declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE"));
  3.   declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
  4. }
复制代码



接受消息时,需要判断数据流
  1. if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) {
  2.             customer = pair;
  3.             customerTuple = input;
  4.             tradeTuple = tradeMap.get(tupleId);
  5.             if (tradeTuple == null) {
  6.                 customerMap.put(tupleId, input);
  7.                 return;
  8.             }
  9.             trade = (Pair) tradeTuple.getValue(1);
  10.         }
复制代码



数据流合并
生成topology时

在下面例子中, MergeRecord 同时接收SequenceTopologyDef.TRADE_BOLT_NAME 和SequenceTopologyDef.CUSTOMER_BOLT_NAME 的数据
  1.         builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
  2.                         SequenceTopologyDef.SPLIT_BOLT_NAME,
  3.                         SequenceTopologyDef.TRADE_STREAM_ID);
  4.                 builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
  5.                         .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME,
  6.                                 SequenceTopologyDef.CUSTOMER_STREAM_ID);
  7.                 builder.setBolt(SequenceTopologyDef.MERGE_BOLT_NAME, new MergeRecord(), 1)
  8.                         .shuffleGrouping(SequenceTopologyDef.TRADE_BOLT_NAME)
  9.                         .shuffleGrouping(SequenceTopologyDef.CUSTOMER_BOLT_NAME);
复制代码



发送方

发送的bolt和普通一样,无需特殊处理

接收方

接收方是,区分一下来源component即可识别出数据的来源
  1.         if (input.getSourceComponent().equals(SequenceTopologyDef.CUSTOMER_BOLT_NAME) ) {
  2.             customer = pair;
  3.             customerTuple = input;
  4.             tradeTuple = tradeMap.get(tupleId);
  5.             if (tradeTuple == null) {
  6.                 customerMap.put(tupleId, input);
  7.                 return;
  8.             }
  9.             trade = (Pair) tradeTuple.getValue(1);
  10.         } else if (input.getSourceComponent().equals(SequenceTopologyDef.TRADE_BOLT_NAME)) {
  11.             trade = pair;
  12.             tradeTuple = input;
  13.             customerTuple = customerMap.get(tupleId);
  14.             if (customerTuple == null) {
  15.                 tradeMap.put(tupleId, input);
  16.                 return;
  17.             }
  18.             customer = (Pair) customerTuple.getValue(1);
  19.         }
复制代码

请参考代码:代码

已有(2)人评论

跳转到指定楼层
hb1984 发表于 2014-10-22 16:27:55
谢谢楼主分享。            
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条