1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.scheduler 19 20import org.apache.spark.ShuffleDependency 21import org.apache.spark.rdd.RDD 22import org.apache.spark.storage.BlockManagerId 23import org.apache.spark.util.CallSite 24 25/** 26 * ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle. 27 * They occur right before each shuffle operation, and might contain multiple pipelined operations 28 * before that (e.g. map and filter). When executed, they save map output files that can later be 29 * fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of, 30 * and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready. 31 * 32 * ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage. 33 * For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that 34 * there can be multiple ActiveJobs trying to compute the same shuffle map stage. 35 */ 36private[spark] class ShuffleMapStage( 37 id: Int, 38 rdd: RDD[_], 39 numTasks: Int, 40 parents: List[Stage], 41 firstJobId: Int, 42 callSite: CallSite, 43 val shuffleDep: ShuffleDependency[_, _, _]) 44 extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) { 45 46 private[this] var _mapStageJobs: List[ActiveJob] = Nil 47 48 private[this] var _numAvailableOutputs: Int = 0 49 50 /** 51 * List of [[MapStatus]] for each partition. The index of the array is the map partition id, 52 * and each value in the array is the list of possible [[MapStatus]] for a partition 53 * (a single task might run multiple times). 54 */ 55 private[this] val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) 56 57 override def toString: String = "ShuffleMapStage " + id 58 59 /** 60 * Returns the list of active jobs, 61 * i.e. map-stage jobs that were submitted to execute this stage independently (if any). 62 */ 63 def mapStageJobs: Seq[ActiveJob] = _mapStageJobs 64 65 /** Adds the job to the active job list. */ 66 def addActiveJob(job: ActiveJob): Unit = { 67 _mapStageJobs = job :: _mapStageJobs 68 } 69 70 /** Removes the job from the active job list. */ 71 def removeActiveJob(job: ActiveJob): Unit = { 72 _mapStageJobs = _mapStageJobs.filter(_ != job) 73 } 74 75 /** 76 * Number of partitions that have shuffle outputs. 77 * When this reaches [[numPartitions]], this map stage is ready. 78 * This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`. 79 */ 80 def numAvailableOutputs: Int = _numAvailableOutputs 81 82 /** 83 * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs. 84 * This should be the same as `outputLocs.contains(Nil)`. 85 */ 86 def isAvailable: Boolean = _numAvailableOutputs == numPartitions 87 88 /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */ 89 override def findMissingPartitions(): Seq[Int] = { 90 val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty) 91 assert(missing.size == numPartitions - _numAvailableOutputs, 92 s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}") 93 missing 94 } 95 96 def addOutputLoc(partition: Int, status: MapStatus): Unit = { 97 val prevList = outputLocs(partition) 98 outputLocs(partition) = status :: prevList 99 if (prevList == Nil) { 100 _numAvailableOutputs += 1 101 } 102 } 103 104 def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = { 105 val prevList = outputLocs(partition) 106 val newList = prevList.filterNot(_.location == bmAddress) 107 outputLocs(partition) = newList 108 if (prevList != Nil && newList == Nil) { 109 _numAvailableOutputs -= 1 110 } 111 } 112 113 /** 114 * Returns an array of [[MapStatus]] (index by partition id). For each partition, the returned 115 * value contains only one (i.e. the first) [[MapStatus]]. If there is no entry for the partition, 116 * that position is filled with null. 117 */ 118 def outputLocInMapOutputTrackerFormat(): Array[MapStatus] = { 119 outputLocs.map(_.headOption.orNull) 120 } 121 122 /** 123 * Removes all shuffle outputs associated with this executor. Note that this will also remove 124 * outputs which are served by an external shuffle server (if one exists), as they are still 125 * registered with this execId. 126 */ 127 def removeOutputsOnExecutor(execId: String): Unit = { 128 var becameUnavailable = false 129 for (partition <- 0 until numPartitions) { 130 val prevList = outputLocs(partition) 131 val newList = prevList.filterNot(_.location.executorId == execId) 132 outputLocs(partition) = newList 133 if (prevList != Nil && newList == Nil) { 134 becameUnavailable = true 135 _numAvailableOutputs -= 1 136 } 137 } 138 if (becameUnavailable) { 139 logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format( 140 this, execId, _numAvailableOutputs, numPartitions, isAvailable)) 141 } 142 } 143} 144