分享

win7+eclipse+hadoop1.2.1 File does not exist: /hdfs/log_kpi/pv,pv不在此类中

hadoop新手,请大神帮忙!
环境:win7 x64 + eclipse + VMWare + hadoop1.2.1 + jdk1.6
程序结构:

工程结构

工程结构





问题描述:有2个class,KPIIP和KPIPV,分别有各自map/reduce方法,首次执行任何一个(如KPIPV),均无异常,但执行(KPIPV)一次之后再执行另一个(KPIIP)时,出现异常,异常信息如下:
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local571336673_0001_m_000000_0 is done. And is in the process of commiting
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息:
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local571336673_0001_m_000000_0' done.
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable run
信息: Finishing task: attempt_local571336673_0001_m_000000_0
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable run
信息: Starting task: attempt_local571336673_0001_m_000001_0
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask runNewMapper
信息: Processing split: hdfs://192.168.8.8:9000/hdfs/log_kpi/pv:0+0
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: io.sort.mb = 100
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: data buffer = 79691776/99614720
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: record buffer = 262144/327680
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job run
信息: Map task executor complete.
九月 09, 2015 2:38:45 下午 org.apache.hadoop.mapred.LocalJobRunner$Job run
警告: job_local571336673_0001
java.lang.Exception: java.io.FileNotFoundException: File does not exist: /hdfs/log_kpi/pv
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.io.FileNotFoundException: File does not exist: /hdfs/log_kpi/pv
        at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.fetchLocatedBlocks(DFSClient.java:2006)
        at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.openInfo(DFSClient.java:1975)
        at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.<init>(DFSClient.java:1967)
        at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:735)
        at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:165)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:436)
        at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:75)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:521)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 50% reduce 0%
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local571336673_0001
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息: Counters: 13
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=276
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=3025757
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=401697
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=3025757
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=332060
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     Map input records=14619
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=13535
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=304984
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=223870976
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     Map output records=13535
九月 09, 2015 2:38:46 下午 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=115





