A ShuffleMapStage (aka shuffle map stage, or simply map stage) is an intermediate stage in the execution DAG that produces data for shuffle operation. It is an input for the other following stages in the DAG of stages. That is why it is also called a shuffle dependency’s map side (see ShuffleDependency)
ShuffleMapStages usually contain multiple pipelined operations, e.g. map
and filter
, before shuffle operation.
Caution
|
FIXME: Show the example and the logs + figures |
A single ShuffleMapStage can be part of many jobs — refer to the section ShuffleMapStage sharing.
A ShuffleMapStage is a stage with a ShuffleDependency - the shuffle that it is part of and outputLocs
and numAvailableOutputs
track how many map outputs are ready.
Note
|
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage for Adaptive Query Planning.
|
When executed, ShuffleMapStages save map output files that can later be fetched by reduce tasks.
Caution
|
FIXME Figure with ShuffleMapStages saving files |
The number of the partitions of an RDD is exactly the number of the tasks in a ShuffleMapStage.
The output locations (outputLocs
) of a ShuffleMapStage are the same as used by its ShuffleDependency. Output locations can be missing, i.e. partitions have not been cached or are lost.
ShuffleMapStages are registered to DAGScheduler that tracks the mapping of shuffles (by their ids from SparkContext) to corresponding ShuffleMapStages that compute them, stored in shuffleToMapStage
.
A new ShuffleMapStage is created from an input ShuffleDependency and a job’s id (in DAGScheduler#newOrUsedShuffleStage
).
FIXME: Where’s shuffleToMapStage
used?
-
getShuffleMapStage - see Stage sharing
-
getAncestorShuffleDependencies
-
cleanupStateForJobAndIndependentStages
-
handleExecutorLost
When there is no ShuffleMapStage for a shuffle id (of a ShuffleDependency), one is created with the ancestor shuffle dependencies of the RDD (of a ShuffleDependency) that are registered to MapOutputTrackerMaster.
FIXME Where is ShuffleMapStage
used?
-
newShuffleMapStage - the proper way to create shuffle map stages (with the additional setup steps)
-
getShuffleMapStage
- see Stage sharing
Caution
|
FIXME
|
ShuffleMapStages can be shared across multiple jobs, if these jobs reuse the same RDDs.
When a ShuffleMapStage is submitted to DAGScheduler to execute, getShuffleMapStage
is called (as part of handleMapStageSubmitted while newResultStage
- note the new
part - for handleJobSubmitted).
scala> val rdd = sc.parallelize(0 to 5).map((_,1)).sortByKey() // (1)
scala> rdd.count // (2)
scala> rdd.count // (3)
-
Shuffle at
sortByKey()
-
Submits a job with two stages with two being executed
-
Intentionally repeat the last action that submits a new job with two stages with one being shared as already-being-computed