1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.ui.jobs
19
20import java.net.URLEncoder
21import java.util.Date
22import javax.servlet.http.HttpServletRequest
23
24import scala.collection.mutable.HashSet
25import scala.xml.{Elem, Node, Unparsed}
26
27import org.apache.commons.lang3.StringEscapeUtils
28
29import org.apache.spark.SparkConf
30import org.apache.spark.executor.TaskMetrics
31import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality}
32import org.apache.spark.ui._
33import org.apache.spark.ui.exec.ExecutorsListener
34import org.apache.spark.ui.jobs.UIData._
35import org.apache.spark.util.{Distribution, Utils}
36
37/** Page showing statistics and task list for a given stage */
38private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
39  import StagePage._
40
41  private val progressListener = parent.progressListener
42  private val operationGraphListener = parent.operationGraphListener
43  private val executorsListener = parent.executorsListener
44
45  private val TIMELINE_LEGEND = {
46    <div class="legend-area">
47      <svg>
48        {
49          val legendPairs = List(("scheduler-delay-proportion", "Scheduler Delay"),
50            ("deserialization-time-proportion", "Task Deserialization Time"),
51            ("shuffle-read-time-proportion", "Shuffle Read Time"),
52            ("executor-runtime-proportion", "Executor Computing Time"),
53            ("shuffle-write-time-proportion", "Shuffle Write Time"),
54            ("serialization-time-proportion", "Result Serialization Time"),
55            ("getting-result-time-proportion", "Getting Result Time"))
56
57          legendPairs.zipWithIndex.map {
58            case ((classAttr, name), index) =>
59              <rect x={5 + (index / 3) * 210 + "px"} y={10 + (index % 3) * 15 + "px"}
60                width="10px" height="10px" class={classAttr}></rect>
61                <text x={25 + (index / 3) * 210 + "px"}
62                  y={20 + (index % 3) * 15 + "px"}>{name}</text>
63          }
64        }
65      </svg>
66    </div>
67  }
68
69  // TODO: We should consider increasing the number of this parameter over time
70  // if we find that it's okay.
71  private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
72
73  private def getLocalitySummaryString(stageData: StageUIData): String = {
74    val localities = stageData.taskData.values.map(_.taskInfo.taskLocality)
75    val localityCounts = localities.groupBy(identity).mapValues(_.size)
76    val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) =>
77      val localityName = locality match {
78        case TaskLocality.PROCESS_LOCAL => "Process local"
79        case TaskLocality.NODE_LOCAL => "Node local"
80        case TaskLocality.RACK_LOCAL => "Rack local"
81        case TaskLocality.ANY => "Any"
82      }
83      s"$localityName: $count"
84    }
85    localityNamesAndCounts.sorted.mkString("; ")
86  }
87
88  def render(request: HttpServletRequest): Seq[Node] = {
89    progressListener.synchronized {
90      val parameterId = request.getParameter("id")
91      require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
92
93      val parameterAttempt = request.getParameter("attempt")
94      require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter")
95
96      val parameterTaskPage = request.getParameter("task.page")
97      val parameterTaskSortColumn = request.getParameter("task.sort")
98      val parameterTaskSortDesc = request.getParameter("task.desc")
99      val parameterTaskPageSize = request.getParameter("task.pageSize")
100      val parameterTaskPrevPageSize = request.getParameter("task.prevPageSize")
101
102      val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1)
103      val taskSortColumn = Option(parameterTaskSortColumn).map { sortColumn =>
104        UIUtils.decodeURLParameter(sortColumn)
105      }.getOrElse("Index")
106      val taskSortDesc = Option(parameterTaskSortDesc).map(_.toBoolean).getOrElse(false)
107      val taskPageSize = Option(parameterTaskPageSize).map(_.toInt).getOrElse(100)
108      val taskPrevPageSize = Option(parameterTaskPrevPageSize).map(_.toInt).getOrElse(taskPageSize)
109
110      val stageId = parameterId.toInt
111      val stageAttemptId = parameterAttempt.toInt
112      val stageDataOption = progressListener.stageIdToData.get((stageId, stageAttemptId))
113
114      val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)"
115      if (stageDataOption.isEmpty) {
116        val content =
117          <div id="no-info">
118            <p>No information to display for Stage {stageId} (Attempt {stageAttemptId})</p>
119          </div>
120        return UIUtils.headerSparkPage(stageHeader, content, parent)
121
122      }
123      if (stageDataOption.get.taskData.isEmpty) {
124        val content =
125          <div>
126            <h4>Summary Metrics</h4> No tasks have started yet
127            <h4>Tasks</h4> No tasks have started yet
128          </div>
129        return UIUtils.headerSparkPage(stageHeader, content, parent)
130      }
131
132      val stageData = stageDataOption.get
133      val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
134      val numCompleted = stageData.numCompleteTasks
135      val totalTasks = stageData.numActiveTasks +
136        stageData.numCompleteTasks + stageData.numFailedTasks
137      val totalTasksNumStr = if (totalTasks == tasks.size) {
138        s"$totalTasks"
139      } else {
140        s"$totalTasks, showing ${tasks.size}"
141      }
142
143      val allAccumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables
144      val externalAccumulables = allAccumulables.values.filter { acc => !acc.internal }
145      val hasAccumulators = externalAccumulables.size > 0
146
147      val summary =
148        <div>
149          <ul class="unstyled">
150            <li>
151              <strong>Total Time Across All Tasks: </strong>
152              {UIUtils.formatDuration(stageData.executorRunTime)}
153            </li>
154            <li>
155              <strong>Locality Level Summary: </strong>
156              {getLocalitySummaryString(stageData)}
157            </li>
158            {if (stageData.hasInput) {
159              <li>
160                <strong>Input Size / Records: </strong>
161                {s"${Utils.bytesToString(stageData.inputBytes)} / ${stageData.inputRecords}"}
162              </li>
163            }}
164            {if (stageData.hasOutput) {
165              <li>
166                <strong>Output: </strong>
167                {s"${Utils.bytesToString(stageData.outputBytes)} / ${stageData.outputRecords}"}
168              </li>
169            }}
170            {if (stageData.hasShuffleRead) {
171              <li>
172                <strong>Shuffle Read: </strong>
173                {s"${Utils.bytesToString(stageData.shuffleReadTotalBytes)} / " +
174                 s"${stageData.shuffleReadRecords}"}
175              </li>
176            }}
177            {if (stageData.hasShuffleWrite) {
178              <li>
179                <strong>Shuffle Write: </strong>
180                 {s"${Utils.bytesToString(stageData.shuffleWriteBytes)} / " +
181                 s"${stageData.shuffleWriteRecords}"}
182              </li>
183            }}
184            {if (stageData.hasBytesSpilled) {
185              <li>
186                <strong>Shuffle Spill (Memory): </strong>
187                {Utils.bytesToString(stageData.memoryBytesSpilled)}
188              </li>
189              <li>
190                <strong>Shuffle Spill (Disk): </strong>
191                {Utils.bytesToString(stageData.diskBytesSpilled)}
192              </li>
193            }}
194          </ul>
195        </div>
196
197      val showAdditionalMetrics =
198        <div>
199          <span class="expand-additional-metrics">
200            <span class="expand-additional-metrics-arrow arrow-closed"></span>
201            <a>Show Additional Metrics</a>
202          </span>
203          <div class="additional-metrics collapsed">
204            <ul>
205              <li>
206                  <input type="checkbox" id="select-all-metrics"/>
207                  <span class="additional-metric-title"><em>(De)select All</em></span>
208              </li>
209              <li>
210                <span data-toggle="tooltip"
211                      title={ToolTips.SCHEDULER_DELAY} data-placement="right">
212                  <input type="checkbox" name={TaskDetailsClassNames.SCHEDULER_DELAY}/>
213                  <span class="additional-metric-title">Scheduler Delay</span>
214                </span>
215              </li>
216              <li>
217                <span data-toggle="tooltip"
218                      title={ToolTips.TASK_DESERIALIZATION_TIME} data-placement="right">
219                  <input type="checkbox" name={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}/>
220                  <span class="additional-metric-title">Task Deserialization Time</span>
221                </span>
222              </li>
223              {if (stageData.hasShuffleRead) {
224                <li>
225                  <span data-toggle="tooltip"
226                        title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right">
227                    <input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}/>
228                    <span class="additional-metric-title">Shuffle Read Blocked Time</span>
229                  </span>
230                </li>
231                <li>
232                  <span data-toggle="tooltip"
233                        title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
234                    <input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}/>
235                    <span class="additional-metric-title">Shuffle Remote Reads</span>
236                  </span>
237                </li>
238              }}
239              <li>
240                <span data-toggle="tooltip"
241                      title={ToolTips.RESULT_SERIALIZATION_TIME} data-placement="right">
242                  <input type="checkbox" name={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}/>
243                  <span class="additional-metric-title">Result Serialization Time</span>
244                </span>
245              </li>
246              <li>
247                <span data-toggle="tooltip"
248                      title={ToolTips.GETTING_RESULT_TIME} data-placement="right">
249                  <input type="checkbox" name={TaskDetailsClassNames.GETTING_RESULT_TIME}/>
250                  <span class="additional-metric-title">Getting Result Time</span>
251                </span>
252              </li>
253              <li>
254                <span data-toggle="tooltip"
255                      title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
256                  <input type="checkbox" name={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}/>
257                  <span class="additional-metric-title">Peak Execution Memory</span>
258                </span>
259              </li>
260            </ul>
261          </div>
262        </div>
263
264      val dagViz = UIUtils.showDagVizForStage(
265        stageId, operationGraphListener.getOperationGraphForStage(stageId))
266
267      val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
268      def accumulableRow(acc: AccumulableInfo): Seq[Node] = {
269        (acc.name, acc.value) match {
270          case (Some(name), Some(value)) => <tr><td>{name}</td><td>{value}</td></tr>
271          case _ => Seq.empty[Node]
272        }
273      }
274      val accumulableTable = UIUtils.listingTable(
275        accumulableHeaders,
276        accumulableRow,
277        externalAccumulables.toSeq)
278
279      val page: Int = {
280        // If the user has changed to a larger page size, then go to page 1 in order to avoid
281        // IndexOutOfBoundsException.
282        if (taskPageSize <= taskPrevPageSize) {
283          taskPage
284        } else {
285          1
286        }
287      }
288      val currentTime = System.currentTimeMillis()
289      val (taskTable, taskTableHTML) = try {
290        val _taskTable = new TaskPagedTable(
291          parent.conf,
292          UIUtils.prependBaseUri(parent.basePath) +
293            s"/stages/stage?id=${stageId}&attempt=${stageAttemptId}",
294          tasks,
295          hasAccumulators,
296          stageData.hasInput,
297          stageData.hasOutput,
298          stageData.hasShuffleRead,
299          stageData.hasShuffleWrite,
300          stageData.hasBytesSpilled,
301          currentTime,
302          pageSize = taskPageSize,
303          sortColumn = taskSortColumn,
304          desc = taskSortDesc,
305          executorsListener = executorsListener
306        )
307        (_taskTable, _taskTable.table(page))
308      } catch {
309        case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
310          val errorMessage =
311            <div class="alert alert-error">
312              <p>Error while rendering stage table:</p>
313              <pre>
314                {Utils.exceptionString(e)}
315              </pre>
316            </div>
317          (null, errorMessage)
318      }
319
320      val jsForScrollingDownToTaskTable =
321        <script>
322          {Unparsed {
323            """
324              |$(function() {
325              |  if (/.*&task.sort=.*$/.test(location.search)) {
326              |    var topOffset = $("#tasks-section").offset().top;
327              |    $("html,body").animate({scrollTop: topOffset}, 200);
328              |  }
329              |});
330            """.stripMargin
331           }
332          }
333        </script>
334
335      val taskIdsInPage = if (taskTable == null) Set.empty[Long]
336        else taskTable.dataSource.slicedTaskIds
337
338      // Excludes tasks which failed and have incomplete metrics
339      val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.metrics.isDefined)
340
341      val summaryTable: Option[Seq[Node]] =
342        if (validTasks.size == 0) {
343          None
344        }
345        else {
346          def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] =
347            Distribution(data).get.getQuantiles()
348          def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = {
349            getDistributionQuantiles(times).map { millis =>
350              <td>{UIUtils.formatDuration(millis.toLong)}</td>
351            }
352          }
353          def getFormattedSizeQuantiles(data: Seq[Double]): Seq[Elem] = {
354            getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>)
355          }
356
357          val deserializationTimes = validTasks.map { taskUIData: TaskUIData =>
358            taskUIData.metrics.get.executorDeserializeTime.toDouble
359          }
360          val deserializationQuantiles =
361            <td>
362              <span data-toggle="tooltip" title={ToolTips.TASK_DESERIALIZATION_TIME}
363                    data-placement="right">
364                Task Deserialization Time
365              </span>
366            </td> +: getFormattedTimeQuantiles(deserializationTimes)
367
368          val serviceTimes = validTasks.map { taskUIData: TaskUIData =>
369            taskUIData.metrics.get.executorRunTime.toDouble
370          }
371          val serviceQuantiles = <td>Duration</td> +: getFormattedTimeQuantiles(serviceTimes)
372
373          val gcTimes = validTasks.map { taskUIData: TaskUIData =>
374            taskUIData.metrics.get.jvmGCTime.toDouble
375          }
376          val gcQuantiles =
377            <td>
378              <span data-toggle="tooltip"
379                  title={ToolTips.GC_TIME} data-placement="right">GC Time
380              </span>
381            </td> +: getFormattedTimeQuantiles(gcTimes)
382
383          val serializationTimes = validTasks.map { taskUIData: TaskUIData =>
384            taskUIData.metrics.get.resultSerializationTime.toDouble
385          }
386          val serializationQuantiles =
387            <td>
388              <span data-toggle="tooltip"
389                    title={ToolTips.RESULT_SERIALIZATION_TIME} data-placement="right">
390                Result Serialization Time
391              </span>
392            </td> +: getFormattedTimeQuantiles(serializationTimes)
393
394          val gettingResultTimes = validTasks.map { taskUIData: TaskUIData =>
395            getGettingResultTime(taskUIData.taskInfo, currentTime).toDouble
396          }
397          val gettingResultQuantiles =
398            <td>
399              <span data-toggle="tooltip"
400                  title={ToolTips.GETTING_RESULT_TIME} data-placement="right">
401                Getting Result Time
402              </span>
403            </td> +:
404            getFormattedTimeQuantiles(gettingResultTimes)
405
406          val peakExecutionMemory = validTasks.map { taskUIData: TaskUIData =>
407            taskUIData.metrics.get.peakExecutionMemory.toDouble
408          }
409          val peakExecutionMemoryQuantiles = {
410            <td>
411              <span data-toggle="tooltip"
412                    title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
413                Peak Execution Memory
414              </span>
415            </td> +: getFormattedSizeQuantiles(peakExecutionMemory)
416          }
417
418          // The scheduler delay includes the network delay to send the task to the worker
419          // machine and to send back the result (but not the time to fetch the task result,
420          // if it needed to be fetched from the block manager on the worker).
421          val schedulerDelays = validTasks.map { taskUIData: TaskUIData =>
422            getSchedulerDelay(taskUIData.taskInfo, taskUIData.metrics.get, currentTime).toDouble
423          }
424          val schedulerDelayTitle = <td><span data-toggle="tooltip"
425            title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td>
426          val schedulerDelayQuantiles = schedulerDelayTitle +:
427            getFormattedTimeQuantiles(schedulerDelays)
428          def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double])
429            : Seq[Elem] = {
430            val recordDist = getDistributionQuantiles(records).iterator
431            getDistributionQuantiles(data).map(d =>
432              <td>{s"${Utils.bytesToString(d.toLong)} / ${recordDist.next().toLong}"}</td>
433            )
434          }
435
436          val inputSizes = validTasks.map { taskUIData: TaskUIData =>
437            taskUIData.metrics.get.inputMetrics.bytesRead.toDouble
438          }
439
440          val inputRecords = validTasks.map { taskUIData: TaskUIData =>
441            taskUIData.metrics.get.inputMetrics.recordsRead.toDouble
442          }
443
444          val inputQuantiles = <td>Input Size / Records</td> +:
445            getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
446
447          val outputSizes = validTasks.map { taskUIData: TaskUIData =>
448            taskUIData.metrics.get.outputMetrics.bytesWritten.toDouble
449          }
450
451          val outputRecords = validTasks.map { taskUIData: TaskUIData =>
452            taskUIData.metrics.get.outputMetrics.recordsWritten.toDouble
453          }
454
455          val outputQuantiles = <td>Output Size / Records</td> +:
456            getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
457
458          val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData =>
459            taskUIData.metrics.get.shuffleReadMetrics.fetchWaitTime.toDouble
460          }
461          val shuffleReadBlockedQuantiles =
462            <td>
463              <span data-toggle="tooltip"
464                    title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right">
465                Shuffle Read Blocked Time
466              </span>
467            </td> +:
468            getFormattedTimeQuantiles(shuffleReadBlockedTimes)
469
470          val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData =>
471            taskUIData.metrics.get.shuffleReadMetrics.totalBytesRead.toDouble
472          }
473          val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData =>
474            taskUIData.metrics.get.shuffleReadMetrics.recordsRead.toDouble
475          }
476          val shuffleReadTotalQuantiles =
477            <td>
478              <span data-toggle="tooltip"
479                    title={ToolTips.SHUFFLE_READ} data-placement="right">
480                Shuffle Read Size / Records
481              </span>
482            </td> +:
483            getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)
484
485          val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData =>
486            taskUIData.metrics.get.shuffleReadMetrics.remoteBytesRead.toDouble
487          }
488          val shuffleReadRemoteQuantiles =
489            <td>
490              <span data-toggle="tooltip"
491                    title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right">
492                Shuffle Remote Reads
493              </span>
494            </td> +:
495            getFormattedSizeQuantiles(shuffleReadRemoteSizes)
496
497          val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData =>
498            taskUIData.metrics.get.shuffleWriteMetrics.bytesWritten.toDouble
499          }
500
501          val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData =>
502            taskUIData.metrics.get.shuffleWriteMetrics.recordsWritten.toDouble
503          }
504
505          val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
506            getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords)
507
508          val memoryBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
509            taskUIData.metrics.get.memoryBytesSpilled.toDouble
510          }
511          val memoryBytesSpilledQuantiles = <td>Shuffle spill (memory)</td> +:
512            getFormattedSizeQuantiles(memoryBytesSpilledSizes)
513
514          val diskBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
515            taskUIData.metrics.get.diskBytesSpilled.toDouble
516          }
517          val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +:
518            getFormattedSizeQuantiles(diskBytesSpilledSizes)
519
520          val listings: Seq[Seq[Node]] = Seq(
521            <tr>{serviceQuantiles}</tr>,
522            <tr class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>,
523            <tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
524              {deserializationQuantiles}
525            </tr>
526            <tr>{gcQuantiles}</tr>,
527            <tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
528              {serializationQuantiles}
529            </tr>,
530            <tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
531            <tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
532              {peakExecutionMemoryQuantiles}
533            </tr>,
534            if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil,
535            if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil,
536            if (stageData.hasShuffleRead) {
537              <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
538                {shuffleReadBlockedQuantiles}
539              </tr>
540              <tr>{shuffleReadTotalQuantiles}</tr>
541              <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
542                {shuffleReadRemoteQuantiles}
543              </tr>
544            } else {
545              Nil
546            },
547            if (stageData.hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil,
548            if (stageData.hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil,
549            if (stageData.hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil)
550
551          val quantileHeaders = Seq("Metric", "Min", "25th percentile",
552            "Median", "75th percentile", "Max")
553          // The summary table does not use CSS to stripe rows, which doesn't work with hidden
554          // rows (instead, JavaScript in table.js is used to stripe the non-hidden rows).
555          Some(UIUtils.listingTable(
556            quantileHeaders,
557            identity[Seq[Node]],
558            listings,
559            fixedWidth = true,
560            id = Some("task-summary-table"),
561            stripeRowsWithCss = false))
562        }
563
564      val executorTable = new ExecutorTable(stageId, stageAttemptId, parent)
565
566      val maybeAccumulableTable: Seq[Node] =
567        if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
568
569      val aggMetrics =
570        <span class="collapse-aggregated-metrics collapse-table"
571              onClick="collapseTable('collapse-aggregated-metrics','aggregated-metrics')">
572          <h4>
573            <span class="collapse-table-arrow arrow-open"></span>
574            <a>Aggregated Metrics by Executor</a>
575          </h4>
576        </span>
577        <div class="aggregated-metrics collapsible-table">
578          {executorTable.toNodeSeq}
579        </div>
580
581      val content =
582        summary ++
583        dagViz ++
584        showAdditionalMetrics ++
585        makeTimeline(
586          // Only show the tasks in the table
587          stageData.taskData.values.toSeq.filter(t => taskIdsInPage.contains(t.taskInfo.taskId)),
588          currentTime) ++
589        <h4>Summary Metrics for <a href="#tasks-section">{numCompleted} Completed Tasks</a></h4> ++
590        <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
591        aggMetrics ++
592        maybeAccumulableTable ++
593        <h4 id="tasks-section">Tasks ({totalTasksNumStr})</h4> ++
594          taskTableHTML ++ jsForScrollingDownToTaskTable
595      UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true)
596    }
597  }
598
599  def makeTimeline(tasks: Seq[TaskUIData], currentTime: Long): Seq[Node] = {
600    val executorsSet = new HashSet[(String, String)]
601    var minLaunchTime = Long.MaxValue
602    var maxFinishTime = Long.MinValue
603
604    val executorsArrayStr =
605      tasks.sortBy(-_.taskInfo.launchTime).take(MAX_TIMELINE_TASKS).map { taskUIData =>
606        val taskInfo = taskUIData.taskInfo
607        val executorId = taskInfo.executorId
608        val host = taskInfo.host
609        executorsSet += ((executorId, host))
610
611        val launchTime = taskInfo.launchTime
612        val finishTime = if (!taskInfo.running) taskInfo.finishTime else currentTime
613        val totalExecutionTime = finishTime - launchTime
614        minLaunchTime = launchTime.min(minLaunchTime)
615        maxFinishTime = finishTime.max(maxFinishTime)
616
617        def toProportion(time: Long) = time.toDouble / totalExecutionTime * 100
618
619        val metricsOpt = taskUIData.metrics
620        val shuffleReadTime =
621          metricsOpt.map(_.shuffleReadMetrics.fetchWaitTime).getOrElse(0L)
622        val shuffleReadTimeProportion = toProportion(shuffleReadTime)
623        val shuffleWriteTime =
624          (metricsOpt.map(_.shuffleWriteMetrics.writeTime).getOrElse(0L) / 1e6).toLong
625        val shuffleWriteTimeProportion = toProportion(shuffleWriteTime)
626
627        val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L)
628        val serializationTimeProportion = toProportion(serializationTime)
629        val deserializationTime = metricsOpt.map(_.executorDeserializeTime).getOrElse(0L)
630        val deserializationTimeProportion = toProportion(deserializationTime)
631        val gettingResultTime = getGettingResultTime(taskUIData.taskInfo, currentTime)
632        val gettingResultTimeProportion = toProportion(gettingResultTime)
633        val schedulerDelay =
634          metricsOpt.map(getSchedulerDelay(taskInfo, _, currentTime)).getOrElse(0L)
635        val schedulerDelayProportion = toProportion(schedulerDelay)
636
637        val executorOverhead = serializationTime + deserializationTime
638        val executorRunTime = if (taskInfo.running) {
639          totalExecutionTime - executorOverhead - gettingResultTime
640        } else {
641          metricsOpt.map(_.executorRunTime).getOrElse(
642            totalExecutionTime - executorOverhead - gettingResultTime)
643        }
644        val executorComputingTime = executorRunTime - shuffleReadTime - shuffleWriteTime
645        val executorComputingTimeProportion =
646          math.max(100 - schedulerDelayProportion - shuffleReadTimeProportion -
647            shuffleWriteTimeProportion - serializationTimeProportion -
648            deserializationTimeProportion - gettingResultTimeProportion, 0)
649
650        val schedulerDelayProportionPos = 0
651        val deserializationTimeProportionPos =
652          schedulerDelayProportionPos + schedulerDelayProportion
653        val shuffleReadTimeProportionPos =
654          deserializationTimeProportionPos + deserializationTimeProportion
655        val executorRuntimeProportionPos =
656          shuffleReadTimeProportionPos + shuffleReadTimeProportion
657        val shuffleWriteTimeProportionPos =
658          executorRuntimeProportionPos + executorComputingTimeProportion
659        val serializationTimeProportionPos =
660          shuffleWriteTimeProportionPos + shuffleWriteTimeProportion
661        val gettingResultTimeProportionPos =
662          serializationTimeProportionPos + serializationTimeProportion
663
664        val index = taskInfo.index
665        val attempt = taskInfo.attemptNumber
666
667        val svgTag =
668          if (totalExecutionTime == 0) {
669            // SPARK-8705: Avoid invalid attribute error in JavaScript if execution time is 0
670            """<svg class="task-assignment-timeline-duration-bar"></svg>"""
671          } else {
672           s"""<svg class="task-assignment-timeline-duration-bar">
673                 |<rect class="scheduler-delay-proportion"
674                   |x="$schedulerDelayProportionPos%" y="0px" height="26px"
675                   |width="$schedulerDelayProportion%"></rect>
676                 |<rect class="deserialization-time-proportion"
677                   |x="$deserializationTimeProportionPos%" y="0px" height="26px"
678                   |width="$deserializationTimeProportion%"></rect>
679                 |<rect class="shuffle-read-time-proportion"
680                   |x="$shuffleReadTimeProportionPos%" y="0px" height="26px"
681                   |width="$shuffleReadTimeProportion%"></rect>
682                 |<rect class="executor-runtime-proportion"
683                   |x="$executorRuntimeProportionPos%" y="0px" height="26px"
684                   |width="$executorComputingTimeProportion%"></rect>
685                 |<rect class="shuffle-write-time-proportion"
686                   |x="$shuffleWriteTimeProportionPos%" y="0px" height="26px"
687                   |width="$shuffleWriteTimeProportion%"></rect>
688                 |<rect class="serialization-time-proportion"
689                   |x="$serializationTimeProportionPos%" y="0px" height="26px"
690                   |width="$serializationTimeProportion%"></rect>
691                 |<rect class="getting-result-time-proportion"
692                   |x="$gettingResultTimeProportionPos%" y="0px" height="26px"
693                   |width="$gettingResultTimeProportion%"></rect></svg>""".stripMargin
694          }
695        val timelineObject =
696          s"""
697             |{
698               |'className': 'task task-assignment-timeline-object',
699               |'group': '$executorId',
700               |'content': '<div class="task-assignment-timeline-content"
701                 |data-toggle="tooltip" data-placement="top"
702                 |data-html="true" data-container="body"
703                 |data-title="${s"Task " + index + " (attempt " + attempt + ")"}<br>
704                 |Status: ${taskInfo.status}<br>
705                 |Launch Time: ${UIUtils.formatDate(new Date(launchTime))}
706                 |${
707                     if (!taskInfo.running) {
708                       s"""<br>Finish Time: ${UIUtils.formatDate(new Date(finishTime))}"""
709                     } else {
710                        ""
711                      }
712                   }
713                 |<br>Scheduler Delay: $schedulerDelay ms
714                 |<br>Task Deserialization Time: ${UIUtils.formatDuration(deserializationTime)}
715                 |<br>Shuffle Read Time: ${UIUtils.formatDuration(shuffleReadTime)}
716                 |<br>Executor Computing Time: ${UIUtils.formatDuration(executorComputingTime)}
717                 |<br>Shuffle Write Time: ${UIUtils.formatDuration(shuffleWriteTime)}
718                 |<br>Result Serialization Time: ${UIUtils.formatDuration(serializationTime)}
719                 |<br>Getting Result Time: ${UIUtils.formatDuration(gettingResultTime)}">
720                 |$svgTag',
721               |'start': new Date($launchTime),
722               |'end': new Date($finishTime)
723             |}
724           |""".stripMargin.replaceAll("""[\r\n]+""", " ")
725        timelineObject
726      }.mkString("[", ",", "]")
727
728    val groupArrayStr = executorsSet.map {
729      case (executorId, host) =>
730        s"""
731            {
732              'id': '$executorId',
733              'content': '$executorId / $host',
734            }
735          """
736    }.mkString("[", ",", "]")
737
738    <span class="expand-task-assignment-timeline">
739      <span class="expand-task-assignment-timeline-arrow arrow-closed"></span>
740      <a>Event Timeline</a>
741    </span> ++
742    <div id="task-assignment-timeline" class="collapsed">
743      {
744        if (MAX_TIMELINE_TASKS < tasks.size) {
745          <strong>
746            This stage has more than the maximum number of tasks that can be shown in the
747            visualization! Only the most recent {MAX_TIMELINE_TASKS} tasks
748            (of {tasks.size} total) are shown.
749          </strong>
750        } else {
751          Seq.empty
752        }
753      }
754      <div class="control-panel">
755        <div id="task-assignment-timeline-zoom-lock">
756          <input type="checkbox"></input>
757          <span>Enable zooming</span>
758        </div>
759      </div>
760      {TIMELINE_LEGEND}
761    </div> ++
762    <script type="text/javascript">
763      {Unparsed(s"drawTaskAssignmentTimeline(" +
764      s"$groupArrayStr, $executorsArrayStr, $minLaunchTime, $maxFinishTime, " +
765        s"${UIUtils.getTimeZoneOffset()})")}
766    </script>
767  }
768
769}
770
771private[ui] object StagePage {
772  private[ui] def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = {
773    if (info.gettingResult) {
774      if (info.finished) {
775        info.finishTime - info.gettingResultTime
776      } else {
777        // The task is still fetching the result.
778        currentTime - info.gettingResultTime
779      }
780    } else {
781      0L
782    }
783  }
784
785  private[ui] def getSchedulerDelay(
786      info: TaskInfo, metrics: TaskMetricsUIData, currentTime: Long): Long = {
787    if (info.finished) {
788      val totalExecutionTime = info.finishTime - info.launchTime
789      val executorOverhead = (metrics.executorDeserializeTime +
790        metrics.resultSerializationTime)
791      math.max(
792        0,
793        totalExecutionTime - metrics.executorRunTime - executorOverhead -
794          getGettingResultTime(info, currentTime))
795    } else {
796      // The task is still running and the metrics like executorRunTime are not available.
797      0L
798    }
799  }
800}
801
802private[ui] case class TaskTableRowInputData(inputSortable: Long, inputReadable: String)
803
804private[ui] case class TaskTableRowOutputData(outputSortable: Long, outputReadable: String)
805
806private[ui] case class TaskTableRowShuffleReadData(
807    shuffleReadBlockedTimeSortable: Long,
808    shuffleReadBlockedTimeReadable: String,
809    shuffleReadSortable: Long,
810    shuffleReadReadable: String,
811    shuffleReadRemoteSortable: Long,
812    shuffleReadRemoteReadable: String)
813
814private[ui] case class TaskTableRowShuffleWriteData(
815    writeTimeSortable: Long,
816    writeTimeReadable: String,
817    shuffleWriteSortable: Long,
818    shuffleWriteReadable: String)
819
820private[ui] case class TaskTableRowBytesSpilledData(
821    memoryBytesSpilledSortable: Long,
822    memoryBytesSpilledReadable: String,
823    diskBytesSpilledSortable: Long,
824    diskBytesSpilledReadable: String)
825
826/**
827 * Contains all data that needs for sorting and generating HTML. Using this one rather than
828 * TaskUIData to avoid creating duplicate contents during sorting the data.
829 */
830private[ui] class TaskTableRowData(
831    val index: Int,
832    val taskId: Long,
833    val attempt: Int,
834    val speculative: Boolean,
835    val status: String,
836    val taskLocality: String,
837    val executorIdAndHost: String,
838    val launchTime: Long,
839    val duration: Long,
840    val formatDuration: String,
841    val schedulerDelay: Long,
842    val taskDeserializationTime: Long,
843    val gcTime: Long,
844    val serializationTime: Long,
845    val gettingResultTime: Long,
846    val peakExecutionMemoryUsed: Long,
847    val accumulators: Option[String], // HTML
848    val input: Option[TaskTableRowInputData],
849    val output: Option[TaskTableRowOutputData],
850    val shuffleRead: Option[TaskTableRowShuffleReadData],
851    val shuffleWrite: Option[TaskTableRowShuffleWriteData],
852    val bytesSpilled: Option[TaskTableRowBytesSpilledData],
853    val error: String,
854    val logs: Map[String, String])
855
856private[ui] class TaskDataSource(
857    tasks: Seq[TaskUIData],
858    hasAccumulators: Boolean,
859    hasInput: Boolean,
860    hasOutput: Boolean,
861    hasShuffleRead: Boolean,
862    hasShuffleWrite: Boolean,
863    hasBytesSpilled: Boolean,
864    currentTime: Long,
865    pageSize: Int,
866    sortColumn: String,
867    desc: Boolean,
868    executorsListener: ExecutorsListener) extends PagedDataSource[TaskTableRowData](pageSize) {
869  import StagePage._
870
871  // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table
872  // so that we can avoid creating duplicate contents during sorting the data
873  private val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc))
874
875  private var _slicedTaskIds: Set[Long] = null
876
877  override def dataSize: Int = data.size
878
879  override def sliceData(from: Int, to: Int): Seq[TaskTableRowData] = {
880    val r = data.slice(from, to)
881    _slicedTaskIds = r.map(_.taskId).toSet
882    r
883  }
884
885  def slicedTaskIds: Set[Long] = _slicedTaskIds
886
887  private def taskRow(taskData: TaskUIData): TaskTableRowData = {
888    val info = taskData.taskInfo
889    val metrics = taskData.metrics
890    val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
891      else metrics.map(_.executorRunTime).getOrElse(1L)
892    val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
893      else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
894    val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
895    val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
896    val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
897    val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
898    val gettingResultTime = getGettingResultTime(info, currentTime)
899
900    val externalAccumulableReadable = info.accumulables
901      .filterNot(_.internal)
902      .flatMap { a =>
903        (a.name, a.update) match {
904          case (Some(name), Some(update)) => Some(StringEscapeUtils.escapeHtml4(s"$name: $update"))
905          case _ => None
906        }
907      }
908    val peakExecutionMemoryUsed = metrics.map(_.peakExecutionMemory).getOrElse(0L)
909
910    val maybeInput = metrics.map(_.inputMetrics)
911    val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L)
912    val inputReadable = maybeInput
913      .map(m => s"${Utils.bytesToString(m.bytesRead)}")
914      .getOrElse("")
915    val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
916
917    val maybeOutput = metrics.map(_.outputMetrics)
918    val outputSortable = maybeOutput.map(_.bytesWritten).getOrElse(0L)
919    val outputReadable = maybeOutput
920      .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
921      .getOrElse("")
922    val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
923
924    val maybeShuffleRead = metrics.map(_.shuffleReadMetrics)
925    val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime).getOrElse(0L)
926    val shuffleReadBlockedTimeReadable =
927      maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
928
929    val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead)
930    val shuffleReadSortable = totalShuffleBytes.getOrElse(0L)
931    val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
932    val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
933
934    val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
935    val shuffleReadRemoteSortable = remoteShuffleBytes.getOrElse(0L)
936    val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
937
938    val maybeShuffleWrite = metrics.map(_.shuffleWriteMetrics)
939    val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L)
940    val shuffleWriteReadable = maybeShuffleWrite
941      .map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("")
942    val shuffleWriteRecords = maybeShuffleWrite
943      .map(_.recordsWritten.toString).getOrElse("")
944
945    val maybeWriteTime = metrics.map(_.shuffleWriteMetrics.writeTime)
946    val writeTimeSortable = maybeWriteTime.getOrElse(0L)
947    val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
948      if (ms == 0) "" else UIUtils.formatDuration(ms)
949    }.getOrElse("")
950
951    val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
952    val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.getOrElse(0L)
953    val memoryBytesSpilledReadable =
954      maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
955
956    val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
957    val diskBytesSpilledSortable = maybeDiskBytesSpilled.getOrElse(0L)
958    val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
959
960    val input =
961      if (hasInput) {
962        Some(TaskTableRowInputData(inputSortable, s"$inputReadable / $inputRecords"))
963      } else {
964        None
965      }
966
967    val output =
968      if (hasOutput) {
969        Some(TaskTableRowOutputData(outputSortable, s"$outputReadable / $outputRecords"))
970      } else {
971        None
972      }
973
974    val shuffleRead =
975      if (hasShuffleRead) {
976        Some(TaskTableRowShuffleReadData(
977          shuffleReadBlockedTimeSortable,
978          shuffleReadBlockedTimeReadable,
979          shuffleReadSortable,
980          s"$shuffleReadReadable / $shuffleReadRecords",
981          shuffleReadRemoteSortable,
982          shuffleReadRemoteReadable
983        ))
984      } else {
985        None
986      }
987
988    val shuffleWrite =
989      if (hasShuffleWrite) {
990        Some(TaskTableRowShuffleWriteData(
991          writeTimeSortable,
992          writeTimeReadable,
993          shuffleWriteSortable,
994          s"$shuffleWriteReadable / $shuffleWriteRecords"
995        ))
996      } else {
997        None
998      }
999
1000    val bytesSpilled =
1001      if (hasBytesSpilled) {
1002        Some(TaskTableRowBytesSpilledData(
1003          memoryBytesSpilledSortable,
1004          memoryBytesSpilledReadable,
1005          diskBytesSpilledSortable,
1006          diskBytesSpilledReadable
1007        ))
1008      } else {
1009        None
1010      }
1011
1012    val logs = executorsListener.executorToTaskSummary.get(info.executorId)
1013      .map(_.executorLogs).getOrElse(Map.empty)
1014    new TaskTableRowData(
1015      info.index,
1016      info.taskId,
1017      info.attemptNumber,
1018      info.speculative,
1019      info.status,
1020      info.taskLocality.toString,
1021      s"${info.executorId} / ${info.host}",
1022      info.launchTime,
1023      duration,
1024      formatDuration,
1025      schedulerDelay,
1026      taskDeserializationTime,
1027      gcTime,
1028      serializationTime,
1029      gettingResultTime,
1030      peakExecutionMemoryUsed,
1031      if (hasAccumulators) Some(externalAccumulableReadable.mkString("<br/>")) else None,
1032      input,
1033      output,
1034      shuffleRead,
1035      shuffleWrite,
1036      bytesSpilled,
1037      taskData.errorMessage.getOrElse(""),
1038      logs)
1039  }
1040
1041  /**
1042   * Return Ordering according to sortColumn and desc
1043   */
1044  private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData] = {
1045    val ordering: Ordering[TaskTableRowData] = sortColumn match {
1046      case "Index" => Ordering.by(_.index)
1047      case "ID" => Ordering.by(_.taskId)
1048      case "Attempt" => Ordering.by(_.attempt)
1049      case "Status" => Ordering.by(_.status)
1050      case "Locality Level" => Ordering.by(_.taskLocality)
1051      case "Executor ID / Host" => Ordering.by(_.executorIdAndHost)
1052      case "Launch Time" => Ordering.by(_.launchTime)
1053      case "Duration" => Ordering.by(_.duration)
1054      case "Scheduler Delay" => Ordering.by(_.schedulerDelay)
1055      case "Task Deserialization Time" => Ordering.by(_.taskDeserializationTime)
1056      case "GC Time" => Ordering.by(_.gcTime)
1057      case "Result Serialization Time" => Ordering.by(_.serializationTime)
1058      case "Getting Result Time" => Ordering.by(_.gettingResultTime)
1059      case "Peak Execution Memory" => Ordering.by(_.peakExecutionMemoryUsed)
1060      case "Accumulators" =>
1061        if (hasAccumulators) {
1062          Ordering.by(_.accumulators.get)
1063        } else {
1064          throw new IllegalArgumentException(
1065            "Cannot sort by Accumulators because of no accumulators")
1066        }
1067      case "Input Size / Records" =>
1068        if (hasInput) {
1069          Ordering.by(_.input.get.inputSortable)
1070        } else {
1071          throw new IllegalArgumentException(
1072            "Cannot sort by Input Size / Records because of no inputs")
1073        }
1074      case "Output Size / Records" =>
1075        if (hasOutput) {
1076          Ordering.by(_.output.get.outputSortable)
1077        } else {
1078          throw new IllegalArgumentException(
1079            "Cannot sort by Output Size / Records because of no outputs")
1080        }
1081      // ShuffleRead
1082      case "Shuffle Read Blocked Time" =>
1083        if (hasShuffleRead) {
1084          Ordering.by(_.shuffleRead.get.shuffleReadBlockedTimeSortable)
1085        } else {
1086          throw new IllegalArgumentException(
1087            "Cannot sort by Shuffle Read Blocked Time because of no shuffle reads")
1088        }
1089      case "Shuffle Read Size / Records" =>
1090        if (hasShuffleRead) {
1091          Ordering.by(_.shuffleRead.get.shuffleReadSortable)
1092        } else {
1093          throw new IllegalArgumentException(
1094            "Cannot sort by Shuffle Read Size / Records because of no shuffle reads")
1095        }
1096      case "Shuffle Remote Reads" =>
1097        if (hasShuffleRead) {
1098          Ordering.by(_.shuffleRead.get.shuffleReadRemoteSortable)
1099        } else {
1100          throw new IllegalArgumentException(
1101            "Cannot sort by Shuffle Remote Reads because of no shuffle reads")
1102        }
1103      // ShuffleWrite
1104      case "Write Time" =>
1105        if (hasShuffleWrite) {
1106          Ordering.by(_.shuffleWrite.get.writeTimeSortable)
1107        } else {
1108          throw new IllegalArgumentException(
1109            "Cannot sort by Write Time because of no shuffle writes")
1110        }
1111      case "Shuffle Write Size / Records" =>
1112        if (hasShuffleWrite) {
1113          Ordering.by(_.shuffleWrite.get.shuffleWriteSortable)
1114        } else {
1115          throw new IllegalArgumentException(
1116            "Cannot sort by Shuffle Write Size / Records because of no shuffle writes")
1117        }
1118      // BytesSpilled
1119      case "Shuffle Spill (Memory)" =>
1120        if (hasBytesSpilled) {
1121          Ordering.by(_.bytesSpilled.get.memoryBytesSpilledSortable)
1122        } else {
1123          throw new IllegalArgumentException(
1124            "Cannot sort by Shuffle Spill (Memory) because of no spills")
1125        }
1126      case "Shuffle Spill (Disk)" =>
1127        if (hasBytesSpilled) {
1128          Ordering.by(_.bytesSpilled.get.diskBytesSpilledSortable)
1129        } else {
1130          throw new IllegalArgumentException(
1131            "Cannot sort by Shuffle Spill (Disk) because of no spills")
1132        }
1133      case "Errors" => Ordering.by(_.error)
1134      case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
1135    }
1136    if (desc) {
1137      ordering.reverse
1138    } else {
1139      ordering
1140    }
1141  }
1142
1143}
1144
1145private[ui] class TaskPagedTable(
1146    conf: SparkConf,
1147    basePath: String,
1148    data: Seq[TaskUIData],
1149    hasAccumulators: Boolean,
1150    hasInput: Boolean,
1151    hasOutput: Boolean,
1152    hasShuffleRead: Boolean,
1153    hasShuffleWrite: Boolean,
1154    hasBytesSpilled: Boolean,
1155    currentTime: Long,
1156    pageSize: Int,
1157    sortColumn: String,
1158    desc: Boolean,
1159    executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] {
1160
1161  override def tableId: String = "task-table"
1162
1163  override def tableCssClass: String =
1164    "table table-bordered table-condensed table-striped table-head-clickable"
1165
1166  override def pageSizeFormField: String = "task.pageSize"
1167
1168  override def prevPageSizeFormField: String = "task.prevPageSize"
1169
1170  override def pageNumberFormField: String = "task.page"
1171
1172  override val dataSource: TaskDataSource = new TaskDataSource(
1173    data,
1174    hasAccumulators,
1175    hasInput,
1176    hasOutput,
1177    hasShuffleRead,
1178    hasShuffleWrite,
1179    hasBytesSpilled,
1180    currentTime,
1181    pageSize,
1182    sortColumn,
1183    desc,
1184    executorsListener)
1185
1186  override def pageLink(page: Int): String = {
1187    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
1188    basePath +
1189      s"&$pageNumberFormField=$page" +
1190      s"&task.sort=$encodedSortColumn" +
1191      s"&task.desc=$desc" +
1192      s"&$pageSizeFormField=$pageSize"
1193  }
1194
1195  override def goButtonFormPath: String = {
1196    val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
1197    s"$basePath&task.sort=$encodedSortColumn&task.desc=$desc"
1198  }
1199
1200  def headers: Seq[Node] = {
1201    val taskHeadersAndCssClasses: Seq[(String, String)] =
1202      Seq(
1203        ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
1204        ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
1205        ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
1206        ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
1207        ("GC Time", ""),
1208        ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
1209        ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
1210        ("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
1211        {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
1212        {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++
1213        {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
1214        {if (hasShuffleRead) {
1215          Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
1216            ("Shuffle Read Size / Records", ""),
1217            ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
1218        } else {
1219          Nil
1220        }} ++
1221        {if (hasShuffleWrite) {
1222          Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
1223        } else {
1224          Nil
1225        }} ++
1226        {if (hasBytesSpilled) {
1227          Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
1228        } else {
1229          Nil
1230        }} ++
1231        Seq(("Errors", ""))
1232
1233    if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
1234      throw new IllegalArgumentException(s"Unknown column: $sortColumn")
1235    }
1236
1237    val headerRow: Seq[Node] = {
1238      taskHeadersAndCssClasses.map { case (header, cssClass) =>
1239        if (header == sortColumn) {
1240          val headerLink = Unparsed(
1241            basePath +
1242              s"&task.sort=${URLEncoder.encode(header, "UTF-8")}" +
1243              s"&task.desc=${!desc}" +
1244              s"&task.pageSize=$pageSize")
1245          val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
1246          <th class={cssClass}>
1247            <a href={headerLink}>
1248              {header}
1249              <span>&nbsp;{Unparsed(arrow)}</span>
1250            </a>
1251          </th>
1252        } else {
1253          val headerLink = Unparsed(
1254            basePath +
1255              s"&task.sort=${URLEncoder.encode(header, "UTF-8")}" +
1256              s"&task.pageSize=$pageSize")
1257          <th class={cssClass}>
1258            <a href={headerLink}>
1259              {header}
1260            </a>
1261          </th>
1262        }
1263      }
1264    }
1265    <thead>{headerRow}</thead>
1266  }
1267
1268  def row(task: TaskTableRowData): Seq[Node] = {
1269    <tr>
1270      <td>{task.index}</td>
1271      <td>{task.taskId}</td>
1272      <td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td>
1273      <td>{task.status}</td>
1274      <td>{task.taskLocality}</td>
1275      <td>
1276        <div style="float: left">{task.executorIdAndHost}</div>
1277        <div style="float: right">
1278        {
1279          task.logs.map {
1280            case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
1281          }
1282        }
1283        </div>
1284      </td>
1285      <td>{UIUtils.formatDate(new Date(task.launchTime))}</td>
1286      <td>{task.formatDuration}</td>
1287      <td class={TaskDetailsClassNames.SCHEDULER_DELAY}>
1288        {UIUtils.formatDuration(task.schedulerDelay)}
1289      </td>
1290      <td class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
1291        {UIUtils.formatDuration(task.taskDeserializationTime)}
1292      </td>
1293      <td>
1294        {if (task.gcTime > 0) UIUtils.formatDuration(task.gcTime) else ""}
1295      </td>
1296      <td class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
1297        {UIUtils.formatDuration(task.serializationTime)}
1298      </td>
1299      <td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
1300        {UIUtils.formatDuration(task.gettingResultTime)}
1301      </td>
1302      <td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
1303        {Utils.bytesToString(task.peakExecutionMemoryUsed)}
1304      </td>
1305      {if (task.accumulators.nonEmpty) {
1306        <td>{Unparsed(task.accumulators.get)}</td>
1307      }}
1308      {if (task.input.nonEmpty) {
1309        <td>{task.input.get.inputReadable}</td>
1310      }}
1311      {if (task.output.nonEmpty) {
1312        <td>{task.output.get.outputReadable}</td>
1313      }}
1314      {if (task.shuffleRead.nonEmpty) {
1315        <td class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}>
1316          {task.shuffleRead.get.shuffleReadBlockedTimeReadable}
1317        </td>
1318        <td>{task.shuffleRead.get.shuffleReadReadable}</td>
1319        <td class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
1320          {task.shuffleRead.get.shuffleReadRemoteReadable}
1321        </td>
1322      }}
1323      {if (task.shuffleWrite.nonEmpty) {
1324        <td>{task.shuffleWrite.get.writeTimeReadable}</td>
1325        <td>{task.shuffleWrite.get.shuffleWriteReadable}</td>
1326      }}
1327      {if (task.bytesSpilled.nonEmpty) {
1328        <td>{task.bytesSpilled.get.memoryBytesSpilledReadable}</td>
1329        <td>{task.bytesSpilled.get.diskBytesSpilledReadable}</td>
1330      }}
1331      {errorMessageCell(task.error)}
1332    </tr>
1333  }
1334
1335  private def errorMessageCell(error: String): Seq[Node] = {
1336    val isMultiline = error.indexOf('\n') >= 0
1337    // Display the first line by default
1338    val errorSummary = StringEscapeUtils.escapeHtml4(
1339      if (isMultiline) {
1340        error.substring(0, error.indexOf('\n'))
1341      } else {
1342        error
1343      })
1344    val details = if (isMultiline) {
1345      // scalastyle:off
1346      <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
1347            class="expand-details">
1348        +details
1349      </span> ++
1350        <div class="stacktrace-details collapsed">
1351          <pre>{error}</pre>
1352        </div>
1353      // scalastyle:on
1354    } else {
1355      ""
1356    }
1357    <td>{errorSummary}{details}</td>
1358  }
1359}
1360