分享

Storm处理流数据接收数据不完全的问题

梦回三国 发表于 2014-10-16 21:19:14 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 16 37841
本帖最后由 梦回三国 于 2014-10-16 21:52 编辑

想写个简单的Storm处理滑动窗口求和的程序,但是无奈遇到问题,还请各位大神不吝赐教。
现有数据存放在某个文件中,内容如下:
t,k,v
1,0,96
1,1,65
1,2,52
1,3,76
1,4,80
1,6,91
1,33,47
1,8,75
1,26,60
1,93,22
2,0,8
2,1,66
2,2,22
2,6,82
2,75,96
2,14,97
2,16,40
2,51,80
2,20,30
2,21,48
3,0,99
3,1,79
3,2,45
3,3,64
3,5,12
3,72,36
3,12,62
3,19,17
3,23,51
3,93,47
4,0,85
4,1,77
4,2,29
4,3,69
4,4,89
4,5,23
4,6,64
4,7,58
4,32,56
4,22,68
5,0,19
5,1,11
5,2,3
5,3,49
5,4,52
5,5,30
5,7,76
5,8,71
5,53,15
5,33,65
6,0,64
6,1,6
6,2,70
6,3,23
6,4,7
6,7,11
6,42,64
6,15,92
6,52,95
6,26,87
7,0,73
7,1,75
7,6,83
7,7,89
7,10,98
7,11,14
7,44,37
7,13,80
7,14,64
7,21,91
8,0,92
8,1,77
8,2,75
8,3,51
8,4,69
8,5,68
8,6,28
8,7,53
8,51,41
8,26,68
9,0,89
9,1,36
9,4,10
9,33,57
9,11,94
9,13,40
9,14,94
9,15,38
9,19,14
9,25,46
10,0,24
10,1,10
10,2,77
10,35,4
10,6,14
10,7,27
10,8,54
10,42,97
10,3,44
10,26,79

用spout逐行去读取,然后emit提交到bolt中,然后在bolt中进行控制:分别计算t=1,2,3的流的v之和。
现在遇到的问题是当t=2的流数据到达bolt时,t=1的数据还没有全部到达,甚至当t>3的数据到达bolt时,t=1的数据也不一定全部到达bolt。(t为其他值时一样)。我想问,这是为什么呢?Storm不是不会丢失数据吗,怎么数据还不是顺序完全到达的呢?这种情况怎么计算呢?

具体到代码里就是,当NQueueBolt代码emit  list到NPrintBolt代码时,使用List<MyEvent> list = (List<MyEvent>) tuple.getValue(0);得到的list并不是emit提交的全部list的值(我测试时只有一半的值过来了),这是为什么呢?


代码如下:
1. 定义的实体类
package com.test;

import java.io.Serializable;

@SuppressWarnings({ "serial", "unchecked" })
public class MyEvent implements Comparable,Serializable{
        private int time;
    private int key;
    private int value;


    public MyEvent() {
                super();
        }

        public MyEvent(int time, int key, int value) {
        this.time = time;
        this.key=key;
        this.value=value;
    }

    public int getTime(){
            return time;
    }
    public int getKey(){
            return key;
    }
    public int getValue(){
            return value;
    }

        public void setTime(int time) {
                this.time = time;
        }

        public void setKey(int key) {
                this.key = key;
        }

        public void setValue(int value) {
                this.value = value;
        }

        @Override
        public int compareTo(Object o) {
                return this.time-((MyEvent)o).time;
        }

        
}
2.NSpout 类负责从文件中读取数据
package com.test;

import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

@SuppressWarnings("serial")
public class NSpout extends BaseRichSpout{
        // 用来发射数据的工具类
        private SpoutOutputCollector collector;
        private RandomAccessFile fileReader;
        private String file;
        private boolean completed=false;
        
        /**
         * 初始化collector
         */
        @SuppressWarnings("unchecked")
        @Override
        public void open(Map conf, TopologyContext context,
                        SpoutOutputCollector collector) {
                try {
                        file = "/root/experiment/data/data.txt";
                        this.fileReader = new RandomAccessFile(file, "r");
                } catch (FileNotFoundException e) {
                        throw new RuntimeException("Erroring reading file " + file);
                }
                this.collector = collector;
        }
        
        @Override
        public void nextTuple() {
                if(completed){
                        try{
                                Thread.sleep(1000);
                        }catch(InterruptedException e){
                                System.out.println(e);
                        }
                        return;
                }
                String line;
                try {
                        // Read all lines
                        while ((line = fileReader.readLine()) != null) {
                                this.collector.emit(new Values(line));
                        }
                }catch (Exception e) {
                        throw new RuntimeException("Error reading tuple", e);
                }finally{
                        completed = true;
                }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("line"));
        }

}


