===== hadoop工作流程描述 ===== * 如何处理一个很大很大很大的任务? * 分而治之 * 任务如何分? * mapreduce * 任务需要的资源如何管理 * yarn - 客户端提交一个任务到Resource Manager中的Application Manager - Resource Manager失败,基于zookeeper的HA,一旦master挂掉,shandby立刻变成master[[yarn容错机制|详见]] - Application Manager启动一个App master - App master向Application Manager注册自己 - App master开始分切分map(按文件和block size) - 如果App master失败,Application Manager负责再次启动App master[[yarn容错机制|详见]] - App master向Resource Manager中的 Resource Scheduler申请资源 - Resource Scheduler为每个map task返回一个nodemanager - App master 通知nodemanager 为map task启动一个container - nodemanager或container失败后,由App master 根据机制判断任务是否需要重做[[yarn容错机制|详见]] - nodemanager 启动container运行map task - map task 的输出超过阀值的话会被flush到磁盘(我们称之为“溢出文件”) - io.sort.mb 缓冲区的大小,默认为100M - io.sort.spill.percent map达到缓冲区多少比例后开始flush,默认80% - 这里设置了一个比例,为了保证在写磁盘时,map任务仍有部分内存可用。 - 在flush之前会先对结果分区,排序,combiner,尽量保证flush的量越小越好 - 当map最后一个记录完成时,可能已经有很多溢出文件了,这时需要将所有溢出文件排序,combiner。后合并成了一个已分区且已排序的数据文件(该文件还伴有一个索引文件,索引文件记录了数据文件中所有partition的起始位置、偏移量) - 分区过程是按照hash算法进行分区的,分区个数reduce个数相等 - combiner的目的是尽量减少每次写入磁盘的数据量,和下一复制阶段网络传输的数据量 - 为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了 - nodemanager 启动container运行reduce task - reduce从各个map结果中下载数据(只取自己需要的分区) - mapred.reduce.parallel.copies 参数控制reduce可以并行从几个map结果中下载相应分区 - 如果reduce端接受的数据量相当小,则直接存储在内存中 - 缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比 - 如果数据量超过了该缓冲区大小的一定比例,则对数据合并后溢写到磁盘中 - 由mapred.job.shuffle.merge.percent决定 - 随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件 - 这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作 - 直到最后一次合并操作,最后一次合并后并不会写到磁盘,而是直接交给reduce函数。现在终于明白了有些人为什么会说:排序是hadoop的灵魂。 - reduce操作完成之后写回hdfs - app Master向Application Manager注销自己 ===== 组件架构 ===== {{:pasted:20150926-132950.png}} ===== MapReduce流程 ===== {{:pasted:20151019-160401.png}}