分享

Mapreduce 的组合依赖关系的问题

zqy 发表于 2016-5-22 15:55:14 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 9373
public class px {

        /**
         * @param args
         */
        public static class SumMap extends Mapper<Object,Text,Text,IntWritable>{
                private Text word;
                private IntWritable one=new IntWritable(1);
                public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
                        String aword;
                        StringTokenizer token=new StringTokenizer(value.toString());
                        while(token.hasMoreTokens()){
                                aword=token.nextToken();
                                word.set(aword);
                        }
                        context.write(word, one);
                }
        }
        public static class SumReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
                public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{
                        int sum=0;
                        for(IntWritable val : values){
                                sum+=val.get();
                        }
                        context.write(key, new IntWritable(sum));
                }
        }
        public static class sortMap extends Mapper<Object,Text,IntWritable,Text>{
                private IntWritable k=new IntWritable();
                private Text t=new Text();
                public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
                        String[] arr=value.toString().split(" ");
                        if(arr.length==2){
                        String kv=arr[0];
                        k.set(Integer.parseInt(kv));
                        String tv=arr[1];
                        t.set(tv);
                        }
                        context.write(k, t);
                }
        }
        public static class sortReduce extends Reducer<IntWritable,Text,Text,IntWritable>{
                public void reduce(IntWritable key,Iterable<Text>values,Context context) throws IOException, InterruptedException{
                        String line=values.toString();
                        context.write(new Text(line), key);
                }
        }
        public static void main(String[] args) throws Exception {
                // TODO Auto-generated method stub
                Configuration jobconf=new Configuration();//第一个Maperduc
                Job job1=new Job(jobconf,"job1");
                job1.setJarByClass(px.class);
                job1.setMapperClass(SumMap.class);
                job1.setCombinerClass(SumReduce.class);
                job1.setReducerClass(SumReduce.class);
                //job1.setMapOutputKeyClass(IntWritable.class);
                //job1.setMapOutputValueClass(Text.class);
                job1.setOutputKeyClass(Text.class);
                job1.setOutputValueClass(IntWritable.class);
       
                FileInputFormat.addInputPath(job1, new Path("hdfs://localhost:9000/test/test1"));
                ControlledJob cjob1=new ControlledJob(jobconf);
                cjob1.setJob(job1);
                FileOutputFormat.setOutputPath(job1, new Path("hdfs://localhost:9000/output/px1"));
                job1.waitForCompletion(true) ;
               
               
                //Configuration job2conf=new Configuration();//第二个mapreduce
                Job job2=new Job(jobconf,"job2");
                job2.setJarByClass(px.class);
                job2.setMapperClass(sortMap.class);
                job2.setReducerClass(sortReduce.class);
                job2.setMapOutputKeyClass(IntWritable.class);
                job2.setMapOutputValueClass(Text.class);
                job2.setOutputKeyClass(Text.class);
                job2.setOutputValueClass(IntWritable.class);
                FileInputFormat.addInputPath(job2, new Path("hdfs://localhost:9000/output/px1"));
                ControlledJob cjob2=new ControlledJob(jobconf);
                cjob2.setJob(job2);
                FileOutputFormat.setOutputPath(job2, new Path("hdfs://locahost:9000/output/px2"));
                job2.waitForCompletion(true);
               
                cjob2.addDependingJob(cjob1);
                JobControl jobCtrl=new JobControl("myctrl");
                //ctrljob1.addDependingJob(ctrljob2);// job2在job1完成后,才可以启动
                //添加到总的JobControl里,进行控制

                jobCtrl.addJob(cjob1);
                jobCtrl.addJob(cjob2);

       
        }

}
报错:
2016-05-22 15:48:57,425 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(976)) - soft limit at 83886080
2016-05-22 15:48:57,426 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(977)) - bufstart = 0; bufvoid = 104857600
2016-05-22 15:48:57,426 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(978)) - kvstart = 26214396; length = 6553600
2016-05-22 15:48:57,593 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1437)) - Starting flush of map output
2016-05-22 15:48:57,643 INFO  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2016-05-22 15:48:57,807 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_local427298243_0001 running in uber mode : false
2016-05-22 15:48:57,810 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) -  map 0% reduce 0%
2016-05-22 15:48:57,897 WARN  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:run(560)) - job_local427298243_0001
java.lang.Exception: java.lang.NullPointerException
        at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.NullPointerException
        at px$SumMap.map(px.java:30)
        at px$SumMap.map(px.java:1)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
2016-05-22 15:48:58,815 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1375)) - Job job_local427298243_0001 failed with state FAILED due to: NA
2016-05-22 15:48:58,828 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 0
2016-05-22 15:48:58,945 INFO  [main] jvm.JvmMetrics (JvmMetrics.java:init(71)) - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2016-05-22 15:49:03,182 INFO  [communication thread] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - map > sort
2016-05-22 15:49:18,969 INFO  [main] ipc.Client (Client.java:handleConnectionTimeout(814)) - Retrying connect to server: locahost/220.250.64.26:9000. Already tried 0 time(s); maxRetries=45
2016-05-22 15:49:39,008 INFO  [main] ipc.Client (Client.java:handleConnectionTimeout(814)) - Retrying connect to server: locahost/220.250.64.26:9000. Already tried 1 time(s); maxRetries=45
2016-05-22 15:49:59,028 INFO  [main] ipc.Client (Client.java:handleConnectionTimeout(814)) - Retrying connect to server: locahost/220.250.64.26:9000. Already tried 2 time(s); maxRetries=45
2016-05-22 15:50:19,054 INFO  [main] ipc.Client (Client.java:handleConnectionTimeout(814)) - Retrying connect to server: locahost/220.250.64.26:9000. Already tried 3 time(s); maxRetries=45

已有(2)人评论

跳转到指定楼层
nextuser 发表于 2016-5-22 18:39:58
楼主是组合式,还是什么方式。
如果是依赖组合,应该没有下面代码
job1.waitForCompletion(true) ;


更多参考:
组合式的MapReduce作业

让你真正明白什么是MapReduce组合式,迭代式,链式


回复

使用道具 举报

qcbb001 发表于 2016-5-22 18:43:25
除了代码,楼主的配置文件应该使用的是本地配置。另外看看集群是否起来了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条