博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark ShuffleManager内存缓冲器SortShuffleWriter设计思路剖析-Spark商业环境实战
阅读量:6303 次
发布时间:2019-06-22

本文共 8791 字,大约阅读时间需要 29 分钟。

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归作者(秦凯新)所有,禁止转载,欢迎学习。

  • [Spark商业环境实战-Spark Shuffle 聚合拉取读数据(Reduce Task)过程深入剖析]

1 从ShuffeManager讲起

一张图我已经用过多次了,不要见怪,因为毕竟都是一个主题,有关shuffle的。英文注释已经很详细了,这里简单介绍一下:

  • 目前只有一个实现 SortShuffleManager。
  • SortShuffleManager依赖于ShuffleWriter提供服务,通过ShuffleWriter定义的规范,可以将MapTask的任务中间结果按照约束的规范持久化到磁盘。
  • SortShuffleManager总共有三个子类, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter。

官方英文介绍如下:

* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the      * driver and on each executor, based on the spark.shuffle.manager setting. The driver      * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.          * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and     * boolean isDriver as parameters.复制代码

ShuffeManager代码欣赏,可以看到,只是定义了标准规范:

/**       * Register a shuffle with the manager and obtain a handle for it to pass to tasks.       */      def registerShuffle[K, V, C](          shuffleId: Int,          numMaps: Int,          dependency: ShuffleDependency[K, V, C]): ShuffleHandle          /** Get a writer for a given partition. Called on executors by map tasks. */      def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]          /**       * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).       * Called on executors by reduce tasks.       */      def getReader[K, C](          handle: ShuffleHandle,          startPartition: Int,          endPartition: Int,          context: TaskContext): ShuffleReader[K, C]          /**       * Remove a shuffle's metadata from the ShuffleManager.       * @return true if the metadata removed successfully, otherwise false.       */      def unregisterShuffle(shuffleId: Int): Boolean          /**       * Return a resolver capable of retrieving shuffle block data based on block coordinates.       */      def shuffleBlockResolver: ShuffleBlockResolver          /** Shut down this ShuffleManager. */      def stop(): Unit    }复制代码
  • SortShuffleManager依赖于ShuffleHandle样例类,主要还是负责向Task传递Shuffle信息。一个是序列化,一个是确定何时绕开合并和排序的Shuffle路径。

2 再讲MapStatus

MapStatus的主要作用用于给ShuffleMapTask返回TaskScheduler的执行结果。看看MapStatus的代码:

* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.复制代码
  • 特质MapStatus,其中location和getSizeForBlock一个表示地址,一个表示大小。

    private[spark] sealed trait MapStatus {    /** Location where this task was run. */    def location: BlockManagerId      /**     * Estimated size for the reduce block, in bytes.     *     * If a block is non-empty, then this method MUST return a non-zero size.  This invariant is     * necessary for correctness, since block fetchers are allowed to skip zero-size blocks.     */    def getSizeForBlock(reduceId: Int): Long  }复制代码
  • 伴生对象用于实现压缩

    private[spark] object MapStatus {        def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {          if (uncompressedSizes.length > 2000) {            HighlyCompressedMapStatus(loc, uncompressedSizes)          } else {            new CompressedMapStatus(loc, uncompressedSizes)          }        }复制代码

3 三聊SortShuffleWriter(重磅戏)

  • 针对MapTask输出提供了数据排序,聚合以及缓存功能
  • SortShuffleWriter底层借助于PartionedAppendOnlyMap和PartionPairBuffer功能,实现数据的写入缓存,以及在缓存中排序,聚合等。

3.1 SortShuffleWriter核心成员介绍

  • blockManager: SparkEnv.get.blockManager子组件实现数据存储服务统一对外管理器

  • sorter :御用成员ExternalSorter,实现内存中缓冲,排序,聚合功能。

  • mapStatus :数据输出的规范,方便reducer查找。

  • dep :handle.dependency传入,主要是ShuffleDependency相关属性。

  • shuffleBlockResolver :索引文件生成器

    * Create and maintain the shuffle blocks' mapping between logic block and physical file location.   * Data of shuffle blocks from the same map task are stored in a single consolidated data file.   * The offsets of the data blocks in the data file are stored in a separate index file.   *   * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"   * as the filename postfix for data file, and ".index" as the filename postfix for index file.复制代码

3.2 SortShuffleWriter的Write方法(神来之笔)

  • ShuffleDependency的mapSideCombine属性为True时,则允许使用aggregator和keyOrdering属性进行聚合和排序。 否则则不传递。这也说明一个问题,究竟是使用PartitionedAppendOnlyMap还是使用PartitionedPairBuffer。
  • insertAll实现了map任务的输出记录插入到内存。
  • ShuffleBlockId:获取Shuffle的数据文件,主要是MapTask的输出文件句柄。
  • writePartitionedFile:重要的伙伴,开始迭代Map端的缓存数据到磁盘,该过程可能会合并溢出到磁盘的中间数据,归并排序后迭代写入正式的Block文件到磁盘。
  • writeIndexFileAndCommit:为最终的Block正式文件建立对应的索引,此索引会记录不同分区Id对应的偏移值,以便reducer任务前来拉取。

3.3 SortShuffleWriter的精彩代码段欣赏

override def write(records: Iterator[Product2[K, V]]): Unit = {            sorter = if (dep.mapSideCombine) {          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")          new ExternalSorter[K, V, C](            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)        } else {          // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't          // care whether the keys get sorted in each partition; that will be done on the reduce side          // if the operation being run is sortByKey.          new ExternalSorter[K, V, V](            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)        }        sorter.insertAll(records)                           ======> 神来之笔            // Don't bother including the time to open the merged output file in the shuffle write time,        // because it just opens a single file, so is typically too fast to measure accurately        // (see SPARK-3570).        val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)          val tmp = Utils.tempFileWith(output)        try {                  val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)                        ======> 神来之笔                    val partitionLengths = sorter.writePartitionedFile(blockId, tmp) ======> 神来之笔                    shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)                                                             ======> 神来之笔                    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)        } finally {          if (tmp.exists() && !tmp.delete()) {            logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")          }        }      }复制代码

