1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14* See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.ui.exec
19
20import scala.collection.mutable.{LinkedHashMap, ListBuffer}
21
22import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext}
23import org.apache.spark.annotation.DeveloperApi
24import org.apache.spark.scheduler._
25import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
26import org.apache.spark.ui.{SparkUI, SparkUITab}
27
28private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
29  val listener = parent.executorsListener
30  val sc = parent.sc
31  val threadDumpEnabled =
32    sc.isDefined && parent.conf.getBoolean("spark.ui.threadDumpsEnabled", true)
33
34  attachPage(new ExecutorsPage(this, threadDumpEnabled))
35  if (threadDumpEnabled) {
36    attachPage(new ExecutorThreadDumpPage(this))
37  }
38}
39
40private[ui] case class ExecutorTaskSummary(
41    var executorId: String,
42    var totalCores: Int = 0,
43    var tasksMax: Int = 0,
44    var tasksActive: Int = 0,
45    var tasksFailed: Int = 0,
46    var tasksComplete: Int = 0,
47    var duration: Long = 0L,
48    var jvmGCTime: Long = 0L,
49    var inputBytes: Long = 0L,
50    var inputRecords: Long = 0L,
51    var outputBytes: Long = 0L,
52    var outputRecords: Long = 0L,
53    var shuffleRead: Long = 0L,
54    var shuffleWrite: Long = 0L,
55    var executorLogs: Map[String, String] = Map.empty,
56    var isAlive: Boolean = true
57)
58
59/**
60 * :: DeveloperApi ::
61 * A SparkListener that prepares information to be displayed on the ExecutorsTab
62 */
63@DeveloperApi
64class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf)
65    extends SparkListener {
66  var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]()
67  var executorEvents = new ListBuffer[SparkListenerEvent]()
68
69  private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000)
70  private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)
71
72  def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
73
74  def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList
75
76  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
77    val eid = executorAdded.executorId
78    val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
79    taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
80    taskSummary.totalCores = executorAdded.executorInfo.totalCores
81    taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1)
82    executorEvents += executorAdded
83    if (executorEvents.size > maxTimelineExecutors) {
84      executorEvents.remove(0)
85    }
86
87    val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive)
88    if (deadExecutors.size > retainedDeadExecutors) {
89      val head = deadExecutors.head
90      executorToTaskSummary.remove(head._1)
91    }
92  }
93
94  override def onExecutorRemoved(
95      executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized {
96    executorEvents += executorRemoved
97    if (executorEvents.size > maxTimelineExecutors) {
98      executorEvents.remove(0)
99    }
100    executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false)
101  }
102
103  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
104    applicationStart.driverLogs.foreach { logs =>
105      val storageStatus = activeStorageStatusList.find { s =>
106        s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
107        s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
108      }
109      storageStatus.foreach { s =>
110        val eid = s.blockManagerId.executorId
111        val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
112        taskSummary.executorLogs = logs.toMap
113      }
114    }
115  }
116
117  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
118    val eid = taskStart.taskInfo.executorId
119    val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
120    taskSummary.tasksActive += 1
121  }
122
123  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
124    val info = taskEnd.taskInfo
125    if (info != null) {
126      val eid = info.executorId
127      val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
128      taskEnd.reason match {
129        case Resubmitted =>
130          // Note: For resubmitted tasks, we continue to use the metrics that belong to the
131          // first attempt of this task. This may not be 100% accurate because the first attempt
132          // could have failed half-way through. The correct fix would be to keep track of the
133          // metrics added by each attempt, but this is much more complicated.
134          return
135        case e: ExceptionFailure =>
136          taskSummary.tasksFailed += 1
137        case _ =>
138          taskSummary.tasksComplete += 1
139      }
140      if (taskSummary.tasksActive >= 1) {
141        taskSummary.tasksActive -= 1
142      }
143      taskSummary.duration += info.duration
144
145      // Update shuffle read/write
146      val metrics = taskEnd.taskMetrics
147      if (metrics != null) {
148        taskSummary.inputBytes += metrics.inputMetrics.bytesRead
149        taskSummary.inputRecords += metrics.inputMetrics.recordsRead
150        taskSummary.outputBytes += metrics.outputMetrics.bytesWritten
151        taskSummary.outputRecords += metrics.outputMetrics.recordsWritten
152
153        taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead
154        taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten
155        taskSummary.jvmGCTime += metrics.jvmGCTime
156      }
157    }
158  }
159
160}
161