1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.mapreduce.v2.app.speculate;
20 
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.atomic.AtomicReference;
24 
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.mapreduce.MRJobConfig;
27 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
28 import org.apache.hadoop.mapreduce.v2.app.AppContext;
29 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
30 
31 /**
32  * This estimator exponentially smooths the rate of progress versus wallclock
33  * time.  Conceivably we could write an estimator that smooths time per
34  * unit progress, and get different results.
35  */
36 public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase {
37 
38   private final ConcurrentMap<TaskAttemptId, AtomicReference<EstimateVector>> estimates
39       = new ConcurrentHashMap<TaskAttemptId, AtomicReference<EstimateVector>>();
40 
41   private SmoothedValue smoothedValue;
42 
43   private long lambda;
44 
45   public enum SmoothedValue {
46     RATE, TIME_PER_UNIT_PROGRESS
47   }
48 
ExponentiallySmoothedTaskRuntimeEstimator(long lambda, SmoothedValue smoothedValue)49   ExponentiallySmoothedTaskRuntimeEstimator
50       (long lambda, SmoothedValue smoothedValue) {
51     super();
52     this.smoothedValue = smoothedValue;
53     this.lambda = lambda;
54   }
55 
ExponentiallySmoothedTaskRuntimeEstimator()56   public ExponentiallySmoothedTaskRuntimeEstimator() {
57     super();
58   }
59 
60   // immutable
61   private class EstimateVector {
62     final double value;
63     final float basedOnProgress;
64     final long atTime;
65 
EstimateVector(double value, float basedOnProgress, long atTime)66     EstimateVector(double value, float basedOnProgress, long atTime) {
67       this.value = value;
68       this.basedOnProgress = basedOnProgress;
69       this.atTime = atTime;
70     }
71 
incorporate(float newProgress, long newAtTime)72     EstimateVector incorporate(float newProgress, long newAtTime) {
73       if (newAtTime <= atTime || newProgress < basedOnProgress) {
74         return this;
75       }
76 
77       double oldWeighting
78           = value < 0.0
79               ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda);
80 
81       double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime);
82 
83       if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) {
84         newRead = 1.0 / newRead;
85       }
86 
87       return new EstimateVector
88           (value * oldWeighting + newRead * (1.0 - oldWeighting),
89            newProgress, newAtTime);
90     }
91   }
92 
93   private void incorporateReading
94       (TaskAttemptId attemptID, float newProgress, long newTime) {
95     //TODO: Refactor this method, it seems more complicated than necessary.
96     AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
97 
98     if (vectorRef == null) {
99       estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null));
100       incorporateReading(attemptID, newProgress, newTime);
101       return;
102     }
103 
104     EstimateVector oldVector = vectorRef.get();
105 
106     if (oldVector == null) {
107       if (vectorRef.compareAndSet(null,
108              new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) {
109         return;
110       }
111 
112       incorporateReading(attemptID, newProgress, newTime);
113       return;
114     }
115 
116     while (!vectorRef.compareAndSet
117             (oldVector, oldVector.incorporate(newProgress, newTime))) {
118       oldVector = vectorRef.get();
119     }
120   }
121 
122   private EstimateVector getEstimateVector(TaskAttemptId attemptID) {
123     AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID);
124 
125     if (vectorRef == null) {
126       return null;
127     }
128 
129     return vectorRef.get();
130   }
131 
132   @Override
133   public void contextualize(Configuration conf, AppContext context) {
134     super.contextualize(conf, context);
135 
136     lambda
137         = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
138             MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
139     smoothedValue
140         = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
141             ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
142   }
143 
144   @Override
145   public long estimatedRuntime(TaskAttemptId id) {
146     Long startTime = startTimes.get(id);
147 
148     if (startTime == null) {
149       return -1L;
150     }
151 
152     EstimateVector vector = getEstimateVector(id);
153 
154     if (vector == null) {
155       return -1L;
156     }
157 
158     long sunkTime = vector.atTime - startTime;
159 
160     double value = vector.value;
161     float progress = vector.basedOnProgress;
162 
163     if (value == 0) {
164       return -1L;
165     }
166 
167     double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value;
168 
169     if (rate == 0.0) {
170       return -1L;
171     }
172 
173     double remainingTime = (1.0 - progress) / rate;
174 
175     return sunkTime + (long)remainingTime;
176   }
177 
178   @Override
179   public long runtimeEstimateVariance(TaskAttemptId id) {
180     return -1L;
181   }
182 
183   @Override
184   public void updateAttempt(TaskAttemptStatus status, long timestamp) {
185     super.updateAttempt(status, timestamp);
186     TaskAttemptId attemptID = status.id;
187 
188     float progress = status.progress;
189 
190     incorporateReading(attemptID, progress, timestamp);
191   }
192 }
193