电影票房数据清洗(MapReduce)

任务描述

根据提供的初始数据集(数据集存放在 /data/workspace/myshixun/data/movies.csv 中),按照下面的要求,完成电影票房数据的清洗工作。

部分数据展示:

中国机长,3369.04万,30.4%,107068,32.7%,9,6.0%,24.98亿,上映15天,2019-10-14
我和我的祖国,3347.62万,30.2%,78832,24.0%,12,7.6%,26.49亿,上映15天,2019-10-14

编程要求

 @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 在map中取得行数据,去掉两端空格后并以逗号切分成数组
        String line[] = value.toString().trim().split(",");
    ...
  1. 去掉字段“上映天数”中带有“零点场”、“点映”、“展映”和“重映”的电影数据;

    String movie_days = line[8].trim();  //上映天数
    
    // 去掉字段“上映天数”中带有“零点场”、“点映”、“展映”和“重映”的电影数据;
    String movie_days_keywords[] = {"零点场","点映","展映","重映"};
    
    // 遍历关键词数组,如果上映天数包含关键词,则结束本次map
    for (String keywords : movie_days_keywords) {
        if (movie_days.contains(keywords)) {
            return;
        }
    }
    
  1. 以字段“上映天数”和“当前日期”为依据,在尾列添加一个“上映日期”(releaseDate)的字段,该字段值为“当前日期”减去“上映天数”+1(格式为:2020-10-13)。例如:若字段“上映天数”的值为“上映2天”,字段“当前日期”为“2020-10-10”,则字段“上映日期”的值为“2020-10-09”。如果字段“上映天数”为空,则字段“上映日期”的值设为“往期电影”。注意:若字段“上映天数”的值为“上映首日”,则字段“上映日期”的值应设为“当前日期”的值;

    String movie_days = line[8].trim();  //上映天数
    String current_time = line[9].trim(); //当前日期
    
    String movie_time = ""; //初始化上映日期
    
    // 条件1: 若字段“上映天数”的值为“上映首日”,则字段“上映日期”的值应设为“当前日期”的值;
    if(movie_days.equals("上映首日")) {
        movie_time = current_time;
    } 
    // 条件2: 如果字段“上映天数”为空,则字段“上映日期”的值设为“往期电影”。
    else if ("".equals(movie_days)|| movie_days==null) {
        movie_time = "往期电影";
    }
    // 默认条件: 该字段值为“当前日期”减去“上映天数”+1(格式为:2020-10-13)。
    else {
        /** 正则匹配取出上映天数
        String regEx="[^0-9]";
        Pattern toPattern = Pattern.compile(regEx);
        Matcher matcherdays = toPattern.matcher(movie_days);
        String days = matcherdays.replaceAll("").trim(); 
        **/
    
         // 遍历字符串取出上映天数   
        String days = "";
        for(int i=0;i<movie_days.length();i++) {
            // ASCII码 数字字符范围 48-57
            if (movie_days.charAt(i) >= 48 && movie_days.charAt(i) <= 57) {
                days += movie_days.charAt(i);
            }
        }
        // 如果长度大于0,则存在上映天数
        if (days.length() > 0) {
             // 格式化日期,将字符串日期转为 Date格式   
            SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd");
            Date current_data = null;
            try {
                current_data = df.parse(current_time);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            // 使用Calendar类获取当前日期
            Calendar cal = Calendar.getInstance();
            cal.setTime(current_data);
            // 进行加减运算
            cal.add(Calendar.DATE, -(Integer.valueOf(days)-1));
            movie_time = df.format(cal.getTime());
        } 
    
        // 如果字段“上映天数”为空,则字段“上映日期”的值设为“往期电影”。
        else {
            movie_time = "往期电影";
        }
    
    }
    
  1. 对字段“当日综合票房”和字段“当前总票房”的数据值进行处理,单位量级统一以“万”展示数据,若原数据中有“万”等表示量级的汉字,需去掉量级汉字。如:“1.5亿”需转换为“15000”,“162.4万”转换为“162.4”,转换时需注意精度缺失问题,当我们使用 double、float 等类型做算术运算时,例如:1.14 亿转换为万时,结果会出现 11399.999999999998 万,这里我们可以使用 BigDecimal 类来处理这类精度缺失问题,字段“当日综合票房”和字段“当前总票房”最后保留两位小数;

    // 单位量级统一以“万”展示数据
    if (boxoffice.contains("万")) {
        // 去掉单位 "万"
        boxoffice = boxoffice.replace("万", "").trim();
        // 使用BigDecimal类上取整保留两位小数
        boxoffice = new BigDecimal(boxoffice)
            .setScale(2,BigDecimal.ROUND_HALF_UP).toString();
        // 替换原数组“当日综合票房”
        line[1] = boxoffice;
    } 
    else if(boxoffice.contains("亿")) {
        boxoffice = boxoffice.replace("亿", "").trim();
        // 上取整保留两位小数并与10000相乘,获得以万为单位量级数据
        boxoffice = new BigDecimal(boxoffice)
                .setScale(2, BigDecimal.ROUND_HALF_UP)
                .multiply(new BigDecimal(10000)).toString();
        line[1] = boxoffice;
    }
    
    // “当前总票房”处理方式同上
    if (total_boxoffice.contains("万")) {
        total_boxoffice = total_boxoffice.replace("万", "").trim();
        total_boxoffice = new BigDecimal(total_boxoffice)
                .setScale(2, BigDecimal.ROUND_HALF_UP).toString();
        line[7] = total_boxoffice;
    } 
    else if(total_boxoffice.contains("亿")) {
        total_boxoffice = total_boxoffice.replace("亿", "").trim();
        total_boxoffice = new BigDecimal(total_boxoffice)
                .setScale(2, BigDecimal.ROUND_HALF_UP)
                .multiply(new BigDecimal(10000)).toString();
        line[7] = total_boxoffice;
    }
    
    // 输出Map到Reduce
    String word = "";
    
    for(String n:line){
        word += n+"\t";
    }
    word +=movie_time;
    context.write(new Text(word),NullWritable.get());
    
    // Reduce 阶段不做数据处理,直接写入
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        /******** Begin **********/
        context.write(NullWritable.get(), key);
        /******** End **********/
    }
    
  1. 清洗完的数据集存储到 /root/files 目录下,分隔方式为 \t

    // MovieJob 主入口
    
    public class MovieJob {
        public static void main(String[] args) throws Exception {
            /********** Begin **********/
            // 数据文件路径和输出路径
            Path inputpath=new Path("/data/workspace/myshixun/data/movies.csv");
            Path outputpath=new Path("/root/files");
            // 新建一个Job任务
            Configuration conf=new Configuration();
            Job job=Job.getInstance(conf);
            // 指定jar包
            job.setJarByClass(MovieJob.class);
            // 指定mapper类和reducer类
            job.setMapperClass(MovieMap.class);
            job.setReducerClass(MovieReduce.class);
            // 指定reducetask的输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            // 指定程序数据的输入和输出路径
            FileInputFormat.addInputPath(job, inputpath);
            FileOutputFormat.setOutputPath(job,outputpath);
            // 提交任务
            boolean flag = job.waitForCompletion(true);
            System.exit(flag? 0 : 1);
    
            /********** End **********/
        }
    }
    

results matching ""

    No results matching ""