1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.mapreduce.v2.app.speculate; 20 21 import java.util.concurrent.ConcurrentHashMap; 22 import java.util.concurrent.ConcurrentMap; 23 import java.util.concurrent.atomic.AtomicReference; 24 25 import org.apache.hadoop.conf.Configuration; 26 import org.apache.hadoop.mapreduce.MRJobConfig; 27 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; 28 import org.apache.hadoop.mapreduce.v2.app.AppContext; 29 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; 30 31 /** 32 * This estimator exponentially smooths the rate of progress versus wallclock 33 * time. Conceivably we could write an estimator that smooths time per 34 * unit progress, and get different results. 35 */ 36 public class ExponentiallySmoothedTaskRuntimeEstimator extends StartEndTimesBase { 37 38 private final ConcurrentMap<TaskAttemptId, AtomicReference<EstimateVector>> estimates 39 = new ConcurrentHashMap<TaskAttemptId, AtomicReference<EstimateVector>>(); 40 41 private SmoothedValue smoothedValue; 42 43 private long lambda; 44 45 public enum SmoothedValue { 46 RATE, TIME_PER_UNIT_PROGRESS 47 } 48 ExponentiallySmoothedTaskRuntimeEstimator(long lambda, SmoothedValue smoothedValue)49 ExponentiallySmoothedTaskRuntimeEstimator 50 (long lambda, SmoothedValue smoothedValue) { 51 super(); 52 this.smoothedValue = smoothedValue; 53 this.lambda = lambda; 54 } 55 ExponentiallySmoothedTaskRuntimeEstimator()56 public ExponentiallySmoothedTaskRuntimeEstimator() { 57 super(); 58 } 59 60 // immutable 61 private class EstimateVector { 62 final double value; 63 final float basedOnProgress; 64 final long atTime; 65 EstimateVector(double value, float basedOnProgress, long atTime)66 EstimateVector(double value, float basedOnProgress, long atTime) { 67 this.value = value; 68 this.basedOnProgress = basedOnProgress; 69 this.atTime = atTime; 70 } 71 incorporate(float newProgress, long newAtTime)72 EstimateVector incorporate(float newProgress, long newAtTime) { 73 if (newAtTime <= atTime || newProgress < basedOnProgress) { 74 return this; 75 } 76 77 double oldWeighting 78 = value < 0.0 79 ? 0.0 : Math.exp(((double) (newAtTime - atTime)) / lambda); 80 81 double newRead = (newProgress - basedOnProgress) / (newAtTime - atTime); 82 83 if (smoothedValue == SmoothedValue.TIME_PER_UNIT_PROGRESS) { 84 newRead = 1.0 / newRead; 85 } 86 87 return new EstimateVector 88 (value * oldWeighting + newRead * (1.0 - oldWeighting), 89 newProgress, newAtTime); 90 } 91 } 92 93 private void incorporateReading 94 (TaskAttemptId attemptID, float newProgress, long newTime) { 95 //TODO: Refactor this method, it seems more complicated than necessary. 96 AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID); 97 98 if (vectorRef == null) { 99 estimates.putIfAbsent(attemptID, new AtomicReference<EstimateVector>(null)); 100 incorporateReading(attemptID, newProgress, newTime); 101 return; 102 } 103 104 EstimateVector oldVector = vectorRef.get(); 105 106 if (oldVector == null) { 107 if (vectorRef.compareAndSet(null, 108 new EstimateVector(-1.0, 0.0F, Long.MIN_VALUE))) { 109 return; 110 } 111 112 incorporateReading(attemptID, newProgress, newTime); 113 return; 114 } 115 116 while (!vectorRef.compareAndSet 117 (oldVector, oldVector.incorporate(newProgress, newTime))) { 118 oldVector = vectorRef.get(); 119 } 120 } 121 122 private EstimateVector getEstimateVector(TaskAttemptId attemptID) { 123 AtomicReference<EstimateVector> vectorRef = estimates.get(attemptID); 124 125 if (vectorRef == null) { 126 return null; 127 } 128 129 return vectorRef.get(); 130 } 131 132 @Override 133 public void contextualize(Configuration conf, AppContext context) { 134 super.contextualize(conf, context); 135 136 lambda 137 = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS, 138 MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS); 139 smoothedValue 140 = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true) 141 ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS; 142 } 143 144 @Override 145 public long estimatedRuntime(TaskAttemptId id) { 146 Long startTime = startTimes.get(id); 147 148 if (startTime == null) { 149 return -1L; 150 } 151 152 EstimateVector vector = getEstimateVector(id); 153 154 if (vector == null) { 155 return -1L; 156 } 157 158 long sunkTime = vector.atTime - startTime; 159 160 double value = vector.value; 161 float progress = vector.basedOnProgress; 162 163 if (value == 0) { 164 return -1L; 165 } 166 167 double rate = smoothedValue == SmoothedValue.RATE ? value : 1.0 / value; 168 169 if (rate == 0.0) { 170 return -1L; 171 } 172 173 double remainingTime = (1.0 - progress) / rate; 174 175 return sunkTime + (long)remainingTime; 176 } 177 178 @Override 179 public long runtimeEstimateVariance(TaskAttemptId id) { 180 return -1L; 181 } 182 183 @Override 184 public void updateAttempt(TaskAttemptStatus status, long timestamp) { 185 super.updateAttempt(status, timestamp); 186 TaskAttemptId attemptID = status.id; 187 188 float progress = status.progress; 189 190 incorporateReading(attemptID, progress, timestamp); 191 } 192 } 193