3. NQueueBolt 负责接收NSpout发出的数据,并且当数据到达一定窗口值时提交给NPrintBolt计算输出。
package com.test;


import java.util.ArrayList;
import java.util.Collections;
import java.util.List;


import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


@SuppressWarnings("serial")
public class NQueueBolt extends BaseBasicBolt{
        public static List<MyEvent> list = new ArrayList<MyEvent>();
        
        @SuppressWarnings("unchecked")
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
                String line = tuple.getString(0);               
                int maxt = WindowTools.max;
                int time = Integer.parseInt(line.split(",")[0]);
                int key = Integer.parseInt(line.split(",")[1]);
                int value = Integer.parseInt(line.split(",")[2]);
               
                if(time > maxt){
                        Collections.sort(list);
                        collector.emit(new Values(list));
                        for(int i=0;i<list.size();i++){  //这里是为了滑动窗口时将以滑动开的数据删除
                                if(list.get(i).getTime()<=WindowTools.max-WindowTools.slide){
                                        list.remove(list.get(i));
                                        i--;
                                }
                        }
                        WindowTools.min += WindowTools.slide;
                        WindowTools.max += WindowTools.slide;
                        WindowTools.num++;
                        try {
                                Thread.sleep(10000);
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                        if (WindowTools.num == 1) {
                                WindowTools.max = 99999999;
                                try {
                                        Thread.sleep(10000);
                                } catch (InterruptedException e) {
                                        e.printStackTrace();
                                }
                                NTopology.cluster.killTopology("simple");
//                                NTopology.cluster.shutdown();
                                long end = System.currentTimeMillis();
                                System.out.println("结束时间:" + end + "ms");
                                System.out.println("查询聚集一个窗口的时间:"
                                                + (end - NTopology.start - 40000) / 3+ "ms");
                        }
                }else{
                        MyEvent event = new MyEvent(time,key,value);
                        list.add(event);
                }
        }


        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("list"));
        }


}

4. NPrintBolt 将一定窗口内的值求和并输出
package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

@SuppressWarnings("serial")
public class NPrintBolt extends BaseBasicBolt{

        @SuppressWarnings("unchecked")
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
                List<MyEvent> list = (List<MyEvent>) tuple.getValue(0);
                Map<Integer, Integer> result = new TreeMap<Integer, Integer>();
                Map<Integer, ArrayList<Integer>> map = new TreeMap<Integer,ArrayList<Integer>>();
                int maxt = list.get(list.size()-1).getTime();
               
                for(int i=0;i<list.size();i++){
                        MyEvent event = list.get(i);
                        if(map.keySet().contains(event.getKey())){
                                map.get(event.getKey()).add(event.getValue());
                        }else{
                                ArrayList<Integer> vlist = new ArrayList<Integer>();
                                vlist.add(list.get(i).getValue());
                                map.put(list.get(i).getKey(), vlist);
                        }
                }
               
                for(int k:map.keySet()){
                        int count = 0;
                        ArrayList<Integer> vlist = map.get(k);
                        for(int i=0;i<vlist.size();i++){
                                count += vlist.get(i);
                        }
                        result.put(k, count);
                }
               
                for(int k:result.keySet()){
                        System.out.println("++++++++++++++++++++++++++++++++++"+maxt+","+k+","+result.get(k));
                }
               
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
               
        }

}
5.主类
package com.test;

import java.util.Scanner;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
* 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bolt。
*/
public class NTopology {
        public static long start = 0;
        public static LocalCluster cluster;
        
        public static void main(String[] args) throws AlreadyAliveException,
                        InvalidTopologyException {
                Scanner s = new Scanner(System.in);
                System.out.print("input window range:");
                WindowTools.max = s.nextInt();
                System.out.print("input window slide:");
                WindowTools.slide = s.nextInt();
                start = System.currentTimeMillis();
                System.out.println("开始时间:" + start + "ms");
                // 实例化TopologyBuilder类。
                TopologyBuilder topologyBuilder = new TopologyBuilder();
                // 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
                topologyBuilder.setSpout("NSpout", new NSpout(), 1);
                // 设置数据处理节点并分配并发数
                topologyBuilder.setBolt("NQueueBolt", new NQueueBolt(), 1).fieldsGrouping("NSpout",new Fields("line"));
                topologyBuilder.setBolt("NPrintBolt", new NPrintBolt(), 1).fieldsGrouping("NQueueBolt",new Fields("list"));

                Config config = new Config();
                config.setDebug(true);

                if (args != null && args.length > 0) {
                        config.setNumWorkers(1);
                        StormSubmitter.submitTopology(args[0], config, topologyBuilder
                                        .createTopology());
                } else {
                        // 这里是本地模式下运行的启动代码。
                        config.setMaxTaskParallelism(1);
                        cluster = new LocalCluster();
                        cluster.submitTopology("simple", config, topologyBuilder
                                        .createTopology());
                }
        }

}

