1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.ui.jobs 19 20import java.net.URLEncoder 21import java.util.Date 22import javax.servlet.http.HttpServletRequest 23 24import scala.collection.JavaConverters._ 25import scala.xml._ 26 27import org.apache.commons.lang3.StringEscapeUtils 28 29import org.apache.spark.scheduler.StageInfo 30import org.apache.spark.ui._ 31import org.apache.spark.ui.jobs.UIData.StageUIData 32import org.apache.spark.util.Utils 33 34private[ui] class StageTableBase( 35 request: HttpServletRequest, 36 stages: Seq[StageInfo], 37 tableHeaderID: String, 38 stageTag: String, 39 basePath: String, 40 subPath: String, 41 progressListener: JobProgressListener, 42 isFairScheduler: Boolean, 43 killEnabled: Boolean, 44 isFailedStage: Boolean) { 45 val allParameters = request.getParameterMap().asScala.toMap 46 val parameterOtherTable = allParameters.filterNot(_._1.startsWith(stageTag)) 47 .map(para => para._1 + "=" + para._2(0)) 48 49 val parameterStagePage = request.getParameter(stageTag + ".page") 50 val parameterStageSortColumn = request.getParameter(stageTag + ".sort") 51 val parameterStageSortDesc = request.getParameter(stageTag + ".desc") 52 val parameterStagePageSize = request.getParameter(stageTag + ".pageSize") 53 val parameterStagePrevPageSize = request.getParameter(stageTag + ".prevPageSize") 54 55 val stagePage = Option(parameterStagePage).map(_.toInt).getOrElse(1) 56 val stageSortColumn = Option(parameterStageSortColumn).map { sortColumn => 57 UIUtils.decodeURLParameter(sortColumn) 58 }.getOrElse("Stage Id") 59 val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse( 60 // New stages should be shown above old jobs by default. 61 if (stageSortColumn == "Stage Id") true else false 62 ) 63 val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100) 64 val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt) 65 .getOrElse(stagePageSize) 66 67 val page: Int = { 68 // If the user has changed to a larger page size, then go to page 1 in order to avoid 69 // IndexOutOfBoundsException. 70 if (stagePageSize <= stagePrevPageSize) { 71 stagePage 72 } else { 73 1 74 } 75 } 76 val currentTime = System.currentTimeMillis() 77 78 val toNodeSeq = try { 79 new StagePagedTable( 80 stages, 81 tableHeaderID, 82 stageTag, 83 basePath, 84 subPath, 85 progressListener, 86 isFairScheduler, 87 killEnabled, 88 currentTime, 89 stagePageSize, 90 stageSortColumn, 91 stageSortDesc, 92 isFailedStage, 93 parameterOtherTable 94 ).table(page) 95 } catch { 96 case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => 97 <div class="alert alert-error"> 98 <p>Error while rendering stage table:</p> 99 <pre> 100 {Utils.exceptionString(e)} 101 </pre> 102 </div> 103 } 104} 105 106private[ui] class StageTableRowData( 107 val stageInfo: StageInfo, 108 val stageData: Option[StageUIData], 109 val stageId: Int, 110 val attemptId: Int, 111 val schedulingPool: String, 112 val descriptionOption: Option[String], 113 val submissionTime: Long, 114 val formattedSubmissionTime: String, 115 val duration: Long, 116 val formattedDuration: String, 117 val inputRead: Long, 118 val inputReadWithUnit: String, 119 val outputWrite: Long, 120 val outputWriteWithUnit: String, 121 val shuffleRead: Long, 122 val shuffleReadWithUnit: String, 123 val shuffleWrite: Long, 124 val shuffleWriteWithUnit: String) 125 126private[ui] class MissingStageTableRowData( 127 stageInfo: StageInfo, 128 stageId: Int, 129 attemptId: Int) extends StageTableRowData( 130 stageInfo, None, stageId, attemptId, "", None, 0, "", -1, "", 0, "", 0, "", 0, "", 0, "") 131 132/** Page showing list of all ongoing and recently finished stages */ 133private[ui] class StagePagedTable( 134 stages: Seq[StageInfo], 135 tableHeaderId: String, 136 stageTag: String, 137 basePath: String, 138 subPath: String, 139 listener: JobProgressListener, 140 isFairScheduler: Boolean, 141 killEnabled: Boolean, 142 currentTime: Long, 143 pageSize: Int, 144 sortColumn: String, 145 desc: Boolean, 146 isFailedStage: Boolean, 147 parameterOtherTable: Iterable[String]) extends PagedTable[StageTableRowData] { 148 149 override def tableId: String = stageTag + "-table" 150 151 override def tableCssClass: String = 152 "table table-bordered table-condensed table-striped " + 153 "table-head-clickable table-cell-width-limited" 154 155 override def pageSizeFormField: String = stageTag + ".pageSize" 156 157 override def prevPageSizeFormField: String = stageTag + ".prevPageSize" 158 159 override def pageNumberFormField: String = stageTag + ".page" 160 161 val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" + 162 parameterOtherTable.mkString("&") 163 164 override val dataSource = new StageDataSource( 165 stages, 166 listener, 167 currentTime, 168 pageSize, 169 sortColumn, 170 desc 171 ) 172 173 override def pageLink(page: Int): String = { 174 val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") 175 parameterPath + 176 s"&$pageNumberFormField=$page" + 177 s"&$stageTag.sort=$encodedSortColumn" + 178 s"&$stageTag.desc=$desc" + 179 s"&$pageSizeFormField=$pageSize" + 180 s"#$tableHeaderId" 181 } 182 183 override def goButtonFormPath: String = { 184 val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") 185 s"$parameterPath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc#$tableHeaderId" 186 } 187 188 override def headers: Seq[Node] = { 189 // stageHeadersAndCssClasses has three parts: header title, tooltip information, and sortable. 190 // The tooltip information could be None, which indicates it does not have a tooltip. 191 // Otherwise, it has two parts: tooltip text, and position (true for left, false for default). 192 val stageHeadersAndCssClasses: Seq[(String, Option[(String, Boolean)], Boolean)] = 193 Seq(("Stage Id", None, true)) ++ 194 {if (isFairScheduler) {Seq(("Pool Name", None, true))} else Seq.empty} ++ 195 Seq( 196 ("Description", None, true), ("Submitted", None, true), ("Duration", None, true), 197 ("Tasks: Succeeded/Total", None, false), 198 ("Input", Some((ToolTips.INPUT, false)), true), 199 ("Output", Some((ToolTips.OUTPUT, false)), true), 200 ("Shuffle Read", Some((ToolTips.SHUFFLE_READ, false)), true), 201 ("Shuffle Write", Some((ToolTips.SHUFFLE_WRITE, true)), true) 202 ) ++ 203 {if (isFailedStage) {Seq(("Failure Reason", None, false))} else Seq.empty} 204 205 if (!stageHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) { 206 throw new IllegalArgumentException(s"Unknown column: $sortColumn") 207 } 208 209 val headerRow: Seq[Node] = { 210 stageHeadersAndCssClasses.map { case (header, tooltip, sortable) => 211 val headerSpan = tooltip.map { case (title, left) => 212 if (left) { 213 /* Place the shuffle write tooltip on the left (rather than the default position 214 of on top) because the shuffle write column is the last column on the right side and 215 the tooltip is wider than the column, so it doesn't fit on top. */ 216 <span data-toggle="tooltip" data-placement="left" title={title}> 217 {header} 218 </span> 219 } else { 220 <span data-toggle="tooltip" title={title}> 221 {header} 222 </span> 223 } 224 }.getOrElse( 225 {header} 226 ) 227 228 if (header == sortColumn) { 229 val headerLink = Unparsed( 230 parameterPath + 231 s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + 232 s"&$stageTag.desc=${!desc}" + 233 s"&$stageTag.pageSize=$pageSize") + 234 s"#$tableHeaderId" 235 val arrow = if (desc) "▾" else "▴" // UP or DOWN 236 237 <th> 238 <a href={headerLink}> 239 {headerSpan}<span> 240 {Unparsed(arrow)} 241 </span> 242 </a> 243 </th> 244 } else { 245 if (sortable) { 246 val headerLink = Unparsed( 247 parameterPath + 248 s"&$stageTag.sort=${URLEncoder.encode(header, "UTF-8")}" + 249 s"&$stageTag.pageSize=$pageSize") + 250 s"#$tableHeaderId" 251 252 <th> 253 <a href={headerLink}> 254 {headerSpan} 255 </a> 256 </th> 257 } else { 258 <th> 259 {headerSpan} 260 </th> 261 } 262 } 263 } 264 } 265 <thead>{headerRow}</thead> 266 } 267 268 override def row(data: StageTableRowData): Seq[Node] = { 269 <tr id={"stage-" + data.stageId + "-" + data.attemptId}> 270 {rowContent(data)} 271 </tr> 272 } 273 274 private def rowContent(data: StageTableRowData): Seq[Node] = { 275 data.stageData match { 276 case None => missingStageRow(data.stageId) 277 case Some(stageData) => 278 val info = data.stageInfo 279 280 {if (data.attemptId > 0) { 281 <td>{data.stageId} (retry {data.attemptId})</td> 282 } else { 283 <td>{data.stageId}</td> 284 }} ++ 285 {if (isFairScheduler) { 286 <td> 287 <a href={"%s/stages/pool?poolname=%s" 288 .format(UIUtils.prependBaseUri(basePath), data.schedulingPool)}> 289 {data.schedulingPool} 290 </a> 291 </td> 292 } else { 293 Seq.empty 294 }} ++ 295 <td>{makeDescription(info, data.descriptionOption)}</td> 296 <td valign="middle"> 297 {data.formattedSubmissionTime} 298 </td> 299 <td>{data.formattedDuration}</td> 300 <td class="progress-cell"> 301 {UIUtils.makeProgressBar(started = stageData.numActiveTasks, 302 completed = stageData.completedIndices.size, failed = stageData.numFailedTasks, 303 skipped = 0, killed = stageData.numKilledTasks, total = info.numTasks)} 304 </td> 305 <td>{data.inputReadWithUnit}</td> 306 <td>{data.outputWriteWithUnit}</td> 307 <td>{data.shuffleReadWithUnit}</td> 308 <td>{data.shuffleWriteWithUnit}</td> ++ 309 { 310 if (isFailedStage) { 311 failureReasonHtml(info) 312 } else { 313 Seq.empty 314 } 315 } 316 } 317 } 318 319 private def failureReasonHtml(s: StageInfo): Seq[Node] = { 320 val failureReason = s.failureReason.getOrElse("") 321 val isMultiline = failureReason.indexOf('\n') >= 0 322 // Display the first line by default 323 val failureReasonSummary = StringEscapeUtils.escapeHtml4( 324 if (isMultiline) { 325 failureReason.substring(0, failureReason.indexOf('\n')) 326 } else { 327 failureReason 328 }) 329 val details = if (isMultiline) { 330 // scalastyle:off 331 <span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')" 332 class="expand-details"> 333 +details 334 </span> ++ 335 <div class="stacktrace-details collapsed"> 336 <pre>{failureReason}</pre> 337 </div> 338 // scalastyle:on 339 } else { 340 "" 341 } 342 <td valign="middle">{failureReasonSummary}{details}</td> 343 } 344 345 private def makeDescription(s: StageInfo, descriptionOption: Option[String]): Seq[Node] = { 346 val basePathUri = UIUtils.prependBaseUri(basePath) 347 348 val killLink = if (killEnabled) { 349 val confirm = 350 s"if (window.confirm('Are you sure you want to kill stage ${s.stageId} ?')) " + 351 "{ this.parentNode.submit(); return true; } else { return false; }" 352 // SPARK-6846 this should be POST-only but YARN AM won't proxy POST 353 /* 354 val killLinkUri = s"$basePathUri/stages/stage/kill/" 355 <form action={killLinkUri} method="POST" style="display:inline"> 356 <input type="hidden" name="id" value={s.stageId.toString}/> 357 <a href="#" onclick={confirm} class="kill-link">(kill)</a> 358 </form> 359 */ 360 val killLinkUri = s"$basePathUri/stages/stage/kill/?id=${s.stageId}" 361 <a href={killLinkUri} onclick={confirm} class="kill-link">(kill)</a> 362 } else { 363 Seq.empty 364 } 365 366 val nameLinkUri = s"$basePathUri/stages/stage?id=${s.stageId}&attempt=${s.attemptId}" 367 val nameLink = <a href={nameLinkUri} class="name-link">{s.name}</a> 368 369 val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0) 370 val details = if (s.details.nonEmpty) { 371 <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')" 372 class="expand-details"> 373 +details 374 </span> ++ 375 <div class="stage-details collapsed"> 376 {if (cachedRddInfos.nonEmpty) { 377 Text("RDD: ") ++ 378 cachedRddInfos.map { i => 379 <a href={s"$basePathUri/storage/rdd?id=${i.id}"}>{i.name}</a> 380 } 381 }} 382 <pre>{s.details}</pre> 383 </div> 384 } 385 386 val stageDesc = descriptionOption.map(UIUtils.makeDescription(_, basePathUri)) 387 <div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div> 388 } 389 390 protected def missingStageRow(stageId: Int): Seq[Node] = { 391 <td>{stageId}</td> ++ 392 {if (isFairScheduler) {<td>-</td>} else Seq.empty} ++ 393 <td>No data available for this stage</td> ++ // Description 394 <td></td> ++ // Submitted 395 <td></td> ++ // Duration 396 <td></td> ++ // Tasks: Succeeded/Total 397 <td></td> ++ // Input 398 <td></td> ++ // Output 399 <td></td> ++ // Shuffle Read 400 <td></td> // Shuffle Write 401 } 402} 403 404private[ui] class StageDataSource( 405 stages: Seq[StageInfo], 406 listener: JobProgressListener, 407 currentTime: Long, 408 pageSize: Int, 409 sortColumn: String, 410 desc: Boolean) extends PagedDataSource[StageTableRowData](pageSize) { 411 // Convert StageInfo to StageTableRowData which contains the final contents to show in the table 412 // so that we can avoid creating duplicate contents during sorting the data 413 private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc)) 414 415 private var _slicedStageIds: Set[Int] = null 416 417 override def dataSize: Int = data.size 418 419 override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = { 420 val r = data.slice(from, to) 421 _slicedStageIds = r.map(_.stageId).toSet 422 r 423 } 424 425 private def stageRow(s: StageInfo): StageTableRowData = { 426 val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId)) 427 428 if (stageDataOption.isEmpty) { 429 return new MissingStageTableRowData(s, s.stageId, s.attemptId) 430 } 431 val stageData = stageDataOption.get 432 433 val description = stageData.description 434 435 val formattedSubmissionTime = s.submissionTime match { 436 case Some(t) => UIUtils.formatDate(new Date(t)) 437 case None => "Unknown" 438 } 439 val finishTime = s.completionTime.getOrElse(currentTime) 440 441 // The submission time for a stage is misleading because it counts the time 442 // the stage waits to be launched. (SPARK-10930) 443 val taskLaunchTimes = 444 stageData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0) 445 val duration: Option[Long] = 446 if (taskLaunchTimes.nonEmpty) { 447 val startTime = taskLaunchTimes.min 448 if (finishTime > startTime) { 449 Some(finishTime - startTime) 450 } else { 451 Some(currentTime - startTime) 452 } 453 } else { 454 None 455 } 456 val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") 457 458 val inputRead = stageData.inputBytes 459 val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else "" 460 val outputWrite = stageData.outputBytes 461 val outputWriteWithUnit = if (outputWrite > 0) Utils.bytesToString(outputWrite) else "" 462 val shuffleRead = stageData.shuffleReadTotalBytes 463 val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else "" 464 val shuffleWrite = stageData.shuffleWriteBytes 465 val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" 466 467 468 new StageTableRowData( 469 s, 470 stageDataOption, 471 s.stageId, 472 s.attemptId, 473 stageData.schedulingPool, 474 description, 475 s.submissionTime.getOrElse(0), 476 formattedSubmissionTime, 477 duration.getOrElse(-1), 478 formattedDuration, 479 inputRead, 480 inputReadWithUnit, 481 outputWrite, 482 outputWriteWithUnit, 483 shuffleRead, 484 shuffleReadWithUnit, 485 shuffleWrite, 486 shuffleWriteWithUnit 487 ) 488 } 489 490 /** 491 * Return Ordering according to sortColumn and desc 492 */ 493 private def ordering(sortColumn: String, desc: Boolean): Ordering[StageTableRowData] = { 494 val ordering: Ordering[StageTableRowData] = sortColumn match { 495 case "Stage Id" => Ordering.by(_.stageId) 496 case "Pool Name" => Ordering.by(_.schedulingPool) 497 case "Description" => Ordering.by(x => (x.descriptionOption, x.stageInfo.name)) 498 case "Submitted" => Ordering.by(_.submissionTime) 499 case "Duration" => Ordering.by(_.duration) 500 case "Input" => Ordering.by(_.inputRead) 501 case "Output" => Ordering.by(_.outputWrite) 502 case "Shuffle Read" => Ordering.by(_.shuffleRead) 503 case "Shuffle Write" => Ordering.by(_.shuffleWrite) 504 case "Tasks: Succeeded/Total" => 505 throw new IllegalArgumentException(s"Unsortable column: $sortColumn") 506 case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") 507 } 508 if (desc) { 509 ordering.reverse 510 } else { 511 ordering 512 } 513 } 514} 515 516