1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.ui
19
20import javax.servlet.http.HttpServletRequest
21
22import scala.xml.Node
23
24import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
25
26import org.apache.spark._
27import org.apache.spark.executor.TaskMetrics
28import org.apache.spark.scheduler._
29import org.apache.spark.storage.StorageStatusListener
30import org.apache.spark.ui.exec.ExecutorsListener
31import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab}
32import org.apache.spark.ui.scope.RDDOperationGraphListener
33
34class StagePageSuite extends SparkFunSuite with LocalSparkContext {
35
36  private val peakExecutionMemory = 10
37
38  test("peak execution memory should displayed") {
39    val conf = new SparkConf(false)
40    val html = renderStagePage(conf).toString().toLowerCase
41    val targetString = "peak execution memory"
42    assert(html.contains(targetString))
43  }
44
45  test("SPARK-10543: peak execution memory should be per-task rather than cumulative") {
46    val conf = new SparkConf(false)
47    val html = renderStagePage(conf).toString().toLowerCase
48    // verify min/25/50/75/max show task value not cumulative values
49    assert(html.contains(s"<td>$peakExecutionMemory.0 b</td>" * 5))
50  }
51
52  /**
53   * Render a stage page started with the given conf and return the HTML.
54   * This also runs a dummy stage to populate the page with useful content.
55   */
56  private def renderStagePage(conf: SparkConf): Seq[Node] = {
57    val jobListener = new JobProgressListener(conf)
58    val graphListener = new RDDOperationGraphListener(conf)
59    val executorsListener = new ExecutorsListener(new StorageStatusListener(conf), conf)
60    val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS)
61    val request = mock(classOf[HttpServletRequest])
62    when(tab.conf).thenReturn(conf)
63    when(tab.progressListener).thenReturn(jobListener)
64    when(tab.operationGraphListener).thenReturn(graphListener)
65    when(tab.executorsListener).thenReturn(executorsListener)
66    when(tab.appName).thenReturn("testing")
67    when(tab.headerTabs).thenReturn(Seq.empty)
68    when(request.getParameter("id")).thenReturn("0")
69    when(request.getParameter("attempt")).thenReturn("0")
70    val page = new StagePage(tab)
71
72    // Simulate a stage in job progress listener
73    val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details")
74    // Simulate two tasks to test PEAK_EXECUTION_MEMORY correctness
75    (1 to 2).foreach {
76      taskId =>
77        val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
78        jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
79        jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
80        taskInfo.markFinished(TaskState.FINISHED)
81        val taskMetrics = TaskMetrics.empty
82        taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
83        jobListener.onTaskEnd(
84          SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics))
85    }
86    jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
87    page.render(request)
88  }
89
90}
91