1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package org.apache.spark.scheduler 19 20import scala.collection.mutable.ListBuffer 21 22import org.apache.spark.TaskState 23import org.apache.spark.TaskState.TaskState 24import org.apache.spark.annotation.DeveloperApi 25 26/** 27 * :: DeveloperApi :: 28 * Information about a running task attempt inside a TaskSet. 29 */ 30@DeveloperApi 31class TaskInfo( 32 val taskId: Long, 33 /** 34 * The index of this task within its task set. Not necessarily the same as the ID of the RDD 35 * partition that the task is computing. 36 */ 37 val index: Int, 38 val attemptNumber: Int, 39 val launchTime: Long, 40 val executorId: String, 41 val host: String, 42 val taskLocality: TaskLocality.TaskLocality, 43 val speculative: Boolean) { 44 45 /** 46 * The time when the task started remotely getting the result. Will not be set if the 47 * task result was sent immediately when the task finished (as opposed to sending an 48 * IndirectTaskResult and later fetching the result from the block manager). 49 */ 50 var gettingResultTime: Long = 0 51 52 /** 53 * Intermediate updates to accumulables during this task. Note that it is valid for the same 54 * accumulable to be updated multiple times in a single task or for two accumulables with the 55 * same name but different IDs to exist in a task. 56 */ 57 val accumulables = ListBuffer[AccumulableInfo]() 58 59 /** 60 * The time when the task has completed successfully (including the time to remotely fetch 61 * results, if necessary). 62 */ 63 var finishTime: Long = 0 64 65 var failed = false 66 67 var killed = false 68 69 private[spark] def markGettingResult(time: Long = System.currentTimeMillis) { 70 gettingResultTime = time 71 } 72 73 private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) { 74 finishTime = time 75 if (state == TaskState.FAILED) { 76 failed = true 77 } else if (state == TaskState.KILLED) { 78 killed = true 79 } 80 } 81 82 def gettingResult: Boolean = gettingResultTime != 0 83 84 def finished: Boolean = finishTime != 0 85 86 def successful: Boolean = finished && !failed && !killed 87 88 def running: Boolean = !finished 89 90 def status: String = { 91 if (running) { 92 if (gettingResult) { 93 "GET RESULT" 94 } else { 95 "RUNNING" 96 } 97 } else if (failed) { 98 "FAILED" 99 } else if (killed) { 100 "KILLED" 101 } else if (successful) { 102 "SUCCESS" 103 } else { 104 "UNKNOWN" 105 } 106 } 107 108 def id: String = s"$index.$attemptNumber" 109 110 def duration: Long = { 111 if (!finished) { 112 throw new UnsupportedOperationException("duration() called on unfinished task") 113 } else { 114 finishTime - launchTime 115 } 116 } 117 118 private[spark] def timeRunning(currentTime: Long): Long = currentTime - launchTime 119} 120