数据清洗器 MapReduce主类


public class MovieJob {
    public static void main(String[] args) throws Exception {
        // 新建一个Job任务
        Configuration conf=new Configuration();
        Job job=Job.getInstance(conf);
        // 指定jar包
        job.setJarByClass(MovieJob.class);
        // 指定mapper类和map输出
        job.setMapperClass(MovieMap.class);
        //job.setMapOutputKeyClass(Text.class);
        //job.setMapOutputValueClass(NullWritable.class);

        // 指定reducer类
        job.setReducerClass(MovieReduce.class);
        // 指定reducetask的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 指定程序数据的输入和输出路径
        FileInputFormat.addInputPath(job, new Path("/data/movies.csv"));
        FileOutputFormat.setOutputPath(job,new Path("/root/files"));
        // 指定输入、输出
        // FileInputFormat.setInputPaths(job,new Path(args[0]));
        // FileOutputFormat.setOutputPath(job,new Path(args[1]));

        // 提交Job,等待结束
        boolean flag = job.waitForCompletion(true);
        System.exit(flag? 0 : 1);
    }
}

多线程


public class MoviesMain {

    public static void main(String[] args) throws Exception{

        Configuration conf=new Configuration();

        Job job1 = Job.getInstance(conf);
        Job job2 = Job.getInstance(conf);
        Job job3 = Job.getInstance(conf);
        Job job4 = Job.getInstance(conf);
        Job job5 = Job.getInstance(conf);

        //DirectorsJob
        job1.setJarByClass(DirectorsJob.class);
        job1.setMapperClass(DirectorsJob.Map.class);
        job1.setReducerClass(DirectorsJob.Reduce.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(IntWritable.class);
        //指定输入数据的路径
        FileInputFormat.addInputPath(job1, new Path(args[0]));
        //指定输出路径,并要求该输出路径一定是不存在的
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));

        //ActorsJob
        job2.setJarByClass(ActorsJob.class);
        job2.setMapperClass(ActorsJob.Map.class);
        job2.setReducerClass(ActorsJob.Reduce.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(IntWritable.class);
        //指定输入数据的路径
        FileInputFormat.addInputPath(job2, new Path(args[0]));
        //指定输出路径,并要求该输出路径一定是不存在的
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));

        //CountryJob
        job3.setJarByClass(CountryJob.class);
        job3.setMapperClass(CountryJob.Map.class);
        job3.setReducerClass(CountryJob.Reduce.class);
        job3.setOutputKeyClass(Text.class);
        job3.setOutputValueClass(Text.class);
        //指定输入数据的路径
        FileInputFormat.addInputPath(job3, new Path(args[0]));
        //指定输出路径,并要求该输出路径一定是不存在的
        FileOutputFormat.setOutputPath(job3, new Path(args[3]));

        //TimeJob
        job4.setJarByClass(TimeJob.class);
        job4.setMapperClass(TimeJob.Map.class);
        job4.setReducerClass(TimeJob.Reduce.class);
        job4.setOutputKeyClass(Text.class);
        job4.setOutputValueClass(IntWritable.class);
        //指定输入数据的路径
        FileInputFormat.addInputPath(job4, new Path(args[0]));
        //指定输出路径,并要求该输出路径一定是不存在的
        FileOutputFormat.setOutputPath(job4, new Path(args[4]));

        //TypeJob
        job5.setJarByClass(TypeJob.class);
        job5.setMapperClass(TypeJob.Map.class);
        job5.setReducerClass(TypeJob.Reduce.class);
        job5.setOutputKeyClass(Text.class);
        job5.setOutputValueClass(Text.class);
        //指定输入数据的路径
        FileInputFormat.addInputPath(job5, new Path(args[0]));
        //指定输出路径,并要求该输出路径一定是不存在的
        FileOutputFormat.setOutputPath(job5, new Path(args[5]));


        //job-1 加入控制容器
        ControlledJob ctrljob1 = new ControlledJob(conf);
        ctrljob1.setJob(job1);

        //job-2 加入控制容器
        ControlledJob ctrljob2 = new ControlledJob(conf);
        ctrljob2.setJob(job2);

        //job-3 加入控制容器
        ControlledJob ctrljob3 = new ControlledJob(conf);
        ctrljob3.setJob(job3);

        //job-4 加入控制容器
        ControlledJob ctrljob4 = new ControlledJob(conf);
        ctrljob4.setJob(job4);

        //job-5 加入控制容器
        ControlledJob ctrljob5 = new ControlledJob(conf);
        ctrljob5.setJob(job5);


        //设置多个作业直接的依赖关系
        //job-2 的启动,依赖于job-1作业的完成
//        ctrljob2.addDependingJob(ctrljob1);

        //主的控制容器,控制上面的总的两个子作业
        JobControl jobCtrl = new JobControl("MoviesOut");

        //添加到总的JobControl里,进行控制
        jobCtrl.addJob(ctrljob1);
        jobCtrl.addJob(ctrljob2);
        jobCtrl.addJob(ctrljob3);
        jobCtrl.addJob(ctrljob4);
        jobCtrl.addJob(ctrljob5);

        //在线程启动,记住一定要有这个
        Thread t = new Thread(jobCtrl);
        t.start();
        while (true) {
            if (jobCtrl.allFinished()) {
                //如果作业成功完成,就打印成功作业的信息
                System.out.println(jobCtrl.getSuccessfulJobList());
                jobCtrl.stop();
                break;
            }
        }
//        boolean flag = job5.waitForCompletion(true);
//        System.out.println(flag);
//        System.exit(flag? 0 : 1);
    }
}

results matching ""

    No results matching ""