分享

Twitter Storm: DRPC学习

sstutu 发表于 2014-6-14 00:01:57 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 13507
问题导读:
1.Storm为什么引入DRP?
2.是否能根据实例新建DRPC实例?






Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。

DRPC其实不能算是storm本身的一个特性, 它是通过组合storm的原语spout,bolt, topology而成的一种模式(pattern)。本来应该把DRPC单独打成一个包的, 但是DRPC实在是太有用了,所以我们我们把它和storm捆绑在一起。

DRPC例子
新建一个Bolat作为数据处理

  1. /**
  2. * ExclaimBolt.java
  3. * 版权所有(C) 2013
  4. * 创建:cuiran 2013-01-15 17:26:42
  5. */
  6. package com.stormdemo.drpc;
  7. import java.util.Map;
  8. import org.apache.commons.logging.Log;
  9. import org.apache.commons.logging.LogFactory;
  10. import com.stormdemo.demo.DemoTopology;
  11. import junit.framework.TestCase;
  12. import backtype.storm.drpc.LinearDRPCTopologyBuilder;
  13. import backtype.storm.task.TopologyContext;
  14. import backtype.storm.topology.BasicOutputCollector;
  15. import backtype.storm.topology.IBasicBolt;
  16. import backtype.storm.topology.OutputFieldsDeclarer;
  17. import backtype.storm.tuple.Fields;
  18. import backtype.storm.tuple.Tuple;
  19. import backtype.storm.tuple.Values;
  20. /**
  21. * TODO
  22. * @author cuiran
  23. * @version TODO
  24. */
  25. public class ExclaimBolt   implements IBasicBolt {
  26.         private static Log log = LogFactory.getLog(ExclaimBolt.class.getName());
  27.        
  28.         /* (non-Javadoc)
  29.          * @see backtype.storm.topology.IBasicBolt#cleanup()
  30.          */
  31.         @Override
  32.         public void cleanup() {
  33.                 // TODO Auto-generated method stub
  34.         }
  35.         /* (non-Javadoc)
  36.          * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
  37.          */
  38.         @Override
  39.         public void execute(Tuple tuple, BasicOutputCollector collector) {
  40.                 // TODO Auto-generated method stub
  41.                 log.debug("处理数据");
  42.                 String input = tuple.getString(1);      
  43.                 log.debug("接收到的数据为:"+input);
  44.                 collector.emit(new Values(tuple.getValue(0), input + "!"));
  45.         }
  46.         /* (non-Javadoc)
  47.          * @see backtype.storm.topology.IBasicBolt#prepare(java.util.Map, backtype.storm.task.TopologyContext)
  48.          */
  49.         @Override
  50.         public void prepare(Map stormConf, TopologyContext context) {
  51.                 // TODO Auto-generated method stub
  52.         }
  53.         /* (non-Javadoc)
  54.          * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
  55.          */
  56.         @Override
  57.         public void declareOutputFields(OutputFieldsDeclarer declarer) {
  58.                 // TODO Auto-generated method stub
  59.                  declarer.declare(new Fields("id", "result"));
  60.         }
  61.         /* (non-Javadoc)
  62.          * @see backtype.storm.topology.IComponent#getComponentConfiguration()
  63.          */
  64.         @Override
  65.         public Map<String, Object> getComponentConfiguration() {
  66.                 // TODO Auto-generated method stub
  67.                 return null;
  68.         }
  69.        
  70.        
  71.        
  72. }
复制代码



