1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.ui.jobs
19
20import java.util.Date
21import javax.servlet.http.HttpServletRequest
22
23import scala.collection.mutable.{Buffer, ListBuffer}
24import scala.xml.{Node, NodeSeq, Unparsed, Utility}
25
26import org.apache.commons.lang3.StringEscapeUtils
27
28import org.apache.spark.JobExecutionStatus
29import org.apache.spark.scheduler._
30import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
31
32/** Page showing statistics and stage list for a given job */
33private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
34
35  private val STAGES_LEGEND =
36    <div class="legend-area"><svg width="150px" height="85px">
37      <rect class="completed-stage-legend"
38        x="5px" y="5px" width="20px" height="15px" rx="2px" ry="2px"></rect>
39      <text x="35px" y="17px">Completed</text>
40      <rect class="failed-stage-legend"
41        x="5px" y="30px" width="20px" height="15px" rx="2px" ry="2px"></rect>
42      <text x="35px" y="42px">Failed</text>
43      <rect class="active-stage-legend"
44        x="5px" y="55px" width="20px" height="15px" rx="2px" ry="2px"></rect>
45      <text x="35px" y="67px">Active</text>
46    </svg></div>.toString.filter(_ != '\n')
47
48  private val EXECUTORS_LEGEND =
49    <div class="legend-area"><svg width="150px" height="55px">
50      <rect class="executor-added-legend"
51        x="5px" y="5px" width="20px" height="15px" rx="2px" ry="2px"></rect>
52      <text x="35px" y="17px">Added</text>
53      <rect class="executor-removed-legend"
54        x="5px" y="30px" width="20px" height="15px" rx="2px" ry="2px"></rect>
55      <text x="35px" y="42px">Removed</text>
56    </svg></div>.toString.filter(_ != '\n')
57
58  private def makeStageEvent(stageInfos: Seq[StageInfo]): Seq[String] = {
59    stageInfos.map { stage =>
60      val stageId = stage.stageId
61      val attemptId = stage.attemptId
62      val name = stage.name
63      val status = stage.getStatusString
64      val submissionTime = stage.submissionTime.get
65      val completionTime = stage.completionTime.getOrElse(System.currentTimeMillis())
66
67      // The timeline library treats contents as HTML, so we have to escape them. We need to add
68      // extra layers of escaping in order to embed this in a Javascript string literal.
69      val escapedName = Utility.escape(name)
70      val jsEscapedName = StringEscapeUtils.escapeEcmaScript(escapedName)
71      s"""
72         |{
73         |  'className': 'stage job-timeline-object ${status}',
74         |  'group': 'stages',
75         |  'start': new Date(${submissionTime}),
76         |  'end': new Date(${completionTime}),
77         |  'content': '<div class="job-timeline-content" data-toggle="tooltip"' +
78         |   'data-placement="top" data-html="true"' +
79         |   'data-title="${jsEscapedName} (Stage ${stageId}.${attemptId})<br>' +
80         |   'Status: ${status.toUpperCase}<br>' +
81         |   'Submitted: ${UIUtils.formatDate(new Date(submissionTime))}' +
82         |   '${
83                 if (status != "running") {
84                   s"""<br>Completed: ${UIUtils.formatDate(new Date(completionTime))}"""
85                 } else {
86                   ""
87                 }
88              }">' +
89         |    '${jsEscapedName} (Stage ${stageId}.${attemptId})</div>',
90         |}
91       """.stripMargin
92    }
93  }
94
95  def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]): Seq[String] = {
96    val events = ListBuffer[String]()
97    executorUIDatas.foreach {
98      case a: SparkListenerExecutorAdded =>
99        val addedEvent =
100          s"""
101             |{
102             |  'className': 'executor added',
103             |  'group': 'executors',
104             |  'start': new Date(${a.time}),
105             |  'content': '<div class="executor-event-content"' +
106             |    'data-toggle="tooltip" data-placement="bottom"' +
107             |    'data-title="Executor ${a.executorId}<br>' +
108             |    'Added at ${UIUtils.formatDate(new Date(a.time))}"' +
109             |    'data-html="true">Executor ${a.executorId} added</div>'
110             |}
111           """.stripMargin
112        events += addedEvent
113
114      case e: SparkListenerExecutorRemoved =>
115        val removedEvent =
116          s"""
117             |{
118             |  'className': 'executor removed',
119             |  'group': 'executors',
120             |  'start': new Date(${e.time}),
121             |  'content': '<div class="executor-event-content"' +
122             |    'data-toggle="tooltip" data-placement="bottom"' +
123             |    'data-title="Executor ${e.executorId}<br>' +
124             |    'Removed at ${UIUtils.formatDate(new Date(e.time))}' +
125             |    '${
126                      if (e.reason != null) {
127                        s"""<br>Reason: ${e.reason.replace("\n", " ")}"""
128                      } else {
129                        ""
130                      }
131                   }"' +
132             |    'data-html="true">Executor ${e.executorId} removed</div>'
133             |}
134           """.stripMargin
135          events += removedEvent
136
137    }
138    events.toSeq
139  }
140
141  private def makeTimeline(
142      stages: Seq[StageInfo],
143      executors: Seq[SparkListenerEvent],
144      appStartTime: Long): Seq[Node] = {
145
146    val stageEventJsonAsStrSeq = makeStageEvent(stages)
147    val executorsJsonAsStrSeq = makeExecutorEvent(executors)
148
149    val groupJsonArrayAsStr =
150      s"""
151          |[
152          |  {
153          |    'id': 'executors',
154          |    'content': '<div>Executors</div>${EXECUTORS_LEGEND}',
155          |  },
156          |  {
157          |    'id': 'stages',
158          |    'content': '<div>Stages</div>${STAGES_LEGEND}',
159          |  }
160          |]
161        """.stripMargin
162
163    val eventArrayAsStr =
164      (stageEventJsonAsStrSeq ++ executorsJsonAsStrSeq).mkString("[", ",", "]")
165
166    <span class="expand-job-timeline">
167      <span class="expand-job-timeline-arrow arrow-closed"></span>
168      <a data-toggle="tooltip" title={ToolTips.STAGE_TIMELINE} data-placement="right">
169        Event Timeline
170      </a>
171    </span> ++
172    <div id="job-timeline" class="collapsed">
173      <div class="control-panel">
174        <div id="job-timeline-zoom-lock">
175          <input type="checkbox"></input>
176          <span>Enable zooming</span>
177        </div>
178      </div>
179    </div> ++
180    <script type="text/javascript">
181      {Unparsed(s"drawJobTimeline(${groupJsonArrayAsStr}, ${eventArrayAsStr}, " +
182      s"${appStartTime}, ${UIUtils.getTimeZoneOffset()});")}
183    </script>
184  }
185
186  def render(request: HttpServletRequest): Seq[Node] = {
187    val listener = parent.jobProgresslistener
188
189    listener.synchronized {
190      val parameterId = request.getParameter("id")
191      require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
192
193      val jobId = parameterId.toInt
194      val jobDataOption = listener.jobIdToData.get(jobId)
195      if (jobDataOption.isEmpty) {
196        val content =
197          <div id="no-info">
198            <p>No information to display for job {jobId}</p>
199          </div>
200        return UIUtils.headerSparkPage(
201          s"Details for Job $jobId", content, parent)
202      }
203      val jobData = jobDataOption.get
204      val isComplete = jobData.status != JobExecutionStatus.RUNNING
205      val stages = jobData.stageIds.map { stageId =>
206        // This could be empty if the JobProgressListener hasn't received information about the
207        // stage or if the stage information has been garbage collected
208        listener.stageIdToInfo.getOrElse(stageId,
209          new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
210      }
211
212      val activeStages = Buffer[StageInfo]()
213      val completedStages = Buffer[StageInfo]()
214      // If the job is completed, then any pending stages are displayed as "skipped":
215      val pendingOrSkippedStages = Buffer[StageInfo]()
216      val failedStages = Buffer[StageInfo]()
217      for (stage <- stages) {
218        if (stage.submissionTime.isEmpty) {
219          pendingOrSkippedStages += stage
220        } else if (stage.completionTime.isDefined) {
221          if (stage.failureReason.isDefined) {
222            failedStages += stage
223          } else {
224            completedStages += stage
225          }
226        } else {
227          activeStages += stage
228        }
229      }
230
231      val basePath = "jobs/job"
232
233      val pendingOrSkippedTableId =
234        if (isComplete) {
235          "pending"
236        } else {
237          "skipped"
238        }
239
240      val activeStagesTable =
241        new StageTableBase(request, activeStages, "active", "activeStage", parent.basePath,
242          basePath, parent.jobProgresslistener, parent.isFairScheduler,
243          killEnabled = parent.killEnabled, isFailedStage = false)
244      val pendingOrSkippedStagesTable =
245        new StageTableBase(request, pendingOrSkippedStages, pendingOrSkippedTableId, "pendingStage",
246          parent.basePath, basePath, parent.jobProgresslistener, parent.isFairScheduler,
247          killEnabled = false, isFailedStage = false)
248      val completedStagesTable =
249        new StageTableBase(request, completedStages, "completed", "completedStage", parent.basePath,
250          basePath, parent.jobProgresslistener, parent.isFairScheduler,
251          killEnabled = false, isFailedStage = false)
252      val failedStagesTable =
253        new StageTableBase(request, failedStages, "failed", "failedStage", parent.basePath,
254          basePath, parent.jobProgresslistener, parent.isFairScheduler,
255          killEnabled = false, isFailedStage = true)
256
257      val shouldShowActiveStages = activeStages.nonEmpty
258      val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty
259      val shouldShowCompletedStages = completedStages.nonEmpty
260      val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty
261      val shouldShowFailedStages = failedStages.nonEmpty
262
263      val summary: NodeSeq =
264        <div>
265          <ul class="unstyled">
266            <li>
267              <Strong>Status:</Strong>
268              {jobData.status}
269            </li>
270            {
271              if (jobData.jobGroup.isDefined) {
272                <li>
273                  <strong>Job Group:</strong>
274                  {jobData.jobGroup.get}
275                </li>
276              }
277            }
278            {
279              if (shouldShowActiveStages) {
280                <li>
281                  <a href="#active"><strong>Active Stages:</strong></a>
282                  {activeStages.size}
283                </li>
284              }
285            }
286            {
287              if (shouldShowPendingStages) {
288                <li>
289                  <a href="#pending">
290                    <strong>Pending Stages:</strong>
291                  </a>{pendingOrSkippedStages.size}
292                </li>
293              }
294            }
295            {
296              if (shouldShowCompletedStages) {
297                <li>
298                  <a href="#completed"><strong>Completed Stages:</strong></a>
299                  {completedStages.size}
300                </li>
301              }
302            }
303            {
304              if (shouldShowSkippedStages) {
305              <li>
306                <a href="#skipped"><strong>Skipped Stages:</strong></a>
307                {pendingOrSkippedStages.size}
308              </li>
309            }
310            }
311            {
312              if (shouldShowFailedStages) {
313                <li>
314                  <a href="#failed"><strong>Failed Stages:</strong></a>
315                  {failedStages.size}
316                </li>
317              }
318            }
319          </ul>
320        </div>
321
322      var content = summary
323      val appStartTime = listener.startTime
324      val executorListener = parent.executorListener
325      val operationGraphListener = parent.operationGraphListener
326
327      content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
328          executorListener.executorEvents, appStartTime)
329
330      content ++= UIUtils.showDagVizForJob(
331        jobId, operationGraphListener.getOperationGraphForJob(jobId))
332
333      if (shouldShowActiveStages) {
334        content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
335          activeStagesTable.toNodeSeq
336      }
337      if (shouldShowPendingStages) {
338        content ++= <h4 id="pending">Pending Stages ({pendingOrSkippedStages.size})</h4> ++
339          pendingOrSkippedStagesTable.toNodeSeq
340      }
341      if (shouldShowCompletedStages) {
342        content ++= <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
343          completedStagesTable.toNodeSeq
344      }
345      if (shouldShowSkippedStages) {
346        content ++= <h4 id="skipped">Skipped Stages ({pendingOrSkippedStages.size})</h4> ++
347          pendingOrSkippedStagesTable.toNodeSeq
348      }
349      if (shouldShowFailedStages) {
350        content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
351          failedStagesTable.toNodeSeq
352      }
353      UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true)
354    }
355  }
356}
357