分享

运行mapreduce程序报错,求解答

丹青穆怀 发表于 2017-3-9 18:19:51 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 16 12054
丹青穆怀 发表于 2017-3-14 18:03:16
nextuser 发表于 2017-3-14 16:15
你怎么提交,贴出图来

代码:
package ri.zhi.fen.xi;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 巨型網站日志系统分析,提取KPI数据
* */
public class BaseStationDataPreprocess extends Configured implements Tool {
        //计数器
        enum Counter {
                OUTOFTIMESKIP, TIMESKIP, LINESKIP
        };

        @Override
        public int run(String[] args) throws Exception {
                // TODO Auto-generated method stub
                Configuration conf = getConf();
                //传递任务参数
                conf.set("date", args[2]);
                conf.set("timepoint", args[3]);
                Job job = new Job(conf, "BaseStationDataPreprecess");
                job.setJarByClass(BaseStationDataPreprocess.class);
                //输入路径
                FileInputFormat.addInputPath(job, new Path(args[0]));
                //输出路径
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                //调用Map类作为Map任务的代码
                job.setMapperClass(Map.class);
                //调用Reduce类作为Reduce任务代码
                job.setReducerClass(Reducer.class);
                //job.setOutputFormatClass(TextOutputFormat.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                //执行任务命令
                job.waitForCompletion(true);

                return job.isSuccessful() ? 0 : 1;
        }

        /**
         * 读取一行数据,以“IMSI+时间段”作为key发射出去
         *
         * */
        public static class Map extends Mapper<LongWritable, Text, Text, Text> {
                String data;
                String[] timepoint;
                boolean dataSource;
                //初始化,每个Mapper开始的时候执行一次
                public void setup(Context context) throws IOException {
                        //提取参数,从context获取文件名区分数据来源和字段
                        this.data = context.getConfiguration().get("date");
                        this.timepoint = context.getConfiguration().get("timpoint").split("-");
                        //提取文件名-打开输入的文件
                        FileSplit fs = (FileSplit) context.getInputSplit();
                        //获取文件名
                        String filename = fs.getPath().getName();
                        //通过文件名判断数据来源与哪个文件
                        if (filename.startsWith("POS"))
                                dataSource = true;
                        else if (filename.startsWith("NET"))
                                dataSource = false;
                        else
                                throw new IOException("File Name should starts with POS or NET");
                }
                //Map任务,Map函数,对每一行输入数据执行一次。
                /**
                 * Map任务
                 * 读取基站数据
                 * 找出数据所对应的时间段
                 * 以IMSI和时间段作为Key
                 * CGI和时间作为Value
                 *
                 * */
                public void map(LongWritable key, Text value, Context context) throws IOException {
                        //读取数据一行
                        String line = value.toString();
                        TableLine tableLine = new TableLine();
                        //读取行
                        try {
                                //自定义TableLine类提取字段
                                tableLine.set(line, dataSource, this.data, timepoint);
                        } catch (LineException e) {
                                // TODO Auto-generated catch block
                                if (e.getFlag() == -1)
                                        //接收到错误的时间记录,然后相应的counter+1
                                        context.getCounter(Counter.OUTOFTIMESKIP).increment(1);
                                else
                                        //格式不对,解析不了,然后相应的counter+1
                                        context.getCounter(Counter.TIMESKIP).increment(1);
                                return;
                        } catch (Exception e) {
                                //读取失败,直接跳过整行
                                context.getCounter(Counter.LINESKIP).increment(1);
                                return;
                        }
                        try {
                                context.write(tableLine.outKey(), tableLine.outValue());
                        } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }

                }
                //解析每一行数据,提取需要的部分
                public class TableLine {
                        private String imsi, position, time, timeFlag;
                        private Date day;
                        private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

