1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.scheduler
19
20import java.util.Properties
21import javax.annotation.Nullable
22
23import scala.collection.Map
24
25import com.fasterxml.jackson.annotation.JsonTypeInfo
26
27import org.apache.spark.{SparkConf, TaskEndReason}
28import org.apache.spark.annotation.DeveloperApi
29import org.apache.spark.executor.TaskMetrics
30import org.apache.spark.scheduler.cluster.ExecutorInfo
31import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
32import org.apache.spark.ui.SparkUI
33
34@DeveloperApi
35@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
36trait SparkListenerEvent {
37  /* Whether output this event to the event log */
38  protected[spark] def logEvent: Boolean = true
39}
40
41@DeveloperApi
42case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
43  extends SparkListenerEvent
44
45@DeveloperApi
46case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
47
48@DeveloperApi
49case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
50  extends SparkListenerEvent
51
52@DeveloperApi
53case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
54
55@DeveloperApi
56case class SparkListenerTaskEnd(
57    stageId: Int,
58    stageAttemptId: Int,
59    taskType: String,
60    reason: TaskEndReason,
61    taskInfo: TaskInfo,
62    // may be null if the task has failed
63    @Nullable taskMetrics: TaskMetrics)
64  extends SparkListenerEvent
65
66@DeveloperApi
67case class SparkListenerJobStart(
68    jobId: Int,
69    time: Long,
70    stageInfos: Seq[StageInfo],
71    properties: Properties = null)
72  extends SparkListenerEvent {
73  // Note: this is here for backwards-compatibility with older versions of this event which
74  // only stored stageIds and not StageInfos:
75  val stageIds: Seq[Int] = stageInfos.map(_.stageId)
76}
77
78@DeveloperApi
79case class SparkListenerJobEnd(
80    jobId: Int,
81    time: Long,
82    jobResult: JobResult)
83  extends SparkListenerEvent
84
85@DeveloperApi
86case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])
87  extends SparkListenerEvent
88
89@DeveloperApi
90case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
91  extends SparkListenerEvent
92
93@DeveloperApi
94case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
95  extends SparkListenerEvent
96
97@DeveloperApi
98case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
99
100@DeveloperApi
101case class SparkListenerExecutorAdded(time: Long, executorId: String, executorInfo: ExecutorInfo)
102  extends SparkListenerEvent
103
104@DeveloperApi
105case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
106  extends SparkListenerEvent
107
108@DeveloperApi
109case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
110
111/**
112 * Periodic updates from executors.
113 * @param execId executor id
114 * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates)
115 */
116@DeveloperApi
117case class SparkListenerExecutorMetricsUpdate(
118    execId: String,
119    accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
120  extends SparkListenerEvent
121
122@DeveloperApi
123case class SparkListenerApplicationStart(
124    appName: String,
125    appId: Option[String],
126    time: Long,
127    sparkUser: String,
128    appAttemptId: Option[String],
129    driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent
130
131@DeveloperApi
132case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
133
134/**
135 * An internal class that describes the metadata of an event log.
136 * This event is not meant to be posted to listeners downstream.
137 */
138private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
139
140/**
141 * Interface for creating history listeners defined in other modules like SQL, which are used to
142 * rebuild the history UI.
143 */
144private[spark] trait SparkHistoryListenerFactory {
145  /**
146   * Create listeners used to rebuild the history UI.
147   */
148  def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
149}
150
151
152/**
153 * Interface for listening to events from the Spark scheduler. Most applications should probably
154 * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
155 *
156 * Note that this is an internal interface which might change in different Spark releases.
157 */
158private[spark] trait SparkListenerInterface {
159
160  /**
161   * Called when a stage completes successfully or fails, with information on the completed stage.
162   */
163  def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit
164
165  /**
166   * Called when a stage is submitted
167   */
168  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit
169
170  /**
171   * Called when a task starts
172   */
173  def onTaskStart(taskStart: SparkListenerTaskStart): Unit
174
175  /**
176   * Called when a task begins remotely fetching its result (will not be called for tasks that do
177   * not need to fetch the result remotely).
178   */
179  def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit
180
181  /**
182   * Called when a task ends
183   */
184  def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit
185
186  /**
187   * Called when a job starts
188   */
189  def onJobStart(jobStart: SparkListenerJobStart): Unit
190
191  /**
192   * Called when a job ends
193   */
194  def onJobEnd(jobEnd: SparkListenerJobEnd): Unit
195
196  /**
197   * Called when environment properties have been updated
198   */
199  def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit
200
201  /**
202   * Called when a new block manager has joined
203   */
204  def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit
205
206  /**
207   * Called when an existing block manager has been removed
208   */
209  def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit
210
211  /**
212   * Called when an RDD is manually unpersisted by the application
213   */
214  def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit
215
216  /**
217   * Called when the application starts
218   */
219  def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit
220
221  /**
222   * Called when the application ends
223   */
224  def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit
225
226  /**
227   * Called when the driver receives task metrics from an executor in a heartbeat.
228   */
229  def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit
230
231  /**
232   * Called when the driver registers a new executor.
233   */
234  def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit
235
236  /**
237   * Called when the driver removes an executor.
238   */
239  def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit
240
241  /**
242   * Called when the driver receives a block update info.
243   */
244  def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
245
246  /**
247   * Called when other events like SQL-specific events are posted.
248   */
249  def onOtherEvent(event: SparkListenerEvent): Unit
250}
251
252
253/**
254 * :: DeveloperApi ::
255 * A default implementation for [[SparkListenerInterface]] that has no-op implementations for
256 * all callbacks.
257 *
258 * Note that this is an internal interface which might change in different Spark releases.
259 */
260@DeveloperApi
261abstract class SparkListener extends SparkListenerInterface {
262  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
263
264  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
265
266  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
267
268  override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
269
270  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
271
272  override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
273
274  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
275
276  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
277
278  override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }
279
280  override def onBlockManagerRemoved(
281      blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
282
283  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
284
285  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
286
287  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }
288
289  override def onExecutorMetricsUpdate(
290      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
291
292  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
293
294  override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
295
296  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
297
298  override def onOtherEvent(event: SparkListenerEvent): Unit = { }
299}
300