MapReduce编程规范

  1. 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
  2. Mapper的输入数据是KV对的形式(KV的类型可自定义)
  3. Mapper的输出数据是KV对的形式(KV的类型可自定义)
  4. Mapper中的业务逻辑写在map()方法中
  5. map()方法(maptask进程)对每一个调用一次
  6. Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
  7. Reducer的业务逻辑写在reduce()方法中
  8. Reducetask进程对每一组相同k的组调用一次reduce()方法
  9. 用户自定义的Mapper和Reducer都要继承各自的父类
  10. 整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的job对象
public class WordcountMapper  extends Mapper<LongWritable,Text,Text,IntWritable> {
    /**
     * map阶段的业务逻辑就写在自定义的map()方法中
     * maptask会对每一行输入数据调用一次我们自定义的map()方法
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws  IOException, InterruptedException{
        String line= value.toString();
       String[] words = line.split("");
       for(String word :words){
           context.write(new Text(word),new IntWritable(1));
       }
    }
}


public class WordcountReducer  extends Reducer <Text, IntWritable, Text, IntWritable> {

    /**
     * 入参key,是一组相同单词kv对的key
     */
    @Override
    protected  void reduce( Text key, Iterable<IntWritable> values,Context context ) throws IOException, InterruptedException{

        int count = 0;
        for(IntWritable value : values){
            count += value.get();
        }
        context.write(key,new IntWritable(count));
    }
}

基本数据类型

这些数据都实现了Writable接口:

  • BooleanWriable:标准布尔型数
  • ByteWriable:单字节数
  • DoubleWriable:双字节数值
  • FloatWriable:浮点数
  • IntWriable:整型数
  • LongWriable:长整型数
  • Text:使用UTF8格式存储的文本
  • NullWriable:当中的key或value为空时使用

    NullWritable.get()

mapreduce程序如何跳过待处理文件的首行

map的输入key为当前行在文件内的位置偏移量,所以首行的偏移量肯定是0,所以可以进行如下判断来跳过第一行的处理


public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            log.info("----------key" + key.toString() + "--------------");

            log.info("----------" + value.toString() + "--------------");
            if (key.toString().equals("0")) {
                log.info("----------不处理--------------");
                return;
            } else {
                log.info("----------处理--------------");
            }

            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);

            }
        }

MapReduce 去掉空白行



if (StringUtils.isBlank(value.toString())) {
    System.out.println("空白行");
    return;
}

StringUtils.isEmpty("yyy") = false
StringUtils.isEmpty("") = true
StringUtils.isEmpty("   ") = false

StringUtils.isBlank("yyy") = false
StringUtils.isBlank("") = true
StringUtils.isBlank("   ") = true

results matching ""

    No results matching ""