1、创建第一个job
Job job1 = new Job(getConf(), "log-grep");
job1.setJarByClass(RegexMapper.class);
job1.setMapperClass(RegexMapper.class);
FileInputFormat.setInputPaths(job1, new Path(inputPath));
FileOutputFormat.setOutputPath(job1, new Path(intermedPath));
2、创建第二个job
Job job2 = new Job(getConf(), "log-analysis");
job2.setJarByClass(LogProcessorMap.class);
job2.setMapperClass(LogProcessorMap.class);
job2.setReducerClass(LogProcessorReduce.class);
FileOutputFormat.setOutputPath(job2, new Path(outputPath));
3、把job2的输入路径设置为job1的输出路径
FileInputFormat.setInputPaths(job2, new Path(intermedPath +"/part*"));
4、创建ControlledJob
ControlledJob controlledJob1 =new ControlledJob(job1.getConfiguration());
ControlledJob controlledJob2 =new ControlledJob(job2.getConfiguration());
5、为job1和job2添加依赖
controlledJob2.addDependingJob(controlledJob1);
6、创建JobControl
JobControl jobControl = new JobControl("JobControlDemoGroup");
jobControl.addJob(controlledJob1);
jobControl.addJob(controlledJob2);
7、创建一个新的线程来运行job
Thread jobControlThread = new Thread(jobControl);
jobControlThread.start();
while (!jobControl.allFinished()){
Thread.sleep(500);
}
jobControl.stop();