Spark Shuffle Non-amateur First Look

Spark Shuffle是什么

Spark Shuffle是spark的最核心部分之一,它涉及到RDD,DAGScheduler,Network,Storage,RPC等,也是spark中优化和问题最多的地方。在讲Shuffle之前要讲介绍下RDD,这里不讲老生常谈的弹性数据集之类的,引用大神Michael Armbrust的经典总结,what is rdd

  • Dependencies
  • Partitions (with optional locality info)
  • Compute Function: Partition => Iterator[T]

Dependencies是该RDD依赖的RDD,Partitions是该RDD要处理的数据分片,Compute是apply到该RDD的数据的方法(Spark是基于函数式编程思想),compute function本身是opaque,返回类型T也是opaque的,这里是不是有点volcano中mechanism与policy分离的味道。与之对应在RDD类中有compute方法,getPartitions方法和getDependencies方法。

PIC-1

Shuffle是分布式计算框架用来衔接上下游任务的数据重分布过程,spark shuffle存在于宽依赖的RDD之间,它是pull & file base的。上游Map Task最终会合并产出一个按照partition排序(key排序不强制)的data和index文件,由下游的Reduce Task去拉取各自需要处理的partition的数据,也就说上游产出多少个partition,下游就会起多少个task去处理,其中partition数量可以通过默认参数spark.sql.shuffle.partitions,spark.default.parallelism指定,或者自定义partitioner决定。这里会有两个问题:

  1. 如何划分上下游,也就是划分Stage?
  2. 某个Stage如何知道需要多少个map task去拉取上游stage的shuffle数据,和产出多少个partition数据给下游

如何划分

Shuffle划分stage,每个stage中一般会有一个RDD是ShuffleRDD(同一stage中可能多个窄依赖的rdd),用经典的WordCount例子来看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> val rdd1 = spark.sparkContext.textFile("file:///data/macduan/test_words")
scala> val rdd2 = rdd1.flatMap { _.split(" ") }
scala> val rdd3 = rdd2.map { (_, 1) }
scala> val rdd4 = rdd3.reduceByKey(_ + _)
scala> val rdd5 = rdd4.map { case (word, count) => (count, word) }
scala> val rdd6 = rdd5.reduceByKey(_ + _)
scala> rdd6.toDebugString
res1: String =
(2) ShuffledRDD[16] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[15] at map at <console>:25 []
| ShuffledRDD[14] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[13] at map at <console>:25 []
| MapPartitionsRDD[12] at flatMap at <console>:25 []
| file:///data/macduan/test_words MapPartitionsRDD[11] at textFile at <console>:23 []
| file:///data/macduan/test_words HadoopRDD[10] at textFile at <console>:23 []

通过rdd6.toDebugString发现有3个stage,以中间的stage为例子,它包含MapPartitionsRDD和一个ShuffledRDD,按照DAGScheduler划分stage的逻辑(后续文章详解)MapPartitionsRDD的parent rdd是ShuffledRDD,此时该Stage的实例中成员rdd实际就是MapPartitionRDD,而该rdd的firstParent就是ShuffledRDD。

抽象出来就是,比如 ... MapPartitionsRDD <- ShuffledRDD <- MapPartitionsRDD <- ShuffledRDD <- MapPartitionsRDD ... 划分stage的边界就是ShuffledRDD,最终会被划分为,

  • stage1 – [… MapPartitionsRDD]
  • stage2 – [ShuffledRDD <- MapPartitionsRDD]
  • stage3 – [ShuffledRDD <- MapPartitionsRDD …]

三个stage,以中间那个stage2 [ShuffledRDD <- MapPartitionsRDD]为例子,它是一个ShuffleMapStage,从下面的代码的visit函数可以看出它被划分和create的边界就是stage3的ShuffledRDD

1
2
3
4
5
6
7
8
// DAGScheduler.scala
def visit(rdd: RDD[_]) {
...
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
...

Task, Parition数量

从下面这两个函数可以直接看出一个map stage被创建时,map task数量和产出的reduce partition数量,分别是该stage的rdd的分区数量和partitioner的分区数量。从ShuffledRDD类也可以看出其dependency就是一个ShuffleDependency,ShuffleDependency中有partitioner,而且在其实例化时会调用registerShuffle方法,partitioner是决定这个stage写的数据的分区数量,该数量也是下游stage的task数量。

1
2
3
4
def createShuffleMapStage
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length, shuffleDep.partitioner.numPartitions)

def registerShuffle(shuffleId: Int, numMaps: Int, numReduces: Int)

那么问题来了,stage2自身的task数量是怎么知道的呢(也就是上游stage的写的分区的数量),上面分析是其rdd的partition数量,那这个rdd的partition数量又是怎么来的呢?

在createShuffleMapStage中rdd是shuffleDep.rdd也就是stage2的MapPartitionsRDD,numTasks是rdd的partitions的length,也就是MapPartitionsRDD.getPartitions的length,从下面代码可以看到是其第一个parent的partition数量,往上追溯那就是stage1的ShuffledRDD的partitions数量,也就是其partitioner的分区数量,这就对应上了上游stage的写的数据的分区数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
...
override def getDependencies: Seq[Dependency[_]] = {
...
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
...
override val partitioner = Some(part)
...
override def getPartitions: Array[Partition] = {
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}
...
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
...

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev) {
...
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
...

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
...
val shuffleId: Int = _rdd.context.newShuffleId()
...
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
...

再探Task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Stage.scala
private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val firstJobId: Int,
val callSite: CallSite)
extends Logging {

// RDD.scala
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}

Spark的每个Job除了触发这个job提交的finalRDD(例如上面的rdd6如果调用了collect()那么它就是这个finalRDD)中的task是ResultTask,其它stage中的task都是ShuffleMapTask。
ShuffleMapTask中有一个成员是partition,就是这个task需要处理的rdd的一个partition,在DAGScheduler.submitMissingTasks中有一段代码亦可看出来,上面的stage1的某个ShuffleMapTask就是处理MapPartitionsRDD的一个partition。

1
2
3
4
5
6
partitions = stage.rdd.partitions
......
val part = partitions(id)
......
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, ......

ShuffleMapTask中最重要的方法是runTask,里面会反序列化得到RDD(task在跑在Executor上,由Driver分发的),从SparkEnv中拿到ShuffleManager获取writer等(这些在后续文章中会详细介绍),其中很关键的一行代码是
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
在本文的例子的stage1的ShuffleMapTask中,这里既包含读Shuffle数据也包含写处理完读取的shuffle数据后写入新的shuffle数据,其中的rdd就是上面提到的MapPartitionsRDD。
write的参数rdd.iterator(partition, context),这个iterator方法最终会调用MapPartitionsRDD的compute方法,

1
2
3
// MapPartitionsRDD.scala
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))

这里的firstParent就是MapPartitionsRDD依赖的ShuffledRDD,而ShuffledRDD的iterator方法最终会调用它的compute方法,

1
2
3
4
5
6
7
// ShuffledRDD.scala
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}

这个方法会通过ShuffleManager拿到Shuffle reader然后读取上游阶段的shuffle数据。到这里Shuffle怎么产生的,读和写发生哪里都比较清晰的介绍了。