1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.scheduler
19
20import org.apache.spark.ShuffleDependency
21import org.apache.spark.rdd.RDD
22import org.apache.spark.storage.BlockManagerId
23import org.apache.spark.util.CallSite
24
25/**
26 * ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
27 * They occur right before each shuffle operation, and might contain multiple pipelined operations
28 * before that (e.g. map and filter). When executed, they save map output files that can later be
29 * fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of,
30 * and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
31 *
32 * ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
33 * For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
34 * there can be multiple ActiveJobs trying to compute the same shuffle map stage.
35 */
36private[spark] class ShuffleMapStage(
37    id: Int,
38    rdd: RDD[_],
39    numTasks: Int,
40    parents: List[Stage],
41    firstJobId: Int,
42    callSite: CallSite,
43    val shuffleDep: ShuffleDependency[_, _, _])
44  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
45
46  private[this] var _mapStageJobs: List[ActiveJob] = Nil
47
48  private[this] var _numAvailableOutputs: Int = 0
49
50  /**
51   * List of [[MapStatus]] for each partition. The index of the array is the map partition id,
52   * and each value in the array is the list of possible [[MapStatus]] for a partition
53   * (a single task might run multiple times).
54   */
55  private[this] val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
56
57  override def toString: String = "ShuffleMapStage " + id
58
59  /**
60   * Returns the list of active jobs,
61   * i.e. map-stage jobs that were submitted to execute this stage independently (if any).
62   */
63  def mapStageJobs: Seq[ActiveJob] = _mapStageJobs
64
65  /** Adds the job to the active job list. */
66  def addActiveJob(job: ActiveJob): Unit = {
67    _mapStageJobs = job :: _mapStageJobs
68  }
69
70  /** Removes the job from the active job list. */
71  def removeActiveJob(job: ActiveJob): Unit = {
72    _mapStageJobs = _mapStageJobs.filter(_ != job)
73  }
74
75  /**
76   * Number of partitions that have shuffle outputs.
77   * When this reaches [[numPartitions]], this map stage is ready.
78   * This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`.
79   */
80  def numAvailableOutputs: Int = _numAvailableOutputs
81
82  /**
83   * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.
84   * This should be the same as `outputLocs.contains(Nil)`.
85   */
86  def isAvailable: Boolean = _numAvailableOutputs == numPartitions
87
88  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
89  override def findMissingPartitions(): Seq[Int] = {
90    val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)
91    assert(missing.size == numPartitions - _numAvailableOutputs,
92      s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
93    missing
94  }
95
96  def addOutputLoc(partition: Int, status: MapStatus): Unit = {
97    val prevList = outputLocs(partition)
98    outputLocs(partition) = status :: prevList
99    if (prevList == Nil) {
100      _numAvailableOutputs += 1
101    }
102  }
103
104  def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {
105    val prevList = outputLocs(partition)
106    val newList = prevList.filterNot(_.location == bmAddress)
107    outputLocs(partition) = newList
108    if (prevList != Nil && newList == Nil) {
109      _numAvailableOutputs -= 1
110    }
111  }
112
113  /**
114   * Returns an array of [[MapStatus]] (index by partition id). For each partition, the returned
115   * value contains only one (i.e. the first) [[MapStatus]]. If there is no entry for the partition,
116   * that position is filled with null.
117   */
118  def outputLocInMapOutputTrackerFormat(): Array[MapStatus] = {
119    outputLocs.map(_.headOption.orNull)
120  }
121
122  /**
123   * Removes all shuffle outputs associated with this executor. Note that this will also remove
124   * outputs which are served by an external shuffle server (if one exists), as they are still
125   * registered with this execId.
126   */
127  def removeOutputsOnExecutor(execId: String): Unit = {
128    var becameUnavailable = false
129    for (partition <- 0 until numPartitions) {
130      val prevList = outputLocs(partition)
131      val newList = prevList.filterNot(_.location.executorId == execId)
132      outputLocs(partition) = newList
133      if (prevList != Nil && newList == Nil) {
134        becameUnavailable = true
135        _numAvailableOutputs -= 1
136      }
137    }
138    if (becameUnavailable) {
139      logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
140        this, execId, _numAvailableOutputs, numPartitions, isAvailable))
141    }
142  }
143}
144