分享

storm笔记及示例代码二

pergrand 发表于 2016-8-17 17:05:07 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 6523
storm组件复杂组合
        TopologyBuilder topologyBuilder = new TopologyBuilder();
                topologyBuilder.setSpout("spout1", new MySpout());
               
                //多个组件接收同一个组件的输出
                topologyBuilder.setBolt("bolt1", new MyBolt1()).shuffleGrouping("spout1");
                topologyBuilder.setBolt("bolt2", new MyBolt2()).shuffleGrouping("spout1");
                topologyBuilder.setBolt("bolt3", new MyBolt3()).shuffleGrouping("spout1");
               
                //一个组件接收多个组件的输出数据
                topologyBuilder.setBolt("bolt4", new MyBolt4()).shuffleGrouping("bolt1").shuffleGrouping("bolt2").shuffleGrouping("bolt3");
               
----------------------------------------------------------------------       

hadoop rpc介绍及代码实现
          其他文档

storm drpc分布式远程过程调用
        可以实现跨主机夸进程访问
        本地模式
        远程模式
原理:DRPC服务器与topology在同一主机上,client请求访问链接DRPC服务器,
           然后使用系统内置的DRPCSpout传输数据,然后用户自定义bolt,最后系统内置“ReturnResults”的bolt连接到DRPC服务器,
            将结果返回给client
client->DRPCServer->DRPCSpout->自定义bolt->ReturnResultBolt->DRPCServe->client
         客户端发送功能名称及功能所需参数到DRPC服务器去执行。
          拓扑它使用DRPCSpout从DRPC服务器接收功能调用流。
          每个功能调用通过DRPC服务器使用唯一ID标记,随后拓扑计算结果,
          在拓扑的最后,一个称之为“ReturnResults”的bolt连接到DRPC服务器,
           把结果交给这个功能调用(根据功能调用ID),
           DRPC服务器根据ID找到等待中的客户端,
为等待中的客户端消除阻塞,并发送结果给客户端。
drpc代码实现
/**
* drpc原理:client->DRPCServer->DRPCSpout->自定义bolt->ReturnResultBolt->DRPCServe->client
* 单词拼接的功能,功能名称fun_name
* 用户发送过来一个字符串,返回一个拼接了hello的字符串
*
*
*/
public class LocalDrpcTopology {
        public static class MyBolt extends BaseRichBolt{

                private OutputCollector collector;
                @Override
                public void prepare(Map stormConf, TopologyContext context,
                                OutputCollector collector) {
                        this.collector = collector;
                }

                /**
                 * DRPCSpout为client传来的数据做了封装发射出的输出流是[id, result]格式
                 * tuple中封装了两个参数
                 * 第一个:表示参数的id
                 * 第二个:表示具体的参数内容
                 */
                @Override
                public void execute(Tuple input) {
                        String value = input.getString(1);
                        value = "hello "+value;
                        this.collector.emit(new Values(input.getValue(0),value));//tuple封装了两个所发射出去两个
                }

                @Override
                public void declareOutputFields(OutputFieldsDeclarer declarer) {
                        declarer.declare(new Fields("id","value"));//字段随便命名,ReturnResultBolt根据下标处理
                }               
        }
               
       
        public static void main(String[] args) {
                LinearDRPCTopologyBuilder topologyBuilder = new LinearDRPCTopologyBuilder("fun_name");//drpc对外提供一个功能,名称是fun_name
                topologyBuilder.addBolt(new MyBolt());
                //定义本地集群
                LocalCluster localCluster = new LocalCluster();
                String simpleName = LocalDrpcTopology.class.getSimpleName();
                //创建本地drpc服务端
                ILocalDRPC drpc = new LocalDRPC();
                localCluster.submitTopology(simpleName, new Config(), topologyBuilder.createLocalTopology(drpc));
                                               
                //模拟客户端访问,由于是模拟本地体现不出rpc的跨主机特点;。
                String result = drpc.execute("fun_name", " storm");//第一个参数是drpc的函数名
                System.err.println(result);                                                       
        }

}
                               
               
当创建LinearDRPCTopologyBuilder时,你把这个拓扑的DRPC功能名称告诉storm。
一个DRPC服务器可以协调许多功能,功能名称用于区别不同的功能,
首先声明的bolt将接收一个输入的2-tuples,第一个字段是请求ID,第二个字段是请求参数。
LinearDRPCTopologyBuilder认为最后的bolt会发射一个输出流,该输出流包含[id, result]格式的2-tuples。
最后,所有拓扑中间过程产生的元组(tuple)都包含请求id作为其第一个字段。
       