                        public void set(String line, boolean source, String date, String[] timepint) throws LineException {
                                String[] lineSpline = line.split("\t");
                                if (source) {
                                        this.imsi = lineSpline[0];
                                        this.position = lineSpline[3];
                                        this.time = lineSpline[4];
                                } else {
                                        this.imsi = lineSpline[0];
                                        this.position = lineSpline[2];
                                        this.time = lineSpline[3];
                                }
                                //检查时间是否与输入的相同
                                if (!this.time.startsWith(date))
                                        //不同的话,flag是-1
                                        throw new LineException("", -1);

                                try {
                                        this.day = this.formatter.parse(this.time);
                                } catch (ParseException e) {

                                        throw new LineException("", 0);

                                }
                                //判断时间是否在指定的时间段内
                                int i = 0, n = timepoint.length;
                                //hour大于最大的时间点
                                int hour = Integer.valueOf(this.time.split(" ")[1].split(":")[0]);
                                while (i < n && Integer.valueOf(timepoint) <= hour)
                                        i++;
                                if (i < n) {
                                        if (i == 0)
                                                //判断是否在时间段之前,然后输出时间段
                                                this.timeFlag = ("00-" + timepint);
                                        else
                                                this.timeFlag = (timepoint[i - 1] + "-" + timepoint);
                                } else
                                        //不是在指定的时间段里面
                                        throw new LineException("", -1);

                        }

                        public Text outKey() {
                                return new Text(this.imsi + "|" + this.timeFlag);

                        }

                        public Text outValue() {
                                long t = (day.getTime() / 1000L);
                                //用时间的偏移量作为输出时间-把时间转化成UNIX格式
                                return new Text(this.position + "|" + String.valueOf(t));

                        }
                }

                public class LineException extends Exception {

                        /**
                         *
                         */
                        private static final long serialVersionUID = 3655169408843271282L;
                        int flag;

                        public LineException(String msg, int flag) {
                                super(msg);
                                this.flag = flag;

                        }

                        public int getFlag() {
                                return flag;
                        }
                }

        }

        /**
         * 统计同一个IMSI在同一时间段,在不同CGI停留的时长
         * */
        public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
                private String date;
                private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

                public void setup(Context context) {
                        this.date=context.getConfiguration().get("date");
                }

                public void reduce(Text key, Iterable<Text> values, Context context) throws IOException {
                        //取出用户,以|分割
                        String imsi = key.toString().split("\\|")[0];
                        //取出时间段
                        String timeFlag = key.toString().split("\\|")[1];
                        //用一个Treemap记录时间(自然排序)---使用TreeMap可以让数据按照时间排序
                        TreeMap<Long, String> uploads = new TreeMap<Long, String>();
                        String valueString;
                        for (Text value : values) {
                                valueString = value.toString();
                                try {
                                        //时间+地点
                                        uploads.put(Long.valueOf(valueString.split("\\|")[1]), valueString.split("\\|")[0]);
                                } catch (NumberFormatException e) {
                                        context.getCounter(Counter.TIMESKIP).increment(1);
                                        continue;
                                }
                        }
                        try {
                                //组合出来最后的时间
                                Date tmp = this.formatter.parse(this.date + " " + timeFlag.split("-")[1] + ":00:00");
                                //自己设定一个最后的时间OFF
                                uploads.put((tmp.getTime() / 1000L), "OFF");
                                //需要键值对表示,并不关心顺序的
                                HashMap<String, Float> locs = getStayTime(uploads);
                                for (Entry<String, Float> entry : locs.entrySet()) {
                                        StringBuilder builder = new StringBuilder();
                                        builder.append(imsi).append("|");
                                        builder.append(entry.getKey()).append("|");
                                        builder.append(timeFlag).append("|");
                                        builder.append(entry.getValue());
                                        context.write(NullWritable.get(), new Text(builder.toString()));
                                }

                        } catch (Exception e) {
                                e.getMessage();
                        }

                }
                //自定义函数,用于汇总停留时间--用后一个时间减去前一个时间,如果间隔超过60分钟就认定关机
                public HashMap<String, Float> getStayTime(TreeMap<Long, String> uploads) {
                        Entry<Long, String> upload, nextUpload;
                        HashMap<String, Float> locs = new HashMap<String, Float>();
                        Iterator<Entry<Long, String>> it = uploads.entrySet().iterator();
                        upload = it.next();
                        while (it.hasNext()) {
                                nextUpload = it.next();
                                float diff = (float) (nextUpload.getKey() - upload.getKey()) / 60.0f;
                                if (diff <= 60.0) {
                                        if (locs.containsKey(upload.getValue()))
                                                locs.put(upload.getValue(), locs.get(upload.getValue()) + diff);
                                        else
                                                locs.put(upload.getValue(), diff);
                                }
                                upload = nextUpload;

                        }
                        return locs;

                }
        }

        public static void main(String args[]) throws Exception {
                //检查参数个数是否正确
                if (args.length != 4) {
                        System.err.println("");
                        System.err.println("");
                        System.exit(-1);;
                }
                //调用run函数执行任务
                int res = ToolRunner.run(new Configuration(), new BaseStationDataPreprocess(), args);
                System.exit(res);
        }

}

