1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.scheduler 19 20import java.util.Properties 21import javax.annotation.Nullable 22 23import scala.collection.Map 24 25import com.fasterxml.jackson.annotation.JsonTypeInfo 26 27import org.apache.spark.{SparkConf, TaskEndReason} 28import org.apache.spark.annotation.DeveloperApi 29import org.apache.spark.executor.TaskMetrics 30import org.apache.spark.scheduler.cluster.ExecutorInfo 31import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo} 32import org.apache.spark.ui.SparkUI 33 34@DeveloperApi 35@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event") 36trait SparkListenerEvent { 37 /* Whether output this event to the event log */ 38 protected[spark] def logEvent: Boolean = true 39} 40 41@DeveloperApi 42case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null) 43 extends SparkListenerEvent 44 45@DeveloperApi 46case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent 47 48@DeveloperApi 49case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo) 50 extends SparkListenerEvent 51 52@DeveloperApi 53case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent 54 55@DeveloperApi 56case class SparkListenerTaskEnd( 57 stageId: Int, 58 stageAttemptId: Int, 59 taskType: String, 60 reason: TaskEndReason, 61 taskInfo: TaskInfo, 62 // may be null if the task has failed 63 @Nullable taskMetrics: TaskMetrics) 64 extends SparkListenerEvent 65 66@DeveloperApi 67case class SparkListenerJobStart( 68 jobId: Int, 69 time: Long, 70 stageInfos: Seq[StageInfo], 71 properties: Properties = null) 72 extends SparkListenerEvent { 73 // Note: this is here for backwards-compatibility with older versions of this event which 74 // only stored stageIds and not StageInfos: 75 val stageIds: Seq[Int] = stageInfos.map(_.stageId) 76} 77 78@DeveloperApi 79case class SparkListenerJobEnd( 80 jobId: Int, 81 time: Long, 82 jobResult: JobResult) 83 extends SparkListenerEvent 84 85@DeveloperApi 86case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]]) 87 extends SparkListenerEvent 88 89@DeveloperApi 90case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long) 91 extends SparkListenerEvent 92 93@DeveloperApi 94case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId) 95 extends SparkListenerEvent 96 97@DeveloperApi 98case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent 99 100@DeveloperApi 101case class SparkListenerExecutorAdded(time: Long, executorId: String, executorInfo: ExecutorInfo) 102 extends SparkListenerEvent 103 104@DeveloperApi 105case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String) 106 extends SparkListenerEvent 107 108@DeveloperApi 109case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent 110 111/** 112 * Periodic updates from executors. 113 * @param execId executor id 114 * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) 115 */ 116@DeveloperApi 117case class SparkListenerExecutorMetricsUpdate( 118 execId: String, 119 accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) 120 extends SparkListenerEvent 121 122@DeveloperApi 123case class SparkListenerApplicationStart( 124 appName: String, 125 appId: Option[String], 126 time: Long, 127 sparkUser: String, 128 appAttemptId: Option[String], 129 driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent 130 131@DeveloperApi 132case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent 133 134/** 135 * An internal class that describes the metadata of an event log. 136 * This event is not meant to be posted to listeners downstream. 137 */ 138private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent 139 140/** 141 * Interface for creating history listeners defined in other modules like SQL, which are used to 142 * rebuild the history UI. 143 */ 144private[spark] trait SparkHistoryListenerFactory { 145 /** 146 * Create listeners used to rebuild the history UI. 147 */ 148 def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] 149} 150 151 152/** 153 * Interface for listening to events from the Spark scheduler. Most applications should probably 154 * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class. 155 * 156 * Note that this is an internal interface which might change in different Spark releases. 157 */ 158private[spark] trait SparkListenerInterface { 159 160 /** 161 * Called when a stage completes successfully or fails, with information on the completed stage. 162 */ 163 def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit 164 165 /** 166 * Called when a stage is submitted 167 */ 168 def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit 169 170 /** 171 * Called when a task starts 172 */ 173 def onTaskStart(taskStart: SparkListenerTaskStart): Unit 174 175 /** 176 * Called when a task begins remotely fetching its result (will not be called for tasks that do 177 * not need to fetch the result remotely). 178 */ 179 def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit 180 181 /** 182 * Called when a task ends 183 */ 184 def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit 185 186 /** 187 * Called when a job starts 188 */ 189 def onJobStart(jobStart: SparkListenerJobStart): Unit 190 191 /** 192 * Called when a job ends 193 */ 194 def onJobEnd(jobEnd: SparkListenerJobEnd): Unit 195 196 /** 197 * Called when environment properties have been updated 198 */ 199 def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit 200 201 /** 202 * Called when a new block manager has joined 203 */ 204 def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit 205 206 /** 207 * Called when an existing block manager has been removed 208 */ 209 def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit 210 211 /** 212 * Called when an RDD is manually unpersisted by the application 213 */ 214 def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit 215 216 /** 217 * Called when the application starts 218 */ 219 def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit 220 221 /** 222 * Called when the application ends 223 */ 224 def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit 225 226 /** 227 * Called when the driver receives task metrics from an executor in a heartbeat. 228 */ 229 def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit 230 231 /** 232 * Called when the driver registers a new executor. 233 */ 234 def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit 235 236 /** 237 * Called when the driver removes an executor. 238 */ 239 def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit 240 241 /** 242 * Called when the driver receives a block update info. 243 */ 244 def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit 245 246 /** 247 * Called when other events like SQL-specific events are posted. 248 */ 249 def onOtherEvent(event: SparkListenerEvent): Unit 250} 251 252 253/** 254 * :: DeveloperApi :: 255 * A default implementation for [[SparkListenerInterface]] that has no-op implementations for 256 * all callbacks. 257 * 258 * Note that this is an internal interface which might change in different Spark releases. 259 */ 260@DeveloperApi 261abstract class SparkListener extends SparkListenerInterface { 262 override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { } 263 264 override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { } 265 266 override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { } 267 268 override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { } 269 270 override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { } 271 272 override def onJobStart(jobStart: SparkListenerJobStart): Unit = { } 273 274 override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { } 275 276 override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { } 277 278 override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { } 279 280 override def onBlockManagerRemoved( 281 blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { } 282 283 override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { } 284 285 override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { } 286 287 override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { } 288 289 override def onExecutorMetricsUpdate( 290 executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { } 291 292 override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { } 293 294 override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { } 295 296 override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { } 297 298 override def onOtherEvent(event: SparkListenerEvent): Unit = { } 299} 300