6. 工具类
package com.test;

public class WindowTools {
        public static int min = 1;// 查询窗口下限
        public static int max = 0;// 查询窗口上限
        public static int slide = 0;
        
        public static int num = 0;
}





已有(16)人评论

跳转到指定楼层
梦回三国 发表于 2014-10-16 21:51:34
回复

使用道具 举报

bioger_hit 发表于 2014-10-16 22:19:13
梦回三国 发表于 2014-10-16 21:51
代码已全部提交,还请不吝赐教
因为你是一行行处理的,strom有一个并发数目,也就是说即使你是按照顺序读取的,但是由于处理的速度有快有慢,所以造成在接受数据时,并不会按照顺序接受,如果有的任务处理的快,那么等于2的提前到达这也是有可能的


可以借鉴这种模式验证下,在全部读取1完毕之后,在执行execute
Storm常见模式2——批处理
回复

使用道具 举报

梦回三国 发表于 2014-10-16 22:25:31
bioger_hit 发表于 2014-10-16 22:19
因为你是一行行处理的,strom有一个并发数目,也就是说即使你是按照顺序读取的,但是由于处理的速度有快有 ...

谢谢,这个模式我借鉴了,所以由spout传往bolt的数据没问题了,但是现在再从一个bolt往另一个bolt中提交数据时,另一个bolt中只得到一部分的数据,而不是全部的呢
NQueueBolt中的collector.emit(new Values(list));跟NPrintBolt中的List<MyEvent> list = (List<MyEvent>) tuple.getValue(0);
两个list中的数据为什么不一样呢

回复

使用道具 举报

howtodown 发表于 2014-10-16 22:34:03
storm流式处理,并没有说是顺序处理,流式在于数据源源不断,或则说有数据就处理,但是输出就不一定了,如果先来的数据处理的慢,那么输出可能比之后来数据输出晚,这是常见的。
这就像是银行取钱,大家都是按照号码来排队的
比如
1号,2号,3号,4号,5号

银行人员有两个:
目前3号全满:

1号的事情
取钱(1分钟)

3号的事情:

1.取了钱(1分钟)
2.办银卡,银卡(1分钟)
3.想存钱。(1分钟)

4号的事情:
取钱(1分钟)

事情的处理顺序
在理想情况下:

1号用了一分钟取钱完毕,4号用了1分钟取钱完毕
1号+4号事件=2分钟
3号             =3分钟

结论:
虽然3号比4号早输入,那么还是4号早输出。



回复

使用道具 举报

bioger_hit 发表于 2014-10-16 22:51:47
本帖最后由 bioger_hit 于 2014-10-16 22:54 编辑
梦回三国 发表于 2014-10-16 22:25
谢谢,这个模式我借鉴了,所以由spout传往bolt的数据没问题了,但是现在再从一个bolt往另一个bolt中提交 ...

第一个问题解决了,第二个问题,应该不是问题。
回复

使用道具 举报

梦回三国 发表于 2014-10-17 08:56:11
howtodown 发表于 2014-10-16 22:34
storm流式处理,并没有说是顺序处理,流式在于数据源源不断,或则说有数据就处理,但是输出就不一定了,如 ...

恩恩,这个道理明白了,就是一个bolt提交数据到另一个bolt的时候,不是全部提交吗?我是在一个bolt中提交一个list,  collector.emit(new Values(list));
,然后想在另一个中通过 tuple.getValue(0);得到这个list。  但是为什么我得到的只是提交list的一个子集呢,好像是一半的数据?


回复

使用道具 举报

梦回三国 发表于 2014-10-17 08:57:16
bioger_hit 发表于 2014-10-16 22:51
本帖最后由 bioger_hit 于 2014-10-16 22:54 编辑

第一个问题解决了,第二个问题,应该不是问题。

这两个问题一样吗,第二是一个bolt提交数据到另一个bolt的时候,不是一次全部提交吗?我是在一个bolt中提交一个list,  collector.emit(new Values(list));
,然后想在另一个中通过 tuple.getValue(0);得到这个list。  但是为什么我得到的只是提交list的一个子集呢,好像是一半的数据?
回复

使用道具 举报

sstutu 发表于 2014-10-17 10:41:17
梦回三国 发表于 2014-10-17 08:57
这两个问题一样吗,第二是一个bolt提交数据到另一个bolt的时候,不是一次全部提交吗?我是在一个bolt中提 ...
多测试一下,看看是否每次都是这个效果
回复

使用道具 举报

梦回三国 发表于 2014-10-17 11:00:06
sstutu 发表于 2014-10-17 10:41
多测试一下,看看是否每次都是这个效果

我每次测试都是这样。。。。。。。。。。
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条