1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17package org.apache.spark.status.api.v1 18 19import java.util.{Arrays, Date, List => JList} 20import javax.ws.rs.{GET, Produces, QueryParam} 21import javax.ws.rs.core.MediaType 22 23import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo} 24import org.apache.spark.ui.SparkUI 25import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData} 26import org.apache.spark.ui.jobs.UIData.{InputMetricsUIData => InternalInputMetrics, OutputMetricsUIData => InternalOutputMetrics, ShuffleReadMetricsUIData => InternalShuffleReadMetrics, ShuffleWriteMetricsUIData => InternalShuffleWriteMetrics, TaskMetricsUIData => InternalTaskMetrics} 27import org.apache.spark.util.Distribution 28 29@Produces(Array(MediaType.APPLICATION_JSON)) 30private[v1] class AllStagesResource(ui: SparkUI) { 31 32 @GET 33 def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = { 34 val listener = ui.jobProgressListener 35 val stageAndStatus = AllStagesResource.stagesAndStatus(ui) 36 val adjStatuses = { 37 if (statuses.isEmpty()) { 38 Arrays.asList(StageStatus.values(): _*) 39 } else { 40 statuses 41 } 42 } 43 for { 44 (status, stageList) <- stageAndStatus 45 stageInfo: StageInfo <- stageList if adjStatuses.contains(status) 46 stageUiData: StageUIData <- listener.synchronized { 47 listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)) 48 } 49 } yield { 50 AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false) 51 } 52 } 53} 54 55private[v1] object AllStagesResource { 56 def stageUiToStageData( 57 status: StageStatus, 58 stageInfo: StageInfo, 59 stageUiData: StageUIData, 60 includeDetails: Boolean): StageData = { 61 62 val taskLaunchTimes = stageUiData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0) 63 64 val firstTaskLaunchedTime: Option[Date] = 65 if (taskLaunchTimes.nonEmpty) { 66 Some(new Date(taskLaunchTimes.min)) 67 } else { 68 None 69 } 70 71 val taskData = if (includeDetails) { 72 Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) 73 } else { 74 None 75 } 76 val executorSummary = if (includeDetails) { 77 Some(stageUiData.executorSummary.map { case (k, summary) => 78 k -> new ExecutorStageSummary( 79 taskTime = summary.taskTime, 80 failedTasks = summary.failedTasks, 81 succeededTasks = summary.succeededTasks, 82 inputBytes = summary.inputBytes, 83 outputBytes = summary.outputBytes, 84 shuffleRead = summary.shuffleRead, 85 shuffleWrite = summary.shuffleWrite, 86 memoryBytesSpilled = summary.memoryBytesSpilled, 87 diskBytesSpilled = summary.diskBytesSpilled 88 ) 89 }) 90 } else { 91 None 92 } 93 94 val accumulableInfo = stageUiData.accumulables.values.map { convertAccumulableInfo }.toSeq 95 96 new StageData( 97 status = status, 98 stageId = stageInfo.stageId, 99 attemptId = stageInfo.attemptId, 100 numActiveTasks = stageUiData.numActiveTasks, 101 numCompleteTasks = stageUiData.numCompleteTasks, 102 numFailedTasks = stageUiData.numFailedTasks, 103 executorRunTime = stageUiData.executorRunTime, 104 executorCpuTime = stageUiData.executorCpuTime, 105 submissionTime = stageInfo.submissionTime.map(new Date(_)), 106 firstTaskLaunchedTime, 107 completionTime = stageInfo.completionTime.map(new Date(_)), 108 inputBytes = stageUiData.inputBytes, 109 inputRecords = stageUiData.inputRecords, 110 outputBytes = stageUiData.outputBytes, 111 outputRecords = stageUiData.outputRecords, 112 shuffleReadBytes = stageUiData.shuffleReadTotalBytes, 113 shuffleReadRecords = stageUiData.shuffleReadRecords, 114 shuffleWriteBytes = stageUiData.shuffleWriteBytes, 115 shuffleWriteRecords = stageUiData.shuffleWriteRecords, 116 memoryBytesSpilled = stageUiData.memoryBytesSpilled, 117 diskBytesSpilled = stageUiData.diskBytesSpilled, 118 schedulingPool = stageUiData.schedulingPool, 119 name = stageInfo.name, 120 details = stageInfo.details, 121 accumulatorUpdates = accumulableInfo, 122 tasks = taskData, 123 executorSummary = executorSummary 124 ) 125 } 126 127 def stagesAndStatus(ui: SparkUI): Seq[(StageStatus, Seq[StageInfo])] = { 128 val listener = ui.jobProgressListener 129 listener.synchronized { 130 Seq( 131 StageStatus.ACTIVE -> listener.activeStages.values.toSeq, 132 StageStatus.COMPLETE -> listener.completedStages.reverse.toSeq, 133 StageStatus.FAILED -> listener.failedStages.reverse.toSeq, 134 StageStatus.PENDING -> listener.pendingStages.values.toSeq 135 ) 136 } 137 } 138 139 def convertTaskData(uiData: TaskUIData): TaskData = { 140 new TaskData( 141 taskId = uiData.taskInfo.taskId, 142 index = uiData.taskInfo.index, 143 attempt = uiData.taskInfo.attemptNumber, 144 launchTime = new Date(uiData.taskInfo.launchTime), 145 executorId = uiData.taskInfo.executorId, 146 host = uiData.taskInfo.host, 147 taskLocality = uiData.taskInfo.taskLocality.toString(), 148 speculative = uiData.taskInfo.speculative, 149 accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo }, 150 errorMessage = uiData.errorMessage, 151 taskMetrics = uiData.metrics.map { convertUiTaskMetrics } 152 ) 153 } 154 155 def taskMetricDistributions( 156 allTaskData: Iterable[TaskUIData], 157 quantiles: Array[Double]): TaskMetricDistributions = { 158 159 val rawMetrics = allTaskData.flatMap{_.metrics}.toSeq 160 161 def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] = 162 Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles) 163 164 // We need to do a lot of similar munging to nested metrics here. For each one, 165 // we want (a) extract the values for nested metrics (b) make a distribution for each metric 166 // (c) shove the distribution into the right field in our return type and (d) only return 167 // a result if the option is defined for any of the tasks. MetricHelper is a little util 168 // to make it a little easier to deal w/ all of the nested options. Mostly it lets us just 169 // implement one "build" method, which just builds the quantiles for each field. 170 171 val inputMetrics: InputMetricDistributions = 172 new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) { 173 def getSubmetrics(raw: InternalTaskMetrics): InternalInputMetrics = raw.inputMetrics 174 175 def build: InputMetricDistributions = new InputMetricDistributions( 176 bytesRead = submetricQuantiles(_.bytesRead), 177 recordsRead = submetricQuantiles(_.recordsRead) 178 ) 179 }.build 180 181 val outputMetrics: OutputMetricDistributions = 182 new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) { 183 def getSubmetrics(raw: InternalTaskMetrics): InternalOutputMetrics = raw.outputMetrics 184 185 def build: OutputMetricDistributions = new OutputMetricDistributions( 186 bytesWritten = submetricQuantiles(_.bytesWritten), 187 recordsWritten = submetricQuantiles(_.recordsWritten) 188 ) 189 }.build 190 191 val shuffleReadMetrics: ShuffleReadMetricDistributions = 192 new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics, 193 quantiles) { 194 def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleReadMetrics = 195 raw.shuffleReadMetrics 196 197 def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions( 198 readBytes = submetricQuantiles(_.totalBytesRead), 199 readRecords = submetricQuantiles(_.recordsRead), 200 remoteBytesRead = submetricQuantiles(_.remoteBytesRead), 201 remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched), 202 localBlocksFetched = submetricQuantiles(_.localBlocksFetched), 203 totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched), 204 fetchWaitTime = submetricQuantiles(_.fetchWaitTime) 205 ) 206 }.build 207 208 val shuffleWriteMetrics: ShuffleWriteMetricDistributions = 209 new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics, 210 quantiles) { 211 def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleWriteMetrics = 212 raw.shuffleWriteMetrics 213 214 def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions( 215 writeBytes = submetricQuantiles(_.bytesWritten), 216 writeRecords = submetricQuantiles(_.recordsWritten), 217 writeTime = submetricQuantiles(_.writeTime) 218 ) 219 }.build 220 221 new TaskMetricDistributions( 222 quantiles = quantiles, 223 executorDeserializeTime = metricQuantiles(_.executorDeserializeTime), 224 executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime), 225 executorRunTime = metricQuantiles(_.executorRunTime), 226 executorCpuTime = metricQuantiles(_.executorCpuTime), 227 resultSize = metricQuantiles(_.resultSize), 228 jvmGcTime = metricQuantiles(_.jvmGCTime), 229 resultSerializationTime = metricQuantiles(_.resultSerializationTime), 230 memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled), 231 diskBytesSpilled = metricQuantiles(_.diskBytesSpilled), 232 inputMetrics = inputMetrics, 233 outputMetrics = outputMetrics, 234 shuffleReadMetrics = shuffleReadMetrics, 235 shuffleWriteMetrics = shuffleWriteMetrics 236 ) 237 } 238 239 def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = { 240 new AccumulableInfo( 241 acc.id, acc.name.orNull, acc.update.map(_.toString), acc.value.map(_.toString).orNull) 242 } 243 244 def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = { 245 new TaskMetrics( 246 executorDeserializeTime = internal.executorDeserializeTime, 247 executorDeserializeCpuTime = internal.executorDeserializeCpuTime, 248 executorRunTime = internal.executorRunTime, 249 executorCpuTime = internal.executorCpuTime, 250 resultSize = internal.resultSize, 251 jvmGcTime = internal.jvmGCTime, 252 resultSerializationTime = internal.resultSerializationTime, 253 memoryBytesSpilled = internal.memoryBytesSpilled, 254 diskBytesSpilled = internal.diskBytesSpilled, 255 inputMetrics = convertInputMetrics(internal.inputMetrics), 256 outputMetrics = convertOutputMetrics(internal.outputMetrics), 257 shuffleReadMetrics = convertShuffleReadMetrics(internal.shuffleReadMetrics), 258 shuffleWriteMetrics = convertShuffleWriteMetrics(internal.shuffleWriteMetrics) 259 ) 260 } 261 262 def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = { 263 new InputMetrics( 264 bytesRead = internal.bytesRead, 265 recordsRead = internal.recordsRead 266 ) 267 } 268 269 def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = { 270 new OutputMetrics( 271 bytesWritten = internal.bytesWritten, 272 recordsWritten = internal.recordsWritten 273 ) 274 } 275 276 def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = { 277 new ShuffleReadMetrics( 278 remoteBlocksFetched = internal.remoteBlocksFetched, 279 localBlocksFetched = internal.localBlocksFetched, 280 fetchWaitTime = internal.fetchWaitTime, 281 remoteBytesRead = internal.remoteBytesRead, 282 localBytesRead = internal.localBytesRead, 283 recordsRead = internal.recordsRead 284 ) 285 } 286 287 def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = { 288 new ShuffleWriteMetrics( 289 bytesWritten = internal.bytesWritten, 290 writeTime = internal.writeTime, 291 recordsWritten = internal.recordsWritten 292 ) 293 } 294} 295 296/** 297 * Helper for getting distributions from nested metric types. 298 */ 299private[v1] abstract class MetricHelper[I, O]( 300 rawMetrics: Seq[InternalTaskMetrics], 301 quantiles: Array[Double]) { 302 303 def getSubmetrics(raw: InternalTaskMetrics): I 304 305 def build: O 306 307 val data: Seq[I] = rawMetrics.map(getSubmetrics) 308 309 /** applies the given function to all input metrics, and returns the quantiles */ 310 def submetricQuantiles(f: I => Double): IndexedSeq[Double] = { 311 Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles) 312 } 313} 314