1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14* See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.ui.exec 19 20import scala.collection.mutable.{LinkedHashMap, ListBuffer} 21 22import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext} 23import org.apache.spark.annotation.DeveloperApi 24import org.apache.spark.scheduler._ 25import org.apache.spark.storage.{StorageStatus, StorageStatusListener} 26import org.apache.spark.ui.{SparkUI, SparkUITab} 27 28private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { 29 val listener = parent.executorsListener 30 val sc = parent.sc 31 val threadDumpEnabled = 32 sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true) 33 34 attachPage(new ExecutorsPage(this, threadDumpEnabled)) 35 if (threadDumpEnabled) { 36 attachPage(new ExecutorThreadDumpPage(this)) 37 } 38} 39 40private[ui] case class ExecutorTaskSummary( 41 var executorId: String, 42 var totalCores: Int = 0, 43 var tasksMax: Int = 0, 44 var tasksActive: Int = 0, 45 var tasksFailed: Int = 0, 46 var tasksComplete: Int = 0, 47 var duration: Long = 0L, 48 var jvmGCTime: Long = 0L, 49 var inputBytes: Long = 0L, 50 var inputRecords: Long = 0L, 51 var outputBytes: Long = 0L, 52 var outputRecords: Long = 0L, 53 var shuffleRead: Long = 0L, 54 var shuffleWrite: Long = 0L, 55 var executorLogs: Map[String, String] = Map.empty, 56 var isAlive: Boolean = true 57) 58 59/** 60 * :: DeveloperApi :: 61 * A SparkListener that prepares information to be displayed on the ExecutorsTab 62 */ 63@DeveloperApi 64class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf) 65 extends SparkListener { 66 var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]() 67 var executorEvents = new ListBuffer[SparkListenerEvent]() 68 69 private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000) 70 private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100) 71 72 def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList 73 74 def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList 75 76 override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { 77 val eid = executorAdded.executorId 78 val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) 79 taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap 80 taskSummary.totalCores = executorAdded.executorInfo.totalCores 81 taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1) 82 executorEvents += executorAdded 83 if (executorEvents.size > maxTimelineExecutors) { 84 executorEvents.remove(0) 85 } 86 87 val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive) 88 if (deadExecutors.size > retainedDeadExecutors) { 89 val head = deadExecutors.head 90 executorToTaskSummary.remove(head._1) 91 } 92 } 93 94 override def onExecutorRemoved( 95 executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized { 96 executorEvents += executorRemoved 97 if (executorEvents.size > maxTimelineExecutors) { 98 executorEvents.remove(0) 99 } 100 executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false) 101 } 102 103 override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { 104 applicationStart.driverLogs.foreach { logs => 105 val storageStatus = activeStorageStatusList.find { s => 106 s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER || 107 s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER 108 } 109 storageStatus.foreach { s => 110 val eid = s.blockManagerId.executorId 111 val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) 112 taskSummary.executorLogs = logs.toMap 113 } 114 } 115 } 116 117 override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { 118 val eid = taskStart.taskInfo.executorId 119 val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) 120 taskSummary.tasksActive += 1 121 } 122 123 override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { 124 val info = taskEnd.taskInfo 125 if (info != null) { 126 val eid = info.executorId 127 val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) 128 taskEnd.reason match { 129 case Resubmitted => 130 // Note: For resubmitted tasks, we continue to use the metrics that belong to the 131 // first attempt of this task. This may not be 100% accurate because the first attempt 132 // could have failed half-way through. The correct fix would be to keep track of the 133 // metrics added by each attempt, but this is much more complicated. 134 return 135 case e: ExceptionFailure => 136 taskSummary.tasksFailed += 1 137 case _ => 138 taskSummary.tasksComplete += 1 139 } 140 if (taskSummary.tasksActive >= 1) { 141 taskSummary.tasksActive -= 1 142 } 143 taskSummary.duration += info.duration 144 145 // Update shuffle read/write 146 val metrics = taskEnd.taskMetrics 147 if (metrics != null) { 148 taskSummary.inputBytes += metrics.inputMetrics.bytesRead 149 taskSummary.inputRecords += metrics.inputMetrics.recordsRead 150 taskSummary.outputBytes += metrics.outputMetrics.bytesWritten 151 taskSummary.outputRecords += metrics.outputMetrics.recordsWritten 152 153 taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead 154 taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten 155 taskSummary.jvmGCTime += metrics.jvmGCTime 156 } 157 } 158 } 159 160} 161