MapReduce常见实例:Join操作

img

Map端Join

MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有单表连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。

1.Map端join的使用场景:一张表数据十分小、一张表数据很大。

Map端join是针对以上场景进行的优化:将小表中的数据全部加载到内存,按关键字建立索引。大表中的数据作为map的输入,对map()函数每一对输入,都能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,reduce端得到的就是已经按key分组并且连接好了的数据。

为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。

(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

任务描述

现有订单表orders1和订单明细表order_items1,orders1表记录了用户购买商品的下单数据,order_items1表记录了商品id,订单id以及明细id,它们的表结构以及关系如下图所示:

img

数据内容如下:

orders1表

订单ID   订单号          用户ID    下单日期
52304    111215052630    176474    2011-12-15 04:58:21
52303    111215052629    178350    2011-12-15 04:45:31
52302    111215052628    172296    2011-12-15 03:12:23
52301    111215052627    178348    2011-12-15 02:37:32
52300    111215052626    174893    2011-12-15 02:18:56
52299    111215052625    169471    2011-12-15 01:33:46
52298    111215052624    178345    2011-12-15 01:04:41
52297    111215052623    176369    2011-12-15 01:02:20
52296    111215052622    178343    2011-12-15 00:38:02
52295    111215052621    178342    2011-12-15 00:18:43
52294    111215052620    178341    2011-12-15 00:14:37
52293    111215052619    178338    2011-12-15 00:13:07

order_items1表

明细ID  订单ID   商品ID
252578    52293    1016840
252579    52293    1014040
252580    52294    1014200
252581    52294    1001012
252582    52294    1022245
252583    52294    1014724
252584    52294    1010731
252586    52295    1023399
252587    52295    1016840
252592    52296    1021134
252593    52296    1021133
252585    52295    1021840
252588    52295    1014040
252589    52296    1014040
252590    52296    1019043

要求用Map端Join来进行多表连接,查询在2011-12-15日都有哪些用户购买了什么商品。(假设orders1文件记录数很少,order_items1文件记录数很多)

流程分析

(1)首先在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join连接的 键值对,将其解释分割放到内存中(可以放大Hash Map等等容器中)。

(2)要重写MyMapper类下面的setup()方法,因为这个方法是先于map方法执行的,将较小表先读入到一个HashMap中。

(3)重写map函数,一行行读入大表的内容,逐一的与HashMap中的内容进行比较,若Key相同,则对数据进行格式化处理,然后直接输出。

(4)map函数输出的键值对首先经过一个suffle把key值相同的所有value放到一个迭代器中形成values,然后将键值对传递给reduce函数,reduce函数输入的key直接复制给输出的key,输入的values通过增强版for循环遍历逐一输出,循环的次数决定了输出的次数。

代码

// Map端join适用于一个表记录数很少(100条),另一表记录数很多(像几亿条)的情况
// 把小表数据加载到内存中,然后扫描大表,看大表中记录的每条join key/value是否能在内存中找到相同的join key记录,如果有则输出结果
// 这样避免了一种数据倾斜问题
package MapReduce.join;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import java.io.*;
import java.net.*;
import java.util.*;


// orders1表记录了用户购买商品的下单数据,包含订单id,订单号,用户id,下单日期
// order_items1表包含明细id,订单id,商品id
// 要求用Map端Join来进行多表连接,查询在2011-12-15日都有哪些用户购买了什么商品
public class MapJoin {

    public static class MyMap extends Mapper<Object, Text, Text, Text>{
        private Map<String, String> dict = new HashMap<>();
        @Override
        protected void setup(Context context) throws IOException {//处理内存中的文件
            String fileName = context.getLocalCacheFiles()[0].getName();//获取当前文件名为orders1的文件
            BufferedReader reader = new BufferedReader(new FileReader(fileName));//用bufferedReader读取内存中缓存文件
            String line = null;
            while (null != ( line = reader.readLine() ) ) {//用readLine()方法读取每行记录
                String str[]=line.split("\t");//截取每行记录,订单id,订单号,用户id,下单日期
                //两张表中相同的字段为订单id,作为key放入map集合dict中
                dict.put(str[0], str[2]+"\t"+str[3]);//<订单id,(用户id,下单日期)>
            }
            reader.close();
        }
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {//接收order_items文件数据
            String[] kv = value.toString().split("\t");//截取每行记录,明细id,订单id,商品id
            if (dict.containsKey(kv[1])) {//如果有相同的订单id
                context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));//<订单id,<(用户id,下单日期),商品id>>
            }
        }
    }

    public static class MyReduce extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text text : values) {
                context.write(key, text);//<订单id,<用户id,下单日期,商品id>>
            }
        }
    }

    public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
        Job job = Job.getInstance();
        job.setJobName("mapjoin");
        job.setJarByClass(MapJoin.class);
        job.setMapperClass(MyMap.class);
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        Path in = new Path("hdfs://localhost:9000/mr/in/order_items1");
        Path out = new Path("hdfs://localhost:9000/mr/out/join/mapjoin");
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);

        URI uri = new URI("hdfs://localhost:9000/mr/in/orders1");
        job.addCacheFile(uri);//把小表加载到内存中

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

执行结果

img

Reduce端Join

在Reudce端进行Join连接是MapReduce框架进行表之间Join操作最为常见的模式。

1.Reduce端Join实现原理

(1)Map端的主要工作,为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

(2)Reduce端的主要工作,在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。

2.Reduce端Join的使用场景

