MapReduce常见实例:Join操作
Map端Join
MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有单表连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。
1.Map端join的使用场景:一张表数据十分小、一张表数据很大。
Map端join是针对以上场景进行的优化:将小表中的数据全部加载到内存,按关键字建立索引。大表中的数据作为map的输入,对map()函数每一对
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:
(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
任务描述
现有订单表orders1和订单明细表order_items1,orders1表记录了用户购买商品的下单数据,order_items1表记录了商品id,订单id以及明细id,它们的表结构以及关系如下图所示:
数据内容如下:
orders1表
订单ID 订单号 用户ID 下单日期
52304 111215052630 176474 2011-12-15 04:58:21
52303 111215052629 178350 2011-12-15 04:45:31
52302 111215052628 172296 2011-12-15 03:12:23
52301 111215052627 178348 2011-12-15 02:37:32
52300 111215052626 174893 2011-12-15 02:18:56
52299 111215052625 169471 2011-12-15 01:33:46
52298 111215052624 178345 2011-12-15 01:04:41
52297 111215052623 176369 2011-12-15 01:02:20
52296 111215052622 178343 2011-12-15 00:38:02
52295 111215052621 178342 2011-12-15 00:18:43
52294 111215052620 178341 2011-12-15 00:14:37
52293 111215052619 178338 2011-12-15 00:13:07
order_items1表
明细ID 订单ID 商品ID
252578 52293 1016840
252579 52293 1014040
252580 52294 1014200
252581 52294 1001012
252582 52294 1022245
252583 52294 1014724
252584 52294 1010731
252586 52295 1023399
252587 52295 1016840
252592 52296 1021134
252593 52296 1021133
252585 52295 1021840
252588 52295 1014040
252589 52296 1014040
252590 52296 1019043
要求用Map端Join来进行多表连接,查询在2011-12-15日都有哪些用户购买了什么商品。(假设orders1文件记录数很少,order_items1文件记录数很多)
流程分析
(1)首先在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join连接的
(2)要重写MyMapper类下面的setup()方法,因为这个方法是先于map方法执行的,将较小表先读入到一个HashMap中。
(3)重写map函数,一行行读入大表的内容,逐一的与HashMap中的内容进行比较,若Key相同,则对数据进行格式化处理,然后直接输出。
(4)map函数输出的
代码
// Map端join适用于一个表记录数很少(100条),另一表记录数很多(像几亿条)的情况
// 把小表数据加载到内存中,然后扫描大表,看大表中记录的每条join key/value是否能在内存中找到相同的join key记录,如果有则输出结果
// 这样避免了一种数据倾斜问题
package MapReduce.join;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import java.io.*;
import java.net.*;
import java.util.*;
// orders1表记录了用户购买商品的下单数据,包含订单id,订单号,用户id,下单日期
// order_items1表包含明细id,订单id,商品id
// 要求用Map端Join来进行多表连接,查询在2011-12-15日都有哪些用户购买了什么商品
public class MapJoin {
public static class MyMap extends Mapper<Object, Text, Text, Text>{
private Map<String, String> dict = new HashMap<>();
@Override
protected void setup(Context context) throws IOException {//处理内存中的文件
String fileName = context.getLocalCacheFiles()[0].getName();//获取当前文件名为orders1的文件
BufferedReader reader = new BufferedReader(new FileReader(fileName));//用bufferedReader读取内存中缓存文件
String line = null;
while (null != ( line = reader.readLine() ) ) {//用readLine()方法读取每行记录
String str[]=line.split("\t");//截取每行记录,订单id,订单号,用户id,下单日期
//两张表中相同的字段为订单id,作为key放入map集合dict中
dict.put(str[0], str[2]+"\t"+str[3]);//<订单id,(用户id,下单日期)>
}
reader.close();
}
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {//接收order_items文件数据
String[] kv = value.toString().split("\t");//截取每行记录,明细id,订单id,商品id
if (dict.containsKey(kv[1])) {//如果有相同的订单id
context.write(new Text(kv[1]), new Text(dict.get(kv[1])+"\t"+kv[2]));//<订单id,<(用户id,下单日期),商品id>>
}
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text text : values) {
context.write(key, text);//<订单id,<用户id,下单日期,商品id>>
}
}
}
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
Job job = Job.getInstance();
job.setJobName("mapjoin");
job.setJarByClass(MapJoin.class);
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path in = new Path("hdfs://localhost:9000/mr/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mr/out/join/mapjoin");
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
URI uri = new URI("hdfs://localhost:9000/mr/in/orders1");
job.addCacheFile(uri);//把小表加载到内存中
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
执行结果
Reduce端Join
在Reudce端进行Join连接是MapReduce框架进行表之间Join操作最为常见的模式。
1.Reduce端Join实现原理
(1)Map端的主要工作,为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
(2)Reduce端的主要工作,在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。
2.Reduce端Join的使用场景
Reduce端连接比Map端连接更为普遍,因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中,但是Reduce端连接效率比较低,因为所有数据都必须经过Shuffle过程。
任务描述
用Reduce端Join来进行多表连接,查询在2011-12-15日都有哪些用户购买了什么商品。
流程分析
(1)Map端读取所有的文件,并在输出的内容里加上标识,代表数据是从哪个文件里来的。
(2)在Reduce处理函数中,按照标识对数据进行处理。
(3)然后将相同的key值进行Join连接操作,求出结果并直接输出。
代码
package MapReduce.join;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import java.io.*;
import java.util.*;
// orders1表记录了用户购买商品的下单数据,包含订单id,订单号,用户id,下单日期
// order_items1表包含明细id,订单id,商品id
// 要求查询在2011-12-15日该电商都有哪些用户购买了什么商品
public class ReduceJoin {
public static class MyMap extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
if (filePath.contains("orders1")) {
String line = value.toString();//获取行文本内容
String[] arr = line.split("\t");//订单id,订单号,用户id,下单日期
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));//<订单id,(1+用户id 下单日期)>
}else if(filePath.contains("order_items1")) {
String line = value.toString();
String[] arr = line.split("\t");//明细id,订单id,商品id
context.write(new Text(arr[1]), new Text("2+" + arr[2]));//<订单id,(2+商品id)>
}
}
}
public static class MyReduce extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<String> orders = new ArrayList<String>();
List<String> items = new ArrayList<String>();
for (Text val : values) {
String str = val.toString();
if (str.startsWith("1+"))
orders.add(str.substring(2));//用户id 下单日期
else if (str.startsWith("2+"))
items.add(str.substring(2));//商品id
}
for(String o:orders)
for(String i:items)
context.write(key,new Text(o+"\t"+i));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJobName("reducejoin");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path left = new Path("hdfs://localhost:9000/mr/in/orders1");
Path right = new Path("hdfs://localhost:9000/mr/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mr/out/join/reducejoin");
FileInputFormat.addInputPath(job, left);
FileInputFormat.addInputPath(job, right);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
执行结果
单表Join
任务描述
现有某电商的用户好友数据文件,名为 buyer1,buyer1中包含(buyer_id,friends_id)两个字段,内容是以"\t"分隔,编写MapReduce进行单表连接,查询出用户的间接好友关系。例如:10001的好友是10002,而10002的好友是10005,那么10001和10005就是间接好友关系。数据内容如下:
buyer1(buyer_id,friends_id)
10001 10002
10002 10005
10003 10002
10004 10006
10005 10007
10006 10022
10007 10032
10009 10006
10010 10005
10011 10013
流程分析
单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,需要:
- 在map阶段将读入数据分割成buyer_id和friends_id之后,会将buyer_id设置成key,friends_id设置成value,直接输出并将其作为左表;
- 再将同一对buyer_id和friends_id中的friends_id设置成key,buyer_id设置成value进行输出,作为右表。
- 为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。
- reduce接收到连接的结果,其中每个key的value-list就包含了"buyer_id friends_id--friends_id buyer_id"关系。
- 取出每个key的value-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组遍历输出就是最后的结果了。
代码
package MapReduce.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
// buyer1(buyer_id,friends_id),内容以"\t"分隔
// 查询出用户的间接好友关系
// 例如:10001的好友是10002,而10002的好友是10005,那么10001和10005就是间接好友关系。
public class SingleTableJoin_1 {
public static class Map extends Mapper<Object,Text,Text,Text>{
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
String line = value.toString();
String[] arr = line.split("\t");
String mapkey=arr[0];//buyer_id
String mapvalue=arr[1];//friends_id
context.write(new Text(mapkey),new Text("1+"+mapvalue));//buyer_id,1+friends_id
context.write(new Text(mapvalue),new Text("2+"+mapkey));//friends_id,2+buyer_id
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>{
private static Text myKey=new Text();
private static Text myValue=new Text();
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
List<String> buyers=new ArrayList<String>();
List <String> friends=new ArrayList <String>();
for(Text val:values){
String str=val.toString();
if(str.startsWith("1+"))
buyers.add(str.substring(2));
else if(str.startsWith("2+"))
friends.add(str.substring(2));
}
for(String b : buyers){
for(String f : friends){
myKey.set(b);
myValue.set(f);
context.write(myKey, myValue);
}
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
String[] otherArgs=new String[2];
otherArgs[0]="hdfs://localhost:9000/mr/in/buyer1";
otherArgs[1]="hdfs://localhost:9000/mr/out/join/singletable_1";
Job job=new Job(conf," SingleTableJoin_1");
job.setJarByClass(SingleTableJoin_1.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}