1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *    http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.spark.util
19
20import java.util.{Properties, UUID}
21
22import scala.collection.JavaConverters._
23import scala.collection.Map
24
25import com.fasterxml.jackson.databind.ObjectMapper
26import com.fasterxml.jackson.module.scala.DefaultScalaModule
27import org.json4s.DefaultFormats
28import org.json4s.JsonAST._
29import org.json4s.JsonDSL._
30import org.json4s.jackson.JsonMethods._
31
32import org.apache.spark._
33import org.apache.spark.executor._
34import org.apache.spark.rdd.RDDOperationScope
35import org.apache.spark.scheduler._
36import org.apache.spark.scheduler.cluster.ExecutorInfo
37import org.apache.spark.storage._
38
39/**
40 * Serializes SparkListener events to/from JSON.  This protocol provides strong backwards-
41 * and forwards-compatibility guarantees: any version of Spark should be able to read JSON output
42 * written by any other version, including newer versions.
43 *
44 * JsonProtocolSuite contains backwards-compatibility tests which check that the current version of
45 * JsonProtocol is able to read output written by earlier versions.  We do not currently have tests
46 * for reading newer JSON output with older Spark versions.
47 *
48 * To ensure that we provide these guarantees, follow these rules when modifying these methods:
49 *
50 *  - Never delete any JSON fields.
51 *  - Any new JSON fields should be optional; use `Utils.jsonOption` when reading these fields
52 *    in `*FromJson` methods.
53 */
54private[spark] object JsonProtocol {
55  // TODO: Remove this file and put JSON serialization into each individual class.
56
57  private implicit val format = DefaultFormats
58
59  private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
60
61  /** ------------------------------------------------- *
62   * JSON serialization methods for SparkListenerEvents |
63   * -------------------------------------------------- */
64
65  def sparkEventToJson(event: SparkListenerEvent): JValue = {
66    event match {
67      case stageSubmitted: SparkListenerStageSubmitted =>
68        stageSubmittedToJson(stageSubmitted)
69      case stageCompleted: SparkListenerStageCompleted =>
70        stageCompletedToJson(stageCompleted)
71      case taskStart: SparkListenerTaskStart =>
72        taskStartToJson(taskStart)
73      case taskGettingResult: SparkListenerTaskGettingResult =>
74        taskGettingResultToJson(taskGettingResult)
75      case taskEnd: SparkListenerTaskEnd =>
76        taskEndToJson(taskEnd)
77      case jobStart: SparkListenerJobStart =>
78        jobStartToJson(jobStart)
79      case jobEnd: SparkListenerJobEnd =>
80        jobEndToJson(jobEnd)
81      case environmentUpdate: SparkListenerEnvironmentUpdate =>
82        environmentUpdateToJson(environmentUpdate)
83      case blockManagerAdded: SparkListenerBlockManagerAdded =>
84        blockManagerAddedToJson(blockManagerAdded)
85      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
86        blockManagerRemovedToJson(blockManagerRemoved)
87      case unpersistRDD: SparkListenerUnpersistRDD =>
88        unpersistRDDToJson(unpersistRDD)
89      case applicationStart: SparkListenerApplicationStart =>
90        applicationStartToJson(applicationStart)
91      case applicationEnd: SparkListenerApplicationEnd =>
92        applicationEndToJson(applicationEnd)
93      case executorAdded: SparkListenerExecutorAdded =>
94        executorAddedToJson(executorAdded)
95      case executorRemoved: SparkListenerExecutorRemoved =>
96        executorRemovedToJson(executorRemoved)
97      case logStart: SparkListenerLogStart =>
98        logStartToJson(logStart)
99      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
100        executorMetricsUpdateToJson(metricsUpdate)
101      case blockUpdated: SparkListenerBlockUpdated =>
102        throw new MatchError(blockUpdated)  // TODO(ekl) implement this
103      case _ => parse(mapper.writeValueAsString(event))
104    }
105  }
106
107  def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
108    val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
109    val properties = propertiesToJson(stageSubmitted.properties)
110    ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~
111    ("Stage Info" -> stageInfo) ~
112    ("Properties" -> properties)
113  }
114
115  def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
116    val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
117    ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~
118    ("Stage Info" -> stageInfo)
119  }
120
121  def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
122    val taskInfo = taskStart.taskInfo
123    ("Event" -> Utils.getFormattedClassName(taskStart)) ~
124    ("Stage ID" -> taskStart.stageId) ~
125    ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
126    ("Task Info" -> taskInfoToJson(taskInfo))
127  }
128
129  def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
130    val taskInfo = taskGettingResult.taskInfo
131    ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~
132    ("Task Info" -> taskInfoToJson(taskInfo))
133  }
134
135  def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
136    val taskEndReason = taskEndReasonToJson(taskEnd.reason)
137    val taskInfo = taskEnd.taskInfo
138    val taskMetrics = taskEnd.taskMetrics
139    val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
140    ("Event" -> Utils.getFormattedClassName(taskEnd)) ~
141    ("Stage ID" -> taskEnd.stageId) ~
142    ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
143    ("Task Type" -> taskEnd.taskType) ~
144    ("Task End Reason" -> taskEndReason) ~
145    ("Task Info" -> taskInfoToJson(taskInfo)) ~
146    ("Task Metrics" -> taskMetricsJson)
147  }
148
149  def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
150    val properties = propertiesToJson(jobStart.properties)
151    ("Event" -> Utils.getFormattedClassName(jobStart)) ~
152    ("Job ID" -> jobStart.jobId) ~
153    ("Submission Time" -> jobStart.time) ~
154    ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~  // Added in Spark 1.2.0
155    ("Stage IDs" -> jobStart.stageIds) ~
156    ("Properties" -> properties)
157  }
158
159  def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
160    val jobResult = jobResultToJson(jobEnd.jobResult)
161    ("Event" -> Utils.getFormattedClassName(jobEnd)) ~
162    ("Job ID" -> jobEnd.jobId) ~
163    ("Completion Time" -> jobEnd.time) ~
164    ("Job Result" -> jobResult)
165  }
166
167  def environmentUpdateToJson(environmentUpdate: SparkListenerEnvironmentUpdate): JValue = {
168    val environmentDetails = environmentUpdate.environmentDetails
169    val jvmInformation = mapToJson(environmentDetails("JVM Information").toMap)
170    val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
171    val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
172    val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
173    ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~
174    ("JVM Information" -> jvmInformation) ~
175    ("Spark Properties" -> sparkProperties) ~
176    ("System Properties" -> systemProperties) ~
177    ("Classpath Entries" -> classpathEntries)
178  }
179
180  def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = {
181    val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
182    ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
183    ("Block Manager ID" -> blockManagerId) ~
184    ("Maximum Memory" -> blockManagerAdded.maxMem) ~
185    ("Timestamp" -> blockManagerAdded.time)
186  }
187
188  def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
189    val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
190    ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
191    ("Block Manager ID" -> blockManagerId) ~
192    ("Timestamp" -> blockManagerRemoved.time)
193  }
194
195  def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
196    ("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~
197    ("RDD ID" -> unpersistRDD.rddId)
198  }
199
200  def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
201    ("Event" -> Utils.getFormattedClassName(applicationStart)) ~
202    ("App Name" -> applicationStart.appName) ~
203    ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
204    ("Timestamp" -> applicationStart.time) ~
205    ("User" -> applicationStart.sparkUser) ~
206    ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
207    ("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing))
208  }
209
210  def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
211    ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~
212    ("Timestamp" -> applicationEnd.time)
213  }
214
215  def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
216    ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
217    ("Timestamp" -> executorAdded.time) ~
218    ("Executor ID" -> executorAdded.executorId) ~
219    ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
220  }
221
222  def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
223    ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
224    ("Timestamp" -> executorRemoved.time) ~
225    ("Executor ID" -> executorRemoved.executorId) ~
226    ("Removed Reason" -> executorRemoved.reason)
227  }
228
229  def logStartToJson(logStart: SparkListenerLogStart): JValue = {
230    ("Event" -> Utils.getFormattedClassName(logStart)) ~
231    ("Spark Version" -> SPARK_VERSION)
232  }
233
234  def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
235    val execId = metricsUpdate.execId
236    val accumUpdates = metricsUpdate.accumUpdates
237    ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
238    ("Executor ID" -> execId) ~
239    ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
240      ("Task ID" -> taskId) ~
241      ("Stage ID" -> stageId) ~
242      ("Stage Attempt ID" -> stageAttemptId) ~
243      ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList))
244    })
245  }
246
247  /** ------------------------------------------------------------------- *
248   * JSON serialization methods for classes SparkListenerEvents depend on |
249   * -------------------------------------------------------------------- */
250
251  def stageInfoToJson(stageInfo: StageInfo): JValue = {
252    val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
253    val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList)
254    val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
255    val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
256    val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
257    ("Stage ID" -> stageInfo.stageId) ~
258    ("Stage Attempt ID" -> stageInfo.attemptId) ~
259    ("Stage Name" -> stageInfo.name) ~
260    ("Number of Tasks" -> stageInfo.numTasks) ~
261    ("RDD Info" -> rddInfo) ~
262    ("Parent IDs" -> parentIds) ~
263    ("Details" -> stageInfo.details) ~
264    ("Submission Time" -> submissionTime) ~
265    ("Completion Time" -> completionTime) ~
266    ("Failure Reason" -> failureReason) ~
267    ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values))
268  }
269
270  def taskInfoToJson(taskInfo: TaskInfo): JValue = {
271    ("Task ID" -> taskInfo.taskId) ~
272    ("Index" -> taskInfo.index) ~
273    ("Attempt" -> taskInfo.attemptNumber) ~
274    ("Launch Time" -> taskInfo.launchTime) ~
275    ("Executor ID" -> taskInfo.executorId) ~
276    ("Host" -> taskInfo.host) ~
277    ("Locality" -> taskInfo.taskLocality.toString) ~
278    ("Speculative" -> taskInfo.speculative) ~
279    ("Getting Result Time" -> taskInfo.gettingResultTime) ~
280    ("Finish Time" -> taskInfo.finishTime) ~
281    ("Failed" -> taskInfo.failed) ~
282    ("Killed" -> taskInfo.killed) ~
283    ("Accumulables" -> accumulablesToJson(taskInfo.accumulables))
284  }
285
286  private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses")
287
288  def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = {
289    JArray(accumulables
290        .filterNot(_.name.exists(accumulableBlacklist.contains))
291        .toList.map(accumulableInfoToJson))
292  }
293
294  def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
295    val name = accumulableInfo.name
296    ("ID" -> accumulableInfo.id) ~
297    ("Name" -> name) ~
298    ("Update" -> accumulableInfo.update.map { v => accumValueToJson(name, v) }) ~
299    ("Value" -> accumulableInfo.value.map { v => accumValueToJson(name, v) }) ~
300    ("Internal" -> accumulableInfo.internal) ~
301    ("Count Failed Values" -> accumulableInfo.countFailedValues) ~
302    ("Metadata" -> accumulableInfo.metadata)
303  }
304
305  /**
306   * Serialize the value of an accumulator to JSON.
307   *
308   * For accumulators representing internal task metrics, this looks up the relevant
309   * [[AccumulatorParam]] to serialize the value accordingly. For all other accumulators,
310   * this will simply serialize the value as a string.
311   *
312   * The behavior here must match that of [[accumValueFromJson]]. Exposed for testing.
313   */
314  private[util] def accumValueToJson(name: Option[String], value: Any): JValue = {
315    if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
316      value match {
317        case v: Int => JInt(v)
318        case v: Long => JInt(v)
319        // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
320        // the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]`
321        case v =>
322          JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map {
323            case (id, status) =>
324              ("Block ID" -> id.toString) ~
325              ("Status" -> blockStatusToJson(status))
326          })
327      }
328    } else {
329      // For all external accumulators, just use strings
330      JString(value.toString)
331    }
332  }
333
334  def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
335    val shuffleReadMetrics: JValue =
336      ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~
337        ("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
338        ("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
339        ("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
340        ("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
341        ("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
342    val shuffleWriteMetrics: JValue =
343      ("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~
344        ("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~
345        ("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten)
346    val inputMetrics: JValue =
347      ("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~
348        ("Records Read" -> taskMetrics.inputMetrics.recordsRead)
349    val outputMetrics: JValue =
350      ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~
351        ("Records Written" -> taskMetrics.outputMetrics.recordsWritten)
352    val updatedBlocks =
353      JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
354        ("Block ID" -> id.toString) ~
355          ("Status" -> blockStatusToJson(status))
356      })
357    ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
358    ("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~
359    ("Executor Run Time" -> taskMetrics.executorRunTime) ~
360    ("Executor CPU Time" -> taskMetrics.executorCpuTime) ~
361    ("Result Size" -> taskMetrics.resultSize) ~
362    ("JVM GC Time" -> taskMetrics.jvmGCTime) ~
363    ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~
364    ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~
365    ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
366    ("Shuffle Read Metrics" -> shuffleReadMetrics) ~
367    ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
368    ("Input Metrics" -> inputMetrics) ~
369    ("Output Metrics" -> outputMetrics) ~
370    ("Updated Blocks" -> updatedBlocks)
371  }
372
373  def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
374    val reason = Utils.getFormattedClassName(taskEndReason)
375    val json: JObject = taskEndReason match {
376      case fetchFailed: FetchFailed =>
377        val blockManagerAddress = Option(fetchFailed.bmAddress).
378          map(blockManagerIdToJson).getOrElse(JNothing)
379        ("Block Manager Address" -> blockManagerAddress) ~
380        ("Shuffle ID" -> fetchFailed.shuffleId) ~
381        ("Map ID" -> fetchFailed.mapId) ~
382        ("Reduce ID" -> fetchFailed.reduceId) ~
383        ("Message" -> fetchFailed.message)
384      case exceptionFailure: ExceptionFailure =>
385        val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
386        val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates)
387        ("Class Name" -> exceptionFailure.className) ~
388        ("Description" -> exceptionFailure.description) ~
389        ("Stack Trace" -> stackTrace) ~
390        ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
391        ("Accumulator Updates" -> accumUpdates)
392      case taskCommitDenied: TaskCommitDenied =>
393        ("Job ID" -> taskCommitDenied.jobID) ~
394        ("Partition ID" -> taskCommitDenied.partitionID) ~
395        ("Attempt Number" -> taskCommitDenied.attemptNumber)
396      case ExecutorLostFailure(executorId, exitCausedByApp, reason) =>
397        ("Executor ID" -> executorId) ~
398        ("Exit Caused By App" -> exitCausedByApp) ~
399        ("Loss Reason" -> reason.map(_.toString))
400      case _ => Utils.emptyJson
401    }
402    ("Reason" -> reason) ~ json
403  }
404
405  def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
406    ("Executor ID" -> blockManagerId.executorId) ~
407    ("Host" -> blockManagerId.host) ~
408    ("Port" -> blockManagerId.port)
409  }
410
411  def jobResultToJson(jobResult: JobResult): JValue = {
412    val result = Utils.getFormattedClassName(jobResult)
413    val json = jobResult match {
414      case JobSucceeded => Utils.emptyJson
415      case jobFailed: JobFailed =>
416        JObject("Exception" -> exceptionToJson(jobFailed.exception))
417    }
418    ("Result" -> result) ~ json
419  }
420
421  def rddInfoToJson(rddInfo: RDDInfo): JValue = {
422    val storageLevel = storageLevelToJson(rddInfo.storageLevel)
423    val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList)
424    ("RDD ID" -> rddInfo.id) ~
425    ("Name" -> rddInfo.name) ~
426    ("Scope" -> rddInfo.scope.map(_.toJson)) ~
427    ("Callsite" -> rddInfo.callSite) ~
428    ("Parent IDs" -> parentIds) ~
429    ("Storage Level" -> storageLevel) ~
430    ("Number of Partitions" -> rddInfo.numPartitions) ~
431    ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
432    ("Memory Size" -> rddInfo.memSize) ~
433    ("Disk Size" -> rddInfo.diskSize)
434  }
435
436  def storageLevelToJson(storageLevel: StorageLevel): JValue = {
437    ("Use Disk" -> storageLevel.useDisk) ~
438    ("Use Memory" -> storageLevel.useMemory) ~
439    ("Deserialized" -> storageLevel.deserialized) ~
440    ("Replication" -> storageLevel.replication)
441  }
442
443  def blockStatusToJson(blockStatus: BlockStatus): JValue = {
444    val storageLevel = storageLevelToJson(blockStatus.storageLevel)
445    ("Storage Level" -> storageLevel) ~
446    ("Memory Size" -> blockStatus.memSize) ~
447    ("Disk Size" -> blockStatus.diskSize)
448  }
449
450  def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
451    ("Host" -> executorInfo.executorHost) ~
452    ("Total Cores" -> executorInfo.totalCores) ~
453    ("Log Urls" -> mapToJson(executorInfo.logUrlMap))
454  }
455
456  /** ------------------------------ *
457   * Util JSON serialization methods |
458   * ------------------------------- */
459
460  def mapToJson(m: Map[String, String]): JValue = {
461    val jsonFields = m.map { case (k, v) => JField(k, JString(v)) }
462    JObject(jsonFields.toList)
463  }
464
465  def propertiesToJson(properties: Properties): JValue = {
466    Option(properties).map { p =>
467      mapToJson(p.asScala)
468    }.getOrElse(JNothing)
469  }
470
471  def UUIDToJson(id: UUID): JValue = {
472    ("Least Significant Bits" -> id.getLeastSignificantBits) ~
473    ("Most Significant Bits" -> id.getMostSignificantBits)
474  }
475
476  def stackTraceToJson(stackTrace: Array[StackTraceElement]): JValue = {
477    JArray(stackTrace.map { case line =>
478      ("Declaring Class" -> line.getClassName) ~
479      ("Method Name" -> line.getMethodName) ~
480      ("File Name" -> line.getFileName) ~
481      ("Line Number" -> line.getLineNumber)
482    }.toList)
483  }
484
485  def exceptionToJson(exception: Exception): JValue = {
486    ("Message" -> exception.getMessage) ~
487    ("Stack Trace" -> stackTraceToJson(exception.getStackTrace))
488  }
489
490
491  /** --------------------------------------------------- *
492   * JSON deserialization methods for SparkListenerEvents |
493   * ---------------------------------------------------- */
494
495  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
496    val stageSubmitted = Utils.getFormattedClassName(SparkListenerStageSubmitted)
497    val stageCompleted = Utils.getFormattedClassName(SparkListenerStageCompleted)
498    val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart)
499    val taskGettingResult = Utils.getFormattedClassName(SparkListenerTaskGettingResult)
500    val taskEnd = Utils.getFormattedClassName(SparkListenerTaskEnd)
501    val jobStart = Utils.getFormattedClassName(SparkListenerJobStart)
502    val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd)
503    val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate)
504    val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded)
505    val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved)
506    val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD)
507    val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart)
508    val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd)
509    val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
510    val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
511    val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
512    val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
513
514    (json \ "Event").extract[String] match {
515      case `stageSubmitted` => stageSubmittedFromJson(json)
516      case `stageCompleted` => stageCompletedFromJson(json)
517      case `taskStart` => taskStartFromJson(json)
518      case `taskGettingResult` => taskGettingResultFromJson(json)
519      case `taskEnd` => taskEndFromJson(json)
520      case `jobStart` => jobStartFromJson(json)
521      case `jobEnd` => jobEndFromJson(json)
522      case `environmentUpdate` => environmentUpdateFromJson(json)
523      case `blockManagerAdded` => blockManagerAddedFromJson(json)
524      case `blockManagerRemoved` => blockManagerRemovedFromJson(json)
525      case `unpersistRDD` => unpersistRDDFromJson(json)
526      case `applicationStart` => applicationStartFromJson(json)
527      case `applicationEnd` => applicationEndFromJson(json)
528      case `executorAdded` => executorAddedFromJson(json)
529      case `executorRemoved` => executorRemovedFromJson(json)
530      case `logStart` => logStartFromJson(json)
531      case `metricsUpdate` => executorMetricsUpdateFromJson(json)
532      case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
533        .asInstanceOf[SparkListenerEvent]
534    }
535  }
536
537  def stageSubmittedFromJson(json: JValue): SparkListenerStageSubmitted = {
538    val stageInfo = stageInfoFromJson(json \ "Stage Info")
539    val properties = propertiesFromJson(json \ "Properties")
540    SparkListenerStageSubmitted(stageInfo, properties)
541  }
542
543  def stageCompletedFromJson(json: JValue): SparkListenerStageCompleted = {
544    val stageInfo = stageInfoFromJson(json \ "Stage Info")
545    SparkListenerStageCompleted(stageInfo)
546  }
547
548  def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
549    val stageId = (json \ "Stage ID").extract[Int]
550    val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
551    val taskInfo = taskInfoFromJson(json \ "Task Info")
552    SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
553  }
554
555  def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult = {
556    val taskInfo = taskInfoFromJson(json \ "Task Info")
557    SparkListenerTaskGettingResult(taskInfo)
558  }
559
560  def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
561    val stageId = (json \ "Stage ID").extract[Int]
562    val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
563    val taskType = (json \ "Task Type").extract[String]
564    val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
565    val taskInfo = taskInfoFromJson(json \ "Task Info")
566    val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
567    SparkListenerTaskEnd(stageId, stageAttemptId, taskType, taskEndReason, taskInfo, taskMetrics)
568  }
569
570  def jobStartFromJson(json: JValue): SparkListenerJobStart = {
571    val jobId = (json \ "Job ID").extract[Int]
572    val submissionTime =
573      Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
574    val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
575    val properties = propertiesFromJson(json \ "Properties")
576    // The "Stage Infos" field was added in Spark 1.2.0
577    val stageInfos = Utils.jsonOption(json \ "Stage Infos")
578      .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
579        stageIds.map { id =>
580          new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown")
581        }
582      }
583    SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
584  }
585
586  def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
587    val jobId = (json \ "Job ID").extract[Int]
588    val completionTime =
589      Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
590    val jobResult = jobResultFromJson(json \ "Job Result")
591    SparkListenerJobEnd(jobId, completionTime, jobResult)
592  }
593
594  def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
595    val environmentDetails = Map[String, Seq[(String, String)]](
596      "JVM Information" -> mapFromJson(json \ "JVM Information").toSeq,
597      "Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq,
598      "System Properties" -> mapFromJson(json \ "System Properties").toSeq,
599      "Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq)
600    SparkListenerEnvironmentUpdate(environmentDetails)
601  }
602
603  def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
604    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
605    val maxMem = (json \ "Maximum Memory").extract[Long]
606    val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
607    SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
608  }
609
610  def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
611    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
612    val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
613    SparkListenerBlockManagerRemoved(time, blockManagerId)
614  }
615
616  def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
617    SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
618  }
619
620  def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
621    val appName = (json \ "App Name").extract[String]
622    val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
623    val time = (json \ "Timestamp").extract[Long]
624    val sparkUser = (json \ "User").extract[String]
625    val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
626    val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson)
627    SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs)
628  }
629
630  def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
631    SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
632  }
633
634  def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
635    val time = (json \ "Timestamp").extract[Long]
636    val executorId = (json \ "Executor ID").extract[String]
637    val executorInfo = executorInfoFromJson(json \ "Executor Info")
638    SparkListenerExecutorAdded(time, executorId, executorInfo)
639  }
640
641  def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
642    val time = (json \ "Timestamp").extract[Long]
643    val executorId = (json \ "Executor ID").extract[String]
644    val reason = (json \ "Removed Reason").extract[String]
645    SparkListenerExecutorRemoved(time, executorId, reason)
646  }
647
648  def logStartFromJson(json: JValue): SparkListenerLogStart = {
649    val sparkVersion = (json \ "Spark Version").extract[String]
650    SparkListenerLogStart(sparkVersion)
651  }
652
653  def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = {
654    val execInfo = (json \ "Executor ID").extract[String]
655    val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
656      val taskId = (json \ "Task ID").extract[Long]
657      val stageId = (json \ "Stage ID").extract[Int]
658      val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
659      val updates =
660        (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
661      (taskId, stageId, stageAttemptId, updates)
662    }
663    SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
664  }
665
666  /** --------------------------------------------------------------------- *
667   * JSON deserialization methods for classes SparkListenerEvents depend on |
668   * ---------------------------------------------------------------------- */
669
670  def stageInfoFromJson(json: JValue): StageInfo = {
671    val stageId = (json \ "Stage ID").extract[Int]
672    val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
673    val stageName = (json \ "Stage Name").extract[String]
674    val numTasks = (json \ "Number of Tasks").extract[Int]
675    val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
676    val parentIds = Utils.jsonOption(json \ "Parent IDs")
677      .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
678      .getOrElse(Seq.empty)
679    val details = (json \ "Details").extractOpt[String].getOrElse("")
680    val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
681    val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
682    val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
683    val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match {
684      case Some(values) => values.map(accumulableInfoFromJson)
685      case None => Seq[AccumulableInfo]()
686    }
687
688    val stageInfo = new StageInfo(
689      stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
690    stageInfo.submissionTime = submissionTime
691    stageInfo.completionTime = completionTime
692    stageInfo.failureReason = failureReason
693    for (accInfo <- accumulatedValues) {
694      stageInfo.accumulables(accInfo.id) = accInfo
695    }
696    stageInfo
697  }
698
699  def taskInfoFromJson(json: JValue): TaskInfo = {
700    val taskId = (json \ "Task ID").extract[Long]
701    val index = (json \ "Index").extract[Int]
702    val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
703    val launchTime = (json \ "Launch Time").extract[Long]
704    val executorId = (json \ "Executor ID").extract[String]
705    val host = (json \ "Host").extract[String]
706    val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
707    val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
708    val gettingResultTime = (json \ "Getting Result Time").extract[Long]
709    val finishTime = (json \ "Finish Time").extract[Long]
710    val failed = (json \ "Failed").extract[Boolean]
711    val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false)
712    val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
713      case Some(values) => values.map(accumulableInfoFromJson)
714      case None => Seq[AccumulableInfo]()
715    }
716
717    val taskInfo =
718      new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
719    taskInfo.gettingResultTime = gettingResultTime
720    taskInfo.finishTime = finishTime
721    taskInfo.failed = failed
722    taskInfo.killed = killed
723    accumulables.foreach { taskInfo.accumulables += _ }
724    taskInfo
725  }
726
727  def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
728    val id = (json \ "ID").extract[Long]
729    val name = (json \ "Name").extractOpt[String]
730    val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
731    val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
732    val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
733    val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false)
734    val metadata = (json \ "Metadata").extractOpt[String]
735    new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
736  }
737
738  /**
739   * Deserialize the value of an accumulator from JSON.
740   *
741   * For accumulators representing internal task metrics, this looks up the relevant
742   * [[AccumulatorParam]] to deserialize the value accordingly. For all other
743   * accumulators, this will simply deserialize the value as a string.
744   *
745   * The behavior here must match that of [[accumValueToJson]]. Exposed for testing.
746   */
747  private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = {
748    if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
749      value match {
750        case JInt(v) => v.toLong
751        case JArray(v) =>
752          v.map { blockJson =>
753            val id = BlockId((blockJson \ "Block ID").extract[String])
754            val status = blockStatusFromJson(blockJson \ "Status")
755            (id, status)
756          }.asJava
757        case _ => throw new IllegalArgumentException(s"unexpected json value $value for " +
758          "accumulator " + name.get)
759      }
760    } else {
761      value.extract[String]
762    }
763  }
764
765  def taskMetricsFromJson(json: JValue): TaskMetrics = {
766    val metrics = TaskMetrics.empty
767    if (json == JNothing) {
768      return metrics
769    }
770    metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
771    metrics.setExecutorDeserializeCpuTime((json \ "Executor Deserialize CPU Time") match {
772      case JNothing => 0
773      case x => x.extract[Long]
774    })
775    metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
776    metrics.setExecutorCpuTime((json \ "Executor CPU Time") match {
777      case JNothing => 0
778      case x => x.extract[Long]
779    })
780    metrics.setResultSize((json \ "Result Size").extract[Long])
781    metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long])
782    metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long])
783    metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long])
784    metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
785
786    // Shuffle read metrics
787    Utils.jsonOption(json \ "Shuffle Read Metrics").foreach { readJson =>
788      val readMetrics = metrics.createTempShuffleReadMetrics()
789      readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
790      readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
791      readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
792      readMetrics.incLocalBytesRead((readJson \ "Local Bytes Read").extractOpt[Long].getOrElse(0L))
793      readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
794      readMetrics.incRecordsRead((readJson \ "Total Records Read").extractOpt[Long].getOrElse(0L))
795      metrics.mergeShuffleReadMetrics()
796    }
797
798    // Shuffle write metrics
799    // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
800    Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
801      val writeMetrics = metrics.shuffleWriteMetrics
802      writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
803      writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
804        .extractOpt[Long].getOrElse(0L))
805      writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
806    }
807
808    // Output metrics
809    Utils.jsonOption(json \ "Output Metrics").foreach { outJson =>
810      val outputMetrics = metrics.outputMetrics
811      outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
812      outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L))
813    }
814
815    // Input metrics
816    Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
817      val inputMetrics = metrics.inputMetrics
818      inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
819      inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
820    }
821
822    // Updated blocks
823    Utils.jsonOption(json \ "Updated Blocks").foreach { blocksJson =>
824      metrics.setUpdatedBlockStatuses(blocksJson.extract[List[JValue]].map { blockJson =>
825        val id = BlockId((blockJson \ "Block ID").extract[String])
826        val status = blockStatusFromJson(blockJson \ "Status")
827        (id, status)
828      })
829    }
830
831    metrics
832  }
833
834  def taskEndReasonFromJson(json: JValue): TaskEndReason = {
835    val success = Utils.getFormattedClassName(Success)
836    val resubmitted = Utils.getFormattedClassName(Resubmitted)
837    val fetchFailed = Utils.getFormattedClassName(FetchFailed)
838    val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
839    val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
840    val taskKilled = Utils.getFormattedClassName(TaskKilled)
841    val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
842    val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
843    val unknownReason = Utils.getFormattedClassName(UnknownReason)
844
845    (json \ "Reason").extract[String] match {
846      case `success` => Success
847      case `resubmitted` => Resubmitted
848      case `fetchFailed` =>
849        val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address")
850        val shuffleId = (json \ "Shuffle ID").extract[Int]
851        val mapId = (json \ "Map ID").extract[Int]
852        val reduceId = (json \ "Reduce ID").extract[Int]
853        val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
854        new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
855          message.getOrElse("Unknown reason"))
856      case `exceptionFailure` =>
857        val className = (json \ "Class Name").extract[String]
858        val description = (json \ "Description").extract[String]
859        val stackTrace = stackTraceFromJson(json \ "Stack Trace")
860        val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull
861        // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
862        val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
863          .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
864          .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => {
865            acc.toInfo(Some(acc.value), None)
866          }))
867        ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
868      case `taskResultLost` => TaskResultLost
869      case `taskKilled` => TaskKilled
870      case `taskCommitDenied` =>
871        // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
872        // de/serialization logic was not added until 1.5.1. To provide backward compatibility
873        // for reading those logs, we need to provide default values for all the fields.
874        val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
875        val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
876        val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
877        TaskCommitDenied(jobId, partitionId, attemptNo)
878      case `executorLostFailure` =>
879        val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
880        val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
881        val reason = Utils.jsonOption(json \ "Loss Reason").map(_.extract[String])
882        ExecutorLostFailure(
883          executorId.getOrElse("Unknown"),
884          exitCausedByApp.getOrElse(true),
885          reason)
886      case `unknownReason` => UnknownReason
887    }
888  }
889
890  def blockManagerIdFromJson(json: JValue): BlockManagerId = {
891    // On metadata fetch fail, block manager ID can be null (SPARK-4471)
892    if (json == JNothing) {
893      return null
894    }
895    val executorId = (json \ "Executor ID").extract[String]
896    val host = (json \ "Host").extract[String]
897    val port = (json \ "Port").extract[Int]
898    BlockManagerId(executorId, host, port)
899  }
900
901  def jobResultFromJson(json: JValue): JobResult = {
902    val jobSucceeded = Utils.getFormattedClassName(JobSucceeded)
903    val jobFailed = Utils.getFormattedClassName(JobFailed)
904
905    (json \ "Result").extract[String] match {
906      case `jobSucceeded` => JobSucceeded
907      case `jobFailed` =>
908        val exception = exceptionFromJson(json \ "Exception")
909        new JobFailed(exception)
910    }
911  }
912
913  def rddInfoFromJson(json: JValue): RDDInfo = {
914    val rddId = (json \ "RDD ID").extract[Int]
915    val name = (json \ "Name").extract[String]
916    val scope = Utils.jsonOption(json \ "Scope")
917      .map(_.extract[String])
918      .map(RDDOperationScope.fromJson)
919    val callsite = Utils.jsonOption(json \ "Callsite").map(_.extract[String]).getOrElse("")
920    val parentIds = Utils.jsonOption(json \ "Parent IDs")
921      .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
922      .getOrElse(Seq.empty)
923    val storageLevel = storageLevelFromJson(json \ "Storage Level")
924    val numPartitions = (json \ "Number of Partitions").extract[Int]
925    val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
926    val memSize = (json \ "Memory Size").extract[Long]
927    val diskSize = (json \ "Disk Size").extract[Long]
928
929    val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope)
930    rddInfo.numCachedPartitions = numCachedPartitions
931    rddInfo.memSize = memSize
932    rddInfo.diskSize = diskSize
933    rddInfo
934  }
935
936  def storageLevelFromJson(json: JValue): StorageLevel = {
937    val useDisk = (json \ "Use Disk").extract[Boolean]
938    val useMemory = (json \ "Use Memory").extract[Boolean]
939    val deserialized = (json \ "Deserialized").extract[Boolean]
940    val replication = (json \ "Replication").extract[Int]
941    StorageLevel(useDisk, useMemory, deserialized, replication)
942  }
943
944  def blockStatusFromJson(json: JValue): BlockStatus = {
945    val storageLevel = storageLevelFromJson(json \ "Storage Level")
946    val memorySize = (json \ "Memory Size").extract[Long]
947    val diskSize = (json \ "Disk Size").extract[Long]
948    BlockStatus(storageLevel, memorySize, diskSize)
949  }
950
951  def executorInfoFromJson(json: JValue): ExecutorInfo = {
952    val executorHost = (json \ "Host").extract[String]
953    val totalCores = (json \ "Total Cores").extract[Int]
954    val logUrls = mapFromJson(json \ "Log Urls").toMap
955    new ExecutorInfo(executorHost, totalCores, logUrls)
956  }
957
958  /** -------------------------------- *
959   * Util JSON deserialization methods |
960   * --------------------------------- */
961
962  def mapFromJson(json: JValue): Map[String, String] = {
963    val jsonFields = json.asInstanceOf[JObject].obj
964    jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap
965  }
966
967  def propertiesFromJson(json: JValue): Properties = {
968    Utils.jsonOption(json).map { value =>
969      val properties = new Properties
970      mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
971      properties
972    }.getOrElse(null)
973  }
974
975  def UUIDFromJson(json: JValue): UUID = {
976    val leastSignificantBits = (json \ "Least Significant Bits").extract[Long]
977    val mostSignificantBits = (json \ "Most Significant Bits").extract[Long]
978    new UUID(leastSignificantBits, mostSignificantBits)
979  }
980
981  def stackTraceFromJson(json: JValue): Array[StackTraceElement] = {
982    json.extract[List[JValue]].map { line =>
983      val declaringClass = (line \ "Declaring Class").extract[String]
984      val methodName = (line \ "Method Name").extract[String]
985      val fileName = (line \ "File Name").extract[String]
986      val lineNumber = (line \ "Line Number").extract[Int]
987      new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
988    }.toArray
989  }
990
991  def exceptionFromJson(json: JValue): Exception = {
992    val e = new Exception((json \ "Message").extract[String])
993    e.setStackTrace(stackTraceFromJson(json \ "Stack Trace"))
994    e
995  }
996
997}
998