1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.ui.jobs 19 20import java.net.URLEncoder 21import java.util.Date 22import javax.servlet.http.HttpServletRequest 23 24import scala.collection.mutable.HashSet 25import scala.xml.{Elem, Node, Unparsed} 26 27import org.apache.commons.lang3.StringEscapeUtils 28 29import org.apache.spark.SparkConf 30import org.apache.spark.executor.TaskMetrics 31import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} 32import org.apache.spark.ui._ 33import org.apache.spark.ui.exec.ExecutorsListener 34import org.apache.spark.ui.jobs.UIData._ 35import org.apache.spark.util.{Distribution, Utils} 36 37/** Page showing statistics and task list for a given stage */ 38private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { 39 import StagePage._ 40 41 private val progressListener = parent.progressListener 42 private val operationGraphListener = parent.operationGraphListener 43 private val executorsListener = parent.executorsListener 44 45 private val TIMELINE_LEGEND = { 46 <div class="legend-area"> 47 <svg> 48 { 49 val legendPairs = List(("scheduler-delay-proportion", "Scheduler Delay"), 50 ("deserialization-time-proportion", "Task Deserialization Time"), 51 ("shuffle-read-time-proportion", "Shuffle Read Time"), 52 ("executor-runtime-proportion", "Executor Computing Time"), 53 ("shuffle-write-time-proportion", "Shuffle Write Time"), 54 ("serialization-time-proportion", "Result Serialization Time"), 55 ("getting-result-time-proportion", "Getting Result Time")) 56 57 legendPairs.zipWithIndex.map { 58 case ((classAttr, name), index) => 59 <rect x={5 + (index / 3) * 210 + "px"} y={10 + (index % 3) * 15 + "px"} 60 width="10px" height="10px" class={classAttr}></rect> 61 <text x={25 + (index / 3) * 210 + "px"} 62 y={20 + (index % 3) * 15 + "px"}>{name}</text> 63 } 64 } 65 </svg> 66 </div> 67 } 68 69 // TODO: We should consider increasing the number of this parameter over time 70 // if we find that it's okay. 71 private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000) 72 73 private def getLocalitySummaryString(stageData: StageUIData): String = { 74 val localities = stageData.taskData.values.map(_.taskInfo.taskLocality) 75 val localityCounts = localities.groupBy(identity).mapValues(_.size) 76 val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) => 77 val localityName = locality match { 78 case TaskLocality.PROCESS_LOCAL => "Process local" 79 case TaskLocality.NODE_LOCAL => "Node local" 80 case TaskLocality.RACK_LOCAL => "Rack local" 81 case TaskLocality.ANY => "Any" 82 } 83 s"$localityName: $count" 84 } 85 localityNamesAndCounts.sorted.mkString("; ") 86 } 87 88 def render(request: HttpServletRequest): Seq[Node] = { 89 progressListener.synchronized { 90 val parameterId = request.getParameter("id") 91 require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") 92 93 val parameterAttempt = request.getParameter("attempt") 94 require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter") 95 96 val parameterTaskPage = request.getParameter("task.page") 97 val parameterTaskSortColumn = request.getParameter("task.sort") 98 val parameterTaskSortDesc = request.getParameter("task.desc") 99 val parameterTaskPageSize = request.getParameter("task.pageSize") 100 val parameterTaskPrevPageSize = request.getParameter("task.prevPageSize") 101 102 val taskPage = Option(parameterTaskPage).map(_.toInt).getOrElse(1) 103 val taskSortColumn = Option(parameterTaskSortColumn).map { sortColumn => 104 UIUtils.decodeURLParameter(sortColumn) 105 }.getOrElse("Index") 106 val taskSortDesc = Option(parameterTaskSortDesc).map(_.toBoolean).getOrElse(false) 107 val taskPageSize = Option(parameterTaskPageSize).map(_.toInt).getOrElse(100) 108 val taskPrevPageSize = Option(parameterTaskPrevPageSize).map(_.toInt).getOrElse(taskPageSize) 109 110 val stageId = parameterId.toInt 111 val stageAttemptId = parameterAttempt.toInt 112 val stageDataOption = progressListener.stageIdToData.get((stageId, stageAttemptId)) 113 114 val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)" 115 if (stageDataOption.isEmpty) { 116 val content = 117 <div id="no-info"> 118 <p>No information to display for Stage {stageId} (Attempt {stageAttemptId})</p> 119 </div> 120 return UIUtils.headerSparkPage(stageHeader, content, parent) 121 122 } 123 if (stageDataOption.get.taskData.isEmpty) { 124 val content = 125 <div> 126 <h4>Summary Metrics</h4> No tasks have started yet 127 <h4>Tasks</h4> No tasks have started yet 128 </div> 129 return UIUtils.headerSparkPage(stageHeader, content, parent) 130 } 131 132 val stageData = stageDataOption.get 133 val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) 134 val numCompleted = stageData.numCompleteTasks 135 val totalTasks = stageData.numActiveTasks + 136 stageData.numCompleteTasks + stageData.numFailedTasks 137 val totalTasksNumStr = if (totalTasks == tasks.size) { 138 s"$totalTasks" 139 } else { 140 s"$totalTasks, showing ${tasks.size}" 141 } 142 143 val allAccumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables 144 val externalAccumulables = allAccumulables.values.filter { acc => !acc.internal } 145 val hasAccumulators = externalAccumulables.size > 0 146 147 val summary = 148 <div> 149 <ul class="unstyled"> 150 <li> 151 <strong>Total Time Across All Tasks: </strong> 152 {UIUtils.formatDuration(stageData.executorRunTime)} 153 </li> 154 <li> 155 <strong>Locality Level Summary: </strong> 156 {getLocalitySummaryString(stageData)} 157 </li> 158 {if (stageData.hasInput) { 159 <li> 160 <strong>Input Size / Records: </strong> 161 {s"${Utils.bytesToString(stageData.inputBytes)} / ${stageData.inputRecords}"} 162 </li> 163 }} 164 {if (stageData.hasOutput) { 165 <li> 166 <strong>Output: </strong> 167 {s"${Utils.bytesToString(stageData.outputBytes)} / ${stageData.outputRecords}"} 168 </li> 169 }} 170 {if (stageData.hasShuffleRead) { 171 <li> 172 <strong>Shuffle Read: </strong> 173 {s"${Utils.bytesToString(stageData.shuffleReadTotalBytes)} / " + 174 s"${stageData.shuffleReadRecords}"} 175 </li> 176 }} 177 {if (stageData.hasShuffleWrite) { 178 <li> 179 <strong>Shuffle Write: </strong> 180 {s"${Utils.bytesToString(stageData.shuffleWriteBytes)} / " + 181 s"${stageData.shuffleWriteRecords}"} 182 </li> 183 }} 184 {if (stageData.hasBytesSpilled) { 185 <li> 186 <strong>Shuffle Spill (Memory): </strong> 187 {Utils.bytesToString(stageData.memoryBytesSpilled)} 188 </li> 189 <li> 190 <strong>Shuffle Spill (Disk): </strong> 191 {Utils.bytesToString(stageData.diskBytesSpilled)} 192 </li> 193 }} 194 </ul> 195 </div> 196 197 val showAdditionalMetrics = 198 <div> 199 <span class="expand-additional-metrics"> 200 <span class="expand-additional-metrics-arrow arrow-closed"></span> 201 <a>Show Additional Metrics</a> 202 </span> 203 <div class="additional-metrics collapsed"> 204 <ul> 205 <li> 206 <input type="checkbox" id="select-all-metrics"/> 207 <span class="additional-metric-title"><em>(De)select All</em></span> 208 </li> 209 <li> 210 <span data-toggle="tooltip" 211 title={ToolTips.SCHEDULER_DELAY} data-placement="right"> 212 <input type="checkbox" name={TaskDetailsClassNames.SCHEDULER_DELAY}/> 213 <span class="additional-metric-title">Scheduler Delay</span> 214 </span> 215 </li> 216 <li> 217 <span data-toggle="tooltip" 218 title={ToolTips.TASK_DESERIALIZATION_TIME} data-placement="right"> 219 <input type="checkbox" name={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}/> 220 <span class="additional-metric-title">Task Deserialization Time</span> 221 </span> 222 </li> 223 {if (stageData.hasShuffleRead) { 224 <li> 225 <span data-toggle="tooltip" 226 title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right"> 227 <input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}/> 228 <span class="additional-metric-title">Shuffle Read Blocked Time</span> 229 </span> 230 </li> 231 <li> 232 <span data-toggle="tooltip" 233 title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right"> 234 <input type="checkbox" name={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}/> 235 <span class="additional-metric-title">Shuffle Remote Reads</span> 236 </span> 237 </li> 238 }} 239 <li> 240 <span data-toggle="tooltip" 241 title={ToolTips.RESULT_SERIALIZATION_TIME} data-placement="right"> 242 <input type="checkbox" name={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}/> 243 <span class="additional-metric-title">Result Serialization Time</span> 244 </span> 245 </li> 246 <li> 247 <span data-toggle="tooltip" 248 title={ToolTips.GETTING_RESULT_TIME} data-placement="right"> 249 <input type="checkbox" name={TaskDetailsClassNames.GETTING_RESULT_TIME}/> 250 <span class="additional-metric-title">Getting Result Time</span> 251 </span> 252 </li> 253 <li> 254 <span data-toggle="tooltip" 255 title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right"> 256 <input type="checkbox" name={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}/> 257 <span class="additional-metric-title">Peak Execution Memory</span> 258 </span> 259 </li> 260 </ul> 261 </div> 262 </div> 263 264 val dagViz = UIUtils.showDagVizForStage( 265 stageId, operationGraphListener.getOperationGraphForStage(stageId)) 266 267 val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value") 268 def accumulableRow(acc: AccumulableInfo): Seq[Node] = { 269 (acc.name, acc.value) match { 270 case (Some(name), Some(value)) => <tr><td>{name}</td><td>{value}</td></tr> 271 case _ => Seq.empty[Node] 272 } 273 } 274 val accumulableTable = UIUtils.listingTable( 275 accumulableHeaders, 276 accumulableRow, 277 externalAccumulables.toSeq) 278 279 val page: Int = { 280 // If the user has changed to a larger page size, then go to page 1 in order to avoid 281 // IndexOutOfBoundsException. 282 if (taskPageSize <= taskPrevPageSize) { 283 taskPage 284 } else { 285 1 286 } 287 } 288 val currentTime = System.currentTimeMillis() 289 val (taskTable, taskTableHTML) = try { 290 val _taskTable = new TaskPagedTable( 291 parent.conf, 292 UIUtils.prependBaseUri(parent.basePath) + 293 s"/stages/stage?id=${stageId}&attempt=${stageAttemptId}", 294 tasks, 295 hasAccumulators, 296 stageData.hasInput, 297 stageData.hasOutput, 298 stageData.hasShuffleRead, 299 stageData.hasShuffleWrite, 300 stageData.hasBytesSpilled, 301 currentTime, 302 pageSize = taskPageSize, 303 sortColumn = taskSortColumn, 304 desc = taskSortDesc, 305 executorsListener = executorsListener 306 ) 307 (_taskTable, _taskTable.table(page)) 308 } catch { 309 case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => 310 val errorMessage = 311 <div class="alert alert-error"> 312 <p>Error while rendering stage table:</p> 313 <pre> 314 {Utils.exceptionString(e)} 315 </pre> 316 </div> 317 (null, errorMessage) 318 } 319 320 val jsForScrollingDownToTaskTable = 321 <script> 322 {Unparsed { 323 """ 324 |$(function() { 325 | if (/.*&task.sort=.*$/.test(location.search)) { 326 | var topOffset = $("#tasks-section").offset().top; 327 | $("html,body").animate({scrollTop: topOffset}, 200); 328 | } 329 |}); 330 """.stripMargin 331 } 332 } 333 </script> 334 335 val taskIdsInPage = if (taskTable == null) Set.empty[Long] 336 else taskTable.dataSource.slicedTaskIds 337 338 // Excludes tasks which failed and have incomplete metrics 339 val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.metrics.isDefined) 340 341 val summaryTable: Option[Seq[Node]] = 342 if (validTasks.size == 0) { 343 None 344 } 345 else { 346 def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] = 347 Distribution(data).get.getQuantiles() 348 def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = { 349 getDistributionQuantiles(times).map { millis => 350 <td>{UIUtils.formatDuration(millis.toLong)}</td> 351 } 352 } 353 def getFormattedSizeQuantiles(data: Seq[Double]): Seq[Elem] = { 354 getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>) 355 } 356 357 val deserializationTimes = validTasks.map { taskUIData: TaskUIData => 358 taskUIData.metrics.get.executorDeserializeTime.toDouble 359 } 360 val deserializationQuantiles = 361 <td> 362 <span data-toggle="tooltip" title={ToolTips.TASK_DESERIALIZATION_TIME} 363 data-placement="right"> 364 Task Deserialization Time 365 </span> 366 </td> +: getFormattedTimeQuantiles(deserializationTimes) 367 368 val serviceTimes = validTasks.map { taskUIData: TaskUIData => 369 taskUIData.metrics.get.executorRunTime.toDouble 370 } 371 val serviceQuantiles = <td>Duration</td> +: getFormattedTimeQuantiles(serviceTimes) 372 373 val gcTimes = validTasks.map { taskUIData: TaskUIData => 374 taskUIData.metrics.get.jvmGCTime.toDouble 375 } 376 val gcQuantiles = 377 <td> 378 <span data-toggle="tooltip" 379 title={ToolTips.GC_TIME} data-placement="right">GC Time 380 </span> 381 </td> +: getFormattedTimeQuantiles(gcTimes) 382 383 val serializationTimes = validTasks.map { taskUIData: TaskUIData => 384 taskUIData.metrics.get.resultSerializationTime.toDouble 385 } 386 val serializationQuantiles = 387 <td> 388 <span data-toggle="tooltip" 389 title={ToolTips.RESULT_SERIALIZATION_TIME} data-placement="right"> 390 Result Serialization Time 391 </span> 392 </td> +: getFormattedTimeQuantiles(serializationTimes) 393 394 val gettingResultTimes = validTasks.map { taskUIData: TaskUIData => 395 getGettingResultTime(taskUIData.taskInfo, currentTime).toDouble 396 } 397 val gettingResultQuantiles = 398 <td> 399 <span data-toggle="tooltip" 400 title={ToolTips.GETTING_RESULT_TIME} data-placement="right"> 401 Getting Result Time 402 </span> 403 </td> +: 404 getFormattedTimeQuantiles(gettingResultTimes) 405 406 val peakExecutionMemory = validTasks.map { taskUIData: TaskUIData => 407 taskUIData.metrics.get.peakExecutionMemory.toDouble 408 } 409 val peakExecutionMemoryQuantiles = { 410 <td> 411 <span data-toggle="tooltip" 412 title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right"> 413 Peak Execution Memory 414 </span> 415 </td> +: getFormattedSizeQuantiles(peakExecutionMemory) 416 } 417 418 // The scheduler delay includes the network delay to send the task to the worker 419 // machine and to send back the result (but not the time to fetch the task result, 420 // if it needed to be fetched from the block manager on the worker). 421 val schedulerDelays = validTasks.map { taskUIData: TaskUIData => 422 getSchedulerDelay(taskUIData.taskInfo, taskUIData.metrics.get, currentTime).toDouble 423 } 424 val schedulerDelayTitle = <td><span data-toggle="tooltip" 425 title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td> 426 val schedulerDelayQuantiles = schedulerDelayTitle +: 427 getFormattedTimeQuantiles(schedulerDelays) 428 def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double]) 429 : Seq[Elem] = { 430 val recordDist = getDistributionQuantiles(records).iterator 431 getDistributionQuantiles(data).map(d => 432 <td>{s"${Utils.bytesToString(d.toLong)} / ${recordDist.next().toLong}"}</td> 433 ) 434 } 435 436 val inputSizes = validTasks.map { taskUIData: TaskUIData => 437 taskUIData.metrics.get.inputMetrics.bytesRead.toDouble 438 } 439 440 val inputRecords = validTasks.map { taskUIData: TaskUIData => 441 taskUIData.metrics.get.inputMetrics.recordsRead.toDouble 442 } 443 444 val inputQuantiles = <td>Input Size / Records</td> +: 445 getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords) 446 447 val outputSizes = validTasks.map { taskUIData: TaskUIData => 448 taskUIData.metrics.get.outputMetrics.bytesWritten.toDouble 449 } 450 451 val outputRecords = validTasks.map { taskUIData: TaskUIData => 452 taskUIData.metrics.get.outputMetrics.recordsWritten.toDouble 453 } 454 455 val outputQuantiles = <td>Output Size / Records</td> +: 456 getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords) 457 458 val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData => 459 taskUIData.metrics.get.shuffleReadMetrics.fetchWaitTime.toDouble 460 } 461 val shuffleReadBlockedQuantiles = 462 <td> 463 <span data-toggle="tooltip" 464 title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right"> 465 Shuffle Read Blocked Time 466 </span> 467 </td> +: 468 getFormattedTimeQuantiles(shuffleReadBlockedTimes) 469 470 val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData => 471 taskUIData.metrics.get.shuffleReadMetrics.totalBytesRead.toDouble 472 } 473 val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData => 474 taskUIData.metrics.get.shuffleReadMetrics.recordsRead.toDouble 475 } 476 val shuffleReadTotalQuantiles = 477 <td> 478 <span data-toggle="tooltip" 479 title={ToolTips.SHUFFLE_READ} data-placement="right"> 480 Shuffle Read Size / Records 481 </span> 482 </td> +: 483 getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords) 484 485 val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData => 486 taskUIData.metrics.get.shuffleReadMetrics.remoteBytesRead.toDouble 487 } 488 val shuffleReadRemoteQuantiles = 489 <td> 490 <span data-toggle="tooltip" 491 title={ToolTips.SHUFFLE_READ_REMOTE_SIZE} data-placement="right"> 492 Shuffle Remote Reads 493 </span> 494 </td> +: 495 getFormattedSizeQuantiles(shuffleReadRemoteSizes) 496 497 val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData => 498 taskUIData.metrics.get.shuffleWriteMetrics.bytesWritten.toDouble 499 } 500 501 val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData => 502 taskUIData.metrics.get.shuffleWriteMetrics.recordsWritten.toDouble 503 } 504 505 val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +: 506 getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords) 507 508 val memoryBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData => 509 taskUIData.metrics.get.memoryBytesSpilled.toDouble 510 } 511 val memoryBytesSpilledQuantiles = <td>Shuffle spill (memory)</td> +: 512 getFormattedSizeQuantiles(memoryBytesSpilledSizes) 513 514 val diskBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData => 515 taskUIData.metrics.get.diskBytesSpilled.toDouble 516 } 517 val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +: 518 getFormattedSizeQuantiles(diskBytesSpilledSizes) 519 520 val listings: Seq[Seq[Node]] = Seq( 521 <tr>{serviceQuantiles}</tr>, 522 <tr class={TaskDetailsClassNames.SCHEDULER_DELAY}>{schedulerDelayQuantiles}</tr>, 523 <tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}> 524 {deserializationQuantiles} 525 </tr> 526 <tr>{gcQuantiles}</tr>, 527 <tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}> 528 {serializationQuantiles} 529 </tr>, 530 <tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>, 531 <tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}> 532 {peakExecutionMemoryQuantiles} 533 </tr>, 534 if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil, 535 if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil, 536 if (stageData.hasShuffleRead) { 537 <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}> 538 {shuffleReadBlockedQuantiles} 539 </tr> 540 <tr>{shuffleReadTotalQuantiles}</tr> 541 <tr class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}> 542 {shuffleReadRemoteQuantiles} 543 </tr> 544 } else { 545 Nil 546 }, 547 if (stageData.hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil, 548 if (stageData.hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil, 549 if (stageData.hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil) 550 551 val quantileHeaders = Seq("Metric", "Min", "25th percentile", 552 "Median", "75th percentile", "Max") 553 // The summary table does not use CSS to stripe rows, which doesn't work with hidden 554 // rows (instead, JavaScript in table.js is used to stripe the non-hidden rows). 555 Some(UIUtils.listingTable( 556 quantileHeaders, 557 identity[Seq[Node]], 558 listings, 559 fixedWidth = true, 560 id = Some("task-summary-table"), 561 stripeRowsWithCss = false)) 562 } 563 564 val executorTable = new ExecutorTable(stageId, stageAttemptId, parent) 565 566 val maybeAccumulableTable: Seq[Node] = 567 if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq() 568 569 val aggMetrics = 570 <span class="collapse-aggregated-metrics collapse-table" 571 onClick="collapseTable('collapse-aggregated-metrics','aggregated-metrics')"> 572 <h4> 573 <span class="collapse-table-arrow arrow-open"></span> 574 <a>Aggregated Metrics by Executor</a> 575 </h4> 576 </span> 577 <div class="aggregated-metrics collapsible-table"> 578 {executorTable.toNodeSeq} 579 </div> 580 581 val content = 582 summary ++ 583 dagViz ++ 584 showAdditionalMetrics ++ 585 makeTimeline( 586 // Only show the tasks in the table 587 stageData.taskData.values.toSeq.filter(t => taskIdsInPage.contains(t.taskInfo.taskId)), 588 currentTime) ++ 589 <h4>Summary Metrics for <a href="#tasks-section">{numCompleted} Completed Tasks</a></h4> ++ 590 <div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++ 591 aggMetrics ++ 592 maybeAccumulableTable ++ 593 <h4 id="tasks-section">Tasks ({totalTasksNumStr})</h4> ++ 594 taskTableHTML ++ jsForScrollingDownToTaskTable 595 UIUtils.headerSparkPage(stageHeader, content, parent, showVisualization = true) 596 } 597 } 598 599 def makeTimeline(tasks: Seq[TaskUIData], currentTime: Long): Seq[Node] = { 600 val executorsSet = new HashSet[(String, String)] 601 var minLaunchTime = Long.MaxValue 602 var maxFinishTime = Long.MinValue 603 604 val executorsArrayStr = 605 tasks.sortBy(-_.taskInfo.launchTime).take(MAX_TIMELINE_TASKS).map { taskUIData => 606 val taskInfo = taskUIData.taskInfo 607 val executorId = taskInfo.executorId 608 val host = taskInfo.host 609 executorsSet += ((executorId, host)) 610 611 val launchTime = taskInfo.launchTime 612 val finishTime = if (!taskInfo.running) taskInfo.finishTime else currentTime 613 val totalExecutionTime = finishTime - launchTime 614 minLaunchTime = launchTime.min(minLaunchTime) 615 maxFinishTime = finishTime.max(maxFinishTime) 616 617 def toProportion(time: Long) = time.toDouble / totalExecutionTime * 100 618 619 val metricsOpt = taskUIData.metrics 620 val shuffleReadTime = 621 metricsOpt.map(_.shuffleReadMetrics.fetchWaitTime).getOrElse(0L) 622 val shuffleReadTimeProportion = toProportion(shuffleReadTime) 623 val shuffleWriteTime = 624 (metricsOpt.map(_.shuffleWriteMetrics.writeTime).getOrElse(0L) / 1e6).toLong 625 val shuffleWriteTimeProportion = toProportion(shuffleWriteTime) 626 627 val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L) 628 val serializationTimeProportion = toProportion(serializationTime) 629 val deserializationTime = metricsOpt.map(_.executorDeserializeTime).getOrElse(0L) 630 val deserializationTimeProportion = toProportion(deserializationTime) 631 val gettingResultTime = getGettingResultTime(taskUIData.taskInfo, currentTime) 632 val gettingResultTimeProportion = toProportion(gettingResultTime) 633 val schedulerDelay = 634 metricsOpt.map(getSchedulerDelay(taskInfo, _, currentTime)).getOrElse(0L) 635 val schedulerDelayProportion = toProportion(schedulerDelay) 636 637 val executorOverhead = serializationTime + deserializationTime 638 val executorRunTime = if (taskInfo.running) { 639 totalExecutionTime - executorOverhead - gettingResultTime 640 } else { 641 metricsOpt.map(_.executorRunTime).getOrElse( 642 totalExecutionTime - executorOverhead - gettingResultTime) 643 } 644 val executorComputingTime = executorRunTime - shuffleReadTime - shuffleWriteTime 645 val executorComputingTimeProportion = 646 math.max(100 - schedulerDelayProportion - shuffleReadTimeProportion - 647 shuffleWriteTimeProportion - serializationTimeProportion - 648 deserializationTimeProportion - gettingResultTimeProportion, 0) 649 650 val schedulerDelayProportionPos = 0 651 val deserializationTimeProportionPos = 652 schedulerDelayProportionPos + schedulerDelayProportion 653 val shuffleReadTimeProportionPos = 654 deserializationTimeProportionPos + deserializationTimeProportion 655 val executorRuntimeProportionPos = 656 shuffleReadTimeProportionPos + shuffleReadTimeProportion 657 val shuffleWriteTimeProportionPos = 658 executorRuntimeProportionPos + executorComputingTimeProportion 659 val serializationTimeProportionPos = 660 shuffleWriteTimeProportionPos + shuffleWriteTimeProportion 661 val gettingResultTimeProportionPos = 662 serializationTimeProportionPos + serializationTimeProportion 663 664 val index = taskInfo.index 665 val attempt = taskInfo.attemptNumber 666 667 val svgTag = 668 if (totalExecutionTime == 0) { 669 // SPARK-8705: Avoid invalid attribute error in JavaScript if execution time is 0 670 """<svg class="task-assignment-timeline-duration-bar"></svg>""" 671 } else { 672 s"""<svg class="task-assignment-timeline-duration-bar"> 673 |<rect class="scheduler-delay-proportion" 674 |x="$schedulerDelayProportionPos%" y="0px" height="26px" 675 |width="$schedulerDelayProportion%"></rect> 676 |<rect class="deserialization-time-proportion" 677 |x="$deserializationTimeProportionPos%" y="0px" height="26px" 678 |width="$deserializationTimeProportion%"></rect> 679 |<rect class="shuffle-read-time-proportion" 680 |x="$shuffleReadTimeProportionPos%" y="0px" height="26px" 681 |width="$shuffleReadTimeProportion%"></rect> 682 |<rect class="executor-runtime-proportion" 683 |x="$executorRuntimeProportionPos%" y="0px" height="26px" 684 |width="$executorComputingTimeProportion%"></rect> 685 |<rect class="shuffle-write-time-proportion" 686 |x="$shuffleWriteTimeProportionPos%" y="0px" height="26px" 687 |width="$shuffleWriteTimeProportion%"></rect> 688 |<rect class="serialization-time-proportion" 689 |x="$serializationTimeProportionPos%" y="0px" height="26px" 690 |width="$serializationTimeProportion%"></rect> 691 |<rect class="getting-result-time-proportion" 692 |x="$gettingResultTimeProportionPos%" y="0px" height="26px" 693 |width="$gettingResultTimeProportion%"></rect></svg>""".stripMargin 694 } 695 val timelineObject = 696 s""" 697 |{ 698 |'className': 'task task-assignment-timeline-object', 699 |'group': '$executorId', 700 |'content': '<div class="task-assignment-timeline-content" 701 |data-toggle="tooltip" data-placement="top" 702 |data-html="true" data-container="body" 703 |data-title="${s"Task " + index + " (attempt " + attempt + ")"}<br> 704 |Status: ${taskInfo.status}<br> 705 |Launch Time: ${UIUtils.formatDate(new Date(launchTime))} 706 |${ 707 if (!taskInfo.running) { 708 s"""<br>Finish Time: ${UIUtils.formatDate(new Date(finishTime))}""" 709 } else { 710 "" 711 } 712 } 713 |<br>Scheduler Delay: $schedulerDelay ms 714 |<br>Task Deserialization Time: ${UIUtils.formatDuration(deserializationTime)} 715 |<br>Shuffle Read Time: ${UIUtils.formatDuration(shuffleReadTime)} 716 |<br>Executor Computing Time: ${UIUtils.formatDuration(executorComputingTime)} 717 |<br>Shuffle Write Time: ${UIUtils.formatDuration(shuffleWriteTime)} 718 |<br>Result Serialization Time: ${UIUtils.formatDuration(serializationTime)} 719 |<br>Getting Result Time: ${UIUtils.formatDuration(gettingResultTime)}"> 720 |$svgTag', 721 |'start': new Date($launchTime), 722 |'end': new Date($finishTime) 723 |} 724 |""".stripMargin.replaceAll("""[\r\n]+""", " ") 725 timelineObject 726 }.mkString("[", ",", "]") 727 728 val groupArrayStr = executorsSet.map { 729 case (executorId, host) => 730 s""" 731 { 732 'id': '$executorId', 733 'content': '$executorId / $host', 734 } 735 """ 736 }.mkString("[", ",", "]") 737 738 <span class="expand-task-assignment-timeline"> 739 <span class="expand-task-assignment-timeline-arrow arrow-closed"></span> 740 <a>Event Timeline</a> 741 </span> ++ 742 <div id="task-assignment-timeline" class="collapsed"> 743 { 744 if (MAX_TIMELINE_TASKS < tasks.size) { 745 <strong> 746 This stage has more than the maximum number of tasks that can be shown in the 747 visualization! Only the most recent {MAX_TIMELINE_TASKS} tasks 748 (of {tasks.size} total) are shown. 749 </strong> 750 } else { 751 Seq.empty 752 } 753 } 754 <div class="control-panel"> 755 <div id="task-assignment-timeline-zoom-lock"> 756 <input type="checkbox"></input> 757 <span>Enable zooming</span> 758 </div> 759 </div> 760 {TIMELINE_LEGEND} 761 </div> ++ 762 <script type="text/javascript"> 763 {Unparsed(s"drawTaskAssignmentTimeline(" + 764 s"$groupArrayStr, $executorsArrayStr, $minLaunchTime, $maxFinishTime, " + 765 s"${UIUtils.getTimeZoneOffset()})")} 766 </script> 767 } 768 769} 770 771private[ui] object StagePage { 772 private[ui] def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = { 773 if (info.gettingResult) { 774 if (info.finished) { 775 info.finishTime - info.gettingResultTime 776 } else { 777 // The task is still fetching the result. 778 currentTime - info.gettingResultTime 779 } 780 } else { 781 0L 782 } 783 } 784 785 private[ui] def getSchedulerDelay( 786 info: TaskInfo, metrics: TaskMetricsUIData, currentTime: Long): Long = { 787 if (info.finished) { 788 val totalExecutionTime = info.finishTime - info.launchTime 789 val executorOverhead = (metrics.executorDeserializeTime + 790 metrics.resultSerializationTime) 791 math.max( 792 0, 793 totalExecutionTime - metrics.executorRunTime - executorOverhead - 794 getGettingResultTime(info, currentTime)) 795 } else { 796 // The task is still running and the metrics like executorRunTime are not available. 797 0L 798 } 799 } 800} 801 802private[ui] case class TaskTableRowInputData(inputSortable: Long, inputReadable: String) 803 804private[ui] case class TaskTableRowOutputData(outputSortable: Long, outputReadable: String) 805 806private[ui] case class TaskTableRowShuffleReadData( 807 shuffleReadBlockedTimeSortable: Long, 808 shuffleReadBlockedTimeReadable: String, 809 shuffleReadSortable: Long, 810 shuffleReadReadable: String, 811 shuffleReadRemoteSortable: Long, 812 shuffleReadRemoteReadable: String) 813 814private[ui] case class TaskTableRowShuffleWriteData( 815 writeTimeSortable: Long, 816 writeTimeReadable: String, 817 shuffleWriteSortable: Long, 818 shuffleWriteReadable: String) 819 820private[ui] case class TaskTableRowBytesSpilledData( 821 memoryBytesSpilledSortable: Long, 822 memoryBytesSpilledReadable: String, 823 diskBytesSpilledSortable: Long, 824 diskBytesSpilledReadable: String) 825 826/** 827 * Contains all data that needs for sorting and generating HTML. Using this one rather than 828 * TaskUIData to avoid creating duplicate contents during sorting the data. 829 */ 830private[ui] class TaskTableRowData( 831 val index: Int, 832 val taskId: Long, 833 val attempt: Int, 834 val speculative: Boolean, 835 val status: String, 836 val taskLocality: String, 837 val executorIdAndHost: String, 838 val launchTime: Long, 839 val duration: Long, 840 val formatDuration: String, 841 val schedulerDelay: Long, 842 val taskDeserializationTime: Long, 843 val gcTime: Long, 844 val serializationTime: Long, 845 val gettingResultTime: Long, 846 val peakExecutionMemoryUsed: Long, 847 val accumulators: Option[String], // HTML 848 val input: Option[TaskTableRowInputData], 849 val output: Option[TaskTableRowOutputData], 850 val shuffleRead: Option[TaskTableRowShuffleReadData], 851 val shuffleWrite: Option[TaskTableRowShuffleWriteData], 852 val bytesSpilled: Option[TaskTableRowBytesSpilledData], 853 val error: String, 854 val logs: Map[String, String]) 855 856private[ui] class TaskDataSource( 857 tasks: Seq[TaskUIData], 858 hasAccumulators: Boolean, 859 hasInput: Boolean, 860 hasOutput: Boolean, 861 hasShuffleRead: Boolean, 862 hasShuffleWrite: Boolean, 863 hasBytesSpilled: Boolean, 864 currentTime: Long, 865 pageSize: Int, 866 sortColumn: String, 867 desc: Boolean, 868 executorsListener: ExecutorsListener) extends PagedDataSource[TaskTableRowData](pageSize) { 869 import StagePage._ 870 871 // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table 872 // so that we can avoid creating duplicate contents during sorting the data 873 private val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc)) 874 875 private var _slicedTaskIds: Set[Long] = null 876 877 override def dataSize: Int = data.size 878 879 override def sliceData(from: Int, to: Int): Seq[TaskTableRowData] = { 880 val r = data.slice(from, to) 881 _slicedTaskIds = r.map(_.taskId).toSet 882 r 883 } 884 885 def slicedTaskIds: Set[Long] = _slicedTaskIds 886 887 private def taskRow(taskData: TaskUIData): TaskTableRowData = { 888 val info = taskData.taskInfo 889 val metrics = taskData.metrics 890 val duration = if (info.status == "RUNNING") info.timeRunning(currentTime) 891 else metrics.map(_.executorRunTime).getOrElse(1L) 892 val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) 893 else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") 894 val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L) 895 val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) 896 val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) 897 val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) 898 val gettingResultTime = getGettingResultTime(info, currentTime) 899 900 val externalAccumulableReadable = info.accumulables 901 .filterNot(_.internal) 902 .flatMap { a => 903 (a.name, a.update) match { 904 case (Some(name), Some(update)) => Some(StringEscapeUtils.escapeHtml4(s"$name: $update")) 905 case _ => None 906 } 907 } 908 val peakExecutionMemoryUsed = metrics.map(_.peakExecutionMemory).getOrElse(0L) 909 910 val maybeInput = metrics.map(_.inputMetrics) 911 val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L) 912 val inputReadable = maybeInput 913 .map(m => s"${Utils.bytesToString(m.bytesRead)}") 914 .getOrElse("") 915 val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("") 916 917 val maybeOutput = metrics.map(_.outputMetrics) 918 val outputSortable = maybeOutput.map(_.bytesWritten).getOrElse(0L) 919 val outputReadable = maybeOutput 920 .map(m => s"${Utils.bytesToString(m.bytesWritten)}") 921 .getOrElse("") 922 val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("") 923 924 val maybeShuffleRead = metrics.map(_.shuffleReadMetrics) 925 val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime).getOrElse(0L) 926 val shuffleReadBlockedTimeReadable = 927 maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("") 928 929 val totalShuffleBytes = maybeShuffleRead.map(_.totalBytesRead) 930 val shuffleReadSortable = totalShuffleBytes.getOrElse(0L) 931 val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("") 932 val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("") 933 934 val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead) 935 val shuffleReadRemoteSortable = remoteShuffleBytes.getOrElse(0L) 936 val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("") 937 938 val maybeShuffleWrite = metrics.map(_.shuffleWriteMetrics) 939 val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L) 940 val shuffleWriteReadable = maybeShuffleWrite 941 .map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("") 942 val shuffleWriteRecords = maybeShuffleWrite 943 .map(_.recordsWritten.toString).getOrElse("") 944 945 val maybeWriteTime = metrics.map(_.shuffleWriteMetrics.writeTime) 946 val writeTimeSortable = maybeWriteTime.getOrElse(0L) 947 val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms => 948 if (ms == 0) "" else UIUtils.formatDuration(ms) 949 }.getOrElse("") 950 951 val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) 952 val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.getOrElse(0L) 953 val memoryBytesSpilledReadable = 954 maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") 955 956 val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) 957 val diskBytesSpilledSortable = maybeDiskBytesSpilled.getOrElse(0L) 958 val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") 959 960 val input = 961 if (hasInput) { 962 Some(TaskTableRowInputData(inputSortable, s"$inputReadable / $inputRecords")) 963 } else { 964 None 965 } 966 967 val output = 968 if (hasOutput) { 969 Some(TaskTableRowOutputData(outputSortable, s"$outputReadable / $outputRecords")) 970 } else { 971 None 972 } 973 974 val shuffleRead = 975 if (hasShuffleRead) { 976 Some(TaskTableRowShuffleReadData( 977 shuffleReadBlockedTimeSortable, 978 shuffleReadBlockedTimeReadable, 979 shuffleReadSortable, 980 s"$shuffleReadReadable / $shuffleReadRecords", 981 shuffleReadRemoteSortable, 982 shuffleReadRemoteReadable 983 )) 984 } else { 985 None 986 } 987 988 val shuffleWrite = 989 if (hasShuffleWrite) { 990 Some(TaskTableRowShuffleWriteData( 991 writeTimeSortable, 992 writeTimeReadable, 993 shuffleWriteSortable, 994 s"$shuffleWriteReadable / $shuffleWriteRecords" 995 )) 996 } else { 997 None 998 } 999 1000 val bytesSpilled = 1001 if (hasBytesSpilled) { 1002 Some(TaskTableRowBytesSpilledData( 1003 memoryBytesSpilledSortable, 1004 memoryBytesSpilledReadable, 1005 diskBytesSpilledSortable, 1006 diskBytesSpilledReadable 1007 )) 1008 } else { 1009 None 1010 } 1011 1012 val logs = executorsListener.executorToTaskSummary.get(info.executorId) 1013 .map(_.executorLogs).getOrElse(Map.empty) 1014 new TaskTableRowData( 1015 info.index, 1016 info.taskId, 1017 info.attemptNumber, 1018 info.speculative, 1019 info.status, 1020 info.taskLocality.toString, 1021 s"${info.executorId} / ${info.host}", 1022 info.launchTime, 1023 duration, 1024 formatDuration, 1025 schedulerDelay, 1026 taskDeserializationTime, 1027 gcTime, 1028 serializationTime, 1029 gettingResultTime, 1030 peakExecutionMemoryUsed, 1031 if (hasAccumulators) Some(externalAccumulableReadable.mkString("<br/>")) else None, 1032 input, 1033 output, 1034 shuffleRead, 1035 shuffleWrite, 1036 bytesSpilled, 1037 taskData.errorMessage.getOrElse(""), 1038 logs) 1039 } 1040 1041 /** 1042 * Return Ordering according to sortColumn and desc 1043 */ 1044 private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData] = { 1045 val ordering: Ordering[TaskTableRowData] = sortColumn match { 1046 case "Index" => Ordering.by(_.index) 1047 case "ID" => Ordering.by(_.taskId) 1048 case "Attempt" => Ordering.by(_.attempt) 1049 case "Status" => Ordering.by(_.status) 1050 case "Locality Level" => Ordering.by(_.taskLocality) 1051 case "Executor ID / Host" => Ordering.by(_.executorIdAndHost) 1052 case "Launch Time" => Ordering.by(_.launchTime) 1053 case "Duration" => Ordering.by(_.duration) 1054 case "Scheduler Delay" => Ordering.by(_.schedulerDelay) 1055 case "Task Deserialization Time" => Ordering.by(_.taskDeserializationTime) 1056 case "GC Time" => Ordering.by(_.gcTime) 1057 case "Result Serialization Time" => Ordering.by(_.serializationTime) 1058 case "Getting Result Time" => Ordering.by(_.gettingResultTime) 1059 case "Peak Execution Memory" => Ordering.by(_.peakExecutionMemoryUsed) 1060 case "Accumulators" => 1061 if (hasAccumulators) { 1062 Ordering.by(_.accumulators.get) 1063 } else { 1064 throw new IllegalArgumentException( 1065 "Cannot sort by Accumulators because of no accumulators") 1066 } 1067 case "Input Size / Records" => 1068 if (hasInput) { 1069 Ordering.by(_.input.get.inputSortable) 1070 } else { 1071 throw new IllegalArgumentException( 1072 "Cannot sort by Input Size / Records because of no inputs") 1073 } 1074 case "Output Size / Records" => 1075 if (hasOutput) { 1076 Ordering.by(_.output.get.outputSortable) 1077 } else { 1078 throw new IllegalArgumentException( 1079 "Cannot sort by Output Size / Records because of no outputs") 1080 } 1081 // ShuffleRead 1082 case "Shuffle Read Blocked Time" => 1083 if (hasShuffleRead) { 1084 Ordering.by(_.shuffleRead.get.shuffleReadBlockedTimeSortable) 1085 } else { 1086 throw new IllegalArgumentException( 1087 "Cannot sort by Shuffle Read Blocked Time because of no shuffle reads") 1088 } 1089 case "Shuffle Read Size / Records" => 1090 if (hasShuffleRead) { 1091 Ordering.by(_.shuffleRead.get.shuffleReadSortable) 1092 } else { 1093 throw new IllegalArgumentException( 1094 "Cannot sort by Shuffle Read Size / Records because of no shuffle reads") 1095 } 1096 case "Shuffle Remote Reads" => 1097 if (hasShuffleRead) { 1098 Ordering.by(_.shuffleRead.get.shuffleReadRemoteSortable) 1099 } else { 1100 throw new IllegalArgumentException( 1101 "Cannot sort by Shuffle Remote Reads because of no shuffle reads") 1102 } 1103 // ShuffleWrite 1104 case "Write Time" => 1105 if (hasShuffleWrite) { 1106 Ordering.by(_.shuffleWrite.get.writeTimeSortable) 1107 } else { 1108 throw new IllegalArgumentException( 1109 "Cannot sort by Write Time because of no shuffle writes") 1110 } 1111 case "Shuffle Write Size / Records" => 1112 if (hasShuffleWrite) { 1113 Ordering.by(_.shuffleWrite.get.shuffleWriteSortable) 1114 } else { 1115 throw new IllegalArgumentException( 1116 "Cannot sort by Shuffle Write Size / Records because of no shuffle writes") 1117 } 1118 // BytesSpilled 1119 case "Shuffle Spill (Memory)" => 1120 if (hasBytesSpilled) { 1121 Ordering.by(_.bytesSpilled.get.memoryBytesSpilledSortable) 1122 } else { 1123 throw new IllegalArgumentException( 1124 "Cannot sort by Shuffle Spill (Memory) because of no spills") 1125 } 1126 case "Shuffle Spill (Disk)" => 1127 if (hasBytesSpilled) { 1128 Ordering.by(_.bytesSpilled.get.diskBytesSpilledSortable) 1129 } else { 1130 throw new IllegalArgumentException( 1131 "Cannot sort by Shuffle Spill (Disk) because of no spills") 1132 } 1133 case "Errors" => Ordering.by(_.error) 1134 case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") 1135 } 1136 if (desc) { 1137 ordering.reverse 1138 } else { 1139 ordering 1140 } 1141 } 1142 1143} 1144 1145private[ui] class TaskPagedTable( 1146 conf: SparkConf, 1147 basePath: String, 1148 data: Seq[TaskUIData], 1149 hasAccumulators: Boolean, 1150 hasInput: Boolean, 1151 hasOutput: Boolean, 1152 hasShuffleRead: Boolean, 1153 hasShuffleWrite: Boolean, 1154 hasBytesSpilled: Boolean, 1155 currentTime: Long, 1156 pageSize: Int, 1157 sortColumn: String, 1158 desc: Boolean, 1159 executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] { 1160 1161 override def tableId: String = "task-table" 1162 1163 override def tableCssClass: String = 1164 "table table-bordered table-condensed table-striped table-head-clickable" 1165 1166 override def pageSizeFormField: String = "task.pageSize" 1167 1168 override def prevPageSizeFormField: String = "task.prevPageSize" 1169 1170 override def pageNumberFormField: String = "task.page" 1171 1172 override val dataSource: TaskDataSource = new TaskDataSource( 1173 data, 1174 hasAccumulators, 1175 hasInput, 1176 hasOutput, 1177 hasShuffleRead, 1178 hasShuffleWrite, 1179 hasBytesSpilled, 1180 currentTime, 1181 pageSize, 1182 sortColumn, 1183 desc, 1184 executorsListener) 1185 1186 override def pageLink(page: Int): String = { 1187 val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") 1188 basePath + 1189 s"&$pageNumberFormField=$page" + 1190 s"&task.sort=$encodedSortColumn" + 1191 s"&task.desc=$desc" + 1192 s"&$pageSizeFormField=$pageSize" 1193 } 1194 1195 override def goButtonFormPath: String = { 1196 val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") 1197 s"$basePath&task.sort=$encodedSortColumn&task.desc=$desc" 1198 } 1199 1200 def headers: Seq[Node] = { 1201 val taskHeadersAndCssClasses: Seq[(String, String)] = 1202 Seq( 1203 ("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""), 1204 ("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""), 1205 ("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY), 1206 ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME), 1207 ("GC Time", ""), 1208 ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), 1209 ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME), 1210 ("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++ 1211 {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ 1212 {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++ 1213 {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++ 1214 {if (hasShuffleRead) { 1215 Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME), 1216 ("Shuffle Read Size / Records", ""), 1217 ("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE)) 1218 } else { 1219 Nil 1220 }} ++ 1221 {if (hasShuffleWrite) { 1222 Seq(("Write Time", ""), ("Shuffle Write Size / Records", "")) 1223 } else { 1224 Nil 1225 }} ++ 1226 {if (hasBytesSpilled) { 1227 Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) 1228 } else { 1229 Nil 1230 }} ++ 1231 Seq(("Errors", "")) 1232 1233 if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) { 1234 throw new IllegalArgumentException(s"Unknown column: $sortColumn") 1235 } 1236 1237 val headerRow: Seq[Node] = { 1238 taskHeadersAndCssClasses.map { case (header, cssClass) => 1239 if (header == sortColumn) { 1240 val headerLink = Unparsed( 1241 basePath + 1242 s"&task.sort=${URLEncoder.encode(header, "UTF-8")}" + 1243 s"&task.desc=${!desc}" + 1244 s"&task.pageSize=$pageSize") 1245 val arrow = if (desc) "▾" else "▴" // UP or DOWN 1246 <th class={cssClass}> 1247 <a href={headerLink}> 1248 {header} 1249 <span> {Unparsed(arrow)}</span> 1250 </a> 1251 </th> 1252 } else { 1253 val headerLink = Unparsed( 1254 basePath + 1255 s"&task.sort=${URLEncoder.encode(header, "UTF-8")}" + 1256 s"&task.pageSize=$pageSize") 1257 <th class={cssClass}> 1258 <a href={headerLink}> 1259 {header} 1260 </a> 1261 </th> 1262 } 1263 } 1264 } 1265 <thead>{headerRow}</thead> 1266 } 1267 1268 def row(task: TaskTableRowData): Seq[Node] = { 1269 <tr> 1270 <td>{task.index}</td> 1271 <td>{task.taskId}</td> 1272 <td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td> 1273 <td>{task.status}</td> 1274 <td>{task.taskLocality}</td> 1275 <td> 1276 <div style="float: left">{task.executorIdAndHost}</div> 1277 <div style="float: right"> 1278 { 1279 task.logs.map { 1280 case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div> 1281 } 1282 } 1283 </div> 1284 </td> 1285 <td>{UIUtils.formatDate(new Date(task.launchTime))}</td> 1286 <td>{task.formatDuration}</td> 1287 <td class={TaskDetailsClassNames.SCHEDULER_DELAY}> 1288 {UIUtils.formatDuration(task.schedulerDelay)} 1289 </td> 1290 <td class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}> 1291 {UIUtils.formatDuration(task.taskDeserializationTime)} 1292 </td> 1293 <td> 1294 {if (task.gcTime > 0) UIUtils.formatDuration(task.gcTime) else ""} 1295 </td> 1296 <td class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}> 1297 {UIUtils.formatDuration(task.serializationTime)} 1298 </td> 1299 <td class={TaskDetailsClassNames.GETTING_RESULT_TIME}> 1300 {UIUtils.formatDuration(task.gettingResultTime)} 1301 </td> 1302 <td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}> 1303 {Utils.bytesToString(task.peakExecutionMemoryUsed)} 1304 </td> 1305 {if (task.accumulators.nonEmpty) { 1306 <td>{Unparsed(task.accumulators.get)}</td> 1307 }} 1308 {if (task.input.nonEmpty) { 1309 <td>{task.input.get.inputReadable}</td> 1310 }} 1311 {if (task.output.nonEmpty) { 1312 <td>{task.output.get.outputReadable}</td> 1313 }} 1314 {if (task.shuffleRead.nonEmpty) { 1315 <td class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}> 1316 {task.shuffleRead.get.shuffleReadBlockedTimeReadable} 1317 </td> 1318 <td>{task.shuffleRead.get.shuffleReadReadable}</td> 1319 <td class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}> 1320 {task.shuffleRead.get.shuffleReadRemoteReadable} 1321 </td> 1322 }} 1323 {if (task.shuffleWrite.nonEmpty) { 1324 <td>{task.shuffleWrite.get.writeTimeReadable}</td> 1325 <td>{task.shuffleWrite.get.shuffleWriteReadable}</td> 1326 }} 1327 {if (task.bytesSpilled.nonEmpty) { 1328 <td>{task.bytesSpilled.get.memoryBytesSpilledReadable}</td> 1329 <td>{task.bytesSpilled.get.diskBytesSpilledReadable}</td> 1330 }} 1331 {errorMessageCell(task.error)} 1332 </tr> 1333 } 1334 1335 private def errorMessageCell(error: String): Seq[Node] = { 1336 val isMultiline = error.indexOf('\n') >= 0 1337 // Display the first line by default 1338 val errorSummary = StringEscapeUtils.escapeHtml4( 1339 if (isMultiline) { 1340 error.substring(0, error.indexOf('\n')) 1341 } else { 1342 error 1343 }) 1344 val details = if (isMultiline) { 1345 // scalastyle:off 1346 <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" 1347 class="expand-details"> 1348 +details 1349 </span> ++ 1350 <div class="stacktrace-details collapsed"> 1351 <pre>{error}</pre> 1352 </div> 1353 // scalastyle:on 1354 } else { 1355 "" 1356 } 1357 <td>{errorSummary}{details}</td> 1358 } 1359} 1360