参考http://blog.chinaunix.net/uid-233938-id-3198826.html

storm优化
并行度:
        一般worker和线程executor的比列1:10~15
worker:
        根据cpu核数和处理的数据,来调整worker数量
        一般一个topology使用12个worker比较合理
        如果多增加,线程之间的内存通信变成进程之间的网络通信
acker:
        如果不需要可靠性可以不跟踪tuple树。
        代码设置
雪崩问题:
        如果spout发射速度大于接受速度,数据累积,消耗内存,最终崩溃
        解决办法:
                增加bolt的处理能力
                代码设置config.setMaxSpoutPending(num);当bolt还有设置的num个tuple没处理时
        spout停止发射,当bolt消耗了之后tuple个数少于num,spout继续发射数据,
        需要开启acker消息确认机制。




storm脚本启动关闭
        1:一键启动集群中所有节点的进程
        提前定义一个文件,文件名叫 supervisorhost 这个文件中保存所有从节点的ip信息,这个文件在storm的bin目录下
        start-all.sh:这个脚本需要在主节点执行,脚本放在storm的bin目录下
        vi supervisorhost
        192.168.1.100
        192.168.1.101

        vi start-all.sh
        #!/bin/bash
        source /etc/profile
        echo "start nimbus and ui on localhost"
        nohup /usr/local/storm/bin/storm nimbus >/dev/null 2>&1 &
        nohup /usr/local/storm/bin/storm ui >/dev/null 2>&1 &
        cat supervisorhost | while read ip
        do
        echo "start supervisor and logviewer on $ip"
        ssh $ip nohup /usr/local/storm/bin/storm supervisor >/dev/null 2>&1 &
        ssh $ip nohup /usr/local/storm/bin/storm logviewer >/dev/null 2>&1 &
        done


        2:实现一键停止脚本
                在主节点写一个stop-all.sh 在storm的bin目录下               
        vi stop-all.sh
        #!/bin/bash
        source /etc/profile
        echo "stop nimbus and ui on localhost"
        kill -9 `ps -ef|grep backtype.storm.daemon.nimbus | awk '{print $2}' | head -1` &
        kill -9 `ps -ef|grep backtype.storm.ui.core | awk '{print $2}' | head -1` &
        cat supervisorhost | while read ip
        do
        echo "stop supervisor and logviewer on $ip"
        ssh $ip /usr/local/storm/bin/stop-supervisor.sh >/dev/null 2>&1 &
        done

        vi stop-supervisor.sh
        #!/bin/bash
        kill -9 `ps -ef|grep backtype.storm.daemon.supervisor | awk '{print $2}' | head -1` &
        kill -9 `ps -ef|grep backtype.storm.daemon.logviewer | awk '{print $2}' | head -1` &
               
        必须配置免密码登录了,通过ssh 启动其他服务器的脚本(no-login方式),如果这个脚本中用到了类似java这种环境变量的话,需要修改对应服务器的 ~/.bashrc文件,
        在这个文件中 添加一行  source /etc/profile 否则会找不到java命令


storm日志
logback和log4j作为两套slf4j-api日志框架的实现
不能共同使用,不然会报日志冲突的错误
解决办法:
如果引入的第三方jar包中包含了log4j相关的包,需要在pom中排除掉
排除之后在本地运行的话可能会没有日志输出,可以单独引用log4j的相关依赖,把scope指定为provided

拓展:
echarts分布图 将数据在前端展现
jstorm 阿里使用Java编写的storm
trident  :对storm的封装
        在storm的基础上进行高层抽象,提供了过滤聚合等适合流式计算的函数
        函数(function)
        过滤器(filter)
        连接(meger)
        流分组(group by)
        聚合(aggregate)


