MapReduce[1]分布式计算框架作为Hadoop生态中的一个重要组成部分, 其核心思想是将大数据的处理过程抽象为Map (映射)和Reduce (归约/化简)两个阶段, 通过资源管理框架YARN来进行资源管理与作业调度, 实现对底层HDFS[2, 3]文件系统中存储的大规模离线数据进行分析计算, 目前MapReduce被广泛应用于工业界的多种海量数据分析场景[4, 5].
另一方面, 随着数据的爆发式增长[6], 数据中心的存储设备开销逐渐成为不可忽视的一部分[7]. 因此, 越来越多主流的分布式存储系统逐渐引入纠删码机制[8-11]作为多副本容错机制的一种替代, 以在保障数据可靠性的同时进一步降低系统的存储开销. 当前HDFS中分别实现了上述两种容错机制, 其数据组织方式如图1所示, HDFS的数据管理以Block (块)为单位, 将用户提交的文件划分为一个个Block, 分散存储于各个DataNode (用于存放数据的从节点). 在副本存储模式下, HDFS按照Block的粒度对原始数据进行分割, 依照系统配置的数据块备份数量(默认为3)和容单机架故障的数据块随机放置策略将数据块存放在不同的节点以满足节点以及机架级别的故障, 如图1(a)为3副本存储模式下HDFS中的数据存储情况. 而在纠删码存储模式下, 为了实现更好的在线编码性能和更少的存储开销, HDFS在Block的数据划分逻辑上, 将原始数据拆分为了更细粒度的Cell单元, 默认为1 MB大小. 在进行编码操作时, 原始文件中连续的一部分将会按Cell进行划分和编码, 并随机分散放置在各个节点以保障机架级别的故障容错. 例如在图1(b)中原始文件的前 6 MB通过RS-(6, 3)编码生成3个Parity Cell, 分别放在DataNode 0–8这9个节点, 在外层, HDFS仍然维护Block的划分逻辑, 当原始文件填满一组Block时, 则开始进行下一个条带的写入.
MapReduce所处理的离线数据具有数据量大, 更新频率低等特性. 在Map阶段, 系统首先对原始数据进行分片(Split)从而确定Map任务的数量, 使每个任务都能处理原始数据集中连续的一部分内容. 资源管理框架YARN将根据集群各节点的配置文件, 以container为资源分配的基础单元管理各节点上的硬件资源(CPU、内存等). 每个Map任务或Reduce任务都需要YARN为其分配container单元来进行计算.
如前文所述, 无论是原始数据还是经过计算后得到的输出结果, 底层HDFS采取纠删码机制能够成倍节省系统的存储占用. 但与此同时, 由于底层存储模式的改变, 以及异构环境中YARN资源管理和作业调度策略本身存在的不足, 使得在多作业并发场景下, MapReduce作业的运行效率面临如下3个问题.
(1) 多种数据存储模式共存引起不同节点的数据访问热度倾斜. 在副本模式下, 如图1(a)所示, MapReduce作业在进行数据分片时, 可以通过将分片大小和Block大小保持一致, 并在YARN进行任务分配时优先将任务分配到其对应数据的存放节点, 从而避免一个任务涉及多个远程节点的数据, 从而最大化利用数据的本地性. 而在纠删码存储模式下, 如图1(b)所示, 由于数据被拆分为更细粒度的存储单元, 因此一个Map任务往往需要从多个节点获取所需数据, 因此掉队节点(straggler)将会引发长尾任务影响MapReduce作业的运行效率. 此外, 在多作业并发的场景中, 由于不同作业处理的数据可能采用不同的存储模式, 因此集群中各节点的数据访问热度也会存在较大差异, 从而影响MapReduce框架的数据访问和处理效率.
(2) 静态的资源管理无法适应集群不同节点的负载变化. 多作业并发场景下, YARN框架中静态的资源管理配置无法适应集群各节点动态变化的负载和本身的硬件异构. 例如CPU核心数较少, 主频较低的节点在本身CPU中高负载的情况下, 对本节点的任务并发量更加敏感. 同样地, 如果内存大小以及内存占用量不同的节点不能根据自身运行时情况决定节点上的任务并发数量, 也会造成任务运行缓慢甚至由于可用内存不足导致任务失败, 而失败任务的重试又将进一步影响 MapReduce 作业的完成速度.
(3) 作业间任务分布倾斜造成节点设备的有效利用率下降. 在多作业并发场景中, 由于作业提交存在时间差异, 在系统运行过程中存在任务的主资源需求和实际分配节点的硬件条件不匹配的情况. 例如在CPU、内存等硬件性能存在差异的异构集群中, 先后向系统提交A、B两个大规模的MapReduce作业, 其中A作业对内存需求更加敏感, B作业对CPU需求更加敏感, A的部分任务在内存性能较好的节点上率先完成, 此时系统会将B作业中的部分任务分配到这些空闲节点上, 而如果这些节点中存在CPU硬件性能较差的情况, 将会造成B作业中部分任务的长尾效应, 影响作业完成时间.
针对上述3个问题, 本文从单个MapReduce作业和多MapReduce作业并发两个层面, 对当前Hadoop系统进行了对应优化, 主要贡献如下.
(1) 对于单个MapReduce作业, 通过修改当前HDFS的默认的随机数据放置策略和YARN的默认的根据配置文件静态确定container数量的资源管理方式, 提出HDPA纠删码数据放置策略和DTAA任务分配策略, 通过对异构集群中各节点的硬件信息和实时负载进行周期性采集, 以计算出合适的纠删码条带放置位置和MapReduce作业执行时各节点合适的任务并发量.
(2) 对于多作业并发场景, 提出动态平衡的公平作业调度方案DB-Fair, 用来确定作业队列中各作业的计算资源分配和作业内任务的优先级划分, 并进行合适的任务分配位置选取.
通过上述两个层面的方案设计, 优化了前面所提到的3个问题. 实验结果表明, 本文提出的DB-Fair调度策略能够有效提升Hadoop MapReduce框架的处理效率, 此外在用户感知, 资源有效利用和集群负载均衡性等方面均要优于YARN的默认调度策略.
1 相关工作 1.1 研究现状目前, 已有的MapReduce性能优化设计主要可以分为两类: 一类是从数据布局的角度出发, 通过更改HDFS中的数据放置, 优化MapReduce的数据访问效率. 另一类是从集群计算环境角度考虑, 通过对异构集群环境中的节点性能进行建模, 并分析各类MapReduce作业的特性以调整MapReduce作业中任务的分配位置, 从而提升MapReduce框架的计算效率. 在进行作业调度优化设计时, 不同学者聚焦的优化阶段也不尽相同, 同时针对具体场景, 在保证作业处理效率的同时, 部分学者也在考虑安全性、能耗控制等其他需求.
Wang等人[12]提出了一种基于数据局部性感知的放置策略, 该方案通过对集群数据的访问日志进行分析, 从而发掘数据本身的局部性和关联性, 在副本方式下实现局部关联数据在集群中的均匀分布, 在MapReduce作业运行过程中达到数据本地性最大化利用的效果. Bawankule等人[13]提出了基于历史数据的Reduce任务调度算法HDRTS, 通过分析作业历史信息来确定每个节点的节点平均响应时间, 用来刻画异构集群节点的计算性能, 从而优化作业在Reduce阶段的计算效率. Jeyaraj等人[14]则是把优化MapReduce作业效率的目光聚焦于Map至Shuffle阶段之间, 提出了一种TSMJS调度方案, 其核心思想是对同一节点上的本地Map任务生成的中间数据进行预合并, 以减少Reduce阶段需要获取和合并的数据量, 从而缩短作业完成时间. Dai等人[15]基于真实场景下大量的MapReduce作业中Map任务相较于 Reduce任务的数量更多[16]的特征, 将优化重点放在了对Map任务的合理调度, 提出了一种具有动态优先级的多级队列调度策略, 其核心思想是在副本存储模式下, 优先在各节点上运行数据存放在本地的Map任务, 并将作业对应的Reduce任务尽可能分配在离对应Map任务生成的中间结果较近的物理节点上, 以达到尽可能少的节点间通信. Maleki等人[17]聚焦于云服务场景中, MapReduce框架进行作业处理时面临的安全性保障和任务处理效率两方面的需求, 提出了SPO作业调度模型, 其核心思想是利用MapReduce本身两阶段的数据处理模式, 将当前系统内全部作业的Map任务和Reduce任务与集群计算资源进行映射, 使用HEFT算法在给定的安全性约束条件下进行任务级分配与调度, 从而尽可能提升集群中的Map任务或Reduce任务的处理效率. Chen等人[18]则是聚焦于集群能耗和MapReduce性能的权衡, 在异构Hadoop集群中通过调度时间、能量消耗和执行成本来衡量一个机架合适的container数量, 而后在机架层面考虑多作业并发时的作业间资源分配, 提高集群的计算资源利用率并降低跨机架的数据传输和能耗.
1.2 存在的不足根据上述相关工作可知, 国内外学者们通过刻画异构环境下的节点性能、调整数据放置、分析不同作业类型的资源需求偏好等各种方式来辅助MapReduce作业调度上的优化. 但基于底层采用纠删码存储模式的MapReduce作业优化的研究相对较少, 目前学界对纠删码存储的优化工作主要还是聚焦于如何提升存储侧本身的性能, 例如通过编码设计在保证低存储开销的前提下提升纠删码存储系统的数据修复性能[19, 20], 引入新型硬件设备[21, 22]或调整编解码流程[23-25], 更改数据存储模式[26]或数据更新、修复机制[27, 28]等方式来提升系统的数据更新和修复性能. 正如前文所述, 异构环境中, 当前HDFS的条带式纠删码在随机的数据放置下, 上层MapReduce作业的数据访问模式由过去的一对一变为了一对多, 在作业数据量大、任务高并发的场景下, 节点的数据访问热度差异和节点的计算负载偏斜将会大大降低MapReduce作业的运行效率. 此外, 相较于部分先前学者的工作, 本文考虑的混合存储模式和异构集群也更具有普遍性. 例如Hashem等人[29]虽然考虑到异构环境对MapReduce作业的影响, 但给出的调度优化仅针对单个作业的运行效率, 且未考虑底层存储模式的影响. Jiang等人[30]虽然提出了一种可有效缩短MapReduce作业运行时间的在线调度器设计, 但也仅考虑了同构的Hadoop集群场景. Naik等人[31]虽然考虑了数据存储对异构集群中的MapReduce性能的影响, 但仍聚焦于单一的副本存储模式, 在其设计的作业调度模式中, 最大化利用数据本地性的策略无法应用于底层存储采用纠删码或混合存储模式的场景. Darrous[32]引出了同构场景中单纠删码模式下由于数据放置的局部不均衡所产生的问题, 并给出了初步的解决方案, 但在更一般的场景中, 该方法并不能起到很好的优化作用.
1.3 混合存储模式下MapReduce作业优化的基本思路真实场景下, 数据在集群中的存储模式往往是副本和多种条带宽度的纠删码混合存储. 因此, 为了使上层的计算应用更好地适应混合存储模式, 降低Map-Reduce作业中任务的长尾效应, 需要对当前异构环境中的各节点的数据访问负载(主要是存储、网络资源) 和作业计算负载 (主要是 CPU、内存资源) 进行均衡. 而在多作业并发场景中, 如何保证各作业具有良好的执行效率也是需要解决的一个问题. 基于上述挑战, 本文通过两层设计, 在单个MapReduce作业运行时设计合理的作业内的数据放置和任务分配, 并在多作业并发场景下提出一种动态调节的作业调度方案, 实现对MapReduce框架计算效率的提升.
2 作业友好的数据放置和任务分配方案本节将从优化当前Hadoop系统中默认的数据放置策略和资源管理策略等方面出发, 提出单MapReduce作业场景下的性能优化方案, 系统总体设计如图2所示. 图2中直观展示了优化方案的设计思路. 根据第1节对当前MapReduce作业在混合存储模式下面临的问题的分析和总结, 本文分别对存储侧HDFS文件系统和计算侧YARN框架进行部分工作流程的修改, 以实现对作业的数据访问效率和数据计算效率两方面的优化.
图2中的上半部分展示了本文对HDFS存储系统的修改设计: 考虑到异构场景中不同节点的磁盘、网络和后台数据访问负载存在区别, 因此, 通过在集群各数据存储节点中实现节点硬件信息采集以及数据访问历史负载分析的机制(对应图2中Devices info模块), 对存储节点上的数据访问性能进行分析. 主节点则可根据各存储节点周期性更新汇报的数据访问性能情况, 按数据访问性能对节点进行分组, 在图2中以不同颜色进行节点组区分, 相同节点组的节点间数据访问性能接近. 这样, 在向系统中写入以纠删码模式存储的数据时, 可以通过修改主节点的数据放置策略(对应图中主节点的HDPA数据放置), 将同一条带尽可能放置在同一节点组内(对应图中灰色的Stripe条带). 这样可以尽可能避免MapReduce作业的任务读取条带数据时, 由于节点数据访问性能差异过大产生掉队节点, 从而带来任务获取数据的效率下降问题.
图2中的下半部分展示了本文对YARN计算框架的修改设计: 与HDFS中类似, 考虑异构场景中不同节点存在CPU、内存等硬件设备性能差异和实时后台负载占用差异, 本文在每个计算节点中加入对应的计算设备信息统计机制(对应图2中的Perf counter模块), 根据周期性更新的负载统计情况动态调节节点上的container计算单元数量并汇报给主节点, 以实现各节点上对任务并发度的动态控制(对应图2中主节点的DTAA任务分配), 均衡计算负载, 避免由于部分节点上严重的计算资源竞争产生长尾任务, 从而影响作业的计算效率.
2.1 数据放置的算法设计针对异构集群环境, 首先应对各节点影响HDFS数据访问的关键硬件参数进行统计, 而后根据各节点相关硬件的历史负载进行分析, 为各节点设备统计出一个常量负载因子, 基于上述两点对集群节点进行性能分组, 使同一个条带的数据块尽可能放置在同一个节点组内.
(1) 节点划分
由MapReduce的数据访问模式可知, 影响Map任务数据读取的因素主要有: 数据存放节点的磁盘顺序读写性能、网络上行带宽以及任务执行节点的网络下行带宽等. 本文基于影响任务数据读取的节点数据传输性能将所有节点进行分组, 数据传输性能将结合节点本身的硬件参数以及历史负载来衡量. 通过访问节点日志, 对节点相关硬件设备较长一段时间的历史负载进行分析并据此得出节点的长期后台数据访问负载情况, 用常量化的负载因子
在获取到集群中各节点的数据传输性能情况后, 确定节点分组的阈值
$ \begin{gathered} Nod{e_{{\rm{up}}}} = \min ({R_{{\rm{disk}}}} - {\alpha _{{\rm{diskIO}}}}, {{NetUP}} - {\alpha _{{{NetUP}}}}) \\ Nod{e_{{\rm{down}}}} = \min ({W_{{\rm{disk}}}} - {\alpha _{{\rm{diskIO}}}}, {{NetDown}} - {\alpha _{{{NetDown}}}}) \\ G = \frac{{\max (Nod{e_{{\rm{up}}}}, Nod{e_{{\rm{down}}}})}}{\theta } \\ \end{gathered} $ | (1) |
其中,
(2) 数据放置
在为条带选取放置位置时, 系统的可靠性要求也是必须满足的一个条件, 纠删码条带的放置要满足节点、机架等多级别的故障容错. 而在真实环境中单磁盘、节点以及单机架故障这类情况占到绝大多数[33], 因此, 为了更容易找到合适的条带存储节点, 同时尽量减少MapReduce作业运行造成的跨机架网络流量, 在符合实际需求的前提下, 保证单个机架的故障容错是足够的. 与此同时为了保证各节点在正常状态下的数据访问性能, 在选取条带放置位置时对条带的数据块和校验块进行区分, 并且在保证每个节点上数据块与校验块的相对均衡. 当集群同一节点组内无法找到足够存放完整条带的节点时, 可以考虑拆分条带, 将条带的数据块部分和校验块部分分别放置在不同的节点组中.
基于上述纠删码存储模式下的条带放置思想, 本文提出一种改进的数据放置策略(heterogeneous-aware data placement algorithm, HDPA), 用来替代当前HDFS中默认的数据放置算法.
算法1. HDPA数据放置策略
Input: 写入文件
11. for NGi in Node_Group do:12. 布尔变量place_flag 判断条带St能否写入节点组NGi13. if place_flag = true then:14. 选取节点组NGi中合适的存储节点存储条带15. 写入条带St16. 刷新节点组数据写入的优先级17. else:18. 布尔变量place_flag 判断条带St数据块部分能否写入节点组NGi19. if place_flag = true then:20. 选取节点组NGi中合适的存储节点存储条带数据块21. 刷新节点组数据写入的优先级, 剔除为数据块分配的节点组22. 在剩余节点组中选取合适节点组存储条带校验块23. 写入条带St24. 刷新节点组数据写入的优先级25. endif26. endif27. end28. end
如算法1中所示, HDPA策略应用于HDFS存储系统中, 主要分为两部分: (1) 主节点周期性更新集群存储节点分组信息; (2) 文件写入时的数据放置位置选取.
首先是主节点周期性更新节点分组信息, 对应于算法1第1行的初始化, 主节点周期性初始化节点分组信息(创建节点分组集合
当有数据写入系统时, 对应于算法第2行的初始化, 主节点可以根据写入文件的大小、采取的存储策略来计算出将要写入的条带数量, 以便接下来为每一个纠删码条带选取放置位置(对应算法1第10–28行的循环体). 在为每一个条带选取放置位置时, 首先按当前确定的节点组优先级进行遍历, 检查节点组能否放置当前条带, 如果当前节点组能够存放整个条带则进行存放并更新节点组优先级(对应算法1第13–16行). 否则进一步考虑将条带的数据块和校验块部分拆分存储(对应算法1第17–26行的条件分支), 首先判断当前遍历的节点组能否容纳条带的数据块部分(算法1第18行), 如果当前节点组也无法容纳条带的数据块部分, 则继续遍历下一个节点组(对应于算法1第19行条件判断为假). 如果可以容纳, 则确定下来了条带的数据块部分存放位置, 接下来将存放条带数据块的该节点组从遍历节点组列表中剔除出去, 并重新刷新节点组优先级(算法1第21行), 以相同的流程为条带的校验块部分选取存放的节点组(算法1第22行). 直到条带的数据块和校验块均找到放置位置, 进行条带写入并结束当前条带选址流程. 依照上述流程直至所有条带写入完成.
通过上述HDPA算法, 在保证了HDFS集群正常读写性能和容错需求的同时, 进一步提高了系统在MapReduce这一应用场景下的数据读取性能.
2.2 单个 MapReduce作业的任务分配针对YARN框架当前采用的静态配置和集群的异构环境和负载变化不协调的问题, 在进行MapReduce作业的任务分配中, 需要解决如下两方面的问题.
(1) 计算资源的动态限制
主节点通过各从节点访问其上配置文件并向主节点发送心跳包来确定集群可用的计算资源情况. 当前YARN默认根据配置文件中的用户设置, 静态计算出各节点上的container数量并无法更改. 本文在系统默认计算出的container资源数量之外, 引入“弹性可用计算单元(elastic-avail-container, EAC)”数量这一指标, 其含义是通过访问节点当前硬件负载情况, 结合硬件参数计算出在不影响节点运行效率的情况下, 节点上可用的计算资源数量, 用来实现计算资源的动态限制, EAC的计算方式如式(2)所示:
$ \begin{split} & RL{B_{{\rm{CPU}}}} = vCores \times \frac{{Nu{m_{{\rm{core}}}} - {\gamma _{{\rm{CPUload}}}}}}{{Nu{m_{{\rm{core}}}}}} \\ & RL{B_{{\rm{MEM}}}} = vMEM \times \frac{{RAM - {\gamma _{{\rm{RAMload}}}}}}{{RAM \times ME{M_{{\rm{task}}}}}} \\ & Nu{m_{{\rm{EAC}}}} = \min (RL{B_{{\rm{CPU}}}}, RL{B_{{\rm{MEM}}}}) \end{split} $ | (2) |
其中,
(2) 任务选取
在实现集群计算资源的动态限制后, 可以得知集群的两类关键信息: MapReduce应用需要处理的数据集合以及当前集群的可用计算资源. 接下来需要解决的就是任务分配问题, 存在3种情况.
1) 作业对应的任务规模小, 需要分配的任务数量小于集群EAC总量.
2) 作业对应的任务规模超过了集群当前计算出的EAC总量, 但没有超过集群默认配置的container数量.
3) 作业对应的任务规模巨大, 需要分配的任务数量大于集群默认配置的container数量, 涉及节点计算资源的分配、释放与再分配.
与HDPA类似地, 本文提出一种动态的任务分配算法(dynamic task allocation algorithm, DTAA), 用以解决上述两方面问题.
算法2. DTAA任务分配策略
Input: 提交作业
16. end17. else18. 在作业中选取
如算法2中所示, 集群各节点定期采集自身 CPU、内存等负载情况, 根据式(2)结合默认配置文件计算自己的EAC数量, 附加在向主节点发送的心跳包信息中(算法2第2–6行). 主节点获悉集群默认container数量
通过上述DTAA算法, 能够有效避免在性能较弱、负载较高的节点上分配过多任务, 从而产生长尾任务影响作业的执行效率, 而在性能较高的节点上也能充分利用节点计算能力, 避免资源空闲浪费.
3 多作业并发下的DB-Fair调度方案设计本节将站在多作业并发的角度, 分析当前Hadoop默认作业调度方案存在的不足, 并通过两个层面的优化, 来提升MapReduce作业并发场景下的性能, 系统总体设计如图3所示.
图3中展示了在多作业并发场景下, 本文对当前Hadoop系统中部分流程的优化设计. 首先方案在存储侧的HDFS中沿用图2中的HDPA数据放置策略, 在计算侧的YARN框架中也仍然采用周期性更新集群计算资源(即EAC数量)的设计. 当作业队列中有多个作业时, 考虑作业间资源分配和作业内任务分配两个层面的优化.
在进行作业间集群计算资源划分时, 主节点采用等分集群计算资源的方式为每个作业分配对应数量的container单元. 若部分作业的任务数量少于等分当前集群可用container单元后可为其分配的资源量, 则其他作业可进一步等分这些空闲的计算资源. 作业间的资源划分由图3中主节点上的DB-Fair模块来实现.
在为每个作业划分好可用的计算资源后, 由每个作业自己的管理进程(如图3中作业1的Application-Master)决定优先分配作业内的哪些任务. 考虑对HDFS存储系统的访问负载均衡, 每个作业管理进程可通过获取作业内任务处理的数据的存储情况, 并获取HDPA策略对当前节点分组情况的信息, 来选取一批在读取所需数据时, 对HDFS存储系统的访问压力均衡的任务. 如图3中所示, 作业1的管理进程选取的k1个任务, 满足对HDFS各节点的数据访问压力(具体为访问该节点上数据块的数量)和存储节点本身的数据访问性能成正比, 以实现对存储侧的读取负载均衡.
3.1 作业并发调度的优化思路在实际的异构Hadoop集群环境中, 用户往往不定时地向集群提交MapReduce作业, 因此系统中通常都是多作业并发的共享模式, 因此, 本文通过修改YARN中对共享队列中作业的调度, 和作业内任务的分配顺序两个层面去进行优化. 首先站在作业间的角度考虑, 保证资源共享的公平性和集群计算资源的负载均衡. 然后站在作业内任务分配的角度考虑, 保证作业内任务执行用时的均衡性.
(1) 各作业计算资源的分配
考虑作业提交序列
1) 引入DTAA算法中的动态资源限制机制, 周期性地确认各节点的后台负载并计算合理的优先资源分配阈值, 并且当有新作业提交时实时更新, 以重新计算为各作业分配的计算资源量.
2) 区分作业的Map任务、Reduce任务. 一般地, 在MapReduce作业中Reduce任务的数量相较于Map任务要小近乎一个量级, 并且不同作业对应的Reduce任务特征相较于Map任务差异更加明显(即Reduce任务对于各类计算资源的敏感程度相较于Map任务更高), 因此, 在多作业并发场景下, 需要对Map任务和Reduce任务进行进一步区分. 这里有两方面的考虑: 一方面, 为了降低任务分配的复杂程度, 不区分不同作业的Map任务在集群不同节点上的分配比例; 另一方面, 注意同一作业内Reduce任务在集群中均匀分布, 避免引起节点间的负载倾斜. 假设当前队列中作业为
(2) 作业内任务优先级确定
动态更新集群总计算资源可以优化异构环境和负载波动对作业执行的影响, 而当作业分配的计算资源小于作业任务数量时, 优先选取作业内的哪些任务也会对作业完成产生不同影响. 多种存储模式混合场景下, 作业并发执行将会带来节点数据访问热度差异, 影响大量的 Map任务执行效率. 这时可以考虑使用HDPA算法中的节点分组信息. 假设当前集群划分为n个节点组
基于上述优化思路, 本文提出一种资源动态平衡的公平调度算法(dynamic balanced fair scheduling algorithm, DB-Fair), 算法流程如算法3.
算法3. DB-Fair作业调度策略
Input: 作业队列
4. 刷新各节点
如算法3中所示, YARN框架中主节点统计各从的可用计算资源, 控制作业队列中的任务在当前集群中的并发度(算法3第1行初始化). 各从节点周期性更新EAC数量
通过算法3, 可以实现在底层采用混合存储模式时, 上层MapReduce计算框架在多作业并发场景下的效率提升, 在后续的实验分析中本文也将从多个角度论证方案的有效性.
4 实验结果及分析 4.1 实验环境与方案集成本文通过在一个由16台服务器组成的真实的异构集群中部署Hadoop系统, 并对HDFS、YARN以及MapReduce框架中的部分模块进行如图4所示的修改以实现本文的设计方案, 节点硬件设备如表1所示.
集群拓扑如图5所示, 16台服务节点分布在3个机架中, 每个机架内节点通过机架顶部的万兆交换机通信, 机架间通过一台万兆的汇聚交换机进行互联. 集群处于共享状态, 其上运行了多种常见的分布式系统及容器环境(Ceph, Redis, Cassandra等), 各种硬件设备具有一定的后台负载, 因此集群具有软硬件环境异构的特性.
4.2 实验设置
MapReduce框架可以进行多种场景的离线批处理应用计算, 文献[4,5,34]对近20种常见的MapReduce应用进行了运行时资源占用分析, 给出了这些MapReduce应用处理过程中的关键路径, 即资源占用和运行时间占比较高的阶段. 结合HiBench[35]中的部分作业测试集以及Hadoop自带的部分测试用例, 本文选取了11种MapReduce应用作为Benchmark的测试集合, 如表2所示. 其中有工作负载集中在Map阶段, 对计算资源需求较高的Wordcount、Histogram-Movies等应用, 有在Shuffle阶段传输大量数据使得Reduce任务对内存有更高需求的TeraSort、AdjList等应用以及迭代式计算应用Kmeans和纯计算应用MonteCarlo等. 通过设置不同类型作业的数量以及作业提交顺序、时间间隔等参数, 本文构造出一个由30个作业组成的Benchmark. 作业对应的输入数据来自真实环境或模拟生成的不同大小的数据集, 以3副本、RS-(3, 2)、RS-(6, 3) 以及 RS-(10, 4) 等不同存储模式随机存储. 实验的相关设置, 如作业的提交间隔、不同类型作业对应的数据集大小和类型等, 本文参考了以前学者[4, 5]的配置方式, 能反映真实Hadoop集群中的作业运行场景. 本文通过对比DB-Fair作业调度策略和当前Hadoop中提供的两种调度策略FIFO先进先出调度和Fair公平调度策略, 验证方案的有效性.
4.3 实验结果与分析 4.3.1 评价指标选取Benchmark的完成速度能够最直观地体现不同调度策略的性能. 因此也是实验结果中最主要的评价指标. 除此之外, 在对比不同调度策略时, 也需要站在MapReduce 框架性能、集群环境影响和用户服务质量等其他角度分析作业调度的影响. 因此, 本文在实验分析中除了对比不同调度策略下Benchmark的完成时间以外, 选取了作业任务分配、用户服务质量和节点计算资源负载等指标来验证DB-Fair 调度的有效性.
4.3.2 Benchmark完成用时
通过在异构集群和模拟后台负载的实验背景下测得了FIFO、Fair和DB-Fair这3种调度策略在给定Benchmark下的运行表现, 实验结果如图6所示. 从图6可以看出, FIFO调度策略下Benchmark的完成时间最短, 其次是DB-Fair, 相较于默认Fair调度速度提升约17%. 值得注意的是, Benchmark中的两个AdjList作业属于Shuffle-Reduce Heavy作业[5], 在3类调度策略中其Shuffle、Reduce 阶段占据了相当长的运行时间. 由于FIFO按作业提交先后顺序优先为先提交的作业分配足够的计算资源, 因此在Benchmark设置的作业提交顺序下, FIFO调度能够尽可能快地处理完AdjList的全部Map任务并为AdjList的全部Reduce任务分配所需container计算资源, 剩余大部分集群计算资源则可以分配给后面的作业, 这种Overlapping使得AdjList作业没有拖慢整个Benchmark的完成. 而在Fair、DB-Fair调度策略下AdjList作业的Map、Reduce任务需要和其他作业进行集群资源共享, 因此在作业调度后期集群中只剩下部分AdjList的Reduce作业正在处理, 拖慢了Benchmark的完成时间. 但是由于FIFO这种先来先服务原则对短作业以及较晚提交作业的处理并不友好, 本文将在第4.3.3节从用户感知的角度进行实验结果分析, 论证FIFO的不足. 而本文提出的DB-Fair, 相较于默认的Fair调度策略有着较为明显的性能提升, 并且避免了FIFO调度存在的问题, 实验结果证明了方案的有效性.
4.3.3 任务分布的均衡性分析进一步, 本文对比了Benchmark在Fair、DB-Fair调度模式下运行过程中, 某个Histogram-Movies作业对应在两种调度策略下的任务分布情况. 值得注意的是, 由于DB-Fair动态地调整各计算节点上的container数量配置, 因此任务分布的均衡性不能简单以作业对应任务在各节点上分布的数量差异来评估. 本文以不同时刻作业对各节点算力占用的比例, 即不同时刻作业在各节点上分配的任务数量除以各节点对应时刻的EA数量, 来刻画任务的分配情况, 实验结果如图7所示.
为了便于分析实验结果, 本文选取了3个具有代表性的节点, 观察作业中对应任务的分布情况. 从图7中可以明显看出作业在Fair调度策略下存在严重的长尾现象: 图7(a) 的node3上, 作业中部分任务对节点计算资源的占用超过了150 s, 这也意味着作业的完成时间要更久. 此外从图7(a)也可以看出, Fair调度策略下作业对不同节点的计算资源占用比例差异较大, 体现出本文最开始提到的任务偏斜效应. 而在DB-Fair调度策略下, 如图7(b) 所示, 任务在各节点上的完成用时和对节点计算资源的占用比例都体现出相较于Fair调度更加均衡的结果, 最高和最低节点算力占用的区间从10%–40%收窄为17.5%–24%, 该作业的完成用时缩短了57.9%, 验证了本文提出的方案在均衡作业内任务分布上的有效性.
4.3.4 用户感知分析
接下来本文将从用户感知的角度分析3种调度策略的服务性能, 站在作业提交者角度, 作业的完成速度是其唯一的评价指标. 本文以单个作业独占YARN集群在HDPA+DTAA策略下的运行时间为基准
4.3.5 集群负载均衡性分析
本节将对不同调度策略下集群各节点的负载情况进行分析. 本文统计了Benchmark运行过程中集群各节点的CPU负载和内存占用的变化情况, 并展示了在3种调度策略下CPU、内存负载差异最大的两个节点的具体负载信息, 实验结果如图9, 图10所示.
从图9, 图10可以看出, 在实验的统一设置下, 模拟集群后台负载时, Benchmark开始运行前, 集群中CPU负载差异最大的两个节点的后台负载分别为70%和5%, 内存占用差异最大的两个节点的后台负载分别为60%和10%. 而正如前文所述, FIFO、Fair这两种默认策略下YARN未考虑集群异构和运行时负载, 静态的计算资源划分和任务分配方式加剧了高负载节点的资源占用, 并且没有很好地利用低负载节点上的计算资源, 如在图9中, Benchmark运行在FIFO、Fair调度策略下, 集群CPU负载最高的节点长期处于100%占用, 而CPU负载最低的节点则在20%–60%之间波动. 与之对应的, 在本文提出的DB-Fair 调度策略下, Benchmark运行期间CPU负载最高的节点虽然峰值也接近100%, 但总体在90%–100%之间波动, 而CPU负载最低的节点的CPU利用率也得到了提升, 在40%–80%之间波动. 说明DB-Fair 调度策略在一定程度上缓和了集群不同节点上的CPU负载差异. 同样地, 如图10中所示, 在FIFO、Fair调度策略下, Benchmark运行期间内存占用最高的节点峰值分别达到了98.8%和93.1%, 内存占用最低的节点则长期处于40%以下. 与之对应的, 在DB-Fair调度下, 高负载节点的内存占用维持在70%–80%, 低负载节点的内存占用也在50%上下波动, 实现了一定程度上的负载均衡.
5 结论与展望本文分析了当前异构集群环境下, 底层存储系统采用混合存储模式时上层MapReduce运行过程中面临的问题. 通过设计合适的数据放置、资源管理和作业调度策略, 实现了多作业场景下的MapReduce计算效率的优化. 实验结果表明, 相较于Hadoop默认的调度策略, 本文的方案能在给定的Benchmark中提升约17% 的性能, 并且在用户感知、资源有效利用和集群负载均衡性等方面均要优于默认调度策略. 此外还可将本文提出的方案部署在更大规模、异构情况更加复杂的集群环境中, 去验证方案的有效性并发掘新的问题. 当然, 本文仍然存在进一步的改进空间, 如对集群节点存储性能进行更细粒度的刻画, 细分不同类型MapReduce 作业的特性等.
[1] |
Dean J, Ghemawat S. MapReduce: Simplified data processing on large clusters. Communications of the ACM, 2008, 51(1): 107-113. DOI:10.1145/1327452.1327492 |
[2] |
徐鹏. Hadoop 2. X HDFS源码剖析. 北京: 电子工业出版社, 2016. 1–25.
|
[3] |
Apache. HDFS architecture. https://hadoop.apache.org/docs/r3.3.1/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html. (2021-06-15)[2022-08-03].
|
[4] |
Chen YP, Ganapathi A, Griffith R, et al. The case for evaluating mapreduce performance using workload suites. Proceedings of the 19th IEEE Annual International Symposium on Modelling, Analysis, and Simulation of Computer and Telecommunication Systems. Singapore: IEEE, 2011. 390–399.
|
[5] |
Ahmad F, Chakradhar ST, Raghunathan A, et al. Tarazu: Optimizing mapReduce on heterogeneous clusters. ACM SIGARCH Computer Architecture News, 2012, 40(1): 61-74. DOI:10.1145/2189750.2150984 |
[6] |
IDC. The digitization of the world from edge to core. https://www.seagate.com/files/www-content/our-story/trends/files/idc-seagate-dataage-whitepaper.pdf. (2018-12-04)[2022-08-03].
|
[7] |
CAICT中国信通院. 数据中心产业图谱研究报告. http://www.caict.ac.cn/kxyj/qwfb/ztbg/202201/P020220125529907466991.pdf. (2022-01-26)[2022-08-03].
|
[8] |
Wikipedia. Era sure code. https://en.wikipedia.org/wiki/Era sure_code. (2022-06-26).
|
[9] |
Ceph. Ceph documentation. https://docs.ceph.com/en/latest/rados/operations/erasure-code/. (2019-04-23)[2022-08-03].
|
[10] |
Huang C, Simitci H, Xu YK, et al. Erasure coding in Windows azure storage. Proceedings of the 2012 USENIX Conference on Annual Technical Conference. Boston: USENIX Association, 2012. 15–26.
|
[11] |
Muralidhar S, Lloyd W, Roy S, et al. F4: Facebook’s warm BLOB storage system. Proceedings of the 11th USENIX Symposium on Operating Systems Design and Implementation. Broomfield: USENIX Association, 2014. 383–398.
|
[12] |
Wang J, Shang PJ, Yin JL. DRAW: A new data-grouping-aware data placement scheme for data intensive applications with interest locality. In: Li XL, Qiu J, eds. Cloud Computing for Data-intensive Applications. New York: Springer, 2014. 149–174.
|
[13] |
Bawankule KL, Dewang RK, Singh AK. Historical data based approach to mitigate stragglers from the Reduce phase of MapReduce in a heterogeneous Hadoop cluster. Cluster Computing, 2022, 25(5): 3193-3211. DOI:10.1007/s10586-021-03530-x |
[14] |
Jeyaraj R, Ananthanarayana VS, Paul A. MapReduce scheduler to minimize the size of intermediate data in shuffle phase. Proceedings of the 18th IEEE/ACIS International Conference on Computer and Information Science. Beijing: IEEE, 2019. 30–34.
|
[15] |
Dai XM, Bensaou B. Scheduling for response time in Hadoop MapReduce. Proceedings of the 2016 IEEE International Conference on Communications. Kuala Lumpur: IEEE, 2016. 1–6.
|
[16] |
Kavulya S, Tan JQ, Gandhi R, et al. An analysis of traces from a production MapReduce cluster. Proceedings of the 10th IEEE/ACM International Conference on Cluster, Cloud and Grid Computing. Melbourne: IEEE, 2010. 94–103.
|
[17] |
Maleki N, Rahmani AM, Conti M. SPO: A secure and performance-aware optimization for MapReduce scheduling. Journal of Network and Computer Applications, 2021, 176: 102944. DOI:10.1016/j.jnca.2020.102944 |
[18] |
Chen L, Liu ZH. Energy- and locality-efficient multi-job scheduling based on MapReduce for heterogeneous datacenter. Service Oriented Computing and Applications, 2019, 13(4): 297-308. DOI:10.1007/s11761-019-00273-x |
[19] |
Rashmi KV, Shah NB, Ramchandran K, et al. Regenerating codes for errors and erasures in distributed storage. Proceedings of the 2012 IEEE International Symposium on Information Theory. Cambridge: IEEE, 2012. 1202–1206.
|
[20] |
Chen HCH, Hu YC, Lee PPC, et al. NCCloud: A network-coding-based storage system in a cloud-of-clouds. IEEE Transactions on Computers, 2014, 63(1): 31-44. DOI:10.1109/TC.2013.167 |
[21] |
Shi HY, Lu XY. TriEC: Tripartite graph based erasure coding NIC offload. Proceedings of the International Conference for High Performance Computing, Networking, Storage and Analysis. Denver: ACM, 2019. 44.
|
[22] |
Wang F, Tang YJ, Xie YW, et al. XORInc: Optimizing data repair and update for erasure-coded systems with XOR-based in-network computation. Proceedings of the 35th Symposium on Mass Storage Systems and Technologies. Santa Clara: IEEE, 2019. 244–256.
|
[23] |
Mitra S, Panta R, Ra MR, et al. Partial-parallel-repair (PPR): A distributed technique for repairing erasure coded storage. Proceedings of the 11th European Conference on Computer Systems. London: ACM, 2016. 30.
|
[24] |
Li XL, Yang ZR, Li JH, et al. Repair pipelining for erasure-coded storage: Algorithms and evaluation. ACM Transactions on Storage, 2021, 17(2): 13. |
[25] |
Xu LL, Lyu M, Li QL, et al. SelectiveEC: Towards balanced recovery load on erasure-coded storage systems. IEEE Transactions on Parallel and Distributed Systems, 2022, 33(10): 2386-2400. DOI:10.1109/TPDS.2021.3129973 |
[26] |
Chan JCW, Ding Q, Lee PPC, et al. Parity logging with reserved space: Towards efficient updates and recovery in erasure-coded clustered storage. Proceedings of the 12th USENIX Conference on File and Storage Technologies. Santa Clara: USENIX Association, 2014. 163–176.
|
[27] |
Rashmi KV, Shah NB, Gu DK, et al. A “hitchhiker’s” guide to fast and efficient data reconstruction in erasure-coded data centers. Proceedings of the 2014 ACM Conference on SIGCOMM. Chicago: ACM, 2014. 331–342.
|
[28] |
Rawat AS, Vishwanath S, Bhowmick A, et al. Update efficient codes for distributed storage. Proceedings of the 2011 IEEE International Symposium on Information Theory. St. Petersburg: IEEE, 2011. 1457–1461.
|
[29] |
Hashem IAT, Anuar NB, Marjani M, et al. Multi-objective scheduling of MapReduce jobs in big data processing. Multimedia Tools and Applications, 2018, 77(8): 9979-9994. DOI:10.1007/s11042-017-4685-y |
[30] |
Jiang YW, Zhou P, Cheng TCE, et al. Optimal online algorithms for MapReduce scheduling on two uniform machines. Optimization Letters, 2019, 13(7): 1663-1676. DOI:10.1007/s11590-018-01384-8 |
[31] |
Naik NS, Negi A, Br TB, et al. A data locality based scheduler to enhance MapReduce performance in heterogeneous environments. Future Generation Computer Systems, 2019, 90: 423-434. DOI:10.1016/j.future.2018.07.043 |
[32] |
Darrous J. Scalable and efficient data management in distributed clouds: Service provisioning and data processing [Ph.D. thesis]. Lyon: Université de Lyon, 2019.
|
[33] |
Ford D, Labelle F, Popovici FI, et al. Availability in globally distributed storage systems. Proceedings of the 9th USENIX Symposium on Operating Systems Design and Implementation. Vancouver: USENIX Association, 2010. 61–74.
|
[34] |
Ahmad F, Chakradhar ST, Raghunathan A, et al. ShuffleWatcher: Shuffle-aware scheduling in multi-tenant MapReduce clusters. Proceedings of the 2014 USENIX Conference on Annual Technical Conference. Philadelphia: USENIX Association, 2014. 1–12.
|
[35] |
Intel. Intel-bigdata/HiBench. https://github.com/Intel-bigdata/HiBench. (2020-06-20)[2022-08-03].
|