Skip to content

Latest commit

 

History

History
69 lines (43 loc) · 3.6 KB

spark-dagscheduler-ShuffleMapStage.adoc

File metadata and controls

69 lines (43 loc) · 3.6 KB

ShuffleMapStage — Intermediate Stage in Job

A ShuffleMapStage (aka shuffle map stage, or simply map stage) is an intermediate stage in the execution DAG that produces data for shuffle operation. It is an input for the other following stages in the DAG of stages. That is why it is also called a shuffle dependency’s map side (see ShuffleDependency)

ShuffleMapStages usually contain multiple pipelined operations, e.g. map and filter, before shuffle operation.

Caution
FIXME: Show the example and the logs + figures

A single ShuffleMapStage can be part of many jobs — refer to the section ShuffleMapStage sharing.

A ShuffleMapStage is a stage with a ShuffleDependency - the shuffle that it is part of and outputLocs and numAvailableOutputs track how many map outputs are ready.

Note
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage for Adaptive Query Planning.

When executed, ShuffleMapStages save map output files that can later be fetched by reduce tasks.

Caution
FIXME Figure with ShuffleMapStages saving files

The number of the partitions of an RDD is exactly the number of the tasks in a ShuffleMapStage.

The output locations (outputLocs) of a ShuffleMapStage are the same as used by its ShuffleDependency. Output locations can be missing, i.e. partitions have not been cached or are lost.

ShuffleMapStages are registered to DAGScheduler that tracks the mapping of shuffles (by their ids from SparkContext) to corresponding ShuffleMapStages that compute them, stored in shuffleToMapStage.

A new ShuffleMapStage is created from an input ShuffleDependency and a job’s id (in DAGScheduler#newOrUsedShuffleStage).

FIXME: Where’s shuffleToMapStage used?

  • getShuffleMapStage - see Stage sharing

  • getAncestorShuffleDependencies

  • cleanupStateForJobAndIndependentStages

  • handleExecutorLost

When there is no ShuffleMapStage for a shuffle id (of a ShuffleDependency), one is created with the ancestor shuffle dependencies of the RDD (of a ShuffleDependency) that are registered to MapOutputTrackerMaster.

FIXME Where is ShuffleMapStage used?

  • newShuffleMapStage - the proper way to create shuffle map stages (with the additional setup steps)

  • MapStageSubmitted

  • getShuffleMapStage - see Stage sharing

Caution

FIXME

  • What’s ShuffleMapStage.outputLocs and MapStatus?

  • newShuffleMapStage

ShuffleMapStage Sharing

ShuffleMapStages can be shared across multiple jobs, if these jobs reuse the same RDDs.

When a ShuffleMapStage is submitted to DAGScheduler to execute, getShuffleMapStage is called (as part of handleMapStageSubmitted while newResultStage - note the new part - for handleJobSubmitted).

scala> val rdd = sc.parallelize(0 to 5).map((_,1)).sortByKey()  // (1)

scala> rdd.count  // (2)

scala> rdd.count  // (3)
  1. Shuffle at sortByKey()

  2. Submits a job with two stages with two being executed

  3. Intentionally repeat the last action that submits a new job with two stages with one being shared as already-being-computed

dagscheduler webui skipped stages
Figure 1. Skipped Stages are already-computed ShuffleMapStages