本帖最后由 梦回三国 于 2014-10-16 21:52 编辑
想写个简单的Storm处理滑动窗口求和的程序,但是无奈遇到问题,还请各位大神不吝赐教。
现有数据存放在某个文件中,内容如下:
t,k,v
1,0,96
1,1,65
1,2,52
1,3,76
1,4,80
1,6,91
1,33,47
1,8,75
1,26,60
1,93,22
2,0,8
2,1,66
2,2,22
2,6,82
2,75,96
2,14,97
2,16,40
2,51,80
2,20,30
2,21,48
3,0,99
3,1,79
3,2,45
3,3,64
3,5,12
3,72,36
3,12,62
3,19,17
3,23,51
3,93,47
4,0,85
4,1,77
4,2,29
4,3,69
4,4,89
4,5,23
4,6,64
4,7,58
4,32,56
4,22,68
5,0,19
5,1,11
5,2,3
5,3,49
5,4,52
5,5,30
5,7,76
5,8,71
5,53,15
5,33,65
6,0,64
6,1,6
6,2,70
6,3,23
6,4,7
6,7,11
6,42,64
6,15,92
6,52,95
6,26,87
7,0,73
7,1,75
7,6,83
7,7,89
7,10,98
7,11,14
7,44,37
7,13,80
7,14,64
7,21,91
8,0,92
8,1,77
8,2,75
8,3,51
8,4,69
8,5,68
8,6,28
8,7,53
8,51,41
8,26,68
9,0,89
9,1,36
9,4,10
9,33,57
9,11,94
9,13,40
9,14,94
9,15,38
9,19,14
9,25,46
10,0,24
10,1,10
10,2,77
10,35,4
10,6,14
10,7,27
10,8,54
10,42,97
10,3,44
10,26,79
用spout逐行去读取,然后emit提交到bolt中,然后在bolt中进行控制:分别计算t=1,2,3的流的v之和。
现在遇到的问题是当t=2的流数据到达bolt时,t=1的数据还没有全部到达,甚至当t>3的数据到达bolt时,t=1的数据也不一定全部到达bolt。(t为其他值时一样)。我想问,这是为什么呢?Storm不是不会丢失数据吗,怎么数据还不是顺序完全到达的呢?这种情况怎么计算呢?
具体到代码里就是,当NQueueBolt代码emit list到NPrintBolt代码时,使用List<MyEvent> list = (List<MyEvent>) tuple.getValue(0);得到的list并不是emit提交的全部list的值(我测试时只有一半的值过来了),这是为什么呢?
代码如下:
1. 定义的实体类
package com.test;
import java.io.Serializable;
@SuppressWarnings({ "serial", "unchecked" })
public class MyEvent implements Comparable,Serializable{
private int time;
private int key;
private int value;
public MyEvent() {
super();
}
public MyEvent(int time, int key, int value) {
this.time = time;
this.key=key;
this.value=value;
}
public int getTime(){
return time;
}
public int getKey(){
return key;
}
public int getValue(){
return value;
}
public void setTime(int time) {
this.time = time;
}
public void setKey(int key) {
this.key = key;
}
public void setValue(int value) {
this.value = value;
}
@Override
public int compareTo(Object o) {
return this.time-((MyEvent)o).time;
}
}
2.NSpout 类负责从文件中读取数据
package com.test;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
@SuppressWarnings("serial")
public class NSpout extends BaseRichSpout{
// 用来发射数据的工具类
private SpoutOutputCollector collector;
private RandomAccessFile fileReader;
private String file;
private boolean completed=false;
/**
* 初始化collector
*/
@SuppressWarnings("unchecked")
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
file = "/root/experiment/data/data.txt";
this.fileReader = new RandomAccessFile(file, "r");
} catch (FileNotFoundException e) {
throw new RuntimeException("Erroring reading file " + file);
}
this.collector = collector;
}
@Override
public void nextTuple() {
if(completed){
try{
Thread.sleep(1000);
}catch(InterruptedException e){
System.out.println(e);
}
return;
}
String line;
try {
// Read all lines
while ((line = fileReader.readLine()) != null) {
this.collector.emit(new Values(line));
}
}catch (Exception e) {
throw new RuntimeException("Error reading tuple", e);
}finally{
completed = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
3. NQueueBolt 负责接收NSpout发出的数据,并且当数据到达一定窗口值时提交给NPrintBolt计算输出。
package com.test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
@SuppressWarnings("serial")
public class NQueueBolt extends BaseBasicBolt{
public static List<MyEvent> list = new ArrayList<MyEvent>();
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String line = tuple.getString(0);
int maxt = WindowTools.max;
int time = Integer.parseInt(line.split(",")[0]);
int key = Integer.parseInt(line.split(",")[1]);
int value = Integer.parseInt(line.split(",")[2]);
if(time > maxt){
Collections.sort(list);
collector.emit(new Values(list));
for(int i=0;i<list.size();i++){ //这里是为了滑动窗口时将以滑动开的数据删除
if(list.get(i).getTime()<=WindowTools.max-WindowTools.slide){
list.remove(list.get(i));
i--;
}
}
WindowTools.min += WindowTools.slide;
WindowTools.max += WindowTools.slide;
WindowTools.num++;
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (WindowTools.num == 1) {
WindowTools.max = 99999999;
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
NTopology.cluster.killTopology("simple");
// NTopology.cluster.shutdown();
long end = System.currentTimeMillis();
System.out.println("结束时间:" + end + "ms");
System.out.println("查询聚集一个窗口的时间:"
+ (end - NTopology.start - 40000) / 3+ "ms");
}
}else{
MyEvent event = new MyEvent(time,key,value);
list.add(event);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("list"));
}
}
4. NPrintBolt 将一定窗口内的值求和并输出
package com.test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
@SuppressWarnings("serial")
public class NPrintBolt extends BaseBasicBolt{
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
List<MyEvent> list = (List<MyEvent>) tuple.getValue(0);
Map<Integer, Integer> result = new TreeMap<Integer, Integer>();
Map<Integer, ArrayList<Integer>> map = new TreeMap<Integer,ArrayList<Integer>>();
int maxt = list.get(list.size()-1).getTime();
for(int i=0;i<list.size();i++){
MyEvent event = list.get(i);
if(map.keySet().contains(event.getKey())){
map.get(event.getKey()).add(event.getValue());
}else{
ArrayList<Integer> vlist = new ArrayList<Integer>();
vlist.add(list.get(i).getValue());
map.put(list.get(i).getKey(), vlist);
}
}
for(int k:map.keySet()){
int count = 0;
ArrayList<Integer> vlist = map.get(k);
for(int i=0;i<vlist.size();i++){
count += vlist.get(i);
}
result.put(k, count);
}
for(int k:result.keySet()){
System.out.println("++++++++++++++++++++++++++++++++++"+maxt+","+k+","+result.get(k));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
5.主类
package com.test;
import java.util.Scanner;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* 定义了一个简单的topology,包括一个数据喷发节点spout和一个数据处理节点bolt。
*/
public class NTopology {
public static long start = 0;
public static LocalCluster cluster;
public static void main(String[] args) throws AlreadyAliveException,
InvalidTopologyException {
Scanner s = new Scanner(System.in);
System.out.print("input window range:");
WindowTools.max = s.nextInt();
System.out.print("input window slide:");
WindowTools.slide = s.nextInt();
start = System.currentTimeMillis();
System.out.println("开始时间:" + start + "ms");
// 实例化TopologyBuilder类。
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
topologyBuilder.setSpout("NSpout", new NSpout(), 1);
// 设置数据处理节点并分配并发数
topologyBuilder.setBolt("NQueueBolt", new NQueueBolt(), 1).fieldsGrouping("NSpout",new Fields("line"));
topologyBuilder.setBolt("NPrintBolt", new NPrintBolt(), 1).fieldsGrouping("NQueueBolt",new Fields("list"));
Config config = new Config();
config.setDebug(true);
if (args != null && args.length > 0) {
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], config, topologyBuilder
.createTopology());
} else {
// 这里是本地模式下运行的启动代码。
config.setMaxTaskParallelism(1);
cluster = new LocalCluster();
cluster.submitTopology("simple", config, topologyBuilder
.createTopology());
}
}
}
6. 工具类
package com.test;
public class WindowTools {
public static int min = 1;// 查询窗口下限
public static int max = 0;// 查询窗口上限
public static int slide = 0;
public static int num = 0;
}
|