1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.ui.jobs
19
20import java.net.URLEncoder
21import java.util.Date
22import javax.servlet.http.HttpServletRequest
23
24import scala.collection.JavaConverters._
25import scala.xml._
26
27import org.apache.commons.lang3.StringEscapeUtils
28
29import org.apache.spark.scheduler.StageInfo
30import org.apache.spark.ui._
31import org.apache.spark.ui.jobs.UIData.StageUIData
32import org.apache.spark.util.Utils
33
34private[ui] class StageTableBase(
35    request: HttpServletRequest,
36    stages: Seq[StageInfo],
37    tableHeaderID: String,
38    stageTag: String,
39    basePath: String,
40    subPath: String,
41    progressListener: JobProgressListener,
42    isFairScheduler: Boolean,
43    killEnabled: Boolean,
44    isFailedStage: Boolean) {
45  val allParameters = request.getParameterMap().asScala.toMap
46  val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag))
47    .map(para => para._1 + "=" + para._2(0))
48
49  val parameterStagePage = request.getParameter(stageTag + ".page")
50  val parameterStageSortColumn = request.getParameter(stageTag + ".sort")
51  val parameterStageSortDesc = request.getParameter(stageTag + ".desc")
52  val parameterStagePageSize = request.getParameter(stageTag + ".pageSize")
53  val parameterStagePrevPageSize = request.getParameter(stageTag + ".prevPageSize")
54
55  val stagePage = Option(parameterStagePage).map(_.toInt).getOrElse(1)
56  val stageSortColumn = Option(parameterStageSortColumn).map { sortColumn =>
57    UIUtils.decodeURLParameter(sortColumn)
58  }.getOrElse("Stage Id")
59  val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse(
60    // New stages should be shown above old jobs by default.
61    if (stageSortColumn == "Stage Id") true else false
62  )
63  val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100)
64  val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt)
65    .getOrElse(stagePageSize)
66
67  val page: Int = {
68    // If the user has changed to a larger page size, then go to page 1 in order to avoid
69    // IndexOutOfBoundsException.
70    if (stagePageSize <= stagePrevPageSize) {
71      stagePage
72    } else {
73      1
74    }
75  }
76  val currentTime = System.currentTimeMillis()
77
78  val toNodeSeq = try {
79    new StagePagedTable(
80      stages,
81      tableHeaderID,
82      stageTag,
83      basePath,
84      subPath,
85      progressListener,
86      isFairScheduler,
87      killEnabled,
88      currentTime,
89      stagePageSize,
90      stageSortColumn,
91      stageSortDesc,
92      isFailedStage,
93      parameterOtherTable
94    ).table(page)
95  } catch {
96    case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
97      <div class="alert alert-error">
98        <p>Error while rendering stage table:</p>
99        <pre>
100          {Utils.exceptionString(e)}
101        </pre>
102      </div>
103  }
104}
105
106private[ui] class StageTableRowData(
107    val stageInfo: StageInfo,
108    val stageData: Option[StageUIData],
109    val stageId: Int,
110    val attemptId: Int,
111    val schedulingPool: String,
112    val descriptionOption: Option[String],
113    val submissionTime: Long,
114    val formattedSubmissionTime: String,
115    val duration: Long,
116    val formattedDuration: String,
117    val inputRead: Long,
118    val inputReadWithUnit: String,
119    val outputWrite: Long,
120    val outputWriteWithUnit: String,
121    val shuffleRead: Long,
122    val shuffleReadWithUnit: String,
123    val shuffleWrite: Long,
124    val shuffleWriteWithUnit: String)
125
126private[ui] class MissingStageTableRowData(
127    stageInfo: StageInfo,
128    stageId: Int,
129    attemptId: Int) extends StageTableRowData(
130  stageInfo, None, stageId, attemptId, "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "")
131
132/** Page showing list of all ongoing and recently finished stages */
133private[ui] class StagePagedTable(
134    stages: Seq[StageInfo],
135    tableHeaderId: String,
136    stageTag: String,
137    basePath: String,
138    subPath: String,
139    listener: JobProgressListener,
140    isFairScheduler: Boolean,
141    killEnabled: Boolean,
142    currentTime: Long,
143    pageSize: Int,
144    sortColumn: String,
145    desc: Boolean,
146    isFailedStage: Boolean,
147    parameterOtherTable: Iterable[String]) extends PagedTable[StageTableRowData] {
148
149  override def tableId: String = stageTag + "-table"
150
151  override def tableCssClass: String =
152    "table table-bordered table-condensed table-striped " +
153      "table-head-clickable table-cell-width-limited"
154
155  override def pageSizeFormField: String = stageTag + ".pageSize"
156
157  override def prevPageSizeFormField: String = stageTag + ".prevPageSize"
158
159  override def pageNumberFormField: String = stageTag + ".page"
160
161  val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
162    parameterOtherTable.mkString("&")
163
164  override val dataSource = new StageDataSource(
165    stages,
166    listener,
167    currentTime,
168    pageSize,
169    sortColumn,
170    desc
171  )
172
173  override def pageLink(page: Int): String = {
174    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
175    parameterPath +
176      s"&$pageNumberFormField=$page" +
177      s"&$stageTag.sort=$encodedSortColumn" +
178      s"&$stageTag.desc=$desc" +
179      s"&$pageSizeFormField=$pageSize" +
180      s"#$tableHeaderId"
181  }
182
183  override def goButtonFormPath: String = {
184    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
185    s"$parameterPath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc#$tableHeaderId"
186  }
187
188  override def headers: Seq[Node] = {
189    // stageHeadersAndCssClasses has three parts: header title, tooltip information, and sortable.
190    // The tooltip information could be None, which indicates it does not have a tooltip.
191    // Otherwise, it has two parts: tooltip text, and position (true for left, false for default).
192    val stageHeadersAndCssClasses: Seq[(String, Option[(String, Boolean)], Boolean)] =
193      Seq(("Stage Id", None, true)) ++
194      {if (isFairScheduler) {Seq(("Pool Name", None, true))} else Seq.empty} ++
195      Seq(
196        ("Description", None, true), ("Submitted", None, true), ("Duration", None, true),
197        ("Tasks: Succeeded/Total", None, false),
198        ("Input", Some((ToolTips.INPUT, false)), true),
199        ("Output", Some((ToolTips.OUTPUT, false)), true),
200        ("Shuffle Read", Some((ToolTips.SHUFFLE_READ, false)), true),
201        ("Shuffle Write", Some((ToolTips.SHUFFLE_WRITE, true)), true)
202      ) ++
203      {if (isFailedStage) {Seq(("Failure Reason", None, false))} else Seq.empty}
204
205    if (!stageHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
206      throw new IllegalArgumentException(s"Unknown column: $sortColumn")
207    }
208
209    val headerRow: Seq[Node] = {
210      stageHeadersAndCssClasses.map { case (header, tooltip, sortable) =>
211        val headerSpan = tooltip.map { case (title, left) =>
212          if (left) {
213            /* Place the shuffle write tooltip on the left (rather than the default position
214            of on top) because the shuffle write column is the last column on the right side and
215            the tooltip is wider than the column, so it doesn't fit on top. */
216            <span data-toggle="tooltip" data-placement="left" title={title}>
217              {header}
218            </span>
219          } else {
220            <span data-toggle="tooltip" title={title}>
221              {header}
222            </span>
223          }
224        }.getOrElse(
225          {header}
226        )
227
228        if (header == sortColumn) {
229          val headerLink = Unparsed(
230            parameterPath +
231              s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
232              s"&$stageTag.desc=${!desc}" +
233              s"&$stageTag.pageSize=$pageSize") +
234              s"#$tableHeaderId"
235          val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
236
237          <th>
238            <a href={headerLink}>
239              {headerSpan}<span>
240              &nbsp;{Unparsed(arrow)}
241            </span>
242            </a>
243          </th>
244        } else {
245          if (sortable) {
246            val headerLink = Unparsed(
247              parameterPath +
248                s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
249                s"&$stageTag.pageSize=$pageSize") +
250                s"#$tableHeaderId"
251
252            <th>
253              <a href={headerLink}>
254                {headerSpan}
255              </a>
256            </th>
257          } else {
258            <th>
259              {headerSpan}
260            </th>
261          }
262        }
263      }
264    }
265    <thead>{headerRow}</thead>
266  }
267
268  override def row(data: StageTableRowData): Seq[Node] = {
269    <tr id={"stage-" + data.stageId + "-" + data.attemptId}>
270      {rowContent(data)}
271    </tr>
272  }
273
274  private def rowContent(data: StageTableRowData): Seq[Node] = {
275    data.stageData match {
276      case None => missingStageRow(data.stageId)
277      case Some(stageData) =>
278        val info = data.stageInfo
279
280        {if (data.attemptId > 0) {
281          <td>{data.stageId} (retry {data.attemptId})</td>
282        } else {
283          <td>{data.stageId}</td>
284        }} ++
285        {if (isFairScheduler) {
286          <td>
287            <a href={"%s/stages/pool?poolname=%s"
288              .format(UIUtils.prependBaseUri(basePath), data.schedulingPool)}>
289              {data.schedulingPool}
290            </a>
291          </td>
292        } else {
293          Seq.empty
294        }} ++
295        <td>{makeDescription(info, data.descriptionOption)}</td>
296        <td valign="middle">
297          {data.formattedSubmissionTime}
298        </td>
299        <td>{data.formattedDuration}</td>
300        <td class="progress-cell">
301          {UIUtils.makeProgressBar(started = stageData.numActiveTasks,
302          completed = stageData.completedIndices.size, failed = stageData.numFailedTasks,
303          skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)}
304        </td>
305        <td>{data.inputReadWithUnit}</td>
306        <td>{data.outputWriteWithUnit}</td>
307        <td>{data.shuffleReadWithUnit}</td>
308        <td>{data.shuffleWriteWithUnit}</td> ++
309        {
310          if (isFailedStage) {
311            failureReasonHtml(info)
312          } else {
313            Seq.empty
314          }
315        }
316    }
317  }
318
319  private def failureReasonHtml(s: StageInfo): Seq[Node] = {
320    val failureReason = s.failureReason.getOrElse("")
321    val isMultiline = failureReason.indexOf('\n') >= 0
322    // Display the first line by default
323    val failureReasonSummary = StringEscapeUtils.escapeHtml4(
324      if (isMultiline) {
325        failureReason.substring(0, failureReason.indexOf('\n'))
326      } else {
327        failureReason
328      })
329    val details = if (isMultiline) {
330      // scalastyle:off
331      <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
332            class="expand-details">
333        +details
334      </span> ++
335        <div class="stacktrace-details collapsed">
336          <pre>{failureReason}</pre>
337        </div>
338      // scalastyle:on
339    } else {
340      ""
341    }
342    <td valign="middle">{failureReasonSummary}{details}</td>
343  }
344
345  private def makeDescription(s: StageInfo, descriptionOption: Option[String]): Seq[Node] = {
346    val basePathUri = UIUtils.prependBaseUri(basePath)
347
348    val killLink = if (killEnabled) {
349      val confirm =
350        s"if (window.confirm('Are you sure you want to kill stage ${s.stageId} ?')) " +
351        "{ this.parentNode.submit(); return true; } else { return false; }"
352      // SPARK-6846 this should be POST-only but YARN AM won't proxy POST
353      /*
354      val killLinkUri = s"$basePathUri/stages/stage/kill/"
355      <form action={killLinkUri} method="POST" style="display:inline">
356        <input type="hidden" name="id" value={s.stageId.toString}/>
357        <a href="#" onclick={confirm} class="kill-link">(kill)</a>
358      </form>
359       */
360      val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}"
361      <a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a>
362    } else {
363      Seq.empty
364    }
365
366    val nameLinkUri = s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}"
367    val nameLink = <a href={nameLinkUri} class="name-link">{s.name}</a>
368
369    val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0)
370    val details = if (s.details.nonEmpty) {
371      <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
372            class="expand-details">
373        +details
374      </span> ++
375      <div class="stage-details collapsed">
376        {if (cachedRddInfos.nonEmpty) {
377          Text("RDD: ") ++
378          cachedRddInfos.map { i =>
379            <a href={s"$basePathUri/storage/rdd?id=${i.id}"}>{i.name}</a>
380          }
381        }}
382        <pre>{s.details}</pre>
383      </div>
384    }
385
386    val stageDesc = descriptionOption.map(UIUtils.makeDescription(_, basePathUri))
387    <div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div>
388  }
389
390  protected def missingStageRow(stageId: Int): Seq[Node] = {
391    <td>{stageId}</td> ++
392    {if (isFairScheduler) {<td>-</td>} else Seq.empty} ++
393    <td>No data available for this stage</td> ++ // Description
394    <td></td> ++ // Submitted
395    <td></td> ++ // Duration
396    <td></td> ++ // Tasks: Succeeded/Total
397    <td></td> ++ // Input
398    <td></td> ++ // Output
399    <td></td> ++ // Shuffle Read
400    <td></td> // Shuffle Write
401  }
402}
403
404private[ui] class StageDataSource(
405    stages: Seq[StageInfo],
406    listener: JobProgressListener,
407    currentTime: Long,
408    pageSize: Int,
409    sortColumn: String,
410    desc: Boolean) extends PagedDataSource[StageTableRowData](pageSize) {
411  // Convert StageInfo to StageTableRowData which contains the final contents to show in the table
412  // so that we can avoid creating duplicate contents during sorting the data
413  private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc))
414
415  private var _slicedStageIds: Set[Int] = null
416
417  override def dataSize: Int = data.size
418
419  override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = {
420    val r = data.slice(from, to)
421    _slicedStageIds = r.map(_.stageId).toSet
422    r
423  }
424
425  private def stageRow(s: StageInfo): StageTableRowData = {
426    val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId))
427
428    if (stageDataOption.isEmpty) {
429      return new MissingStageTableRowData(s, s.stageId, s.attemptId)
430    }
431    val stageData = stageDataOption.get
432
433    val description = stageData.description
434
435    val formattedSubmissionTime = s.submissionTime match {
436      case Some(t) => UIUtils.formatDate(new Date(t))
437      case None => "Unknown"
438    }
439    val finishTime = s.completionTime.getOrElse(currentTime)
440
441    // The submission time for a stage is misleading because it counts the time
442    // the stage waits to be launched. (SPARK-10930)
443    val taskLaunchTimes =
444      stageData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0)
445    val duration: Option[Long] =
446      if (taskLaunchTimes.nonEmpty) {
447        val startTime = taskLaunchTimes.min
448        if (finishTime > startTime) {
449          Some(finishTime - startTime)
450        } else {
451          Some(currentTime - startTime)
452        }
453      } else {
454        None
455      }
456    val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
457
458    val inputRead = stageData.inputBytes
459    val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
460    val outputWrite = stageData.outputBytes
461    val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else ""
462    val shuffleRead = stageData.shuffleReadTotalBytes
463    val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
464    val shuffleWrite = stageData.shuffleWriteBytes
465    val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
466
467
468    new StageTableRowData(
469      s,
470      stageDataOption,
471      s.stageId,
472      s.attemptId,
473      stageData.schedulingPool,
474      description,
475      s.submissionTime.getOrElse(0),
476      formattedSubmissionTime,
477      duration.getOrElse(-1),
478      formattedDuration,
479      inputRead,
480      inputReadWithUnit,
481      outputWrite,
482      outputWriteWithUnit,
483      shuffleRead,
484      shuffleReadWithUnit,
485      shuffleWrite,
486      shuffleWriteWithUnit
487    )
488  }
489
490  /**
491   * Return Ordering according to sortColumn and desc
492   */
493  private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = {
494    val ordering: Ordering[StageTableRowData] = sortColumn match {
495      case "Stage Id" => Ordering.by(_.stageId)
496      case "Pool Name" => Ordering.by(_.schedulingPool)
497      case "Description" => Ordering.by(x => (x.descriptionOption, x.stageInfo.name))
498      case "Submitted" => Ordering.by(_.submissionTime)
499      case "Duration" => Ordering.by(_.duration)
500      case "Input" => Ordering.by(_.inputRead)
501      case "Output" => Ordering.by(_.outputWrite)
502      case "Shuffle Read" => Ordering.by(_.shuffleRead)
503      case "Shuffle Write" => Ordering.by(_.shuffleWrite)
504      case "Tasks: Succeeded/Total" =>
505        throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
506      case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
507    }
508    if (desc) {
509      ordering.reverse
510    } else {
511      ordering
512    }
513  }
514}
515
516