1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.util 19 20import java.util.{Properties, UUID} 21 22import scala.collection.JavaConverters._ 23import scala.collection.Map 24 25import com.fasterxml.jackson.databind.ObjectMapper 26import com.fasterxml.jackson.module.scala.DefaultScalaModule 27import org.json4s.DefaultFormats 28import org.json4s.JsonAST._ 29import org.json4s.JsonDSL._ 30import org.json4s.jackson.JsonMethods._ 31 32import org.apache.spark._ 33import org.apache.spark.executor._ 34import org.apache.spark.rdd.RDDOperationScope 35import org.apache.spark.scheduler._ 36import org.apache.spark.scheduler.cluster.ExecutorInfo 37import org.apache.spark.storage._ 38 39/** 40 * Serializes SparkListener events to/from JSON. This protocol provides strong backwards- 41 * and forwards-compatibility guarantees: any version of Spark should be able to read JSON output 42 * written by any other version, including newer versions. 43 * 44 * JsonProtocolSuite contains backwards-compatibility tests which check that the current version of 45 * JsonProtocol is able to read output written by earlier versions. We do not currently have tests 46 * for reading newer JSON output with older Spark versions. 47 * 48 * To ensure that we provide these guarantees, follow these rules when modifying these methods: 49 * 50 * - Never delete any JSON fields. 51 * - Any new JSON fields should be optional; use `Utils.jsonOption` when reading these fields 52 * in `*FromJson` methods. 53 */ 54private[spark] object JsonProtocol { 55 // TODO: Remove this file and put JSON serialization into each individual class. 56 57 private implicit val format = DefaultFormats 58 59 private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) 60 61 /** ------------------------------------------------- * 62 * JSON serialization methods for SparkListenerEvents | 63 * -------------------------------------------------- */ 64 65 def sparkEventToJson(event: SparkListenerEvent): JValue = { 66 event match { 67 case stageSubmitted: SparkListenerStageSubmitted => 68 stageSubmittedToJson(stageSubmitted) 69 case stageCompleted: SparkListenerStageCompleted => 70 stageCompletedToJson(stageCompleted) 71 case taskStart: SparkListenerTaskStart => 72 taskStartToJson(taskStart) 73 case taskGettingResult: SparkListenerTaskGettingResult => 74 taskGettingResultToJson(taskGettingResult) 75 case taskEnd: SparkListenerTaskEnd => 76 taskEndToJson(taskEnd) 77 case jobStart: SparkListenerJobStart => 78 jobStartToJson(jobStart) 79 case jobEnd: SparkListenerJobEnd => 80 jobEndToJson(jobEnd) 81 case environmentUpdate: SparkListenerEnvironmentUpdate => 82 environmentUpdateToJson(environmentUpdate) 83 case blockManagerAdded: SparkListenerBlockManagerAdded => 84 blockManagerAddedToJson(blockManagerAdded) 85 case blockManagerRemoved: SparkListenerBlockManagerRemoved => 86 blockManagerRemovedToJson(blockManagerRemoved) 87 case unpersistRDD: SparkListenerUnpersistRDD => 88 unpersistRDDToJson(unpersistRDD) 89 case applicationStart: SparkListenerApplicationStart => 90 applicationStartToJson(applicationStart) 91 case applicationEnd: SparkListenerApplicationEnd => 92 applicationEndToJson(applicationEnd) 93 case executorAdded: SparkListenerExecutorAdded => 94 executorAddedToJson(executorAdded) 95 case executorRemoved: SparkListenerExecutorRemoved => 96 executorRemovedToJson(executorRemoved) 97 case logStart: SparkListenerLogStart => 98 logStartToJson(logStart) 99 case metricsUpdate: SparkListenerExecutorMetricsUpdate => 100 executorMetricsUpdateToJson(metricsUpdate) 101 case blockUpdated: SparkListenerBlockUpdated => 102 throw new MatchError(blockUpdated) // TODO(ekl) implement this 103 case _ => parse(mapper.writeValueAsString(event)) 104 } 105 } 106 107 def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = { 108 val stageInfo = stageInfoToJson(stageSubmitted.stageInfo) 109 val properties = propertiesToJson(stageSubmitted.properties) 110 ("Event" -> Utils.getFormattedClassName(stageSubmitted)) ~ 111 ("Stage Info" -> stageInfo) ~ 112 ("Properties" -> properties) 113 } 114 115 def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = { 116 val stageInfo = stageInfoToJson(stageCompleted.stageInfo) 117 ("Event" -> Utils.getFormattedClassName(stageCompleted)) ~ 118 ("Stage Info" -> stageInfo) 119 } 120 121 def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = { 122 val taskInfo = taskStart.taskInfo 123 ("Event" -> Utils.getFormattedClassName(taskStart)) ~ 124 ("Stage ID" -> taskStart.stageId) ~ 125 ("Stage Attempt ID" -> taskStart.stageAttemptId) ~ 126 ("Task Info" -> taskInfoToJson(taskInfo)) 127 } 128 129 def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = { 130 val taskInfo = taskGettingResult.taskInfo 131 ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~ 132 ("Task Info" -> taskInfoToJson(taskInfo)) 133 } 134 135 def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = { 136 val taskEndReason = taskEndReasonToJson(taskEnd.reason) 137 val taskInfo = taskEnd.taskInfo 138 val taskMetrics = taskEnd.taskMetrics 139 val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing 140 ("Event" -> Utils.getFormattedClassName(taskEnd)) ~ 141 ("Stage ID" -> taskEnd.stageId) ~ 142 ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~ 143 ("Task Type" -> taskEnd.taskType) ~ 144 ("Task End Reason" -> taskEndReason) ~ 145 ("Task Info" -> taskInfoToJson(taskInfo)) ~ 146 ("Task Metrics" -> taskMetricsJson) 147 } 148 149 def jobStartToJson(jobStart: SparkListenerJobStart): JValue = { 150 val properties = propertiesToJson(jobStart.properties) 151 ("Event" -> Utils.getFormattedClassName(jobStart)) ~ 152 ("Job ID" -> jobStart.jobId) ~ 153 ("Submission Time" -> jobStart.time) ~ 154 ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~ // Added in Spark 1.2.0 155 ("Stage IDs" -> jobStart.stageIds) ~ 156 ("Properties" -> properties) 157 } 158 159 def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = { 160 val jobResult = jobResultToJson(jobEnd.jobResult) 161 ("Event" -> Utils.getFormattedClassName(jobEnd)) ~ 162 ("Job ID" -> jobEnd.jobId) ~ 163 ("Completion Time" -> jobEnd.time) ~ 164 ("Job Result" -> jobResult) 165 } 166 167 def environmentUpdateToJson(environmentUpdate: SparkListenerEnvironmentUpdate): JValue = { 168 val environmentDetails = environmentUpdate.environmentDetails 169 val jvmInformation = mapToJson(environmentDetails("JVM Information").toMap) 170 val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap) 171 val systemProperties = mapToJson(environmentDetails("System Properties").toMap) 172 val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap) 173 ("Event" -> Utils.getFormattedClassName(environmentUpdate)) ~ 174 ("JVM Information" -> jvmInformation) ~ 175 ("Spark Properties" -> sparkProperties) ~ 176 ("System Properties" -> systemProperties) ~ 177 ("Classpath Entries" -> classpathEntries) 178 } 179 180 def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = { 181 val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId) 182 ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~ 183 ("Block Manager ID" -> blockManagerId) ~ 184 ("Maximum Memory" -> blockManagerAdded.maxMem) ~ 185 ("Timestamp" -> blockManagerAdded.time) 186 } 187 188 def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = { 189 val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId) 190 ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~ 191 ("Block Manager ID" -> blockManagerId) ~ 192 ("Timestamp" -> blockManagerRemoved.time) 193 } 194 195 def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = { 196 ("Event" -> Utils.getFormattedClassName(unpersistRDD)) ~ 197 ("RDD ID" -> unpersistRDD.rddId) 198 } 199 200 def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = { 201 ("Event" -> Utils.getFormattedClassName(applicationStart)) ~ 202 ("App Name" -> applicationStart.appName) ~ 203 ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ 204 ("Timestamp" -> applicationStart.time) ~ 205 ("User" -> applicationStart.sparkUser) ~ 206 ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~ 207 ("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing)) 208 } 209 210 def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { 211 ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~ 212 ("Timestamp" -> applicationEnd.time) 213 } 214 215 def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = { 216 ("Event" -> Utils.getFormattedClassName(executorAdded)) ~ 217 ("Timestamp" -> executorAdded.time) ~ 218 ("Executor ID" -> executorAdded.executorId) ~ 219 ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo)) 220 } 221 222 def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = { 223 ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~ 224 ("Timestamp" -> executorRemoved.time) ~ 225 ("Executor ID" -> executorRemoved.executorId) ~ 226 ("Removed Reason" -> executorRemoved.reason) 227 } 228 229 def logStartToJson(logStart: SparkListenerLogStart): JValue = { 230 ("Event" -> Utils.getFormattedClassName(logStart)) ~ 231 ("Spark Version" -> SPARK_VERSION) 232 } 233 234 def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = { 235 val execId = metricsUpdate.execId 236 val accumUpdates = metricsUpdate.accumUpdates 237 ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~ 238 ("Executor ID" -> execId) ~ 239 ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) => 240 ("Task ID" -> taskId) ~ 241 ("Stage ID" -> stageId) ~ 242 ("Stage Attempt ID" -> stageAttemptId) ~ 243 ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList)) 244 }) 245 } 246 247 /** ------------------------------------------------------------------- * 248 * JSON serialization methods for classes SparkListenerEvents depend on | 249 * -------------------------------------------------------------------- */ 250 251 def stageInfoToJson(stageInfo: StageInfo): JValue = { 252 val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList) 253 val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList) 254 val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing) 255 val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing) 256 val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing) 257 ("Stage ID" -> stageInfo.stageId) ~ 258 ("Stage Attempt ID" -> stageInfo.attemptId) ~ 259 ("Stage Name" -> stageInfo.name) ~ 260 ("Number of Tasks" -> stageInfo.numTasks) ~ 261 ("RDD Info" -> rddInfo) ~ 262 ("Parent IDs" -> parentIds) ~ 263 ("Details" -> stageInfo.details) ~ 264 ("Submission Time" -> submissionTime) ~ 265 ("Completion Time" -> completionTime) ~ 266 ("Failure Reason" -> failureReason) ~ 267 ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values)) 268 } 269 270 def taskInfoToJson(taskInfo: TaskInfo): JValue = { 271 ("Task ID" -> taskInfo.taskId) ~ 272 ("Index" -> taskInfo.index) ~ 273 ("Attempt" -> taskInfo.attemptNumber) ~ 274 ("Launch Time" -> taskInfo.launchTime) ~ 275 ("Executor ID" -> taskInfo.executorId) ~ 276 ("Host" -> taskInfo.host) ~ 277 ("Locality" -> taskInfo.taskLocality.toString) ~ 278 ("Speculative" -> taskInfo.speculative) ~ 279 ("Getting Result Time" -> taskInfo.gettingResultTime) ~ 280 ("Finish Time" -> taskInfo.finishTime) ~ 281 ("Failed" -> taskInfo.failed) ~ 282 ("Killed" -> taskInfo.killed) ~ 283 ("Accumulables" -> accumulablesToJson(taskInfo.accumulables)) 284 } 285 286 private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses") 287 288 def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = { 289 JArray(accumulables 290 .filterNot(_.name.exists(accumulableBlacklist.contains)) 291 .toList.map(accumulableInfoToJson)) 292 } 293 294 def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = { 295 val name = accumulableInfo.name 296 ("ID" -> accumulableInfo.id) ~ 297 ("Name" -> name) ~ 298 ("Update" -> accumulableInfo.update.map { v => accumValueToJson(name, v) }) ~ 299 ("Value" -> accumulableInfo.value.map { v => accumValueToJson(name, v) }) ~ 300 ("Internal" -> accumulableInfo.internal) ~ 301 ("Count Failed Values" -> accumulableInfo.countFailedValues) ~ 302 ("Metadata" -> accumulableInfo.metadata) 303 } 304 305 /** 306 * Serialize the value of an accumulator to JSON. 307 * 308 * For accumulators representing internal task metrics, this looks up the relevant 309 * [[AccumulatorParam]] to serialize the value accordingly. For all other accumulators, 310 * this will simply serialize the value as a string. 311 * 312 * The behavior here must match that of [[accumValueFromJson]]. Exposed for testing. 313 */ 314 private[util] def accumValueToJson(name: Option[String], value: Any): JValue = { 315 if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) { 316 value match { 317 case v: Int => JInt(v) 318 case v: Long => JInt(v) 319 // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be 320 // the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]` 321 case v => 322 JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map { 323 case (id, status) => 324 ("Block ID" -> id.toString) ~ 325 ("Status" -> blockStatusToJson(status)) 326 }) 327 } 328 } else { 329 // For all external accumulators, just use strings 330 JString(value.toString) 331 } 332 } 333 334 def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { 335 val shuffleReadMetrics: JValue = 336 ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~ 337 ("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~ 338 ("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~ 339 ("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~ 340 ("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~ 341 ("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead) 342 val shuffleWriteMetrics: JValue = 343 ("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~ 344 ("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~ 345 ("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten) 346 val inputMetrics: JValue = 347 ("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~ 348 ("Records Read" -> taskMetrics.inputMetrics.recordsRead) 349 val outputMetrics: JValue = 350 ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~ 351 ("Records Written" -> taskMetrics.outputMetrics.recordsWritten) 352 val updatedBlocks = 353 JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) => 354 ("Block ID" -> id.toString) ~ 355 ("Status" -> blockStatusToJson(status)) 356 }) 357 ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ 358 ("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~ 359 ("Executor Run Time" -> taskMetrics.executorRunTime) ~ 360 ("Executor CPU Time" -> taskMetrics.executorCpuTime) ~ 361 ("Result Size" -> taskMetrics.resultSize) ~ 362 ("JVM GC Time" -> taskMetrics.jvmGCTime) ~ 363 ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~ 364 ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~ 365 ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~ 366 ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ 367 ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ 368 ("Input Metrics" -> inputMetrics) ~ 369 ("Output Metrics" -> outputMetrics) ~ 370 ("Updated Blocks" -> updatedBlocks) 371 } 372 373 def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { 374 val reason = Utils.getFormattedClassName(taskEndReason) 375 val json: JObject = taskEndReason match { 376 case fetchFailed: FetchFailed => 377 val blockManagerAddress = Option(fetchFailed.bmAddress). 378 map(blockManagerIdToJson).getOrElse(JNothing) 379 ("Block Manager Address" -> blockManagerAddress) ~ 380 ("Shuffle ID" -> fetchFailed.shuffleId) ~ 381 ("Map ID" -> fetchFailed.mapId) ~ 382 ("Reduce ID" -> fetchFailed.reduceId) ~ 383 ("Message" -> fetchFailed.message) 384 case exceptionFailure: ExceptionFailure => 385 val stackTrace = stackTraceToJson(exceptionFailure.stackTrace) 386 val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates) 387 ("Class Name" -> exceptionFailure.className) ~ 388 ("Description" -> exceptionFailure.description) ~ 389 ("Stack Trace" -> stackTrace) ~ 390 ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~ 391 ("Accumulator Updates" -> accumUpdates) 392 case taskCommitDenied: TaskCommitDenied => 393 ("Job ID" -> taskCommitDenied.jobID) ~ 394 ("Partition ID" -> taskCommitDenied.partitionID) ~ 395 ("Attempt Number" -> taskCommitDenied.attemptNumber) 396 case ExecutorLostFailure(executorId, exitCausedByApp, reason) => 397 ("Executor ID" -> executorId) ~ 398 ("Exit Caused By App" -> exitCausedByApp) ~ 399 ("Loss Reason" -> reason.map(_.toString)) 400 case _ => Utils.emptyJson 401 } 402 ("Reason" -> reason) ~ json 403 } 404 405 def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = { 406 ("Executor ID" -> blockManagerId.executorId) ~ 407 ("Host" -> blockManagerId.host) ~ 408 ("Port" -> blockManagerId.port) 409 } 410 411 def jobResultToJson(jobResult: JobResult): JValue = { 412 val result = Utils.getFormattedClassName(jobResult) 413 val json = jobResult match { 414 case JobSucceeded => Utils.emptyJson 415 case jobFailed: JobFailed => 416 JObject("Exception" -> exceptionToJson(jobFailed.exception)) 417 } 418 ("Result" -> result) ~ json 419 } 420 421 def rddInfoToJson(rddInfo: RDDInfo): JValue = { 422 val storageLevel = storageLevelToJson(rddInfo.storageLevel) 423 val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList) 424 ("RDD ID" -> rddInfo.id) ~ 425 ("Name" -> rddInfo.name) ~ 426 ("Scope" -> rddInfo.scope.map(_.toJson)) ~ 427 ("Callsite" -> rddInfo.callSite) ~ 428 ("Parent IDs" -> parentIds) ~ 429 ("Storage Level" -> storageLevel) ~ 430 ("Number of Partitions" -> rddInfo.numPartitions) ~ 431 ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~ 432 ("Memory Size" -> rddInfo.memSize) ~ 433 ("Disk Size" -> rddInfo.diskSize) 434 } 435 436 def storageLevelToJson(storageLevel: StorageLevel): JValue = { 437 ("Use Disk" -> storageLevel.useDisk) ~ 438 ("Use Memory" -> storageLevel.useMemory) ~ 439 ("Deserialized" -> storageLevel.deserialized) ~ 440 ("Replication" -> storageLevel.replication) 441 } 442 443 def blockStatusToJson(blockStatus: BlockStatus): JValue = { 444 val storageLevel = storageLevelToJson(blockStatus.storageLevel) 445 ("Storage Level" -> storageLevel) ~ 446 ("Memory Size" -> blockStatus.memSize) ~ 447 ("Disk Size" -> blockStatus.diskSize) 448 } 449 450 def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { 451 ("Host" -> executorInfo.executorHost) ~ 452 ("Total Cores" -> executorInfo.totalCores) ~ 453 ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) 454 } 455 456 /** ------------------------------ * 457 * Util JSON serialization methods | 458 * ------------------------------- */ 459 460 def mapToJson(m: Map[String, String]): JValue = { 461 val jsonFields = m.map { case (k, v) => JField(k, JString(v)) } 462 JObject(jsonFields.toList) 463 } 464 465 def propertiesToJson(properties: Properties): JValue = { 466 Option(properties).map { p => 467 mapToJson(p.asScala) 468 }.getOrElse(JNothing) 469 } 470 471 def UUIDToJson(id: UUID): JValue = { 472 ("Least Significant Bits" -> id.getLeastSignificantBits) ~ 473 ("Most Significant Bits" -> id.getMostSignificantBits) 474 } 475 476 def stackTraceToJson(stackTrace: Array[StackTraceElement]): JValue = { 477 JArray(stackTrace.map { case line => 478 ("Declaring Class" -> line.getClassName) ~ 479 ("Method Name" -> line.getMethodName) ~ 480 ("File Name" -> line.getFileName) ~ 481 ("Line Number" -> line.getLineNumber) 482 }.toList) 483 } 484 485 def exceptionToJson(exception: Exception): JValue = { 486 ("Message" -> exception.getMessage) ~ 487 ("Stack Trace" -> stackTraceToJson(exception.getStackTrace)) 488 } 489 490 491 /** --------------------------------------------------- * 492 * JSON deserialization methods for SparkListenerEvents | 493 * ---------------------------------------------------- */ 494 495 def sparkEventFromJson(json: JValue): SparkListenerEvent = { 496 val stageSubmitted = Utils.getFormattedClassName(SparkListenerStageSubmitted) 497 val stageCompleted = Utils.getFormattedClassName(SparkListenerStageCompleted) 498 val taskStart = Utils.getFormattedClassName(SparkListenerTaskStart) 499 val taskGettingResult = Utils.getFormattedClassName(SparkListenerTaskGettingResult) 500 val taskEnd = Utils.getFormattedClassName(SparkListenerTaskEnd) 501 val jobStart = Utils.getFormattedClassName(SparkListenerJobStart) 502 val jobEnd = Utils.getFormattedClassName(SparkListenerJobEnd) 503 val environmentUpdate = Utils.getFormattedClassName(SparkListenerEnvironmentUpdate) 504 val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded) 505 val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved) 506 val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD) 507 val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart) 508 val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd) 509 val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded) 510 val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved) 511 val logStart = Utils.getFormattedClassName(SparkListenerLogStart) 512 val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate) 513 514 (json \ "Event").extract[String] match { 515 case `stageSubmitted` => stageSubmittedFromJson(json) 516 case `stageCompleted` => stageCompletedFromJson(json) 517 case `taskStart` => taskStartFromJson(json) 518 case `taskGettingResult` => taskGettingResultFromJson(json) 519 case `taskEnd` => taskEndFromJson(json) 520 case `jobStart` => jobStartFromJson(json) 521 case `jobEnd` => jobEndFromJson(json) 522 case `environmentUpdate` => environmentUpdateFromJson(json) 523 case `blockManagerAdded` => blockManagerAddedFromJson(json) 524 case `blockManagerRemoved` => blockManagerRemovedFromJson(json) 525 case `unpersistRDD` => unpersistRDDFromJson(json) 526 case `applicationStart` => applicationStartFromJson(json) 527 case `applicationEnd` => applicationEndFromJson(json) 528 case `executorAdded` => executorAddedFromJson(json) 529 case `executorRemoved` => executorRemovedFromJson(json) 530 case `logStart` => logStartFromJson(json) 531 case `metricsUpdate` => executorMetricsUpdateFromJson(json) 532 case other => mapper.readValue(compact(render(json)), Utils.classForName(other)) 533 .asInstanceOf[SparkListenerEvent] 534 } 535 } 536 537 def stageSubmittedFromJson(json: JValue): SparkListenerStageSubmitted = { 538 val stageInfo = stageInfoFromJson(json \ "Stage Info") 539 val properties = propertiesFromJson(json \ "Properties") 540 SparkListenerStageSubmitted(stageInfo, properties) 541 } 542 543 def stageCompletedFromJson(json: JValue): SparkListenerStageCompleted = { 544 val stageInfo = stageInfoFromJson(json \ "Stage Info") 545 SparkListenerStageCompleted(stageInfo) 546 } 547 548 def taskStartFromJson(json: JValue): SparkListenerTaskStart = { 549 val stageId = (json \ "Stage ID").extract[Int] 550 val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) 551 val taskInfo = taskInfoFromJson(json \ "Task Info") 552 SparkListenerTaskStart(stageId, stageAttemptId, taskInfo) 553 } 554 555 def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult = { 556 val taskInfo = taskInfoFromJson(json \ "Task Info") 557 SparkListenerTaskGettingResult(taskInfo) 558 } 559 560 def taskEndFromJson(json: JValue): SparkListenerTaskEnd = { 561 val stageId = (json \ "Stage ID").extract[Int] 562 val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) 563 val taskType = (json \ "Task Type").extract[String] 564 val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason") 565 val taskInfo = taskInfoFromJson(json \ "Task Info") 566 val taskMetrics = taskMetricsFromJson(json \ "Task Metrics") 567 SparkListenerTaskEnd(stageId, stageAttemptId, taskType, taskEndReason, taskInfo, taskMetrics) 568 } 569 570 def jobStartFromJson(json: JValue): SparkListenerJobStart = { 571 val jobId = (json \ "Job ID").extract[Int] 572 val submissionTime = 573 Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L) 574 val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int]) 575 val properties = propertiesFromJson(json \ "Properties") 576 // The "Stage Infos" field was added in Spark 1.2.0 577 val stageInfos = Utils.jsonOption(json \ "Stage Infos") 578 .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse { 579 stageIds.map { id => 580 new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown") 581 } 582 } 583 SparkListenerJobStart(jobId, submissionTime, stageInfos, properties) 584 } 585 586 def jobEndFromJson(json: JValue): SparkListenerJobEnd = { 587 val jobId = (json \ "Job ID").extract[Int] 588 val completionTime = 589 Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L) 590 val jobResult = jobResultFromJson(json \ "Job Result") 591 SparkListenerJobEnd(jobId, completionTime, jobResult) 592 } 593 594 def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = { 595 val environmentDetails = Map[String, Seq[(String, String)]]( 596 "JVM Information" -> mapFromJson(json \ "JVM Information").toSeq, 597 "Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq, 598 "System Properties" -> mapFromJson(json \ "System Properties").toSeq, 599 "Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq) 600 SparkListenerEnvironmentUpdate(environmentDetails) 601 } 602 603 def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = { 604 val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") 605 val maxMem = (json \ "Maximum Memory").extract[Long] 606 val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L) 607 SparkListenerBlockManagerAdded(time, blockManagerId, maxMem) 608 } 609 610 def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = { 611 val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID") 612 val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L) 613 SparkListenerBlockManagerRemoved(time, blockManagerId) 614 } 615 616 def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = { 617 SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int]) 618 } 619 620 def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = { 621 val appName = (json \ "App Name").extract[String] 622 val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String]) 623 val time = (json \ "Timestamp").extract[Long] 624 val sparkUser = (json \ "User").extract[String] 625 val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String]) 626 val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson) 627 SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs) 628 } 629 630 def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = { 631 SparkListenerApplicationEnd((json \ "Timestamp").extract[Long]) 632 } 633 634 def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = { 635 val time = (json \ "Timestamp").extract[Long] 636 val executorId = (json \ "Executor ID").extract[String] 637 val executorInfo = executorInfoFromJson(json \ "Executor Info") 638 SparkListenerExecutorAdded(time, executorId, executorInfo) 639 } 640 641 def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = { 642 val time = (json \ "Timestamp").extract[Long] 643 val executorId = (json \ "Executor ID").extract[String] 644 val reason = (json \ "Removed Reason").extract[String] 645 SparkListenerExecutorRemoved(time, executorId, reason) 646 } 647 648 def logStartFromJson(json: JValue): SparkListenerLogStart = { 649 val sparkVersion = (json \ "Spark Version").extract[String] 650 SparkListenerLogStart(sparkVersion) 651 } 652 653 def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = { 654 val execInfo = (json \ "Executor ID").extract[String] 655 val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { json => 656 val taskId = (json \ "Task ID").extract[Long] 657 val stageId = (json \ "Stage ID").extract[Int] 658 val stageAttemptId = (json \ "Stage Attempt ID").extract[Int] 659 val updates = 660 (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson) 661 (taskId, stageId, stageAttemptId, updates) 662 } 663 SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates) 664 } 665 666 /** --------------------------------------------------------------------- * 667 * JSON deserialization methods for classes SparkListenerEvents depend on | 668 * ---------------------------------------------------------------------- */ 669 670 def stageInfoFromJson(json: JValue): StageInfo = { 671 val stageId = (json \ "Stage ID").extract[Int] 672 val attemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0) 673 val stageName = (json \ "Stage Name").extract[String] 674 val numTasks = (json \ "Number of Tasks").extract[Int] 675 val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson) 676 val parentIds = Utils.jsonOption(json \ "Parent IDs") 677 .map { l => l.extract[List[JValue]].map(_.extract[Int]) } 678 .getOrElse(Seq.empty) 679 val details = (json \ "Details").extractOpt[String].getOrElse("") 680 val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]) 681 val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) 682 val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String]) 683 val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match { 684 case Some(values) => values.map(accumulableInfoFromJson) 685 case None => Seq[AccumulableInfo]() 686 } 687 688 val stageInfo = new StageInfo( 689 stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details) 690 stageInfo.submissionTime = submissionTime 691 stageInfo.completionTime = completionTime 692 stageInfo.failureReason = failureReason 693 for (accInfo <- accumulatedValues) { 694 stageInfo.accumulables(accInfo.id) = accInfo 695 } 696 stageInfo 697 } 698 699 def taskInfoFromJson(json: JValue): TaskInfo = { 700 val taskId = (json \ "Task ID").extract[Long] 701 val index = (json \ "Index").extract[Int] 702 val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1) 703 val launchTime = (json \ "Launch Time").extract[Long] 704 val executorId = (json \ "Executor ID").extract[String] 705 val host = (json \ "Host").extract[String] 706 val taskLocality = TaskLocality.withName((json \ "Locality").extract[String]) 707 val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false) 708 val gettingResultTime = (json \ "Getting Result Time").extract[Long] 709 val finishTime = (json \ "Finish Time").extract[Long] 710 val failed = (json \ "Failed").extract[Boolean] 711 val killed = (json \ "Killed").extractOpt[Boolean].getOrElse(false) 712 val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match { 713 case Some(values) => values.map(accumulableInfoFromJson) 714 case None => Seq[AccumulableInfo]() 715 } 716 717 val taskInfo = 718 new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative) 719 taskInfo.gettingResultTime = gettingResultTime 720 taskInfo.finishTime = finishTime 721 taskInfo.failed = failed 722 taskInfo.killed = killed 723 accumulables.foreach { taskInfo.accumulables += _ } 724 taskInfo 725 } 726 727 def accumulableInfoFromJson(json: JValue): AccumulableInfo = { 728 val id = (json \ "ID").extract[Long] 729 val name = (json \ "Name").extractOpt[String] 730 val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) } 731 val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) } 732 val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false) 733 val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false) 734 val metadata = (json \ "Metadata").extractOpt[String] 735 new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata) 736 } 737 738 /** 739 * Deserialize the value of an accumulator from JSON. 740 * 741 * For accumulators representing internal task metrics, this looks up the relevant 742 * [[AccumulatorParam]] to deserialize the value accordingly. For all other 743 * accumulators, this will simply deserialize the value as a string. 744 * 745 * The behavior here must match that of [[accumValueToJson]]. Exposed for testing. 746 */ 747 private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = { 748 if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) { 749 value match { 750 case JInt(v) => v.toLong 751 case JArray(v) => 752 v.map { blockJson => 753 val id = BlockId((blockJson \ "Block ID").extract[String]) 754 val status = blockStatusFromJson(blockJson \ "Status") 755 (id, status) 756 }.asJava 757 case _ => throw new IllegalArgumentException(s"unexpected json value $value for " + 758 "accumulator " + name.get) 759 } 760 } else { 761 value.extract[String] 762 } 763 } 764 765 def taskMetricsFromJson(json: JValue): TaskMetrics = { 766 val metrics = TaskMetrics.empty 767 if (json == JNothing) { 768 return metrics 769 } 770 metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) 771 metrics.setExecutorDeserializeCpuTime((json \ "Executor Deserialize CPU Time") match { 772 case JNothing => 0 773 case x => x.extract[Long] 774 }) 775 metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long]) 776 metrics.setExecutorCpuTime((json \ "Executor CPU Time") match { 777 case JNothing => 0 778 case x => x.extract[Long] 779 }) 780 metrics.setResultSize((json \ "Result Size").extract[Long]) 781 metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long]) 782 metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long]) 783 metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long]) 784 metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long]) 785 786 // Shuffle read metrics 787 Utils.jsonOption(json \ "Shuffle Read Metrics").foreach { readJson => 788 val readMetrics = metrics.createTempShuffleReadMetrics() 789 readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int]) 790 readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int]) 791 readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long]) 792 readMetrics.incLocalBytesRead((readJson \ "Local Bytes Read").extractOpt[Long].getOrElse(0L)) 793 readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long]) 794 readMetrics.incRecordsRead((readJson \ "Total Records Read").extractOpt[Long].getOrElse(0L)) 795 metrics.mergeShuffleReadMetrics() 796 } 797 798 // Shuffle write metrics 799 // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes. 800 Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson => 801 val writeMetrics = metrics.shuffleWriteMetrics 802 writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long]) 803 writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written") 804 .extractOpt[Long].getOrElse(0L)) 805 writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long]) 806 } 807 808 // Output metrics 809 Utils.jsonOption(json \ "Output Metrics").foreach { outJson => 810 val outputMetrics = metrics.outputMetrics 811 outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long]) 812 outputMetrics.setRecordsWritten((outJson \ "Records Written").extractOpt[Long].getOrElse(0L)) 813 } 814 815 // Input metrics 816 Utils.jsonOption(json \ "Input Metrics").foreach { inJson => 817 val inputMetrics = metrics.inputMetrics 818 inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long]) 819 inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L)) 820 } 821 822 // Updated blocks 823 Utils.jsonOption(json \ "Updated Blocks").foreach { blocksJson => 824 metrics.setUpdatedBlockStatuses(blocksJson.extract[List[JValue]].map { blockJson => 825 val id = BlockId((blockJson \ "Block ID").extract[String]) 826 val status = blockStatusFromJson(blockJson \ "Status") 827 (id, status) 828 }) 829 } 830 831 metrics 832 } 833 834 def taskEndReasonFromJson(json: JValue): TaskEndReason = { 835 val success = Utils.getFormattedClassName(Success) 836 val resubmitted = Utils.getFormattedClassName(Resubmitted) 837 val fetchFailed = Utils.getFormattedClassName(FetchFailed) 838 val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure) 839 val taskResultLost = Utils.getFormattedClassName(TaskResultLost) 840 val taskKilled = Utils.getFormattedClassName(TaskKilled) 841 val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied) 842 val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure) 843 val unknownReason = Utils.getFormattedClassName(UnknownReason) 844 845 (json \ "Reason").extract[String] match { 846 case `success` => Success 847 case `resubmitted` => Resubmitted 848 case `fetchFailed` => 849 val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address") 850 val shuffleId = (json \ "Shuffle ID").extract[Int] 851 val mapId = (json \ "Map ID").extract[Int] 852 val reduceId = (json \ "Reduce ID").extract[Int] 853 val message = Utils.jsonOption(json \ "Message").map(_.extract[String]) 854 new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId, 855 message.getOrElse("Unknown reason")) 856 case `exceptionFailure` => 857 val className = (json \ "Class Name").extract[String] 858 val description = (json \ "Description").extract[String] 859 val stackTrace = stackTraceFromJson(json \ "Stack Trace") 860 val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull 861 // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x 862 val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates") 863 .map(_.extract[List[JValue]].map(accumulableInfoFromJson)) 864 .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => { 865 acc.toInfo(Some(acc.value), None) 866 })) 867 ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) 868 case `taskResultLost` => TaskResultLost 869 case `taskKilled` => TaskKilled 870 case `taskCommitDenied` => 871 // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON 872 // de/serialization logic was not added until 1.5.1. To provide backward compatibility 873 // for reading those logs, we need to provide default values for all the fields. 874 val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1) 875 val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1) 876 val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1) 877 TaskCommitDenied(jobId, partitionId, attemptNo) 878 case `executorLostFailure` => 879 val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean]) 880 val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String]) 881 val reason = Utils.jsonOption(json \ "Loss Reason").map(_.extract[String]) 882 ExecutorLostFailure( 883 executorId.getOrElse("Unknown"), 884 exitCausedByApp.getOrElse(true), 885 reason) 886 case `unknownReason` => UnknownReason 887 } 888 } 889 890 def blockManagerIdFromJson(json: JValue): BlockManagerId = { 891 // On metadata fetch fail, block manager ID can be null (SPARK-4471) 892 if (json == JNothing) { 893 return null 894 } 895 val executorId = (json \ "Executor ID").extract[String] 896 val host = (json \ "Host").extract[String] 897 val port = (json \ "Port").extract[Int] 898 BlockManagerId(executorId, host, port) 899 } 900 901 def jobResultFromJson(json: JValue): JobResult = { 902 val jobSucceeded = Utils.getFormattedClassName(JobSucceeded) 903 val jobFailed = Utils.getFormattedClassName(JobFailed) 904 905 (json \ "Result").extract[String] match { 906 case `jobSucceeded` => JobSucceeded 907 case `jobFailed` => 908 val exception = exceptionFromJson(json \ "Exception") 909 new JobFailed(exception) 910 } 911 } 912 913 def rddInfoFromJson(json: JValue): RDDInfo = { 914 val rddId = (json \ "RDD ID").extract[Int] 915 val name = (json \ "Name").extract[String] 916 val scope = Utils.jsonOption(json \ "Scope") 917 .map(_.extract[String]) 918 .map(RDDOperationScope.fromJson) 919 val callsite = Utils.jsonOption(json \ "Callsite").map(_.extract[String]).getOrElse("") 920 val parentIds = Utils.jsonOption(json \ "Parent IDs") 921 .map { l => l.extract[List[JValue]].map(_.extract[Int]) } 922 .getOrElse(Seq.empty) 923 val storageLevel = storageLevelFromJson(json \ "Storage Level") 924 val numPartitions = (json \ "Number of Partitions").extract[Int] 925 val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] 926 val memSize = (json \ "Memory Size").extract[Long] 927 val diskSize = (json \ "Disk Size").extract[Long] 928 929 val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope) 930 rddInfo.numCachedPartitions = numCachedPartitions 931 rddInfo.memSize = memSize 932 rddInfo.diskSize = diskSize 933 rddInfo 934 } 935 936 def storageLevelFromJson(json: JValue): StorageLevel = { 937 val useDisk = (json \ "Use Disk").extract[Boolean] 938 val useMemory = (json \ "Use Memory").extract[Boolean] 939 val deserialized = (json \ "Deserialized").extract[Boolean] 940 val replication = (json \ "Replication").extract[Int] 941 StorageLevel(useDisk, useMemory, deserialized, replication) 942 } 943 944 def blockStatusFromJson(json: JValue): BlockStatus = { 945 val storageLevel = storageLevelFromJson(json \ "Storage Level") 946 val memorySize = (json \ "Memory Size").extract[Long] 947 val diskSize = (json \ "Disk Size").extract[Long] 948 BlockStatus(storageLevel, memorySize, diskSize) 949 } 950 951 def executorInfoFromJson(json: JValue): ExecutorInfo = { 952 val executorHost = (json \ "Host").extract[String] 953 val totalCores = (json \ "Total Cores").extract[Int] 954 val logUrls = mapFromJson(json \ "Log Urls").toMap 955 new ExecutorInfo(executorHost, totalCores, logUrls) 956 } 957 958 /** -------------------------------- * 959 * Util JSON deserialization methods | 960 * --------------------------------- */ 961 962 def mapFromJson(json: JValue): Map[String, String] = { 963 val jsonFields = json.asInstanceOf[JObject].obj 964 jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap 965 } 966 967 def propertiesFromJson(json: JValue): Properties = { 968 Utils.jsonOption(json).map { value => 969 val properties = new Properties 970 mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) } 971 properties 972 }.getOrElse(null) 973 } 974 975 def UUIDFromJson(json: JValue): UUID = { 976 val leastSignificantBits = (json \ "Least Significant Bits").extract[Long] 977 val mostSignificantBits = (json \ "Most Significant Bits").extract[Long] 978 new UUID(leastSignificantBits, mostSignificantBits) 979 } 980 981 def stackTraceFromJson(json: JValue): Array[StackTraceElement] = { 982 json.extract[List[JValue]].map { line => 983 val declaringClass = (line \ "Declaring Class").extract[String] 984 val methodName = (line \ "Method Name").extract[String] 985 val fileName = (line \ "File Name").extract[String] 986 val lineNumber = (line \ "Line Number").extract[Int] 987 new StackTraceElement(declaringClass, methodName, fileName, lineNumber) 988 }.toArray 989 } 990 991 def exceptionFromJson(json: JValue): Exception = { 992 val e = new Exception((json \ "Message").extract[String]) 993 e.setStackTrace(stackTraceFromJson(json \ "Stack Trace")) 994 e 995 } 996 997} 998