Spark由加州大学伯克利分校AMP实验室开发, 是基于RDD的内存计算框架, 在流式计算场景中表现良好. 在Spark分布式流计算集群的实践过程中, 往往出现数据分配不均匀的现象, 某个分区的数据显著多于其它分区, 该节点的计算速度成为整个集群的性能瓶颈. 任务执行慢的节点往往导致内存溢出, 服务器CPU使用率在短时间内急剧增加.
发生数据倾斜时, 某个任务处理的数据量远大于其他分片, 从而增加了整个阶段的完成时间. 由于原始数据源的分布不均匀, 每个Reducer在分区映射过程中计算的数据量也不相同, 任务执行时间不同, 增加宽依赖阶段的延迟, 降低集群作业执行的效率. 虽然支持用户定义分区, 但真正的数据分布难以预测, 无法保证自定义分区功能的合理性和准确性. 数据倾斜问题很难规避, 为有效改善数据倾斜问题, 本文在分析和研究国内外对该问题的研究成果和实践经验的基础上, 主要做了以下工作.
1) 研究了Spark Shuffle设计和算法实现, 分析了哈希和排序两类Shuffle机制的实现过程, 深入分析在Shuffle过程发生数据倾斜的本质原因.
2) 分析了Spark流计算集群中, 发生数据倾斜常见业务场景, 以及数据倾斜问题的原因和发生过程, 提供了问题定位的方法和步骤.
3) 提出了Broadcast机制避免某些场景下Shuffle导致的数据倾斜问题的方法, 给出广播变量分发机制和算法实现. 通过Broadcast实现Join算子的实验, 验证了该方法在性能上有较大提升.
1 相关工作MapReduce框架数据倾斜解决方案包括静态和动态自适应调整. 静态自适应是事先分析数据集中键的分布特征, 选择适当的分区方法. 该方法需要总结出一套和业务相关的数据集抽象规则算法, 可以通过机器学习的方式来训练算法, 提高对各种类型数据集的适配度. 文献[1]提出的LEEN方法, 在Skew Reduce的基础上将分析过程移至Map完成后, 考虑Reduce端输入数据的公平性和数据本地性, LEEN方法在缓解慢任务和Shuffle引起的网络拥塞方面有较大的性能提升[1].
动态自适应的思想是在应用运行时, 实时检测当前各个Partition中的数据填充情况. 如果发现存在严重的数据倾斜问题, 在下一步进行调整, 以避免原始MapReduce任务之间的不能全局优化的问题. 文献[2]提出了Skew Tune方法, 推测当前任务需要完成时间, 确定最可能的慢任务[2]. 扫描慢任务要处理的输入数据, 在数据偏斜场景中性能有较好的表现. 文献[3]提出在执行流程中插入一个Intermediate Reduce(IR)的新阶段, 用于并行处理动态的中间结果, 使所有键值对都能够利用到所有的节点资源[3]. 文献[4]提出Adaptive MapReduce策略, 动态地调整每个Mapper处理的输入数据量, 在Mapper端使用一个固定大小的哈希表执行数据本地聚合[4]. 文献[5]提出了任务重分割方法, 通过监视任务的执行进度, 根据预定策略直接切割任务本身[5].
本文与上述研究成果的不同之处在于通过Broadcast机制将数据分发到计算节点中, 实现数据本地性, 从根本上避免Shuffle过程, 不需要做额外的开发和部署. 能够快速解决特定场景下的数据倾斜问题, 具有较高的实用性.
2 分区策略和Shuffle算法 2.1 分区策略分区策略(Partition)的主要作用是确定Shuffle过程中Reducer的数量, 以及Mapper侧数据应该分配给哪个Reducer. 分区程序可以间接确定RDD中的分区数和分区中的内部数据记录数. Spark内置了Hash Partition和Range Partition, 支持自定义分区器. Hash Partition的算法实现是获取Key的HashCode除以子RDD分区的数量取余. 哈希分区器易于实现并且运行速度快. 但有一明显的缺点是不关心键值的分布情况, 其散列到不同分区的概率会因数据而有差别.
Range Partition在一定程度上避免了该问题. 范围分区器根据父RDD的数据特征确定子RDD分区的边界, 通过对数据进行采样和排序, 将父RDD数据划分为M个数据块. 如果数据均匀分布, 则为每个分区提取的样本大小都很接近. 如果所有数据都分配给每个分区, 则每个分区都会提取相同的数据量, 并统计每个分区的实际数据量. 若出现数据倾斜的情况, 则对个别分区重新采样.
2.2 Hash-Based Shuffle算法Spark作为MapReduce框架的一种实现, 本质上将Mapper端输出的数据通过Partition算法, 确定发送到相应的Reducer, 该过程涉及网络传输和磁盘读写. Spark提供了哈希和排序两类Shuffle机制. ShuffleManager主要作用是提供Shuffle Writer和Shuffle Reader过程.
Hash-Based Shuffle机制的实现过程包括聚合和计算, 使用哈希表存储所有聚合数据的处理结果. 聚合和计算过程不进行排序, 分区内部的数据是无序的, 如果希望有序就需要调用排序操作. 哈希Shuffle强制要求在Map端进行聚合操作, 对于某些键值重复率不高的数据会影响效率.
如图1所示, 哈希Shuffle算法实现过程. 每个Mapper都会根据Reducer的数量创建一个相应的桶(Bucket). Mapper生成的结果将根据设置的分区算法填充到每个桶中. 当Reducer启动时, 它将根据自己的TaskId和它所依赖的MapperId, 从远程或本地Block Manager获取相应的Bucket作为Reducer的输入. Bucket是一种抽象概念, 可以对应于某个文件或文件的一部分. 哈希Shuffle算法实现中, Mapper会针对每个Reducer生成一个数据文件, 当Mapper和Reducer数量比较多的时候, 生成大量的磁盘文件.
开启consolidate优化机制后, Shuffle Writer过程中Task不是为下游Stage的每个Task创建一个文件. 每个ShuffleFileGroup对应一批磁盘文件, 磁盘文件数与下游Stage中的Task总数相同. Executor分配多少个Core就可以并行执行多少个任务, 第一批并行执行中的每一个任务都创建一个ShuffleFileGroup, 并将数据写入相应的磁盘文件. 当执行器执行下一批任务时, 复用以前存在的ShuffleFileGroup的磁盘文件, Task将数据写入现有磁盘文件. consolidate机制允许不同的任务复用同一批磁盘文件, 减少了文件数量并提高了Shuffle Writer的性能.
2.3 Sort-Based Shuffle算法如图2所示, Sort-Based Shuffle算法实现过程. 排序Shuffle相比于哈希Shuffle, 两者的Shuffle Reader过程是一致的, 区别在Shuffle Writer过程. 排序Shuffle允许Map端不进行聚合操作, 在不指定聚合操作的情况下, 排序Shuffle机制Mapper端用数据缓存区(Buffer)存储所有的数据. 对于指定聚合操作的情况下, 排序Shuffle仍然使用哈希表存储数据, 聚合过程与哈希Shuffle的基本一致. 无论是Buffer还是HashMap, 每更新一次都检查是否需要将现有的数据溢存到磁盘中, 需要对数据进行排序, 存储到一个文件中. 更新完所有数据后, 将多个文件合并为一个文件, 并确保每个分区的内部数据都是有序的.
排序Shuffle机制包括普通和bypass两种运行模式. 当Shuffle Reader Task的数量小于等于bypassMergeThreshold的值时就会启用bypass模式. 在普通操作模式中, 首先将数据写入内存数据结构, 根据不同的Shuffle运算符选择不同的数据结构. 如ReduceByKey之类的聚合运算符选择哈希数据结构, Join类的普通运算符使用数组数据结构. 在将每条数据写入内存之后, 确定是否已达到临界阈值. 如果达到临界阈值则将内存中的数据写到磁盘, 然后清空内存中的数据.
在溢出到磁盘文件之前, 根据Key对内存中已有的数据进行排序, 排序后数据将批量写入磁盘文件. 排序后的数据以每批10 000个批量写入磁盘文件, 从而有效减少磁盘IO数量. 将Task的所有数据写入内存数据结构的过程中, 会发生多次磁盘溢出操作, 生成多个临时文件, 最后合并所有先前的临时磁盘文件. 由于一个Task仅对应于一个磁盘文件, 因此将单独写入索引文件以标识文件中每个下游Task数据的开始和结束位置, 磁盘文件合并过程减少了文件数量.
在bypass模式下, 为每个下游Task创建一个临时磁盘文件, 并根据Key的HashCode写入相应的磁盘文件. 写入磁盘文件时先写入内存缓冲区, 然后在缓冲区满后溢出到磁盘文件. 它还将所有临时磁盘文件合并到一个文件中, 并创建一个单独的索引文件. 此过程的磁盘写入机制与未优化的哈希Shuffle相同, 但是Shuffle Reader性能会更好. 这种机制和普通的排序Shuffle之间的区别是磁盘写入机制不同, 不会被排序. 启用bypass机制的最大优点是在Shuffle Writer过程中不需要执行数据排序操作, 节省了部分性能开销.
3 数据倾斜问题分析和定位 3.1 数据倾斜原因分析
发生数据倾斜的原因主要包括输入数据源分布不均匀, 以及计算过程中数据拉取时间不均匀. 输入数据分布不均匀的一般表现为原始数据, 或者中间临时数据中Key分布不均匀[6]. 例如Spark Streaming通过Direct Stream读取Kafka数据. 由于Kafka的每个分区都对应于Spark的相应任务, Kafka相关主题的分区之间的数据是否平衡, 直接决定Spark在处理数据时是否会产生数据偏斜. 如果使用随机分区, 概率上分区之间的数据是平衡的, 不会生成数据倾斜. 但很多业务场景要求将具备同一特征的数据顺序消费, 需要将具有相同特征的数据放于同一个分区中. 一个典型的场景是相同的用户相关的PV信息放置在相同的分区中, 很容易导致数据倾斜.
数据拉取时间不均匀一般是硬件计算能力不均匀, 或者网络传输能力不均匀造成的[7]. 比如PageRank算子分为三个Stage运行, 由于第二个Stage产生了Shuffle是最容易发生数据倾斜, 每个Task处理分区数据绑定了各个顶点权重, 然后收集其邻接节点的权重. 由于Executor需要从非本地节点上拉取上一个Stage中得到的节点信息. 如果数据分布不均匀, 某些节点会比其他节点承受更大的网络流量和计算压力. 数据倾斜的计算时间主要花费在Shuffle上, 提高Shuffle性能有利于提高应用程序的整体性能.
3.2 数据倾斜定位方法通过Spark界面观察每个阶段任务当前分配的数据量, 进一步确定数据的不均匀分布是否导致了数据倾斜[8]. 只要在代码中看到Shuffle类操作符, 或在Spark SQL语句中看到导致Shuffle的语句, 就可以确定划分Stage的边界. Stage 1的每个任务开始运行时, 将首先执行Shuffler Reader操作, 从Stage 0的每个任务中提取需要处理的Key. 比如Stage 1在执行ReduceByKey操作符之后计算出最终RDD, 然后执行收集算子将所有数据拉到Driver中.
数据倾斜往往发生在Shuffle过程中, 可能会触发Shuffle操作的算子包括GroupByKey、ReduceByKey和AggregateByKey等. 在执行Shuffle时必须将每个节点上的相同Key拖动到同一个节点上的Task进行处理. 如果某个Key对应的数据量特别大就会发生数据倾斜, Job运行得非常缓慢, 甚至可能因为某个Task处理的数据量过大导致内存溢出.
4 广播机制避免Shuffle的策略 4.1 Torrent Broadcast原理通过广播机制将只读变量从一个节点发送到其他Executor节点, 进程内运行的任务属于同一个应用程序, 在每个执行器节点上放置广播变量可以由该节点的所有任务共享[9]. Torrent Broadcast算法的基本思想是将广播变量划分为多个数据块. 当某个执行器获得数据块时, 当前执行器被视为数据服务器节点. 随着越来越多的执行器获得数据块, 更多的数据服务器节点可用. 广播变量可以快速传播到所有节点. Torrent Broadcast读取数据的方式与读取缓存类似, 使用Block Manager自带的NIO通信方式传递数据, 存在的问题是慢启动和占内存[10]. 慢启动指的是刚开始数据只在Driver节点上, 要等执行器获取多轮数据块后, 数据服务节点才会变得可观, 后面的获取速度才会变快. 执行器在获取完所有数据分块后进行反序列化时, 需要将近两倍的内存消耗.
4.2 Broadcast变量性能优势Driver先把广播变量序列化为字节数组, 然后切割成BLOCK_SIZE大小的数据块. 在数据分区和切割之后, 数据分区元信息作为全局变量被存储在Driver节点的Block Manager中. 之后每个数据分块都做相同的操作, Block Manager Master可以被Driver和所有Executor访问到. 执行器反序列化Task时, 先询问所在的Block Manager是会否包含广播变量, 若存在就直接从本地Block Manager读取数据. 否则连接到Driver节点的Block Manager Master获取数据块的元信息.
广播机制把只读变量通过共享的方式有效的提高了集群的性能. 大多数Spark作业的性能主要消耗在Shuffle环节, 该环节包含了大量的磁盘IO、序列化、网络数据传输等操作. 通过广播机制避免Shuffle, 可显著提高应用运行速度. 例如普通Join操作会产生Shuffle, RDD中相同的Key需要通过网络拉取到同一个节点上. 在算子函数使用广播变量时, 首先会判断当前Task所执行器内存中是否有变量副本. 如果有则直接使用, 如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地执行器内存中. 广播变量保证了每个执行器内存中只驻留一份变量副本, Executor中的Task执行时共享该变量副本, 减少变量副本的数量和网络传输的性能开销, 降低了执行器内存的开销, 降低GC的频率, 会极大地提升集群性能.
4.3 Broadcast Join代码实现如图3所示, Broadcast Join算子Scala代码实现. 当在RDD上使用Join类操作或者在Spark SQL中使用联接语句时, 普通联接运算符会产生Shuffle过程, 并将相同的Key数据拉入Shuffle Reader Task进行联接操作. 如果连接操作中RDD或表的数据量相对较小, 则不使用连接运算符而是使用广播变量和映射类运算符来实现Join操作, 从而完全避免了Shuffle过程和数据歪斜. 较小的RDD中的数据通过收集操作直接拉入Driver节点的内存中, 创建广播变量. 然后从广播变量中获取较小RDD的数据, 并在另一个RDD上执行映射类运算符. 根据连接键比较当前RDD的每个数据. 如果连接键相同则将两个RDD的数据连接在一起, 实现Join操作的效果. 该方法不足在于Driver和Executor节点都要存储广播变量的全部数据, 比较消耗内存.
5 实验和分析 5.1 集群硬件环境配置
本文使用的实验环境是一个由四个节点组成的服务器集群. 集群采用主-从体系结构, 其中一个是主节点, 其他三个是从节点. Spark Job和Hadoop文件系统部署在同一节点上. 大部分的Spark作业会从外部存储系统读取输入数据, 比如Hadoop文件系统, 将其与存储系统放得越近越好. 在相同的节点上安装Spark Standalone模式集群, 并单独配置Spark和Hadoop的内存和CPU使用以避免干扰[11]. 实验服务器集群环境的硬件配置根据数据量调整, 参考官方建议初始配置如下.
单个节点服务器最初配置四个磁盘. Spark很多计算都在内存中进行, 但当数据在内存中装不下的时候, 仍然使用本地磁盘来存储数据, 以及在不同阶段之间保留中间的输出. 在实验中每个计算节点有4–8个磁盘. 集群网络最初配置为万兆网卡. 当数据在内存中时, 许多Spark应用程序都与网络密切相关. 使用万兆或更高的网络是使这些应用程序更快的比较好的方法. 分布式Reduce应用程序尤其适用, 比如group-by、reduce-by等操作.
单个节点服务器的初始内存配置为16GB. Spark应用分配75%的内存, 剩下的部分留给操作系统和缓冲区缓存. 使用Spark监视UI的Storage选项卡查看内存使用情况, 内存使用情况受存储等级和序列化格式的影响很大[12]. 单个计算节点的CPU核心数量最初配置8个Core. Spark在每台机器上可以扩展到数几十个Core. 测试阶段在每台机器上提供8–16个内核, 根据服务器负载的消耗可以配置更多.
5.2 系统配置和源数据如图4所示, 集群运行时软件版本信息, 图5是集群系统配置信息, 都是当前业界使用的稳定版本和配置. 实验数据来自于某金融机构的客户消费贷款逾期信息, 包括逾期客户还款信息, 以及催收行动信息. 选取逾期客户还款信息数据量相对较少, 10万条记录, 数据结构中包括记录ID、还款时间、还款金额、银行编码、还款卡号、客户姓名、客户ID等信息. 逾期客户机构催收行动数据量相对较大, 1000万条记录, 数据结构包括记录ID、催收行动码、行动描述、客户电话、客户关系、通话时长、客户ID、客户姓名、行动时间、催收员、催收机构等信息. 根据客户ID进行Join操作, 计算指标包括机构催收员工作量和催收员工作业绩.
5.3 实验结果分析
如图6所示, Broadcast Join算子DAG视图中不存在Shuffle, 普通Join算子DAG视图的复杂度明显高于Broadcast Join算子. 图6(a)是普通Join操作的DAG视图, 根据RDD的宽依赖关系分为三个阶段, 有向无环图描述了阶段之间的依赖关系, 当前Stage只能在父Stage之后执行. 从DAG视图清晰的看到普通Join算子存在Shuffle过程. 图6(b)是Broadcast Join算子有向无环图只有一个阶段, 逻辑过程相对简单.
如图7所示, Broadcast Join算子中Task统计数据表明性能上存在明显的优势. 图7(a)是普通Join算子某阶段Task性能统计数据, 包括Task持续时间、GC执行时间和Shuffle数据量等3个方面的统计信息. 图7(b)是Broadcast Join算子Task性能统计数据, 相比于普通Join算子的Task性能统计信息, 在3个方面都存在明显的优势.
如图8所示, Broadcast Joins算子各个Stage的磁盘读写和网络流量、任务持续时长等都存在明显的优势. 图8(a)是普通Join算子各个阶段执行情况, 共有3个阶段, 整个过程耗时6–7秒, Shuffle并行度为9, 涉及Shuffle Reader数据量61.3MB, Shuffle Writer数据量61.3 MB. 如图8(b)所示, Broadcast Join算子只有1个阶段, 没有涉及Shuffle数据读写过程, 数据输入117.4 MB, Task总数量明显小于普通Join算子的数量, 执行时间是3秒, 和普通Join算子相比在性能上有较大的提升.
如图9所示, Broadcast Joins算子不存在数据倾斜问题. 图9(a)是普通Join操作任务数据分配和执的详细数据. 从列Shuffle Read Size可以看出, 任务分配出现了数据倾斜问题, 被分配数据量较大的Task执行时间明显高于其他任务的持续时间, 消耗更大的资源和网络流量, 其他已经完成计算的节点处于等待状态. 图9(b)是Broadcast join算子任务数据分配和执行情况明细, 从持续时间列和Input Size列看出, 数据几乎是均匀分配, 8个任务的持续时间是1秒, 1个是0.9秒, 充分发挥了数据本地性特性, 每个节点的计算资源都被有效利用.
如图10所示, Broadcast Join算子在高并发的应用情况下性能上存在稳定的提升. 如图10(a)所示, 普通Join算子压测统计, 通过压测工具10万次的统计结果, 统计了平均持续时间、中位数和偏差情况. 从图中看出, 持续时间绝大部分相对集中和稳定在7秒左右. 如图10(b)所示, Broadcast join算子压测情况统计, 比较相同的统计指标存在明显的优势, 持续时间基本集中和稳定在4秒左右.
6 总结本文研究了Spark Shuffle设计和算法实现, 分析了哈希和排序两类Shuffle机制的实现过程, 深入分析在Shuffle过程发生数据倾斜的本质原因. 进一步分析了Spark流计算集群中, 发生数据倾斜常见业务场景, 分析数据倾斜问题的原因和发生过程, 提供了问题定位的方法和步骤. 提出了广播机制避免某些场景下的数据倾斜问题, 给出广播变量分发机制和算法实现. 通过Broadcast实现Join算子的实验, 相对于直接操作Join算子, 通过DAG视图、任务持续时间、Shuffle读写数据量等指标的比较和分析, 验证了广播机制在性能上有较大提升, 压力测试进一步验证了在大规模应用的情况下性能有稳定的改善.
[1] |
Chen Q, Yao JY, Xiao Z. LIBRA: Lightweight data skew mitigation in MapReduce. IEEE Transactions on Parallel and Distributed Systems, 2015, 26(9): 2520-2533. DOI:10.1109/TPDS.2014.2350972 |
[2] |
Al Hajj Hassan M, Bamha M. Towards scalability and data skew handling in GroupBy-joins using MapReduce model. Procedia Computer Science, 2015, 51: 70-79. DOI:10.1016/j.procs.2015.05.200 |
[3] |
de Oliveira PM, Allison PM, Mastorakos E. Ignition of uniform droplet-laden weakly turbulent flows following a laser spark. Combustion and Flame, 2019, 199: 387-400. DOI:10.1016/j.combustflame.2018.10.009 |
[4] |
Tang Z, Zhang XS, Li KL, et al. An intermediate data placement algorithm for load balancing in Spark computing environment. Future Generation Computer Systems, 2018, 78: 287-301. DOI:10.1016/j.future.2016.06.027 |
[5] |
Mustafa S, Elghandou I, Ismail MA. A machine learning approach for predicting execution time of spark jobs. Alexandria Engineering Journal, 2018, 57(4): 3767-3778. DOI:10.1016/j.aej.2018.03.006 |
[6] |
Tang JC, Xu M, Fu SJ, et al. A scheduling optimization technique based on reuse in spark to defend against APT attack. Tsinghua Science and Technology, 2018, 23(5): 550-560. DOI:10.26599/TST.2018.9010022 |
[7] |
Ye XM, Chen XS, Liu DH, et al. Efficient feature extraction using apache spark for network behavior anomaly detection. Tsinghua Science and Technology, 2018, 23(5): 561-573. DOI:10.26599/TST.2018.9010021 |
[8] |
廖旺坚, 黄永峰, 包从开. Spark并行计算框架的内存优化. 计算机工程与科学, 2018, 40(4): 587-593. DOI:10.3969/j.issn.1007-130X.2018.04.003 |
[9] |
卞琛, 于炯, 英昌甜, 等. 并行计算框架Spark的自适应缓存管理策略. 电子学报, 2017, 45(2): 278-284. DOI:10.3969/j.issn.0372-2112.2017.02.003 |
[10] |
李俊丽. 基于Spark平台的离群数据并行挖掘算法. 计算机与数字工程, 2018, 46(11): 2175-2178. DOI:10.3969/j.issn.1672-9722.2018.11.003 |
[11] |
朱继召, 贾岩涛, 徐君, 等. SparkCRF: 一种基于Spark的并行CRFs算法实现. 计算机研究与发展, 2016, 53(8): 1819-1828. |
[12] |
谭亮, 周静. 基于Spark Streaming的实时交通数据处理平台. 计算机系统应用, 2018, 27(10): 133-139. |