1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2.app.speculate;
20 
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.atomic.AtomicLong;
24 
25 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
26 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
27 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
28 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
29 import org.apache.hadoop.mapreduce.v2.app.job.Job;
30 import org.apache.hadoop.mapreduce.v2.app.job.Task;
31 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
32 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
33 
34 
35 
36 
37 public class LegacyTaskRuntimeEstimator extends StartEndTimesBase {
38 
39   private final Map<TaskAttempt, AtomicLong> attemptRuntimeEstimates
40       = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
41   private final ConcurrentHashMap<TaskAttempt, AtomicLong> attemptRuntimeEstimateVariances
42       = new ConcurrentHashMap<TaskAttempt, AtomicLong>();
43 
44   @Override
updateAttempt(TaskAttemptStatus status, long timestamp)45   public void updateAttempt(TaskAttemptStatus status, long timestamp) {
46     super.updateAttempt(status, timestamp);
47 
48 
49     TaskAttemptId attemptID = status.id;
50     TaskId taskID = attemptID.getTaskId();
51     JobId jobID = taskID.getJobId();
52     Job job = context.getJob(jobID);
53 
54     if (job == null) {
55       return;
56     }
57 
58     Task task = job.getTask(taskID);
59 
60     if (task == null) {
61       return;
62     }
63 
64     TaskAttempt taskAttempt = task.getAttempt(attemptID);
65 
66     if (taskAttempt == null) {
67       return;
68     }
69 
70     Long boxedStart = startTimes.get(attemptID);
71     long start = boxedStart == null ? Long.MIN_VALUE : boxedStart;
72 
73     // We need to do two things.
74     //  1: If this is a completion, we accumulate statistics in the superclass
75     //  2: If this is not a completion, we learn more about it.
76 
77     // This is not a completion, but we're cooking.
78     //
79     if (taskAttempt.getState() == TaskAttemptState.RUNNING) {
80       // See if this task is already in the registry
81       AtomicLong estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
82       AtomicLong estimateVarianceContainer
83           = attemptRuntimeEstimateVariances.get(taskAttempt);
84 
85       if (estimateContainer == null) {
86         if (attemptRuntimeEstimates.get(taskAttempt) == null) {
87           attemptRuntimeEstimates.put(taskAttempt, new AtomicLong());
88 
89           estimateContainer = attemptRuntimeEstimates.get(taskAttempt);
90         }
91       }
92 
93       if (estimateVarianceContainer == null) {
94         attemptRuntimeEstimateVariances.putIfAbsent(taskAttempt, new AtomicLong());
95         estimateVarianceContainer = attemptRuntimeEstimateVariances.get(taskAttempt);
96       }
97 
98 
99       long estimate = -1;
100       long varianceEstimate = -1;
101 
102       // This code assumes that we'll never consider starting a third
103       //  speculative task attempt if two are already running for this task
104       if (start > 0 && timestamp > start) {
105         estimate = (long) ((timestamp - start) / Math.max(0.0001, status.progress));
106         varianceEstimate = (long) (estimate * status.progress / 10);
107       }
108       if (estimateContainer != null) {
109         estimateContainer.set(estimate);
110       }
111       if (estimateVarianceContainer != null) {
112         estimateVarianceContainer.set(varianceEstimate);
113       }
114     }
115   }
116 
storedPerAttemptValue(Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID)117   private long storedPerAttemptValue
118        (Map<TaskAttempt, AtomicLong> data, TaskAttemptId attemptID) {
119     TaskId taskID = attemptID.getTaskId();
120     JobId jobID = taskID.getJobId();
121     Job job = context.getJob(jobID);
122 
123     Task task = job.getTask(taskID);
124 
125     if (task == null) {
126       return -1L;
127     }
128 
129     TaskAttempt taskAttempt = task.getAttempt(attemptID);
130 
131     if (taskAttempt == null) {
132       return -1L;
133     }
134 
135     AtomicLong estimate = data.get(taskAttempt);
136 
137     return estimate == null ? -1L : estimate.get();
138 
139   }
140 
141   @Override
estimatedRuntime(TaskAttemptId attemptID)142   public long estimatedRuntime(TaskAttemptId attemptID) {
143     return storedPerAttemptValue(attemptRuntimeEstimates, attemptID);
144   }
145 
146   @Override
runtimeEstimateVariance(TaskAttemptId attemptID)147   public long runtimeEstimateVariance(TaskAttemptId attemptID) {
148     return storedPerAttemptValue(attemptRuntimeEstimateVariances, attemptID);
149   }
150 }
151