1 /* 2 * Copyright (c) 2001 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.internal; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.api.internal.*; 29 import seda.sandStorm.core.*; 30 import seda.sandStorm.main.*; 31 import seda.util.*; 32 import java.util.*; 33 34 /** 35 * An implementation of ResponseTimeController that models the stage 36 * as an M/M/1 queue. 37 * 38 * @author Matt Welsh 39 */ 40 public class ResponseTimeControllerMM1 extends ResponseTimeController { 41 42 private static final boolean DEBUG = true; 43 44 private static final boolean ADJUST_THRESHOLD = false; 45 private static final boolean ADJUST_RATE = true; 46 47 protected final static int INIT_THRESHOLD = 1; 48 protected final static int MIN_THRESHOLD = 1; 49 protected final static int MAX_THRESHOLD = 1024; 50 51 private static final double INIT_RATE = -1.0; 52 private static final int INIT_DEPTH = 100; 53 private static final double MIN_RATE = 0.5; 54 55 private static final boolean DEBUG_CAP_RATE = false; 56 private static final double DEBUG_RATE = 100000.0; 57 58 private static final boolean MOVING_AVERAGE = true; 59 private static final int MEASUREMENT_SIZE = 200; 60 61 62 63 // Arashi runs 64 private static final int ESTIMATION_SIZE = 5000; 65 private static final long ESTIMATION_TIME = 5000; 66 67 // Arashi runs 68 // private static final int ESTIMATION_SIZE = 500; 69 // private static final long ESTIMATION_TIME = 2000; 70 71 // Original benchmarking 72 // private static final int ESTIMATION_SIZE = 100; 73 // private static final long ESTIMATION_TIME = 1000; 74 75 private static final double SMOOTH_CONST = 0.1; 76 private static final int NINETIETH = (int)((double)MEASUREMENT_SIZE * 0.9); 77 78 private static final boolean BIDIRECTIONAL_FILTER = true; 79 private static final double SMOOTH_CONST_UP = 0.9; 80 private static final double SMOOTH_CONST_DOWN = 0.1; 81 82 private SinkProxy sinkProxy; 83 private long measurements[], sortedmeasurements[]; 84 private int curThreshold, cur_measurement; 85 private double curRate; 86 private double measured_mu, measured_lambda, est_ninetiethRT; 87 private double total_measured_mu, count_measured_mu, total_measured_lambda, 88 count_measured_lambda, total_est_ninetiethRT, count_est_ninetiethRT; 89 private double ninetiethRT, totalNinetiethRT; 90 private int countNinetiethRT; 91 private long lasttime, totalProcTime; 92 private long startProcTime, endProcTime; 93 private int numProcessed, numReceived, numEst; 94 private double avgNumThreads = 1.0; 95 private int totalNumThreads = 0, countNumThreads = 0; 96 private boolean enabled; 97 ResponseTimeControllerMM1(ManagerIF mgr, StageWrapperIF stage)98 public ResponseTimeControllerMM1(ManagerIF mgr, StageWrapperIF stage) throws IllegalArgumentException { 99 super(mgr, stage); 100 this.sinkProxy = (SinkProxy)stage.getStage().getSink(); 101 this.lasttime = System.currentTimeMillis(); 102 103 if (ADJUST_THRESHOLD) { 104 this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), INIT_THRESHOLD); 105 this.curThreshold = ((QueueThresholdPredicate)pred).getThreshold(); 106 } 107 if (ADJUST_RATE) { 108 this.pred = new RateLimitingPredicate(stage.getStage().getSink(), INIT_RATE, INIT_DEPTH); 109 this.curRate = ((RateLimitingPredicate)pred).getTargetRate(); 110 } 111 stage.getStage().getSink().setEnqueuePredicate(pred); 112 enabled = true; 113 114 this.measurements = new long[MEASUREMENT_SIZE]; 115 this.sortedmeasurements = new long[MEASUREMENT_SIZE]; 116 this.cur_measurement = 0; 117 this.startProcTime = Long.MAX_VALUE; this.endProcTime = 0L; 118 119 // Add profile 120 mgr.getProfiler().add("RTControllerMM1 90th-percentile RT <"+stage.getStage().getName()+">", 121 new ProfilableIF() { 122 public int profileSize() { 123 return (int)ninetiethRT; 124 } 125 }); 126 mgr.getProfiler().add("RTControllerMM1 lambda <"+stage.getStage().getName()+">", 127 new ProfilableIF() { 128 public int profileSize() { 129 return (int)measured_lambda; 130 } 131 }); 132 mgr.getProfiler().add("RTControllerMM1 mu <"+stage.getStage().getName()+">", 133 new ProfilableIF() { 134 public int profileSize() { 135 return (int)measured_mu; 136 } 137 }); 138 mgr.getProfiler().add("RTControllerMM1 est90thRT <"+stage.getStage().getName()+">", 139 new ProfilableIF() { 140 public int profileSize() { 141 return (int)est_ninetiethRT; 142 } 143 }); 144 mgr.getProfiler().add("RTControllerMM1 avgNumThreads <"+stage.getStage().getName()+">", 145 new ProfilableIF() { 146 public int profileSize() { 147 return (int)avgNumThreads; 148 } 149 }); 150 151 if (ADJUST_THRESHOLD) { 152 System.err.print("RTControllerMM1 <"+stage.getStage().getName()+">: ADJUST_THRESHOLD enabled, INIT_THRESHOLD="+INIT_THRESHOLD+", ESTIMATION_SIZE="+ESTIMATION_SIZE+", ESTIMATION_TIME="+ESTIMATION_TIME); 153 mgr.getProfiler().add("RTControllerMM1 queueThreshold <"+stage.getStage().getName()+">", 154 new ProfilableIF() { 155 public int profileSize() { 156 return curThreshold; 157 } 158 }); 159 } 160 161 if (ADJUST_RATE) { 162 System.err.print("RTControllerMM1 <"+stage.getStage().getName()+">: ADJUST_RATE enabled, INIT_DEPTH="+INIT_DEPTH+", ESTIMATION_SIZE="+ESTIMATION_SIZE+", ESTIMATION_TIME="+ESTIMATION_TIME); 163 if (BIDIRECTIONAL_FILTER) { 164 System.err.println(", SMOOTH_CONST_UP="+SMOOTH_CONST_UP+", SMOOTH_CONST_DOWN="+SMOOTH_CONST_DOWN); 165 } else { 166 System.err.println(", SMOOTH_CONST="+SMOOTH_CONST); 167 } 168 169 mgr.getProfiler().add("RTControllerMM1 queueRate <"+stage.getStage().getName()+">", 170 new ProfilableIF() { 171 public int profileSize() { 172 return (int)curRate; 173 } 174 }); 175 mgr.getProfiler().add("RTControllerMM1 tokenBucket <"+stage.getStage().getName()+">", 176 new ProfilableIF() { 177 public int profileSize() { 178 return ((RateLimitingPredicate)pred).getBucketSize(); 179 } 180 }); 181 } 182 183 System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: initialized, targetRT="+targetRT+" ms"); 184 } 185 enable()186 public synchronized void enable() { 187 if (enabled) return; 188 189 System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: Enabling"); 190 if (ADJUST_THRESHOLD) { 191 this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), curThreshold); 192 193 } else if (ADJUST_RATE) { 194 this.pred = new RateLimitingPredicate(stage.getStage().getSink(), curRate, INIT_DEPTH); 195 } 196 stage.getStage().getSink().setEnqueuePredicate(this.pred); 197 enabled = true; 198 } 199 disable()200 public synchronized void disable() { 201 if (!enabled) return; 202 System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: Disabling"); 203 this.pred = null; 204 stage.getStage().getSink().setEnqueuePredicate(null); 205 enabled = false; 206 } 207 208 // Measure 90thRT, mu, lambda, estimate RT from model adjustThreshold(QueueElementIF fetched[], long startTime, long endTime, boolean isFirst, int numThreads)209 public synchronized void adjustThreshold(QueueElementIF fetched[], 210 long startTime, long endTime, boolean isFirst, int numThreads) { 211 // if (DEBUG) System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: adjustThreshold called, fetched.len="+fetched.length+", time="+(endTime-startTime)+", isFirst="+isFirst+", numThreads="+numThreads); 212 213 boolean adjust_meas = false; 214 boolean adjust_est = false; 215 216 if (MOVING_AVERAGE) { 217 avgNumThreads = (SMOOTH_CONST * avgNumThreads) + ((1.0 - SMOOTH_CONST) * (double)(numThreads*1.0)); 218 } else { 219 totalNumThreads += numThreads; 220 countNumThreads ++; 221 avgNumThreads = (totalNumThreads * 1.0) / (countNumThreads * 1.0); 222 } 223 224 numProcessed += fetched.length; 225 totalProcTime += endTime - startTime; 226 227 //if (startTime < startProcTime) startProcTime = startTime; 228 //if (endTime > endProcTime) endProcTime = endTime; 229 230 /* 231 if ((endTime <= startProcTime) || (startTime >= endProcTime)) { 232 totalProcTime += endTime - startTime; 233 startProcTime = startTime; endProcTime = endTime; 234 } else { 235 if (startTime < startProcTime) { 236 totalProcTime += startProcTime - startTime; 237 startProcTime = startTime; 238 } 239 if (endTime > endProcTime) { 240 totalProcTime += endTime - endProcTime; 241 endProcTime = endTime; 242 } 243 } 244 */ 245 246 //System.err.println("RTControllerMM1 <"+stage.getStage().getName()+"> S="+(startTime-startProcTime)+" E="+(endTime-startProcTime)); 247 248 if (!isFirst) return; 249 250 // On every iteration reset the timespan 251 //totalProcTime += endProcTime - startProcTime; 252 //startProcTime = Long.MAX_VALUE; endProcTime = 0L; 253 254 numReceived += sinkProxy.enqueueSuccessCount; 255 sinkProxy.enqueueSuccessCount = 0; 256 257 // Measure actual 90th RT 258 long curtime = System.currentTimeMillis(); 259 for (int i = 0; i < fetched.length; i++) { 260 if (fetched[i] instanceof TimeStampedEvent) { 261 adjust_est = true; 262 TimeStampedEvent ev = (TimeStampedEvent)fetched[i]; 263 long time = ev.timestamp; 264 if (time != 0) { 265 measurements[cur_measurement] = curtime - time; 266 cur_measurement++; 267 if (cur_measurement == MEASUREMENT_SIZE) { 268 cur_measurement = 0; 269 adjust_meas = true; 270 break; // XXX MDW TESTING 271 } 272 } 273 } 274 } 275 276 // XXX MDW: Continuously update 277 adjust_meas = true; 278 if (adjust_meas) { 279 System.arraycopy(measurements, 0, sortedmeasurements, 0, MEASUREMENT_SIZE); 280 Arrays.sort(sortedmeasurements); 281 long cur = sortedmeasurements[NINETIETH]; 282 283 if (MOVING_AVERAGE) { 284 ninetiethRT = (SMOOTH_CONST * (double)ninetiethRT*1.0) + ((1.0 - SMOOTH_CONST) * ((double)(cur) * 1.0)); 285 } else { 286 totalNinetiethRT += cur; 287 countNinetiethRT ++; 288 ninetiethRT = (totalNinetiethRT * 1.0) / (countNinetiethRT * 1.0); 289 } 290 stage.getStats().record90thRT(ninetiethRT); 291 292 } 293 294 // XXX MDW: Always adjust estimated lambda/mu 295 //if (!adjust_est) return; 296 297 long elapsed = curtime - lasttime; 298 numEst++; 299 if ((numEst == ESTIMATION_SIZE) || (elapsed >= ESTIMATION_TIME)) { 300 numEst = 0; 301 } else { 302 return; 303 } 304 305 lasttime = curtime; 306 307 //System.err.println("RT: recv "+numReceived+" proc "+numProcessed); 308 309 // Estimate 90th RT using M/M/1 model 310 // Assume mu scales linearly with number of threads 311 double mu_scaling = Math.log(avgNumThreads)+1.0; 312 313 if (DEBUG) System.err.println("\nRT: numProcessed "+numProcessed+" mu_scaling "+mu_scaling+" totalProcTime "+totalProcTime); 314 315 316 // Don't recalculate if we don't have enough data - avoid large mu 317 // spikes due to fast measurements 318 if ((totalProcTime < 2) || (numProcessed < 2)) return; 319 320 // XXX TESTING 321 if (elapsed < 2) return; 322 323 double cur_mu = (numProcessed * mu_scaling * 1.0) / (totalProcTime * 1.0e-3); 324 double cur_lambda = (numReceived * 1.0) / (elapsed * 1.0e-3); 325 326 if (MOVING_AVERAGE) { 327 if (BIDIRECTIONAL_FILTER) { 328 if (cur_mu < measured_mu) { 329 measured_mu = (SMOOTH_CONST_DOWN * (double)measured_mu*1.0) + ((1.0 - SMOOTH_CONST_DOWN) * cur_mu); 330 } else { 331 measured_mu = (SMOOTH_CONST_UP * (double)measured_mu*1.0) + ((1.0 - SMOOTH_CONST_UP) * cur_mu); 332 } 333 334 } else { 335 measured_mu = (SMOOTH_CONST * (double)measured_mu*1.0) + ((1.0 - SMOOTH_CONST) * cur_mu); 336 } 337 338 measured_lambda = (SMOOTH_CONST * measured_lambda) + ((1.0 - SMOOTH_CONST) * cur_lambda); 339 340 } else { 341 total_measured_mu += numProcessed * mu_scaling; 342 count_measured_mu += totalProcTime * 1.0e-3; 343 measured_mu = total_measured_mu / count_measured_mu; 344 total_measured_lambda += numReceived; 345 count_measured_lambda += elapsed * 1.0e-3; 346 measured_lambda = total_measured_lambda / count_measured_lambda; 347 } 348 349 double rho = measured_lambda/measured_mu; 350 // XXX MDW: This is wrong - for waiting time 351 //double est = (((1.0/measured_mu)/(1.0-rho)) * Math.log(10.0)) * 1.0e3; 352 double est = (((1.0/measured_mu)/(1.0-rho)) * 2.3) * 1.0e3; 353 if (DEBUG) System.err.println("\nRT: cur_mu "+cur_mu+", cur_lambda "+cur_lambda+", est90th "+est); 354 if (est >= 0.0) { 355 if (MOVING_AVERAGE) { 356 est_ninetiethRT = (SMOOTH_CONST * (double)est_ninetiethRT*1.0) + ((1.0 - SMOOTH_CONST) * ((double)(est) * 1.0)); 357 } else { 358 total_est_ninetiethRT += est; 359 count_est_ninetiethRT ++; 360 est_ninetiethRT = (total_est_ninetiethRT * 1.0) / (count_est_ninetiethRT * 1.0); 361 } 362 } 363 364 numProcessed = 0; numReceived = 0; totalProcTime = 0L; 365 366 if (DEBUG) System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" est "+MDWUtil.format(est_ninetiethRT)+" mu "+MDWUtil.format(measured_mu)+" lambda "+MDWUtil.format(measured_lambda)); 367 368 if (!enabled) return; 369 370 // Now do threshold scaling 371 if (ADJUST_THRESHOLD) { 372 if (est < 0.0) { 373 // If under overload 374 if (DEBUG) System.err.println("RT: Overload detected"); 375 // XXX MDW TESTING: Was 0.9 376 curThreshold = MIN_THRESHOLD; 377 } else { 378 if (est_ninetiethRT < (0.5 * targetRT)) { 379 curThreshold += 1; 380 if (curThreshold > MAX_THRESHOLD) curThreshold = MAX_THRESHOLD; 381 } else if (est_ninetiethRT >= (1.2 * targetRT)) { 382 curThreshold /= 2; 383 if (curThreshold < MIN_THRESHOLD) curThreshold = MIN_THRESHOLD; 384 } 385 } 386 ((QueueThresholdPredicate)pred).setThreshold(curThreshold); 387 if (DEBUG) System.err.println("RTControllerMM1 <"+stage.getStage().getName()+"> threshold now "+curThreshold); 388 } 389 390 // Do rate scaling 391 if (ADJUST_RATE) { 392 if (est < 0.0) { 393 // If under overload 394 if (DEBUG) System.err.println("RT: Overload detected"); 395 // XXX MDW TESTING: Was 0.9 396 this.curRate = measured_mu * 0.1; 397 } else { 398 399 // XXX TESTING: Maybe don't adjust if we are under the target, 400 // but, then we may be rejecting requests needlessly 401 // } else if (est_ninetiethRT >= targetRT) { 402 403 this.curRate = measured_mu - (2.302) / (targetRT / 1.0e3); 404 if (this.curRate < 0.0) { 405 // If the target is not feasible 406 if (DEBUG) System.err.println("RT: Target infeasible"); 407 this.curRate = measured_mu * 0.1; 408 } 409 } 410 411 if (DEBUG_CAP_RATE) { 412 this.curRate = DEBUG_RATE; 413 } 414 415 this.curRate = Math.max(MIN_RATE, this.curRate); 416 ((RateLimitingPredicate)pred).setTargetRate(this.curRate); 417 418 if (DEBUG) System.err.println("RTControllerMM1 <"+stage.getStage().getName()+"> rate now "+curRate); 419 } 420 } 421 adjustThreshold(QueueElementIF fetched[], long procTime)422 public void adjustThreshold(QueueElementIF fetched[], long procTime) { 423 throw new IllegalArgumentException("Not supported"); 424 } 425 426 } 427