数据清洗器 MapReduce主类
public class MovieJob {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(MovieJob.class);
job.setMapperClass(MovieMap.class);
job.setReducerClass(MovieReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path("/data/movies.csv"));
FileOutputFormat.setOutputPath(job,new Path("/root/files"));
boolean flag = job.waitForCompletion(true);
System.exit(flag? 0 : 1);
}
}
多线程
public class MoviesMain {
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
Job job1 = Job.getInstance(conf);
Job job2 = Job.getInstance(conf);
Job job3 = Job.getInstance(conf);
Job job4 = Job.getInstance(conf);
Job job5 = Job.getInstance(conf);
job1.setJarByClass(DirectorsJob.class);
job1.setMapperClass(DirectorsJob.Map.class);
job1.setReducerClass(DirectorsJob.Reduce.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
job2.setJarByClass(ActorsJob.class);
job2.setMapperClass(ActorsJob.Map.class);
job2.setReducerClass(ActorsJob.Reduce.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job2, new Path(args[0]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
job3.setJarByClass(CountryJob.class);
job3.setMapperClass(CountryJob.Map.class);
job3.setReducerClass(CountryJob.Reduce.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job3, new Path(args[0]));
FileOutputFormat.setOutputPath(job3, new Path(args[3]));
job4.setJarByClass(TimeJob.class);
job4.setMapperClass(TimeJob.Map.class);
job4.setReducerClass(TimeJob.Reduce.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job4, new Path(args[0]));
FileOutputFormat.setOutputPath(job4, new Path(args[4]));
job5.setJarByClass(TypeJob.class);
job5.setMapperClass(TypeJob.Map.class);
job5.setReducerClass(TypeJob.Reduce.class);
job5.setOutputKeyClass(Text.class);
job5.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job5, new Path(args[0]));
FileOutputFormat.setOutputPath(job5, new Path(args[5]));
ControlledJob ctrljob1 = new ControlledJob(conf);
ctrljob1.setJob(job1);
ControlledJob ctrljob2 = new ControlledJob(conf);
ctrljob2.setJob(job2);
ControlledJob ctrljob3 = new ControlledJob(conf);
ctrljob3.setJob(job3);
ControlledJob ctrljob4 = new ControlledJob(conf);
ctrljob4.setJob(job4);
ControlledJob ctrljob5 = new ControlledJob(conf);
ctrljob5.setJob(job5);
JobControl jobCtrl = new JobControl("MoviesOut");
jobCtrl.addJob(ctrljob1);
jobCtrl.addJob(ctrljob2);
jobCtrl.addJob(ctrljob3);
jobCtrl.addJob(ctrljob4);
jobCtrl.addJob(ctrljob5);
Thread t = new Thread(jobCtrl);
t.start();
while (true) {
if (jobCtrl.allFinished()) {
System.out.println(jobCtrl.getSuccessfulJobList());
jobCtrl.stop();
break;
}
}
}
}