Reduce端连接比Map端连接更为普遍,因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中,但是Reduce端连接效率比较低,因为所有数据都必须经过Shuffle过程。

任务描述

用Reduce端Join来进行多表连接,查询在2011-12-15日都有哪些用户购买了什么商品。

流程分析

(1)Map端读取所有的文件,并在输出的内容里加上标识,代表数据是从哪个文件里来的。

(2)在Reduce处理函数中,按照标识对数据进行处理。

(3)然后将相同的key值进行Join连接操作,求出结果并直接输出。

代码

package MapReduce.join;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import java.io.*;
import java.util.*;

// orders1表记录了用户购买商品的下单数据,包含订单id,订单号,用户id,下单日期
// order_items1表包含明细id,订单id,商品id
// 要求查询在2011-12-15日该电商都有哪些用户购买了什么商品
public class ReduceJoin {

    public static class MyMap extends Mapper<Object, Text, Text, Text>{
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
            if (filePath.contains("orders1")) {
                String line = value.toString();//获取行文本内容
                String[] arr = line.split("\t");//订单id,订单号,用户id,下单日期
                context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));//<订单id,(1+用户id 下单日期)>
            }else if(filePath.contains("order_items1")) {
                String line = value.toString();
                String[] arr = line.split("\t");//明细id,订单id,商品id
                context.write(new Text(arr[1]), new Text("2+" + arr[2]));//<订单id,(2+商品id)>
            }
        }
    }

    public static class MyReduce extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            List<String> orders  = new ArrayList<String>();
            List<String> items  = new ArrayList<String>();
            for (Text val : values) {
                String str = val.toString();
                if (str.startsWith("1+"))
                    orders.add(str.substring(2));//用户id 下单日期
                else if (str.startsWith("2+"))
                    items.add(str.substring(2));//商品id
            }
            for(String o:orders)
                for(String i:items)
                    context.write(key,new Text(o+"\t"+i));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();
        job.setJobName("reducejoin");
        job.setJarByClass(ReduceJoin.class);
        job.setMapperClass(MyMap.class);
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        Path left = new Path("hdfs://localhost:9000/mr/in/orders1");
        Path right = new Path("hdfs://localhost:9000/mr/in/order_items1");
        Path out = new Path("hdfs://localhost:9000/mr/out/join/reducejoin");
        FileInputFormat.addInputPath(job, left);
        FileInputFormat.addInputPath(job, right);
        FileOutputFormat.setOutputPath(job, out);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

执行结果

img

单表Join

任务描述

现有某电商的用户好友数据文件,名为 buyer1,buyer1中包含(buyer_id,friends_id)两个字段,内容是以"\t"分隔,编写MapReduce进行单表连接,查询出用户的间接好友关系。例如:10001的好友是10002,而10002的好友是10005,那么10001和10005就是间接好友关系。数据内容如下:

buyer1(buyer_id,friends_id)

10001    10002
10002    10005
10003    10002
10004    10006
10005    10007
10006    10022
10007    10032
10009    10006
10010    10005
10011    10013

流程分析

单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,需要:

  1. 在map阶段将读入数据分割成buyer_id和friends_id之后,会将buyer_id设置成key,friends_id设置成value,直接输出并将其作为左表;
  2. 再将同一对buyer_id和friends_id中的friends_id设置成key,buyer_id设置成value进行输出,作为右表。
  3. 为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。
  4. reduce接收到连接的结果,其中每个key的value-list就包含了"buyer_id friends_id--friends_id buyer_id"关系。
  5. 取出每个key的value-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组遍历输出就是最后的结果了。

https://img-blog.csdnimg.cn/20181220163856536

代码

package MapReduce.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

// buyer1(buyer_id,friends_id),内容以"\t"分隔
// 查询出用户的间接好友关系
// 例如:10001的好友是10002,而10002的好友是10005,那么10001和10005就是间接好友关系。
public class SingleTableJoin_1 {
    public static class Map extends Mapper<Object,Text,Text,Text>{
        public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
            String line = value.toString();
            String[] arr = line.split("\t");
            String mapkey=arr[0];//buyer_id
            String mapvalue=arr[1];//friends_id
            context.write(new Text(mapkey),new Text("1+"+mapvalue));//buyer_id,1+friends_id
            context.write(new Text(mapvalue),new Text("2+"+mapkey));//friends_id,2+buyer_id
        }
    }
    public static class Reduce extends Reducer<Text, Text, Text, Text>{
        private static Text myKey=new Text();
        private static Text myValue=new Text();
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
            List<String> buyers=new ArrayList<String>();
            List <String> friends=new ArrayList <String>();
            for(Text val:values){
                String str=val.toString();
                if(str.startsWith("1+"))
                    buyers.add(str.substring(2));
                else if(str.startsWith("2+"))
                    friends.add(str.substring(2));
            }
            for(String b : buyers){
                for(String f : friends){
                    myKey.set(b);
                    myValue.set(f);
                    context.write(myKey, myValue);
                }
            }
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf=new Configuration();
        String[] otherArgs=new String[2];
        otherArgs[0]="hdfs://localhost:9000/mr/in/buyer1";
        otherArgs[1]="hdfs://localhost:9000/mr/out/join/singletable_1";
        Job job=new Job(conf," SingleTableJoin_1");
        job.setJarByClass(SingleTableJoin_1.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

执行结果

img

https://blog.csdn.net/qq_34239412/article/details/85125741

results matching ""

    No results matching ""