1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17package org.apache.spark.status.api.v1
18
19import java.util.{Arrays, Date, List => JList}
20import javax.ws.rs.{GET, Produces, QueryParam}
21import javax.ws.rs.core.MediaType
22
23import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
24import org.apache.spark.ui.SparkUI
25import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
26import org.apache.spark.ui.jobs.UIData.{InputMetricsUIData => InternalInputMetrics, OutputMetricsUIData => InternalOutputMetrics, ShuffleReadMetricsUIData => InternalShuffleReadMetrics, ShuffleWriteMetricsUIData => InternalShuffleWriteMetrics, TaskMetricsUIData => InternalTaskMetrics}
27import org.apache.spark.util.Distribution
28
29@Produces(Array(MediaType.APPLICATION_JSON))
30private[v1] class AllStagesResource(ui: SparkUI) {
31
32  @GET
33  def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = {
34    val listener = ui.jobProgressListener
35    val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
36    val adjStatuses = {
37      if (statuses.isEmpty()) {
38        Arrays.asList(StageStatus.values(): _*)
39      } else {
40        statuses
41      }
42    }
43    for {
44      (status, stageList) <- stageAndStatus
45      stageInfo: StageInfo <- stageList if adjStatuses.contains(status)
46      stageUiData: StageUIData <- listener.synchronized {
47        listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
48      }
49    } yield {
50      AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false)
51    }
52  }
53}
54
55private[v1] object AllStagesResource {
56  def stageUiToStageData(
57      status: StageStatus,
58      stageInfo: StageInfo,
59      stageUiData: StageUIData,
60      includeDetails: Boolean): StageData = {
61
62    val taskLaunchTimes = stageUiData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0)
63
64    val firstTaskLaunchedTime: Option[Date] =
65      if (taskLaunchTimes.nonEmpty) {
66        Some(new Date(taskLaunchTimes.min))
67      } else {
68        None
69      }
70
71    val taskData = if (includeDetails) {
72      Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } )
73    } else {
74      None
75    }
76    val executorSummary = if (includeDetails) {
77      Some(stageUiData.executorSummary.map { case (k, summary) =>
78        k -> new ExecutorStageSummary(
79          taskTime = summary.taskTime,
80          failedTasks = summary.failedTasks,
81          succeededTasks = summary.succeededTasks,
82          inputBytes = summary.inputBytes,
83          outputBytes = summary.outputBytes,
84          shuffleRead = summary.shuffleRead,
85          shuffleWrite = summary.shuffleWrite,
86          memoryBytesSpilled = summary.memoryBytesSpilled,
87          diskBytesSpilled = summary.diskBytesSpilled
88        )
89      })
90    } else {
91      None
92    }
93
94    val accumulableInfo = stageUiData.accumulables.values.map { convertAccumulableInfo }.toSeq
95
96    new StageData(
97      status = status,
98      stageId = stageInfo.stageId,
99      attemptId = stageInfo.attemptId,
100      numActiveTasks = stageUiData.numActiveTasks,
101      numCompleteTasks = stageUiData.numCompleteTasks,
102      numFailedTasks = stageUiData.numFailedTasks,
103      executorRunTime = stageUiData.executorRunTime,
104      executorCpuTime = stageUiData.executorCpuTime,
105      submissionTime = stageInfo.submissionTime.map(new Date(_)),
106      firstTaskLaunchedTime,
107      completionTime = stageInfo.completionTime.map(new Date(_)),
108      inputBytes = stageUiData.inputBytes,
109      inputRecords = stageUiData.inputRecords,
110      outputBytes = stageUiData.outputBytes,
111      outputRecords = stageUiData.outputRecords,
112      shuffleReadBytes = stageUiData.shuffleReadTotalBytes,
113      shuffleReadRecords = stageUiData.shuffleReadRecords,
114      shuffleWriteBytes = stageUiData.shuffleWriteBytes,
115      shuffleWriteRecords = stageUiData.shuffleWriteRecords,
116      memoryBytesSpilled = stageUiData.memoryBytesSpilled,
117      diskBytesSpilled = stageUiData.diskBytesSpilled,
118      schedulingPool = stageUiData.schedulingPool,
119      name = stageInfo.name,
120      details = stageInfo.details,
121      accumulatorUpdates = accumulableInfo,
122      tasks = taskData,
123      executorSummary = executorSummary
124    )
125  }
126
127  def stagesAndStatus(ui: SparkUI): Seq[(StageStatus, Seq[StageInfo])] = {
128    val listener = ui.jobProgressListener
129    listener.synchronized {
130      Seq(
131        StageStatus.ACTIVE -> listener.activeStages.values.toSeq,
132        StageStatus.COMPLETE -> listener.completedStages.reverse.toSeq,
133        StageStatus.FAILED -> listener.failedStages.reverse.toSeq,
134        StageStatus.PENDING -> listener.pendingStages.values.toSeq
135      )
136    }
137  }
138
139  def convertTaskData(uiData: TaskUIData): TaskData = {
140    new TaskData(
141      taskId = uiData.taskInfo.taskId,
142      index = uiData.taskInfo.index,
143      attempt = uiData.taskInfo.attemptNumber,
144      launchTime = new Date(uiData.taskInfo.launchTime),
145      executorId = uiData.taskInfo.executorId,
146      host = uiData.taskInfo.host,
147      taskLocality = uiData.taskInfo.taskLocality.toString(),
148      speculative = uiData.taskInfo.speculative,
149      accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo },
150      errorMessage = uiData.errorMessage,
151      taskMetrics = uiData.metrics.map { convertUiTaskMetrics }
152    )
153  }
154
155  def taskMetricDistributions(
156      allTaskData: Iterable[TaskUIData],
157      quantiles: Array[Double]): TaskMetricDistributions = {
158
159    val rawMetrics = allTaskData.flatMap{_.metrics}.toSeq
160
161    def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] =
162      Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles)
163
164    // We need to do a lot of similar munging to nested metrics here.  For each one,
165    // we want (a) extract the values for nested metrics (b) make a distribution for each metric
166    // (c) shove the distribution into the right field in our return type and (d) only return
167    // a result if the option is defined for any of the tasks.  MetricHelper is a little util
168    // to make it a little easier to deal w/ all of the nested options.  Mostly it lets us just
169    // implement one "build" method, which just builds the quantiles for each field.
170
171    val inputMetrics: InputMetricDistributions =
172      new MetricHelper[InternalInputMetrics, InputMetricDistributions](rawMetrics, quantiles) {
173        def getSubmetrics(raw: InternalTaskMetrics): InternalInputMetrics = raw.inputMetrics
174
175        def build: InputMetricDistributions = new InputMetricDistributions(
176          bytesRead = submetricQuantiles(_.bytesRead),
177          recordsRead = submetricQuantiles(_.recordsRead)
178        )
179      }.build
180
181    val outputMetrics: OutputMetricDistributions =
182      new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) {
183        def getSubmetrics(raw: InternalTaskMetrics): InternalOutputMetrics = raw.outputMetrics
184
185        def build: OutputMetricDistributions = new OutputMetricDistributions(
186          bytesWritten = submetricQuantiles(_.bytesWritten),
187          recordsWritten = submetricQuantiles(_.recordsWritten)
188        )
189      }.build
190
191    val shuffleReadMetrics: ShuffleReadMetricDistributions =
192      new MetricHelper[InternalShuffleReadMetrics, ShuffleReadMetricDistributions](rawMetrics,
193        quantiles) {
194        def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleReadMetrics =
195          raw.shuffleReadMetrics
196
197        def build: ShuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
198          readBytes = submetricQuantiles(_.totalBytesRead),
199          readRecords = submetricQuantiles(_.recordsRead),
200          remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
201          remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
202          localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
203          totalBlocksFetched = submetricQuantiles(_.totalBlocksFetched),
204          fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
205        )
206      }.build
207
208    val shuffleWriteMetrics: ShuffleWriteMetricDistributions =
209      new MetricHelper[InternalShuffleWriteMetrics, ShuffleWriteMetricDistributions](rawMetrics,
210        quantiles) {
211        def getSubmetrics(raw: InternalTaskMetrics): InternalShuffleWriteMetrics =
212          raw.shuffleWriteMetrics
213
214        def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
215          writeBytes = submetricQuantiles(_.bytesWritten),
216          writeRecords = submetricQuantiles(_.recordsWritten),
217          writeTime = submetricQuantiles(_.writeTime)
218        )
219      }.build
220
221    new TaskMetricDistributions(
222      quantiles = quantiles,
223      executorDeserializeTime = metricQuantiles(_.executorDeserializeTime),
224      executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime),
225      executorRunTime = metricQuantiles(_.executorRunTime),
226      executorCpuTime = metricQuantiles(_.executorCpuTime),
227      resultSize = metricQuantiles(_.resultSize),
228      jvmGcTime = metricQuantiles(_.jvmGCTime),
229      resultSerializationTime = metricQuantiles(_.resultSerializationTime),
230      memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled),
231      diskBytesSpilled = metricQuantiles(_.diskBytesSpilled),
232      inputMetrics = inputMetrics,
233      outputMetrics = outputMetrics,
234      shuffleReadMetrics = shuffleReadMetrics,
235      shuffleWriteMetrics = shuffleWriteMetrics
236    )
237  }
238
239  def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = {
240    new AccumulableInfo(
241      acc.id, acc.name.orNull, acc.update.map(_.toString), acc.value.map(_.toString).orNull)
242  }
243
244  def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
245    new TaskMetrics(
246      executorDeserializeTime = internal.executorDeserializeTime,
247      executorDeserializeCpuTime = internal.executorDeserializeCpuTime,
248      executorRunTime = internal.executorRunTime,
249      executorCpuTime = internal.executorCpuTime,
250      resultSize = internal.resultSize,
251      jvmGcTime = internal.jvmGCTime,
252      resultSerializationTime = internal.resultSerializationTime,
253      memoryBytesSpilled = internal.memoryBytesSpilled,
254      diskBytesSpilled = internal.diskBytesSpilled,
255      inputMetrics = convertInputMetrics(internal.inputMetrics),
256      outputMetrics = convertOutputMetrics(internal.outputMetrics),
257      shuffleReadMetrics = convertShuffleReadMetrics(internal.shuffleReadMetrics),
258      shuffleWriteMetrics = convertShuffleWriteMetrics(internal.shuffleWriteMetrics)
259    )
260  }
261
262  def convertInputMetrics(internal: InternalInputMetrics): InputMetrics = {
263    new InputMetrics(
264      bytesRead = internal.bytesRead,
265      recordsRead = internal.recordsRead
266    )
267  }
268
269  def convertOutputMetrics(internal: InternalOutputMetrics): OutputMetrics = {
270    new OutputMetrics(
271      bytesWritten = internal.bytesWritten,
272      recordsWritten = internal.recordsWritten
273    )
274  }
275
276  def convertShuffleReadMetrics(internal: InternalShuffleReadMetrics): ShuffleReadMetrics = {
277    new ShuffleReadMetrics(
278      remoteBlocksFetched = internal.remoteBlocksFetched,
279      localBlocksFetched = internal.localBlocksFetched,
280      fetchWaitTime = internal.fetchWaitTime,
281      remoteBytesRead = internal.remoteBytesRead,
282      localBytesRead = internal.localBytesRead,
283      recordsRead = internal.recordsRead
284    )
285  }
286
287  def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
288    new ShuffleWriteMetrics(
289      bytesWritten = internal.bytesWritten,
290      writeTime = internal.writeTime,
291      recordsWritten = internal.recordsWritten
292    )
293  }
294}
295
296/**
297 * Helper for getting distributions from nested metric types.
298 */
299private[v1] abstract class MetricHelper[I, O](
300    rawMetrics: Seq[InternalTaskMetrics],
301    quantiles: Array[Double]) {
302
303  def getSubmetrics(raw: InternalTaskMetrics): I
304
305  def build: O
306
307  val data: Seq[I] = rawMetrics.map(getSubmetrics)
308
309  /** applies the given function to all input metrics, and returns the quantiles */
310  def submetricQuantiles(f: I => Double): IndexedSeq[Double] = {
311    Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles)
312  }
313}
314