3.4 SortShuffleWriter的Write方法示意图,可谓一图道尽所有

不废话,这张图简直画的太好了,望原图作者看到留言于我。

3.5 SortShuffleWriter伴生对象shouldBypassMergeSort

是不是需要绕过聚合和排序。spark.shuffle.sort.bypassMergeThreshold默认值是200.

* We cannot bypass sorting if we need to do map-side aggregation.        private[spark] object SortShuffleWriter {      def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {        // We cannot bypass sorting if we need to do map-side aggregation.        if (dep.mapSideCombine) {          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")          false        } else {          val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)          dep.partitioner.numPartitions <= bypassMergeThreshold        }      }复制代码

4 SortShuffleManager 如何扛霸子

根据需要选择想要的UnsafeShuffleWriter 还是BypassMergeSortShuffleWriter 还是SortShuffleWriter,然后执行内存缓冲排序集合。

SortShuffleManager因此是组织者,对外暴露的管理者
/** Get a writer for a given partition. Called on executors by map tasks. */  override def getWriter[K, V](      handle: ShuffleHandle,      mapId: Int,      context: TaskContext): ShuffleWriter[K, V] = {    numMapsForShuffle.putIfAbsent(      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)    val env = SparkEnv.get    handle match {      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>        new UnsafeShuffleWriter(          env.blockManager,          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],          context.taskMemoryManager(),          unsafeShuffleHandle,          mapId,          context,          env.conf)      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>        new BypassMergeSortShuffleWriter(          env.blockManager,          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],          bypassMergeSortHandle,          mapId,          context,          env.conf)      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)     } }复制代码

5 总结

本节内容是作者投入大量时间优化后的内容,采用最平实的语言来剖析 ShuffeManager之统一存储服务SortShuffleWriter设计思路。

秦凯新 于深圳

你可能感兴趣的文章
day8--socket网络编程进阶
查看>>
node mysql模块写入中文字符时的乱码问题
查看>>
仍需"敬请期待"的微信沃卡
查看>>
分析Ajax爬取今日头条街拍美图
查看>>
内存分布简视图
查看>>
POJ 2918 求解数独
查看>>
如何学习虚拟现实技术vr? vr初级入门教程开始
查看>>
第4 章序列的应用
查看>>
Mysql explain
查看>>
初识闭包
查看>>
java tcp socket实例
查看>>
011 指针的算术运算
查看>>
hdu1874畅通工程续
查看>>
rails 字符串 转化为 html
查看>>
java-学习8
查看>>
AOP动态代理
查看>>
Oracle序列
查看>>
xcodebuild命令行编译错误问题解决
查看>>
Yii2.0 下的 load() 方法的使用
查看>>
华为畅玩5 (CUN-AL00) 刷入第三方twrp Recovery 及 root
查看>>