程序:[mw_shl_code=java,true]
package org.pnlorf.hadoop.mr.kpi;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class KPIIP {

        public static class KPIIPMapper extends
                        Mapper<LongWritable, Text, Text, LongWritable> {
                @Override
                protected void map(LongWritable key, Text value,
                                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                                throws IOException, InterruptedException {
                        KPI kpi = KPI.parser(value.toString());
                        if (kpi.isValid()) {
                                context.write(new Text(kpi.getRemote_addr()), new LongWritable(
                                                1));
                        }

                }
        }

        public static class KPIIPReducer extends
                        Reducer<Text, LongWritable, Text, LongWritable> {
                @Override
                protected void reduce(Text k2, Iterable<LongWritable> v2s,
                                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                                throws IOException, InterruptedException {
                        int sum = 0;
                        for (LongWritable v2 : v2s) {
                                sum += v2.get();
                        }
                        context.write(new Text(k2), new LongWritable(sum));
                }
        }

        public static void main(String[] args) throws Exception {
                String input = "hdfs://192.168.8.8:9000/hdfs/log_kpi/";
                String output = "hdfs://192.168.8.8:9000/hdfs/log_kpi/ip";

                Configuration conf = new Configuration();
                FileSystem fileSystem = FileSystem.get(URI.create(input), conf);
                Path outPath = new Path(output);
                if (fileSystem.exists(outPath)) {
                        fileSystem.delete(outPath, true);
                }

                Job job = new Job(conf, KPIIP.class.getSimpleName());

                // 1.1 读取hdfs,读取文件位于哪里
                FileInputFormat.setInputPaths(job, input);
                // 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
                job.setInputFormatClass(TextInputFormat.class);

                // 1.2 指定自定义的map类
                job.setMapperClass(KPIIPMapper.class);
                // map输出的<k,v>类型。如果<k3, v3>的类型与<k2, v2>类型一直,则可以省略
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);

                // 1.3 分区
                job.setPartitionerClass(HashPartitioner.class);
                job.setNumReduceTasks(1);

                // TODO:1.4 排序、分组

                // TODO:1.5 规约

                // 2.2 指定自定义reduce类
                job.setReducerClass(KPIIPReducer.class);
                // 指定reduce的输出类型
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);

                // 2.3 指定写出到哪里
                FileOutputFormat.setOutputPath(job, outPath);
                // 指定输出文件的格式化类
                job.setOutputFormatClass(TextOutputFormat.class);

                // 把job提交给JobTracker运行
                job.waitForCompletion(true);
        }

}
[/mw_shl_code]
[mw_shl_code=java,true]
package org.pnlorf.hadoop.mr.kpi;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class KPIPV {

        public static class KPIPVMapper extends
                        Mapper<LongWritable, Text, Text, LongWritable> {
                @Override
                protected void map(LongWritable key, Text value,
                                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                                throws IOException, InterruptedException {
                        KPI kpi = KPI.parser(value.toString());//KPI.filterPVs(value.toString());
                        if (kpi.isValid()) {
                                context.write(new Text(kpi.getRequest()), new LongWritable(1));
                        }
                }
        }

        public static class KPIPVReducer extends
                        Reducer<Text, LongWritable, Text, LongWritable> {
                @Override
                protected void reduce(Text k2, Iterable<LongWritable> v2s,
                                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                                throws IOException, InterruptedException {
                        int sum = 0;
                        for (LongWritable v2 : v2s) {
                                sum += v2.get();
                        }
                        context.write(k2, new LongWritable(sum));
                }
        }

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                String input = "hdfs://192.168.8.8:9000/hdfs/log_kpi/";
                String output = "hdfs://192.168.8.8:9000/hdfs/log_kpi/pv";

                Configuration conf = new Configuration();
                FileSystem fileSystem = FileSystem.get(URI.create(input), conf);
                Path outPath = new Path(output);
                if (fileSystem.exists(outPath)) {
                        fileSystem.delete(outPath, true);
                }
                Job job = new Job(conf, KPIPV.class.getSimpleName());
                // 1.1 指定读取的文件位于哪里
                FileInputFormat.setInputPaths(job, input);
                // 指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
                job.setInputFormatClass(TextInputFormat.class);
               
                // 1.2 指定自定义的map类
                job.setMapperClass(KPIPVMapper.class);
                // map输出的<k,v>类型。如果<k3, v3>的类型与<k2, v2>类型一直,则可以省略
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(LongWritable.class);
               
                // 1.3 分区
                job.setPartitionerClass(HashPartitioner.class);
                // 有一个reduce任务执行
                job.setNumReduceTasks(1);
               
                // 1.4 TODO:排序、分组
               
                // 1.5 TODO:规约
               
                // 2.2 指定自定义reduce类
                job.setReducerClass(KPIPVReducer.class);
                // 指定reduce的输出类型
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(LongWritable.class);
               
                // 2.3 指定写出到哪里
                FileOutputFormat.setOutputPath(job, outPath);
                // 指定输出文件的格式化类
                job.setOutputFormatClass(TextOutputFormat.class);
               
                // 把job提交给JobTracker运行
                job.waitForCompletion(true);
        }
}
[/mw_shl_code]

请大神帮忙解答!!!!!

已有(2)人评论

跳转到指定楼层
s060403072 发表于 2015-9-9 15:22:41
楼主程序可以改进下, /hdfs/log_kpi/pv
主要是输出、输出路径写死造成的

使用ToolRunner将输出路径变成是灵活的。


回复

使用道具 举报

冰诺莫语 发表于 2015-9-10 13:24:10
感谢楼上回复,已经找到解决了,是因为输入输出目录设置有问题,新手犯的低级错误。
原因:/hdfs/log_kpi/是一个目录,将输出也写到这个目录下,则输出成为了第二次执行的输入了,导致出问题了。

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条