MapReduce过程
1.MapReduce概述
MapReduce是一个分布式运算程序的编程框架,是用户开发的 “基于Hadoop的数据分析应用” 的核心框架.
MapReduce核心功能是将用户编写的业务逻辑代码和自带的默认组件整合成一个完整的分布式运算程序,并在Hadoop集群上运行.
2.MapReduce框架的结构及核心运行机制
一个完整的MapReduce程序在分布式运行时有三类实例进程:
- MRAppMaster: 负责整个程序的过程 调度及 状态 协调 .
- MapTask:负责map阶段的整个数据处理流程.
- ReduceTask:负责reduce阶段的整个数据处理流程.
1
2
3
4
5
6
Map阶段为每个数据块分配一个Map计算任务; //<word,1>
然后将所有map输出的key进行合并; //<word,<1,1,1,1,1>>
相同的Key及对应的Value发送给同一个Reduce任务去处理. //word 5
2.1 完整流程
2.2 流程解析
- 一个mr程序启动时,最先启动MRAppMaster, MRAppMaster 启动后根据本次job的描述信息,计算出需要的MapTask实例的数量,然后向集群申请机器启动相应数量的MapTask进程.
MapTask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:
1
2
3a) 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对
b) 将输入KV对传递给MR程序定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存
c) 将缓存中的KV对安装K( __Key__ )分区排序后不断 __溢写__ 到磁盘文件MRAppMaster监控到所有MapTask进程任务完成之后,会根据MR程序指定的参数启动相应数量的ReduceTask(setReduceTaskNum)进程,并告知reducetask进程要处理的数据范围(指定分区Partitioner,默认是Hash).
- Reducetask进程启动后,根据MRAppMaster告知待处理数据所在位置,从若干台MapTask运行所在机器上获取若干个MapTask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一组,用MR程序定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,让后调用客户指定的outputformat将结果数据输出到外部存储.
2.3 MapTask并行度决定机制
一个job的map阶段并行度由客户端在提交job时决定;
而客户端对map阶段并行度的规划的基本逻辑为:
1 | 将待处理数据执行逻辑切片(即按照一个[特定切片]大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个MapTask并行实例处理. |
这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成,其过程如下图:
1 | FileInputFormat切片机制默认块为128M |
2.4 ReduceTask并行度决定机制
ReduceTask的并行度与MapTask的并行度决定不同,ReduceTask数量的决定是可以直接手动设置:
1 | //默认值是1,手动设置为4 |
如果数据分配不均匀,就可能在reduce阶段产生数据 _倾斜_.
2.5 MapReduce编程
1 | 1)三个部分:Mapper Reducer Driver(可以提交运行mr程序的客户端) |
2.6 MapReduce中的Combiner
- Combiner是MR程序中的Mapper和Reducer之外的一种组件
- Combiner的父类是Reducer
Combiner和Reducer区别于运行的位置
1
2Combiner是在每一个MapTask所在的节点运行
Reducer是接收全局所有MapTask的输出结果Combiner的意义对每一个MapTask的输出结果局部汇总,减少网络传输
- Combiner应用的前提是不能影响业务逻辑,Key-Value需与Reducer的Key-Value对应
3. MapReduce中的 Shuffle 机制(介于Map任务输出与Reduce任务输入之间)
- Shuffle:在MapReduce中,Map阶段处理的数据如何传递给Reduce阶段.是MapReduce框架中最关键的一个流程,这个流程就是shuffle.
- Shuffle:(核心机制: 数据分区 排序 缓存 )
- Shuffle就是将 MapTask输出的处理结果数据 ,分发给ReduceTask,并在分发的过程中,对数据按 key 进行分区和排序.
3.1 Shuffle主要流程
1 | 1. 分区partition |
3.2 详细流程
- MapTask收集map()方法中输出的Key-Value对,放入内存缓冲区中
- 从内存缓冲区不断 溢出本地磁盘文件 ,可能会溢出多个文件
- 多个溢出文件会被合并成大的溢出文件
- 在溢出及合并过程中,都要调用partitioner进行分组和针对key进行排序
- ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
- ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
- 合并成大文件后,Shuffle的阶段完成,后面进入ReduceTask的逻辑运算.(具体逻辑运算过程)
1
从文件中取出一个一个键值对,调用用户自定义的reduce()方法
Shuffle中的缓冲区会影响到MapReduce程序的执行效率,缓冲区越大,则磁盘的IO次数越少,执行速度则越快.
缓冲区大小可以调整,参数:io.sort.mb 默认为 100M.