1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17package org.apache.spark.status.api.v1
18
19import javax.ws.rs._
20import javax.ws.rs.core.MediaType
21
22import org.apache.spark.SparkException
23import org.apache.spark.scheduler.StageInfo
24import org.apache.spark.status.api.v1.StageStatus._
25import org.apache.spark.status.api.v1.TaskSorting._
26import org.apache.spark.ui.SparkUI
27import org.apache.spark.ui.jobs.JobProgressListener
28import org.apache.spark.ui.jobs.UIData.StageUIData
29
30@Produces(Array(MediaType.APPLICATION_JSON))
31private[v1] class OneStageResource(ui: SparkUI) {
32
33  @GET
34  @Path("")
35  def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = {
36    withStage(stageId) { stageAttempts =>
37      stageAttempts.map { stage =>
38        AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
39          includeDetails = true)
40      }
41    }
42  }
43
44  @GET
45  @Path("/{stageAttemptId: \\d+}")
46  def oneAttemptData(
47      @PathParam("stageId") stageId: Int,
48      @PathParam("stageAttemptId") stageAttemptId: Int): StageData = {
49    withStageAttempt(stageId, stageAttemptId) { stage =>
50      AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
51        includeDetails = true)
52    }
53  }
54
55  @GET
56  @Path("/{stageAttemptId: \\d+}/taskSummary")
57  def taskSummary(
58      @PathParam("stageId") stageId: Int,
59      @PathParam("stageAttemptId") stageAttemptId: Int,
60      @DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String)
61  : TaskMetricDistributions = {
62    withStageAttempt(stageId, stageAttemptId) { stage =>
63      val quantiles = quantileString.split(",").map { s =>
64        try {
65          s.toDouble
66        } catch {
67          case nfe: NumberFormatException =>
68            throw new BadParameterException("quantiles", "double", s)
69        }
70      }
71      AllStagesResource.taskMetricDistributions(stage.ui.taskData.values, quantiles)
72    }
73  }
74
75  @GET
76  @Path("/{stageAttemptId: \\d+}/taskList")
77  def taskList(
78      @PathParam("stageId") stageId: Int,
79      @PathParam("stageAttemptId") stageAttemptId: Int,
80      @DefaultValue("0") @QueryParam("offset") offset: Int,
81      @DefaultValue("20") @QueryParam("length") length: Int,
82      @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = {
83    withStageAttempt(stageId, stageAttemptId) { stage =>
84      val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
85        .sorted(OneStageResource.ordering(sortBy))
86      tasks.slice(offset, offset + length)
87    }
88  }
89
90  private case class StageStatusInfoUi(status: StageStatus, info: StageInfo, ui: StageUIData)
91
92  private def withStage[T](stageId: Int)(f: Seq[StageStatusInfoUi] => T): T = {
93    val stageAttempts = findStageStatusUIData(ui.jobProgressListener, stageId)
94    if (stageAttempts.isEmpty) {
95      throw new NotFoundException("unknown stage: " + stageId)
96    } else {
97      f(stageAttempts)
98    }
99  }
100
101  private def findStageStatusUIData(
102      listener: JobProgressListener,
103      stageId: Int): Seq[StageStatusInfoUi] = {
104    listener.synchronized {
105      def getStatusInfoUi(status: StageStatus, infos: Seq[StageInfo]): Seq[StageStatusInfoUi] = {
106        infos.filter { _.stageId == stageId }.map { info =>
107          val ui = listener.stageIdToData.getOrElse((info.stageId, info.attemptId),
108            // this is an internal error -- we should always have uiData
109            throw new SparkException(
110              s"no stage ui data found for stage: ${info.stageId}:${info.attemptId}")
111          )
112          StageStatusInfoUi(status, info, ui)
113        }
114      }
115      getStatusInfoUi(ACTIVE, listener.activeStages.values.toSeq) ++
116        getStatusInfoUi(COMPLETE, listener.completedStages) ++
117        getStatusInfoUi(FAILED, listener.failedStages) ++
118        getStatusInfoUi(PENDING, listener.pendingStages.values.toSeq)
119    }
120  }
121
122  private def withStageAttempt[T](
123      stageId: Int,
124      stageAttemptId: Int)
125      (f: StageStatusInfoUi => T): T = {
126    withStage(stageId) { attempts =>
127        val oneAttempt = attempts.find { stage => stage.info.attemptId == stageAttemptId }
128        oneAttempt match {
129          case Some(stage) =>
130            f(stage)
131          case None =>
132            val stageAttempts = attempts.map { _.info.attemptId }
133            throw new NotFoundException(s"unknown attempt for stage $stageId.  " +
134              s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}")
135        }
136    }
137  }
138}
139
140object OneStageResource {
141  def ordering(taskSorting: TaskSorting): Ordering[TaskData] = {
142    val extractor: (TaskData => Long) = td =>
143      taskSorting match {
144        case ID => td.taskId
145        case INCREASING_RUNTIME => td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
146        case DECREASING_RUNTIME => -td.taskMetrics.map{_.executorRunTime}.getOrElse(-1L)
147      }
148    Ordering.by(extractor)
149  }
150}
151