1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17package org.apache.spark.status.api.v1 18 19import javax.ws.rs._ 20import javax.ws.rs.core.MediaType 21 22import org.apache.spark.SparkException 23import org.apache.spark.scheduler.StageInfo 24import org.apache.spark.status.api.v1.StageStatus._ 25import org.apache.spark.status.api.v1.TaskSorting._ 26import org.apache.spark.ui.SparkUI 27import org.apache.spark.ui.jobs.JobProgressListener 28import org.apache.spark.ui.jobs.UIData.StageUIData 29 30@Produces(Array(MediaType.APPLICATION_JSON)) 31private[v1] class OneStageResource(ui: SparkUI) { 32 33 @GET 34 @Path("") 35 def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = { 36 withStage(stageId) { stageAttempts => 37 stageAttempts.map { stage => 38 AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, 39 includeDetails = true) 40 } 41 } 42 } 43 44 @GET 45 @Path("/{stageAttemptId: \\d+}") 46 def oneAttemptData( 47 @PathParam("stageId") stageId: Int, 48 @PathParam("stageAttemptId") stageAttemptId: Int): StageData = { 49 withStageAttempt(stageId, stageAttemptId) { stage => 50 AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, 51 includeDetails = true) 52 } 53 } 54 55 @GET 56 @Path("/{stageAttemptId: \\d+}/taskSummary") 57 def taskSummary( 58 @PathParam("stageId") stageId: Int, 59 @PathParam("stageAttemptId") stageAttemptId: Int, 60 @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String) 61 : TaskMetricDistributions = { 62 withStageAttempt(stageId, stageAttemptId) { stage => 63 val quantiles = quantileString.split(",").map { s => 64 try { 65 s.toDouble 66 } catch { 67 case nfe: NumberFormatException => 68 throw new BadParameterException("quantiles", "double", s) 69 } 70 } 71 AllStagesResource.taskMetricDistributions(stage.ui.taskData.values, quantiles) 72 } 73 } 74 75 @GET 76 @Path("/{stageAttemptId: \\d+}/taskList") 77 def taskList( 78 @PathParam("stageId") stageId: Int, 79 @PathParam("stageAttemptId") stageAttemptId: Int, 80 @DefaultValue("0") @QueryParam("offset") offset: Int, 81 @DefaultValue("20") @QueryParam("length") length: Int, 82 @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { 83 withStageAttempt(stageId, stageAttemptId) { stage => 84 val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq 85 .sorted(OneStageResource.ordering(sortBy)) 86 tasks.slice(offset, offset + length) 87 } 88 } 89 90 private case class StageStatusInfoUi(status: StageStatus, info: StageInfo, ui: StageUIData) 91 92 private def withStage[T](stageId: Int)(f: Seq[StageStatusInfoUi] => T): T = { 93 val stageAttempts = findStageStatusUIData(ui.jobProgressListener, stageId) 94 if (stageAttempts.isEmpty) { 95 throw new NotFoundException("unknown stage: " + stageId) 96 } else { 97 f(stageAttempts) 98 } 99 } 100 101 private def findStageStatusUIData( 102 listener: JobProgressListener, 103 stageId: Int): Seq[StageStatusInfoUi] = { 104 listener.synchronized { 105 def getStatusInfoUi(status: StageStatus, infos: Seq[StageInfo]): Seq[StageStatusInfoUi] = { 106 infos.filter { _.stageId == stageId }.map { info => 107 val ui = listener.stageIdToData.getOrElse((info.stageId, info.attemptId), 108 // this is an internal error -- we should always have uiData 109 throw new SparkException( 110 s"no stage ui data found for stage: ${info.stageId}:${info.attemptId}") 111 ) 112 StageStatusInfoUi(status, info, ui) 113 } 114 } 115 getStatusInfoUi(ACTIVE, listener.activeStages.values.toSeq) ++ 116 getStatusInfoUi(COMPLETE, listener.completedStages) ++ 117 getStatusInfoUi(FAILED, listener.failedStages) ++ 118 getStatusInfoUi(PENDING, listener.pendingStages.values.toSeq) 119 } 120 } 121 122 private def withStageAttempt[T]( 123 stageId: Int, 124 stageAttemptId: Int) 125 (f: StageStatusInfoUi => T): T = { 126 withStage(stageId) { attempts => 127 val oneAttempt = attempts.find { stage => stage.info.attemptId == stageAttemptId } 128 oneAttempt match { 129 case Some(stage) => 130 f(stage) 131 case None => 132 val stageAttempts = attempts.map { _.info.attemptId } 133 throw new NotFoundException(s"unknown attempt for stage $stageId. " + 134 s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}") 135 } 136 } 137 } 138} 139 140object OneStageResource { 141 def ordering(taskSorting: TaskSorting): Ordering[TaskData] = { 142 val extractor: (TaskData => Long) = td => 143 taskSorting match { 144 case ID => td.taskId 145 case INCREASING_RUNTIME => td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L) 146 case DECREASING_RUNTIME => -td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L) 147 } 148 Ordering.by(extractor) 149 } 150} 151