文件合并和去重MR
对于两个输入文件,即文件file1
和文件file2
,请编写MapReduce
程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件file3
。 为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的不同文件合并到一个没有重复的整合文件,规则如下:
- 第一列按学号排列;
- 学号相同,按
x,y,z
排列; - 输入文件路径为:
/user/tmp/input/
; - 输出路径为:
/user/tmp/output/
。
package blog.bian.bigdata;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertIndex_origin {
public static class Map extends Mapper<Object, Text, Text, Text> {
private Text keyInfo = new Text(); // 存储单词和URL组合
private Text valueInfo = new Text(); // 存储词频
private FileSplit split; // 存储Split对象
// 实现map函数
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 获得<key,value>对所属的FileSplit对象
split = (FileSplit) context.getInputSplit();
StringTokenizer itr = new StringTokenizer(value.toString());
/**********Begin**********/
String pathname = split.getPath().getName();
while(itr.hasMoreTokens()) {
keyInfo.set(itr.nextToken()+":"+pathname);
valueInfo.set("1");
context.write(keyInfo,valueInfo);
}
/**********End**********/
}
}
public static class Combine extends Reducer<Text, Text, Text, Text> {
private Text info = new Text();
// 实现reduce函数, 将相同key值的value加起来
// 并将(单词:文件名, value) 转换为 (单词, 文件名:value)
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/**********Begin**********/
// 统计词频
int sum = 0;
for(Text val : values) {
sum += Integer.parseInt(val.toString());
}
String splitkey[] = key.toString().split(":");
// 重新设置value值由URL和词频组成
Text valueInfo = new Text(splitkey[1]+":"+String.valueOf(sum));
// 重新设置key值为单词
Text keyInfo = new Text(splitkey[0]);
context.write(keyInfo,valueInfo);
/**********End**********/
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
// 实现reduce函数, 将相同单词的value聚合成一个总的value,每个value之间用`;`隔开, 最后以`;`结尾
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/**********Begin**********/
String valueInfo = "";
for(Text val : values) {
valueInfo += val.toString()+";";
}
context.write(key,new Text(valueInfo));
/**********End**********/
}
}
public static void main(String[] args) throws Exception {
// 第一个参数为 输入文件目录路径, 第二个参数为输出结果路径
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: Inverted Index <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Inverted Index");
job.setJarByClass(InvertIndex_origin.class);
// 设置Map、Combine和Reduce处理类
job.setMapperClass(Map.class);
job.setCombinerClass(Combine.class);
job.setReducerClass(Reduce.class);
// 设置Map输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置Reduce输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
测试说明
程序会对你编写的代码进行测试: 输入已经指定了测试文本数据:需要你的程序输出合并去重后的结果。 下面是输入文件和输出文件的一个样例供参考。
输入文件file1
的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
输入文件file2
的样例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
根据输入文件file1
和file2
合并得到的输出文件file3
的样例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x