电影票房数据清洗(MapReduce)
任务描述
根据提供的初始数据集(数据集存放在 /data/workspace/myshixun/data/movies.csv
中),按照下面的要求,完成电影票房数据的清洗工作。
部分数据展示:
中国机长,3369.04万,30.4%,107068,32.7%,9,6.0%,24.98亿,上映15天,2019-10-14
我和我的祖国,3347.62万,30.2%,78832,24.0%,12,7.6%,26.49亿,上映15天,2019-10-14
编程要求
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 在map中取得行数据,去掉两端空格后并以逗号切分成数组
String line[] = value.toString().trim().split(",");
...
去掉字段“上映天数”中带有“零点场”、“点映”、“展映”和“重映”的电影数据;
String movie_days = line[8].trim(); //上映天数 // 去掉字段“上映天数”中带有“零点场”、“点映”、“展映”和“重映”的电影数据; String movie_days_keywords[] = {"零点场","点映","展映","重映"}; // 遍历关键词数组,如果上映天数包含关键词,则结束本次map for (String keywords : movie_days_keywords) { if (movie_days.contains(keywords)) { return; } }
以字段“上映天数”和“当前日期”为依据,在尾列添加一个“上映日期”(releaseDate)的字段,该字段值为“当前日期”减去“上映天数”+1(格式为:2020-10-13)。例如:若字段“上映天数”的值为“上映2天”,字段“当前日期”为“2020-10-10”,则字段“上映日期”的值为“2020-10-09”。如果字段“上映天数”为空,则字段“上映日期”的值设为“往期电影”。注意:若字段“上映天数”的值为“上映首日”,则字段“上映日期”的值应设为“当前日期”的值;
String movie_days = line[8].trim(); //上映天数 String current_time = line[9].trim(); //当前日期 String movie_time = ""; //初始化上映日期
// 条件1: 若字段“上映天数”的值为“上映首日”,则字段“上映日期”的值应设为“当前日期”的值; if(movie_days.equals("上映首日")) { movie_time = current_time; } // 条件2: 如果字段“上映天数”为空,则字段“上映日期”的值设为“往期电影”。 else if ("".equals(movie_days)|| movie_days==null) { movie_time = "往期电影"; } // 默认条件: 该字段值为“当前日期”减去“上映天数”+1(格式为:2020-10-13)。 else { /** 正则匹配取出上映天数 String regEx="[^0-9]"; Pattern toPattern = Pattern.compile(regEx); Matcher matcherdays = toPattern.matcher(movie_days); String days = matcherdays.replaceAll("").trim(); **/ // 遍历字符串取出上映天数 String days = ""; for(int i=0;i<movie_days.length();i++) { // ASCII码 数字字符范围 48-57 if (movie_days.charAt(i) >= 48 && movie_days.charAt(i) <= 57) { days += movie_days.charAt(i); } } // 如果长度大于0,则存在上映天数 if (days.length() > 0) { // 格式化日期,将字符串日期转为 Date格式 SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd"); Date current_data = null; try { current_data = df.parse(current_time); } catch (ParseException e) { e.printStackTrace(); } // 使用Calendar类获取当前日期 Calendar cal = Calendar.getInstance(); cal.setTime(current_data); // 进行加减运算 cal.add(Calendar.DATE, -(Integer.valueOf(days)-1)); movie_time = df.format(cal.getTime()); } // 如果字段“上映天数”为空,则字段“上映日期”的值设为“往期电影”。 else { movie_time = "往期电影"; } }
对字段“当日综合票房”和字段“当前总票房”的数据值进行处理,单位量级统一以“万”展示数据,若原数据中有“万”等表示量级的汉字,需去掉量级汉字。如:“1.5亿”需转换为“15000”,“162.4万”转换为“162.4”,转换时需注意精度缺失问题,当我们使用 double、float 等类型做算术运算时,例如:1.14 亿转换为万时,结果会出现 11399.999999999998 万,这里我们可以使用 BigDecimal 类来处理这类精度缺失问题,字段“当日综合票房”和字段“当前总票房”最后保留两位小数;
// 单位量级统一以“万”展示数据 if (boxoffice.contains("万")) { // 去掉单位 "万" boxoffice = boxoffice.replace("万", "").trim(); // 使用BigDecimal类上取整保留两位小数 boxoffice = new BigDecimal(boxoffice) .setScale(2,BigDecimal.ROUND_HALF_UP).toString(); // 替换原数组“当日综合票房” line[1] = boxoffice; } else if(boxoffice.contains("亿")) { boxoffice = boxoffice.replace("亿", "").trim(); // 上取整保留两位小数并与10000相乘,获得以万为单位量级数据 boxoffice = new BigDecimal(boxoffice) .setScale(2, BigDecimal.ROUND_HALF_UP) .multiply(new BigDecimal(10000)).toString(); line[1] = boxoffice; } // “当前总票房”处理方式同上 if (total_boxoffice.contains("万")) { total_boxoffice = total_boxoffice.replace("万", "").trim(); total_boxoffice = new BigDecimal(total_boxoffice) .setScale(2, BigDecimal.ROUND_HALF_UP).toString(); line[7] = total_boxoffice; } else if(total_boxoffice.contains("亿")) { total_boxoffice = total_boxoffice.replace("亿", "").trim(); total_boxoffice = new BigDecimal(total_boxoffice) .setScale(2, BigDecimal.ROUND_HALF_UP) .multiply(new BigDecimal(10000)).toString(); line[7] = total_boxoffice; }
// 输出Map到Reduce String word = ""; for(String n:line){ word += n+"\t"; } word +=movie_time; context.write(new Text(word),NullWritable.get());
// Reduce 阶段不做数据处理,直接写入 @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { /******** Begin **********/ context.write(NullWritable.get(), key); /******** End **********/ }
清洗完的数据集存储到
/root/files
目录下,分隔方式为\t
。// MovieJob 主入口 public class MovieJob { public static void main(String[] args) throws Exception { /********** Begin **********/ // 数据文件路径和输出路径 Path inputpath=new Path("/data/workspace/myshixun/data/movies.csv"); Path outputpath=new Path("/root/files"); // 新建一个Job任务 Configuration conf=new Configuration(); Job job=Job.getInstance(conf); // 指定jar包 job.setJarByClass(MovieJob.class); // 指定mapper类和reducer类 job.setMapperClass(MovieMap.class); job.setReducerClass(MovieReduce.class); // 指定reducetask的输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 指定程序数据的输入和输出路径 FileInputFormat.addInputPath(job, inputpath); FileOutputFormat.setOutputPath(job,outputpath); // 提交任务 boolean flag = job.waitForCompletion(true); System.exit(flag? 0 : 1); /********** End **********/ } }