1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2.hs;
20 
21 import java.util.ArrayList;
22 import java.util.List;
23 
24 import org.apache.hadoop.mapreduce.Counters;
25 import org.apache.hadoop.mapreduce.TypeConverter;
26 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
27 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
28 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
29 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
30 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
31 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
32 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
33 import org.apache.hadoop.yarn.api.records.ContainerId;
34 import org.apache.hadoop.yarn.api.records.NodeId;
35 import org.apache.hadoop.yarn.util.Records;
36 
37 public class CompletedTaskAttempt implements TaskAttempt {
38 
39   private final TaskAttemptInfo attemptInfo;
40   private final TaskAttemptId attemptId;
41   private final TaskAttemptState state;
42   private final List<String> diagnostics = new ArrayList<String>();
43   private TaskAttemptReport report;
44 
45   private String localDiagMessage;
46 
CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo)47   CompletedTaskAttempt(TaskId taskId, TaskAttemptInfo attemptInfo) {
48     this.attemptInfo = attemptInfo;
49     this.attemptId = TypeConverter.toYarn(attemptInfo.getAttemptId());
50     if (attemptInfo.getTaskStatus() != null) {
51       this.state = TaskAttemptState.valueOf(attemptInfo.getTaskStatus());
52     } else {
53       this.state = TaskAttemptState.KILLED;
54       localDiagMessage = "Attmpt state missing from History : marked as KILLED";
55       diagnostics.add(localDiagMessage);
56     }
57     if (attemptInfo.getError() != null) {
58       diagnostics.add(attemptInfo.getError());
59     }
60   }
61 
62   @Override
getNodeId()63   public NodeId getNodeId() throws UnsupportedOperationException{
64     throw new UnsupportedOperationException();
65   }
66 
67   @Override
getAssignedContainerID()68   public ContainerId getAssignedContainerID() {
69     return attemptInfo.getContainerId();
70   }
71 
72   @Override
getAssignedContainerMgrAddress()73   public String getAssignedContainerMgrAddress() {
74     return attemptInfo.getHostname() + ":" + attemptInfo.getPort();
75   }
76 
77   @Override
getNodeHttpAddress()78   public String getNodeHttpAddress() {
79     return attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort();
80   }
81 
82   @Override
getNodeRackName()83   public String getNodeRackName() {
84     return attemptInfo.getRackname();
85   }
86 
87   @Override
getCounters()88   public Counters getCounters() {
89     return attemptInfo.getCounters();
90   }
91 
92   @Override
getID()93   public TaskAttemptId getID() {
94     return attemptId;
95   }
96 
97   @Override
getProgress()98   public float getProgress() {
99     return 1.0f;
100   }
101 
102   @Override
getReport()103   public synchronized TaskAttemptReport getReport() {
104     if (report == null) {
105       constructTaskAttemptReport();
106     }
107     return report;
108   }
109 
110   @Override
getPhase()111   public Phase getPhase() {
112     return Phase.CLEANUP;
113   }
114 
115   @Override
getState()116   public TaskAttemptState getState() {
117     return state;
118   }
119 
120   @Override
isFinished()121   public boolean isFinished() {
122     return true;
123   }
124 
125   @Override
getDiagnostics()126   public List<String> getDiagnostics() {
127     return diagnostics;
128   }
129 
130   @Override
getLaunchTime()131   public long getLaunchTime() {
132     return attemptInfo.getStartTime();
133   }
134 
135   @Override
getFinishTime()136   public long getFinishTime() {
137     return attemptInfo.getFinishTime();
138   }
139 
140   @Override
getShuffleFinishTime()141   public long getShuffleFinishTime() {
142     return attemptInfo.getShuffleFinishTime();
143   }
144 
145   @Override
getSortFinishTime()146   public long getSortFinishTime() {
147     return attemptInfo.getSortFinishTime();
148   }
149 
150   @Override
getShufflePort()151   public int getShufflePort() {
152     return attemptInfo.getShufflePort();
153   }
154 
constructTaskAttemptReport()155   private void constructTaskAttemptReport() {
156     report = Records.newRecord(TaskAttemptReport.class);
157 
158     report.setTaskAttemptId(attemptId);
159     report.setTaskAttemptState(state);
160     report.setProgress(getProgress());
161     report.setStartTime(attemptInfo.getStartTime());
162     report.setFinishTime(attemptInfo.getFinishTime());
163     report.setShuffleFinishTime(attemptInfo.getShuffleFinishTime());
164     report.setSortFinishTime(attemptInfo.getSortFinishTime());
165     if (localDiagMessage != null) {
166       report
167           .setDiagnosticInfo(attemptInfo.getError() + ", " + localDiagMessage);
168     } else {
169       report.setDiagnosticInfo(attemptInfo.getError());
170     }
171     // report.setPhase(attemptInfo.get); //TODO
172     report.setStateString(attemptInfo.getState());
173     report.setCounters(TypeConverter.toYarn(getCounters()));
174     report.setContainerId(attemptInfo.getContainerId());
175     if (attemptInfo.getHostname() == null) {
176       report.setNodeManagerHost("UNKNOWN");
177     } else {
178       report.setNodeManagerHost(attemptInfo.getHostname());
179       report.setNodeManagerPort(attemptInfo.getPort());
180     }
181     report.setNodeManagerHttpPort(attemptInfo.getHttpPort());
182   }
183 }
184