MapReduce Combiner组件

1.Combiner是MR程序中Mapper和Reduce之外的一种组件 2.Combiner组件的父类就是Reducer 3.Combiner和Reducer之间的区别在于运行的位置 4.Reducer是每一个接收全局的Map Task 所输出的结果 5.Combiner是在MapTask的节点中运行

6.每一个map都会产生大量的本地输出,Combiner的作用就是对map输出的结果先做一次合并,以较少的map和reduce节点中的数据传输量

7.Combiner的存在就是提高当前网络IO传输的性能,也是MapReduce的一种优化手段。

8.实现自定义的Combiner 1.因为源码中的Combiner是继承于Reducer,我们使用自己的Combiner就需要继承Reducer并重写reduce方法 2.job中设置job.setCombinerClass(自定义Combiner的类.class)

ps:需要注意一个点就是:Combiner就是一次Reducer类中reduce方法的实现,所以这里的KV需要和Reducer的KV是一致的 实际开发一定是先实现Mapper之后,知道了KV,然后再根据需要实现自定义的Combiner中的KV

Combiner.java

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CombinerDemo extends Reducer<Text, Text, Text, Text>{
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        //因为Combiner就相当于在Mapper实现了reduce方法
        //所以逻辑和实际Reducer中的reduce方法一致
        int sum = 0;
        for (Text t : values) {
            sum += Integer.parseInt(t.toString());
        }
        context.write(key, new Text(sum + ""));
    }    
}

WordCountCombiner.java

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountCombiner {
    //实现MapReduce
    /*1.实现Mapper端的逻辑
     * KEYIN:文件中读取的偏移量-->LongWritable(固定的) 
     * VALUEIN:文件中实际读取的内容-->Text
     * KEYOUT:Mapper处理完成后传递给Reducer中的KEYIN的数据类型-->不固定,根据需求来
     * VALUEOUT:Mapper端处理完成后传递给Reducer中的VALUEIN的数据类型-->不固定,根据需求来
     * 
     */
    public static class MyMapper extends  Mapper<LongWritable, Text, Text, Text>{
        /* 进入Map处理逻辑之前会执行一次的方法
         */
        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
        }
        /*
         * 需要实现Mapper端的处理逻辑
         * key:是文件中数据的偏移量,数据类型是由泛型中定义得来的KEYIN
         * value:是文件中实际的内容,数据类型是泛型中定义得来的VALUEIN
         * context:将处理过后产生的KV,写成文件输出
         */
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //第一种
            String [] words = value.toString().split(" ");
            for (String word : words) {
                context.write(new Text(word), new Text("1"));
            }
            //第二种
            /*
             * 通过StringTokenizer类来进行拆分,默认是空格\t,\n,\r,\f
             * 需要向 使用迭代器一样使用这个对象
             */
        /*    String line = value.toString();
            //若文件中的数据分割方式较多,那么建议使用这个类进行拆分
            //这个类实现了枚举迭代器,所以它提供了一些类似于迭代一样的获取数据方式
            StringTokenizer st = new StringTokenizer(line);
            while(st.hasMoreTokens()) {//返回是否还有分隔符-->即判断是否还有其他分隔符
                //返回从当前位置到分隔符之间的字符串-->获取下一个元素
//                st.nextToken();//取出元素
                context.write(new Text(st.nextToken()), new Text("1"));
            }*/

        }
        /* 
         * 在Map处理逻辑之后会执行一次,可以处理一些逻辑
         */
        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {

        }
    }

    //实现Reducer端的逻辑
   /*
    * Reducer相当于对Mapper端处理过后的数据进行一个实际的处理业务
    * KEYIN-->Mapper处理过后输出key的数据类型,由Mapper的泛型中第三个参数决定
    * VALUE-->Mapper处理过后输出value的数据类型,由Mapper的泛型中第四个参数决定
    * KEYOUT-->Reducer端处理完数据之后要写出key的数据类
    * VALUEOUT-->Reducer处理完数据之后,要写出value的 数据类
    */
    public static class MyReduce extends Reducer<Text, Text, Text, Text>{
        /* 执行Reducer端先执行一次的方法
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

        }

        /*
         *reduce方法是处理业务的核心逻辑 
         *key: 是从Mapper端处理完成后,产生key的数据
         *values-->是从 Mapper端处理完成之后相同key的values的数据集合
         *context-->写出实际 处理完成后KV的方法 
         */
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            //因为Combiner就相当于在Mapper实现了reduce方法
            //所以逻辑和实际Reducer中的reduce方法一致
            int sum = 0;
            for (Text t : values) {
                sum += Integer.parseInt(t.toString());
            }
            context.write(key, new Text(sum + ""));
        }
        /* 
         * 执行完reduce方法执行的方法
         * 
         */
        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {

        }
    }
    /**
     * 实现job,完成作业配置
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.获取配置对象
        Configuration conf = new Configuration();
        //2.创建Job对象(创建作业)
        /*
         * 这个方法一共有两个参数版本
         * getInstance(conf) --------直接传入配置对象
         * getInstance(conf,"WordCountCombiner")---传入配置对象和类的名字
         */
        Job job = Job.getInstance(conf);
        //3.设置运行job的class
        job.setJarByClass(WordCountCombiner.class);

        //4.设置Mapper端的运行类和输出key,输出value的数据类型
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //5.读取数据来源

        //这两个方法处理是一样的,只是最后一个参数不同,
        /*
         * FileInputFormat.addInputPath(job, new Path("input/data1"));
         * 
         * add:证明只有 一个路径,
         * 
         * FileInputFormat.setInputPaths(job, new Path("input/data1"));
         * set证明后面是可变参数,多个
         * 
         * 因为当前运行的是本地MR,所以数据是 从本地读取,若需要在集群中运行,这个位置的参数应该是args[0]
         */
        //FileInputFormat.addInputPath(job, new Path("input/data1"));
        FileInputFormat.setInputPaths(job, new Path("input/data1"));

        //优化设置
        //一般可以写分区设置,多文件输出设置,Combiner设置
        /*
         * 并不是所有job都适用于Combiner,只有操作满足结合规律才可以进行设置
         * 如 求和,求最大值,topN 等可以使用Combiner
         * 
         * Combiner不一定需要存在,只有数据量较大,需要做优化的时候可以使用
         */
        job.setCombinerClass(CombinerDemo.class);

        //6.社会Reducer端的运行类和输出key和输出value的数据类型
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //7.处理完文件之后输出的路径
        //ps:因为当前运行的是本地MR,所以数据是写到本地的,若需要再集群中运行,这个位置的参数应该是args[1]
        //数据是存储到HDFS中
        FileOutputFormat.setOutputPath(job, new Path("output1"));

        //8.提交作业
        int isok = job.waitForCompletion(true)?0:-1;
        System.exit(isok);
    }
}

在这里插入图片描述 word.txt里面的内容 在这里插入图片描述

结果:

在这里插入图片描述

results matching ""

    No results matching ""