trident 例子:单词计数


public class TridentWordCount {
        public static class MySpout implements IBatchSpout {
            Fields fields;
            HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();
            
            public MySpout(Fields fields) {
                this.fields = fields;
            }
            
            @Override
            public void open(Map conf, TopologyContext context) {
            }

            @Override
            public void emitBatch(long batchId, TridentCollector collector) {
                List<List<Object>> batch = this.batches.get(batchId);
                if(batch == null){
                    batch = new ArrayList<List<Object>>();
                    Collection<File> listFiles = FileUtils.listFiles(new File("d:\\test"), new String[]{"txt"}, true);
                    for (File file : listFiles) {
                            List<String> readLines;
                                        try {
                                                readLines = FileUtils.readLines(file);
                                                for (String line : readLines) {
                                                        batch.add(new Values(line));
                                                }
                                               
                                                FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
                                        } catch (IOException e) {
                                                e.printStackTrace();
                                        }
                    }
                    if(batch.size()>0){
                            this.batches.put(batchId, batch);
                    }
                }
                for(List<Object> list : batch){
                    collector.emit(list);
                }
            }

            @Override
            public void ack(long batchId) {
                this.batches.remove(batchId);
            }

            @Override
            public void close() {
            }

            @Override
            public Map getComponentConfiguration() {
                Config conf = new Config();
                conf.setMaxTaskParallelism(1);
                return conf;
            }

            @Override
            public Fields getOutputFields() {
                return fields;
            }
            
        }
       
        public static class splitBolt extends BaseFunction{

                @Override
                public void execute(TridentTuple tuple, TridentCollector collector) {
                        String line = tuple.getString(0);
                        String[] words = line.split("\t");
                        for (String word : words) {
                                collector.emit(new Values(word));
                        }
                        }               
        }
       
        public static class MyAgge extends BaseAggregator<Map<String, Integer>>{               
                @Override
                public Map<String, Integer> init(Object batchId,
                                TridentCollector collector) {
                        return new HashMap<String,Integer>();
                }
                @Override
                public void aggregate(Map<String, Integer> val, TridentTuple tuple,
                                TridentCollector collector) {
                        String word = tuple.getString(0);
                        Integer integer = val.get(word);
                        if(integer==null){
                                integer=0;
                        }
                        integer++;
                        val.put(word, integer);
                }
                @Override
                public void complete(Map<String, Integer> val,
                                TridentCollector collector) {
                        collector.emit(new Values(val));
                }
               
        }       
        public static class PrintBolt extends BaseFunction{

                HashMap<String, Integer> allMap = new HashMap<String,Integer>();
                @Override
                public void execute(TridentTuple tuple, TridentCollector collector) {
                        Map<String, Integer> map = (Map<String, Integer>)tuple.getValue(0);
                        for (Entry<String, Integer> entry : map.entrySet()) {
                                String key = entry.getKey();
                                Integer value = entry.getValue();
                               
                                Integer integer = allMap.get(key);
                                if(integer==null){
                                        integer=0;
                                }
                                allMap.put(key, value+integer);
                        }
                       
                        Utils.sleep(1000);               
                        for (Entry<String, Integer> entry : allMap.entrySet()) {
                                System.out.println(entry);
                        }                       
                        }               
        }
               
        public static void main(String[] args) {
                TridentTopology tridentTopology = new TridentTopology();
                tridentTopology.newStream("spoutid", new MySpout(new Fields("sentence")))
                        .each(new Fields("sentence"), new splitBolt(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        .aggregate(new Fields("word"), new MyAgge(), new Fields("map"))
                        .each(new Fields("map"), new PrintBolt(),  new Fields(""));
               
                LocalCluster localCluster = new LocalCluster();
                String simpleName = TridentWordCount2.class.getSimpleName();
                localCluster.submitTopology(simpleName, new Config(), tridentTopology.build());
        }

}


已有(1)人评论

跳转到指定楼层
pergrand 发表于 2016-8-17 17:05:41
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条