然后写一个测试类


  1. /**
  2. * DRPCTest.java
  3. * 版权所有(C) 2013
  4. * 创建:cuiran 2013-01-15 17:25:37
  5. */
  6. package com.stormdemo.drpc;
  7. import org.apache.commons.logging.Log;
  8. import org.apache.commons.logging.LogFactory;
  9. import backtype.storm.Config;
  10. import backtype.storm.LocalCluster;
  11. import backtype.storm.LocalDRPC;
  12. import backtype.storm.drpc.LinearDRPCTopologyBuilder;
  13. import backtype.storm.topology.TopologyBuilder;
  14. import junit.framework.TestCase;
  15. /**
  16. * TODO
  17. * @author cuiran
  18. * @version TODO
  19. */
  20. public class DRPCTest extends TestCase {
  21.        
  22.         private static Log log = LogFactory.getLog(DRPCTest.class.getName());
  23.        
  24.         public void testDrpc(){
  25.                 log.debug("testDrpc开始");
  26.                 Config conf = new Config();
  27.                 conf.setDebug(true);
  28.                 conf.setNumWorkers(2);
  29.                 conf.setMaxSpoutPending(1);
  30.                 LocalDRPC drpc=new LocalDRPC();
  31.                 LocalCluster cluster=new LocalCluster();
  32.                 LinearDRPCTopologyBuilder builder  = new LinearDRPCTopologyBuilder("exclamation");
  33.                 builder.addBolt(new ExclaimBolt(), 3);
  34.                
  35.                 cluster.submitTopology("DRPCTest", conf, builder.createLocalTopology(drpc));
  36.                
  37.                 log.debug("传入参数返回的结果:"+drpc.execute("exclamation", "hello"));
  38.                
  39.                
  40.                 cluster.shutdown();
  41.                 drpc.shutdown();
  42.         }
  43. }
复制代码


运行结果如图:

1358313090_1498.png







欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

已有(3)人评论

跳转到指定楼层
Joker 发表于 2014-11-27 21:39:37
版主你好,你看下我这样理解DRPC程序对吗
1在创建new LinearDRPCTopologyBuilder("exclamation")对象,不用去实现exclamation方法,它只是作为一个DRPC方法名而已,但是我看别人要实现这个DRPC函数,也就是参数名字的函数


2.声明的第一个bolt会接收一个两维tuple指的是
  1. drpc.execute("exclamation", "hello")
复制代码

这段代码

还是
  1. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  2.                 declarer.declare(new Fields("id", "result"));
  3.         }
复制代码
这段代码

3.返回最后的结果通过这段代码?
  1. collector.emit(new Values(tuple.getValue(0), input + "!"));
复制代码


假设我有很多的bolt,那么我在topology中如下设定中间的bolt
  1. builder.addBolt(new myBolt(), 2)
  2.         .fieldsGrouping(new Fields("id")); //这个id和bolt定义的id一致
复制代码

到最后一个bolt时候才使用你这个例子中用的bolt形式吗?传递方法名和参数,方法名必须和
new LinearDRPCTopologyBuilder("exclamation")这个参数名字一致

希望版主能回复我

回复

使用道具 举报

desehawk 发表于 2014-11-27 22:35:38
Joker 发表于 2014-11-27 21:39
版主你好,你看下我这样理解DRPC程序对吗
1在创建new LinearDRPCTopologyBuilder("exclamation")对象,不 ...
declareOutputFields 方法,告诉Storm 集群Spout 发送了哪些字段

public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("id", "result"));
        }


exclamation 这只是个参数,并非函数

第三个确实是返回,后面多了叹号(!)

来源http://www.aboutyun.com/thread-10316-1-1.html

楼主不要拘泥于语句
这本书你可以看看
相信对有帮助:
链接:http://pan.baidu.com/s/1hq6Aci4 密码:y21h

回复

使用道具 举报

邓立辉 发表于 2015-10-19 21:06:34
版主,我在集群放部署topology。每次调用一次topology跑到一半就报错。求指导 work里报的日志
2015-10-19T20:55:54.455+0800 o.a.z.ClientCnxn [INFO] Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
2015-10-19T20:55:54.556+0800 o.a.z.ClientCnxn [INFO] Opening socket connection to server 2.2.2.12/2.2.2.12:2181. Will not attempt to authenticate using SASL (unknown error)
2015-10-19T20:55:54.556+0800 o.a.z.ClientCnxn [INFO] Socket connection established to 2.2.2.12/2.2.2.12:2181, initiating session
2015-10-19T20:55:54.557+0800 o.a.z.ClientCnxn [WARN] Session 0x0 for server 2.2.2.12/2.2.2.12:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_60]
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_60]
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_60]
        at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_60]
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_60]
        at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68) ~[stormjar.jar:na]
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366) ~[stormjar.jar:na]
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) ~[stormjar.jar:na]
2015-10-19T20:55:54.560+0800 o.a.z.ClientCnxn [INFO] Opening socket connection to server 2.2.2.12/2.2.2.12:2181. Will not attempt to authenticate using SASL (unknown error)
2015-10-19T20:55:54.560+0800 o.a.z.ClientCnxn [INFO] Socket connection established to 2.2.2.12/2.2.2.12:2181, initiating session
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条