1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.ui 19 20import javax.servlet.http.HttpServletRequest 21 22import scala.xml.Node 23 24import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} 25 26import org.apache.spark._ 27import org.apache.spark.executor.TaskMetrics 28import org.apache.spark.scheduler._ 29import org.apache.spark.storage.StorageStatusListener 30import org.apache.spark.ui.exec.ExecutorsListener 31import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab} 32import org.apache.spark.ui.scope.RDDOperationGraphListener 33 34class StagePageSuite extends SparkFunSuite with LocalSparkContext { 35 36 private val peakExecutionMemory = 10 37 38 test("peak execution memory should displayed") { 39 val conf = new SparkConf(false) 40 val html = renderStagePage(conf).toString().toLowerCase 41 val targetString = "peak execution memory" 42 assert(html.contains(targetString)) 43 } 44 45 test("SPARK-10543: peak execution memory should be per-task rather than cumulative") { 46 val conf = new SparkConf(false) 47 val html = renderStagePage(conf).toString().toLowerCase 48 // verify min/25/50/75/max show task value not cumulative values 49 assert(html.contains(s"<td>$peakExecutionMemory.0 b</td>" * 5)) 50 } 51 52 /** 53 * Render a stage page started with the given conf and return the HTML. 54 * This also runs a dummy stage to populate the page with useful content. 55 */ 56 private def renderStagePage(conf: SparkConf): Seq[Node] = { 57 val jobListener = new JobProgressListener(conf) 58 val graphListener = new RDDOperationGraphListener(conf) 59 val executorsListener = new ExecutorsListener(new StorageStatusListener(conf), conf) 60 val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS) 61 val request = mock(classOf[HttpServletRequest]) 62 when(tab.conf).thenReturn(conf) 63 when(tab.progressListener).thenReturn(jobListener) 64 when(tab.operationGraphListener).thenReturn(graphListener) 65 when(tab.executorsListener).thenReturn(executorsListener) 66 when(tab.appName).thenReturn("testing") 67 when(tab.headerTabs).thenReturn(Seq.empty) 68 when(request.getParameter("id")).thenReturn("0") 69 when(request.getParameter("attempt")).thenReturn("0") 70 val page = new StagePage(tab) 71 72 // Simulate a stage in job progress listener 73 val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details") 74 // Simulate two tasks to test PEAK_EXECUTION_MEMORY correctness 75 (1 to 2).foreach { 76 taskId => 77 val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false) 78 jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) 79 jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) 80 taskInfo.markFinished(TaskState.FINISHED) 81 val taskMetrics = TaskMetrics.empty 82 taskMetrics.incPeakExecutionMemory(peakExecutionMemory) 83 jobListener.onTaskEnd( 84 SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics)) 85 } 86 jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo)) 87 page.render(request) 88 } 89 90} 91