1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.hs; 20 21 import java.util.ArrayList; 22 import java.util.List; 23 24 import org.apache.hadoop.mapreduce.Counters; 25 import org.apache.hadoop.mapreduce.TypeConverter; 26 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; 27 import org.apache.hadoop.mapreduce.v2.api.records.Phase; 28 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 29 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport; 30 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; 31 import org.apache.hadoop.mapreduce.v2.api.records.TaskId; 32 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; 33 import org.apache.hadoop.yarn.api.records.ContainerId; 34 import org.apache.hadoop.yarn.api.records.NodeId; 35 import org.apache.hadoop.yarn.util.Records; 36 37 public class CompletedTaskAttempt implements TaskAttempt { 38 39 private final TaskAttemptInfo attemptInfo; 40 private final TaskAttemptId attemptId; 41 private final TaskAttemptState state; 42 private final List<String> diagnostics = new ArrayList<String>(); 43 private TaskAttemptReport report; 44 45 private String localDiagMessage; 46 CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo)47 CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) { 48 this.attemptInfo = attemptInfo; 49 this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId()); 50 if (attemptInfo.getTaskStatus() != null) { 51 this.state = TaskAttemptState.valueOf(attemptInfo.getTaskStatus()); 52 } else { 53 this.state = TaskAttemptState.KILLED; 54 localDiagMessage = "Attmpt state missing from History : marked as KILLED"; 55 diagnostics.add(localDiagMessage); 56 } 57 if (attemptInfo.getError() != null) { 58 diagnostics.add(attemptInfo.getError()); 59 } 60 } 61 62 @Override getNodeId()63 public NodeId getNodeId() throws UnsupportedOperationException{ 64 throw new UnsupportedOperationException(); 65 } 66 67 @Override getAssignedContainerID()68 public ContainerId getAssignedContainerID() { 69 return attemptInfo.getContainerId(); 70 } 71 72 @Override getAssignedContainerMgrAddress()73 public String getAssignedContainerMgrAddress() { 74 return attemptInfo.getHostname() + ":" + attemptInfo.getPort(); 75 } 76 77 @Override getNodeHttpAddress()78 public String getNodeHttpAddress() { 79 return attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(); 80 } 81 82 @Override getNodeRackName()83 public String getNodeRackName() { 84 return attemptInfo.getRackname(); 85 } 86 87 @Override getCounters()88 public Counters getCounters() { 89 return attemptInfo.getCounters(); 90 } 91 92 @Override getID()93 public TaskAttemptId getID() { 94 return attemptId; 95 } 96 97 @Override getProgress()98 public float getProgress() { 99 return 1.0f; 100 } 101 102 @Override getReport()103 public synchronized TaskAttemptReport getReport() { 104 if (report == null) { 105 constructTaskAttemptReport(); 106 } 107 return report; 108 } 109 110 @Override getPhase()111 public Phase getPhase() { 112 return Phase.CLEANUP; 113 } 114 115 @Override getState()116 public TaskAttemptState getState() { 117 return state; 118 } 119 120 @Override isFinished()121 public boolean isFinished() { 122 return true; 123 } 124 125 @Override getDiagnostics()126 public List<String> getDiagnostics() { 127 return diagnostics; 128 } 129 130 @Override getLaunchTime()131 public long getLaunchTime() { 132 return attemptInfo.getStartTime(); 133 } 134 135 @Override getFinishTime()136 public long getFinishTime() { 137 return attemptInfo.getFinishTime(); 138 } 139 140 @Override getShuffleFinishTime()141 public long getShuffleFinishTime() { 142 return attemptInfo.getShuffleFinishTime(); 143 } 144 145 @Override getSortFinishTime()146 public long getSortFinishTime() { 147 return attemptInfo.getSortFinishTime(); 148 } 149 150 @Override getShufflePort()151 public int getShufflePort() { 152 return attemptInfo.getShufflePort(); 153 } 154 constructTaskAttemptReport()155 private void constructTaskAttemptReport() { 156 report = Records.newRecord(TaskAttemptReport.class); 157 158 report.setTaskAttemptId(attemptId); 159 report.setTaskAttemptState(state); 160 report.setProgress(getProgress()); 161 report.setStartTime(attemptInfo.getStartTime()); 162 report.setFinishTime(attemptInfo.getFinishTime()); 163 report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime()); 164 report.setSortFinishTime(attemptInfo.getSortFinishTime()); 165 if (localDiagMessage != null) { 166 report 167 .setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage); 168 } else { 169 report.setDiagnosticInfo(attemptInfo.getError()); 170 } 171 // report.setPhase(attemptInfo.get); //TODO 172 report.setStateString(attemptInfo.getState()); 173 report.setCounters(TypeConverter.toYarn(getCounters())); 174 report.setContainerId(attemptInfo.getContainerId()); 175 if (attemptInfo.getHostname() == null) { 176 report.setNodeManagerHost("UNKNOWN"); 177 } else { 178 report.setNodeManagerHost(attemptInfo.getHostname()); 179 report.setNodeManagerPort(attemptInfo.getPort()); 180 } 181 report.setNodeManagerHttpPort(attemptInfo.getHttpPort()); 182 } 183 } 184