运行代码:
hadoop jar ./HadoopTest01.jar cn.dataguru.hadoop.BaseStationDataPreprocess /user/hadoop/file/input /user/hadoop/file/output 2013-09-12 07-09-17-24



回复

使用道具 举报

nextuser 发表于 2017-3-14 18:24:29
丹青穆怀 发表于 2017-3-14 18:03
代码:
package ri.zhi.fen.xi;

hadoop jar ./HadoopTest01.jar cn.dataguru.hadoop.BaseStationDataPreprocess /user/hadoop/file/input /user/hadoop/file/output 2013-09-12 07-09-17-24
输出路径太复杂了,而且是否创建
回复

使用道具 举报

丹青穆怀 发表于 2017-3-15 11:08:09
nextuser 发表于 2017-3-14 18:24
hadoop jar ./HadoopTest01.jar cn.dataguru.hadoop.BaseStationDataPreprocess /user/hadoop/file/input ...

已经创建了,
回复

使用道具 举报

nextuser 发表于 2017-3-15 14:54:16

楼主基础感觉不怎么牢固。
应该是学生吧。
如果还有问题,需上图。并且确保HA没有问题。建议现在非HA集群上先运行

回复

使用道具 举报

丹青穆怀 发表于 2017-3-15 15:56:43
nextuser 发表于 2017-3-15 14:54
楼主基础感觉不怎么牢固。
应该是学生吧。
如果还有问题,需上图。并且确保HA没有问题。建议现在非HA集 ...

在非ha下面是可以执行的,在新搭的有ha的集群上总是有问题。配置文件里面有和读取输入参数有关系的吗?
回复

使用道具 举报

arsenduan 发表于 2017-3-22 16:23:25
楼主好像已经解决了。副本的问题,原为1.配置中为replication=3
运行wordcount报错
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21247


回复

使用道具 举报

丹青穆怀 发表于 2017-3-23 09:51:31
arsenduan 发表于 2017-3-22 16:23
楼主好像已经解决了。副本的问题,原为1.配置中为replication=3
运行wordcount报错
http://www.aboutyun. ...

对,报错二解决了,报错了一没有解决,就是job的问题
17/03/21 17:31:39 INFO mapreduce.Job:  map 0% reduce 0%
17/03/21 17:31:39 INFO mapreduce.Job: Job job_1490088658080_0001 failed with state FAILED due to: Application application_1490088658080_0001 failed 2 times due to AM Container for appattempt_1490088658080_0001_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://hadoop002:23188/proxy/application_1490088658080_0001/Then, click on links to logs of each attempt.
Diagnostics: Could not find any valid local directory for nmPrivate/container_1490088658080_0001_02_000001.tokens
Failing this attempt. Failing the application.
17/03/21 17:31:39 INFO mapreduce.Job: Counters: 0

一启动就报-1000
回复

使用道具 举报

12
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条