MapReduce优化Combiner和Partitioner
概述
Combiner(Map阶段对MapTask进行局部汇总)
- Combiner是MapReduce程序中Mapper和Reducer之外的一种组件。
- Combiner组件的父类是Reducer。Combiner extends Reducer。
Combiner和Reducer的区别在于运行的位置:
1
2Combiner是在每个MapTask所在的节点运行
Reducer是接收全局所有Mapper的输出结果Combiner的意义是对 每一个MapTask的输出进行局部汇总 ,以减少 网络传输量。
1
2
3具体的实现步骤:
1.自定义一个Combiner继承Reducer,重写reduce方法
2.在job中设置:job.setCombinerClass(xxxxCombiner.class).Combiner能够应用的前提是不能影响最终的业务逻辑,Combiner的输出Key-Value需要与Reducer的输入Key-Value类型对应。
Partitioner(分发到的Reduce分区,可自定义分区)
MapReduce会将map输出的Key-Value对,按照相同key分组,然后分发给不同的ReduceTask,默认分发规则为:按key的hashcode%reducetask数来分发。
1
2
3public int getPartition(K2 key, V2 value,int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}如果需要按照我们自己的需求进行分组,则需要改写数据分发组件Partitioner自定义Partitioner继承抽象类:Partitioner,重写getPartition方法。
1
2
3
4
5
6class MyPartitioner extends Partitioner{
@Override
public int getPartition(Text key,xxx value,int numPartitions){
}
}然后在job中设置自定义partitioner:
1
job.setPartitionerClass(xxxxPartitioner.class);
代码实现
- 自定义Combiner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24/**
* @author afeng
* @date 2018/11/19 17:43
* <p>
* Combiner是在map阶段对MapTask任务进行局部汇总
**/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>
{
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable value : values)
{
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}
Combiner的优点
- 对每一个MapTask进行局部汇总,减少网络IO传输数量。
1
2
3
4
5
6
7
8一个Map任务执行完毕,Key-Value为:
<hadoop,1> <hadoop,1> <hadoop,1> <hadoop,1>
如果不用Combiner,传给Reducer的则是:
<hadoop,1> <hadoop,1> <hadoop,1> <hadoop,1>
用了Combiner,则Map阶段输出的是:
<hadoop,4> //显然网络传输的数据量减少了