1 /*
2  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
3  * California. All rights reserved.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation for any purpose, without fee, and without written agreement is
7  * hereby granted, provided that the above copyright notice and the following
8  * two paragraphs appear in all copies of this software.
9  *
10  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
11  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
12  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
13  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14  *
15  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
16  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
17  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
18  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
19  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
20  *
21  * Author: Matt Welsh <mdw@cs.berkeley.edu>
22  *
23  */
24 
25 package seda.sandStorm.internal;
26 
27 import seda.sandStorm.api.*;
28 import seda.sandStorm.api.internal.*;
29 import seda.sandStorm.core.*;
30 import seda.sandStorm.main.*;
31 import seda.util.*;
32 import java.util.*;
33 
34 /**
35  * An implementation of ResponseTimeController that models the stage
36  * as an M/M/1 queue.
37  *
38  * @author   Matt Welsh
39  */
40 public class ResponseTimeControllerMM1 extends ResponseTimeController {
41 
42   private static final boolean DEBUG = true;
43 
44   private static final boolean ADJUST_THRESHOLD = false;
45   private static final boolean ADJUST_RATE = true;
46 
47   protected final static int INIT_THRESHOLD = 1;
48   protected final static int MIN_THRESHOLD = 1;
49   protected final static int MAX_THRESHOLD = 1024;
50 
51   private static final double INIT_RATE = -1.0;
52   private static final int INIT_DEPTH = 100;
53   private static final double MIN_RATE = 0.5;
54 
55   private static final boolean DEBUG_CAP_RATE = false;
56   private static final double DEBUG_RATE = 100000.0;
57 
58   private static final boolean MOVING_AVERAGE = true;
59   private static final int MEASUREMENT_SIZE = 200;
60 
61 
62 
63   // Arashi runs
64   private static final int ESTIMATION_SIZE = 5000;
65   private static final long ESTIMATION_TIME = 5000;
66 
67   // Arashi runs
68 //  private static final int ESTIMATION_SIZE = 500;
69 // private static final long ESTIMATION_TIME = 2000;
70 
71   // Original benchmarking
72 //  private static final int ESTIMATION_SIZE = 100;
73 //  private static final long ESTIMATION_TIME = 1000;
74 
75   private static final double SMOOTH_CONST = 0.1;
76   private static final int NINETIETH = (int)((double)MEASUREMENT_SIZE * 0.9);
77 
78   private static final boolean BIDIRECTIONAL_FILTER = true;
79   private static final double SMOOTH_CONST_UP = 0.9;
80   private static final double SMOOTH_CONST_DOWN = 0.1;
81 
82   private SinkProxy sinkProxy;
83   private long measurements[], sortedmeasurements[];
84   private int curThreshold, cur_measurement;
85   private double curRate;
86   private double measured_mu, measured_lambda, est_ninetiethRT;
87   private double total_measured_mu, count_measured_mu, total_measured_lambda,
88     count_measured_lambda, total_est_ninetiethRT, count_est_ninetiethRT;
89   private double ninetiethRT, totalNinetiethRT;
90   private int countNinetiethRT;
91   private long lasttime, totalProcTime;
92   private long startProcTime, endProcTime;
93   private int numProcessed, numReceived, numEst;
94   private double avgNumThreads = 1.0;
95   private int totalNumThreads = 0, countNumThreads = 0;
96   private boolean enabled;
97 
ResponseTimeControllerMM1(ManagerIF mgr, StageWrapperIF stage)98   public ResponseTimeControllerMM1(ManagerIF mgr, StageWrapperIF stage) throws IllegalArgumentException {
99     super(mgr, stage);
100     this.sinkProxy = (SinkProxy)stage.getStage().getSink();
101     this.lasttime = System.currentTimeMillis();
102 
103     if (ADJUST_THRESHOLD) {
104       this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), INIT_THRESHOLD);
105       this.curThreshold = ((QueueThresholdPredicate)pred).getThreshold();
106     }
107     if (ADJUST_RATE) {
108       this.pred = new RateLimitingPredicate(stage.getStage().getSink(), INIT_RATE, INIT_DEPTH);
109       this.curRate = ((RateLimitingPredicate)pred).getTargetRate();
110     }
111     stage.getStage().getSink().setEnqueuePredicate(pred);
112     enabled = true;
113 
114     this.measurements = new long[MEASUREMENT_SIZE];
115     this.sortedmeasurements = new long[MEASUREMENT_SIZE];
116     this.cur_measurement = 0;
117     this.startProcTime = Long.MAX_VALUE; this.endProcTime = 0L;
118 
119     // Add profile
120     mgr.getProfiler().add("RTControllerMM1 90th-percentile RT <"+stage.getStage().getName()+">",
121 	new ProfilableIF() {
122 	public int profileSize() {
123 	return (int)ninetiethRT;
124 	}
125 	});
126     mgr.getProfiler().add("RTControllerMM1 lambda <"+stage.getStage().getName()+">",
127 	new ProfilableIF() {
128 	public int profileSize() {
129 	return (int)measured_lambda;
130 	}
131 	});
132     mgr.getProfiler().add("RTControllerMM1 mu <"+stage.getStage().getName()+">",
133 	new ProfilableIF() {
134 	public int profileSize() {
135 	return (int)measured_mu;
136 	}
137 	});
138     mgr.getProfiler().add("RTControllerMM1 est90thRT <"+stage.getStage().getName()+">",
139 	new ProfilableIF() {
140 	public int profileSize() {
141 	return (int)est_ninetiethRT;
142 	}
143 	});
144     mgr.getProfiler().add("RTControllerMM1 avgNumThreads <"+stage.getStage().getName()+">",
145 	new ProfilableIF() {
146 	public int profileSize() {
147 	return (int)avgNumThreads;
148 	}
149 	});
150 
151     if (ADJUST_THRESHOLD) {
152       System.err.print("RTControllerMM1 <"+stage.getStage().getName()+">: ADJUST_THRESHOLD enabled, INIT_THRESHOLD="+INIT_THRESHOLD+", ESTIMATION_SIZE="+ESTIMATION_SIZE+", ESTIMATION_TIME="+ESTIMATION_TIME);
153       mgr.getProfiler().add("RTControllerMM1 queueThreshold <"+stage.getStage().getName()+">",
154   	  new ProfilableIF() {
155   	  public int profileSize() {
156   	  return curThreshold;
157   	  }
158   	  });
159     }
160 
161     if (ADJUST_RATE) {
162       System.err.print("RTControllerMM1 <"+stage.getStage().getName()+">: ADJUST_RATE enabled, INIT_DEPTH="+INIT_DEPTH+", ESTIMATION_SIZE="+ESTIMATION_SIZE+", ESTIMATION_TIME="+ESTIMATION_TIME);
163       if (BIDIRECTIONAL_FILTER) {
164 	System.err.println(", SMOOTH_CONST_UP="+SMOOTH_CONST_UP+", SMOOTH_CONST_DOWN="+SMOOTH_CONST_DOWN);
165       } else {
166 	System.err.println(", SMOOTH_CONST="+SMOOTH_CONST);
167       }
168 
169       mgr.getProfiler().add("RTControllerMM1 queueRate <"+stage.getStage().getName()+">",
170   	  new ProfilableIF() {
171   	  public int profileSize() {
172   	  return (int)curRate;
173   	  }
174   	  });
175       mgr.getProfiler().add("RTControllerMM1 tokenBucket <"+stage.getStage().getName()+">",
176   	  new ProfilableIF() {
177   	  public int profileSize() {
178   	  return ((RateLimitingPredicate)pred).getBucketSize();
179   	  }
180   	  });
181     }
182 
183     System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: initialized, targetRT="+targetRT+" ms");
184   }
185 
enable()186   public synchronized void enable() {
187     if (enabled) return;
188 
189     System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: Enabling");
190     if (ADJUST_THRESHOLD) {
191       this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), curThreshold);
192 
193     } else if (ADJUST_RATE) {
194       this.pred = new RateLimitingPredicate(stage.getStage().getSink(), curRate, INIT_DEPTH);
195     }
196     stage.getStage().getSink().setEnqueuePredicate(this.pred);
197     enabled = true;
198   }
199 
disable()200   public synchronized void disable() {
201     if (!enabled) return;
202     System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: Disabling");
203     this.pred = null;
204     stage.getStage().getSink().setEnqueuePredicate(null);
205     enabled = false;
206   }
207 
208   // Measure 90thRT, mu, lambda, estimate RT from model
adjustThreshold(QueueElementIF fetched[], long startTime, long endTime, boolean isFirst, int numThreads)209   public synchronized void adjustThreshold(QueueElementIF fetched[],
210       long startTime, long endTime, boolean isFirst, int numThreads) {
211 //    if (DEBUG) System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: adjustThreshold called, fetched.len="+fetched.length+", time="+(endTime-startTime)+", isFirst="+isFirst+", numThreads="+numThreads);
212 
213     boolean adjust_meas = false;
214     boolean adjust_est = false;
215 
216     if (MOVING_AVERAGE) {
217       avgNumThreads = (SMOOTH_CONST * avgNumThreads) + ((1.0 - SMOOTH_CONST) * (double)(numThreads*1.0));
218     } else {
219       totalNumThreads += numThreads;
220       countNumThreads ++;
221       avgNumThreads = (totalNumThreads * 1.0) / (countNumThreads * 1.0);
222     }
223 
224     numProcessed += fetched.length;
225     totalProcTime += endTime - startTime;
226 
227     //if (startTime < startProcTime) startProcTime = startTime;
228     //if (endTime > endProcTime) endProcTime = endTime;
229 
230     /*
231     if ((endTime <= startProcTime) || (startTime >= endProcTime)) {
232       totalProcTime += endTime - startTime;
233       startProcTime = startTime; endProcTime = endTime;
234     } else {
235       if (startTime < startProcTime) {
236 	totalProcTime += startProcTime - startTime;
237 	startProcTime = startTime;
238       }
239       if (endTime > endProcTime) {
240 	totalProcTime += endTime - endProcTime;
241 	endProcTime = endTime;
242       }
243     }
244     */
245 
246     //System.err.println("RTControllerMM1 <"+stage.getStage().getName()+"> S="+(startTime-startProcTime)+" E="+(endTime-startProcTime));
247 
248     if (!isFirst) return;
249 
250     // On every iteration reset the timespan
251     //totalProcTime += endProcTime - startProcTime;
252     //startProcTime = Long.MAX_VALUE; endProcTime = 0L;
253 
254     numReceived += sinkProxy.enqueueSuccessCount;
255     sinkProxy.enqueueSuccessCount = 0;
256 
257     // Measure actual 90th RT
258     long curtime = System.currentTimeMillis();
259     for (int i = 0; i < fetched.length; i++) {
260       if (fetched[i] instanceof TimeStampedEvent) {
261 	adjust_est = true;
262 	TimeStampedEvent ev = (TimeStampedEvent)fetched[i];
263 	long time = ev.timestamp;
264 	if (time != 0) {
265 	  measurements[cur_measurement] = curtime - time;
266 	  cur_measurement++;
267 	  if (cur_measurement == MEASUREMENT_SIZE) {
268 	    cur_measurement = 0;
269 	    adjust_meas = true;
270 	    break; // XXX MDW TESTING
271 	  }
272 	}
273       }
274     }
275 
276     // XXX MDW: Continuously update
277     adjust_meas = true;
278     if (adjust_meas) {
279       System.arraycopy(measurements, 0, sortedmeasurements, 0, MEASUREMENT_SIZE);
280       Arrays.sort(sortedmeasurements);
281       long cur = sortedmeasurements[NINETIETH];
282 
283       if (MOVING_AVERAGE) {
284         ninetiethRT = (SMOOTH_CONST * (double)ninetiethRT*1.0) + ((1.0 - SMOOTH_CONST) * ((double)(cur) * 1.0));
285       } else {
286 	totalNinetiethRT += cur;
287 	countNinetiethRT ++;
288 	ninetiethRT = (totalNinetiethRT * 1.0) / (countNinetiethRT * 1.0);
289       }
290       stage.getStats().record90thRT(ninetiethRT);
291 
292     }
293 
294     // XXX MDW: Always adjust estimated lambda/mu
295     //if (!adjust_est) return;
296 
297     long elapsed = curtime - lasttime;
298     numEst++;
299     if ((numEst == ESTIMATION_SIZE) || (elapsed >= ESTIMATION_TIME)) {
300       numEst = 0;
301     } else {
302       return;
303     }
304 
305     lasttime = curtime;
306 
307     //System.err.println("RT: recv "+numReceived+" proc "+numProcessed);
308 
309     // Estimate 90th RT using M/M/1 model
310     // Assume mu scales linearly with number of threads
311     double mu_scaling = Math.log(avgNumThreads)+1.0;
312 
313     if (DEBUG) System.err.println("\nRT: numProcessed "+numProcessed+" mu_scaling "+mu_scaling+" totalProcTime "+totalProcTime);
314 
315 
316     // Don't recalculate if we don't have enough data - avoid large mu
317     // spikes due to fast measurements
318     if ((totalProcTime < 2) || (numProcessed < 2)) return;
319 
320     // XXX TESTING
321     if (elapsed < 2) return;
322 
323     double cur_mu = (numProcessed * mu_scaling * 1.0) / (totalProcTime * 1.0e-3);
324     double cur_lambda = (numReceived * 1.0) / (elapsed * 1.0e-3);
325 
326     if (MOVING_AVERAGE) {
327       if (BIDIRECTIONAL_FILTER) {
328 	if (cur_mu < measured_mu) {
329 	  measured_mu = (SMOOTH_CONST_DOWN * (double)measured_mu*1.0) + ((1.0 - SMOOTH_CONST_DOWN) * cur_mu);
330 	} else {
331 	  measured_mu = (SMOOTH_CONST_UP * (double)measured_mu*1.0) + ((1.0 - SMOOTH_CONST_UP) * cur_mu);
332 	}
333 
334       } else {
335 	measured_mu = (SMOOTH_CONST * (double)measured_mu*1.0) + ((1.0 - SMOOTH_CONST) * cur_mu);
336       }
337 
338       measured_lambda = (SMOOTH_CONST * measured_lambda) + ((1.0 - SMOOTH_CONST) * cur_lambda);
339 
340     } else {
341       total_measured_mu += numProcessed * mu_scaling;
342       count_measured_mu += totalProcTime * 1.0e-3;
343       measured_mu = total_measured_mu / count_measured_mu;
344       total_measured_lambda += numReceived;
345       count_measured_lambda += elapsed * 1.0e-3;
346       measured_lambda = total_measured_lambda / count_measured_lambda;
347     }
348 
349     double rho = measured_lambda/measured_mu;
350     // XXX MDW: This is wrong - for waiting time
351     //double est = (((1.0/measured_mu)/(1.0-rho)) * Math.log(10.0)) * 1.0e3;
352     double est = (((1.0/measured_mu)/(1.0-rho)) * 2.3) * 1.0e3;
353     if (DEBUG) System.err.println("\nRT: cur_mu "+cur_mu+", cur_lambda "+cur_lambda+", est90th "+est);
354     if (est >= 0.0) {
355       if (MOVING_AVERAGE) {
356 	est_ninetiethRT = (SMOOTH_CONST * (double)est_ninetiethRT*1.0) + ((1.0 - SMOOTH_CONST) * ((double)(est) * 1.0));
357       } else {
358 	total_est_ninetiethRT += est;
359 	count_est_ninetiethRT ++;
360 	est_ninetiethRT = (total_est_ninetiethRT * 1.0) / (count_est_ninetiethRT * 1.0);
361       }
362     }
363 
364     numProcessed = 0; numReceived = 0; totalProcTime = 0L;
365 
366     if (DEBUG) System.err.println("RTControllerMM1 <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" est "+MDWUtil.format(est_ninetiethRT)+" mu "+MDWUtil.format(measured_mu)+" lambda "+MDWUtil.format(measured_lambda));
367 
368     if (!enabled) return;
369 
370     // Now do threshold scaling
371     if (ADJUST_THRESHOLD) {
372       if (est < 0.0) {
373 	// If under overload
374 	if (DEBUG) System.err.println("RT: Overload detected");
375 	// XXX MDW TESTING: Was 0.9
376 	curThreshold = MIN_THRESHOLD;
377       } else {
378 	if (est_ninetiethRT < (0.5 * targetRT)) {
379 	  curThreshold += 1;
380 	  if (curThreshold > MAX_THRESHOLD) curThreshold = MAX_THRESHOLD;
381 	} else if (est_ninetiethRT >= (1.2 * targetRT)) {
382 	  curThreshold /= 2;
383 	  if (curThreshold < MIN_THRESHOLD) curThreshold = MIN_THRESHOLD;
384 	}
385       }
386       ((QueueThresholdPredicate)pred).setThreshold(curThreshold);
387       if (DEBUG) System.err.println("RTControllerMM1 <"+stage.getStage().getName()+"> threshold now "+curThreshold);
388     }
389 
390     // Do rate scaling
391     if (ADJUST_RATE) {
392       if (est < 0.0) {
393 	// If under overload
394 	if (DEBUG) System.err.println("RT: Overload detected");
395 	// XXX MDW TESTING: Was 0.9
396 	this.curRate = measured_mu * 0.1;
397       } else {
398 
399 	// XXX TESTING: Maybe don't adjust if we are under the target,
400 	// but, then we may be rejecting requests needlessly
401 //      } else if (est_ninetiethRT >= targetRT) {
402 
403 	this.curRate = measured_mu - (2.302) / (targetRT / 1.0e3);
404 	if (this.curRate < 0.0) {
405 	  // If the target is not feasible
406 	  if (DEBUG) System.err.println("RT: Target infeasible");
407 	  this.curRate = measured_mu * 0.1;
408 	}
409       }
410 
411       if (DEBUG_CAP_RATE) {
412 	this.curRate = DEBUG_RATE;
413       }
414 
415       this.curRate = Math.max(MIN_RATE, this.curRate);
416       ((RateLimitingPredicate)pred).setTargetRate(this.curRate);
417 
418       if (DEBUG) System.err.println("RTControllerMM1 <"+stage.getStage().getName()+"> rate now "+curRate);
419     }
420   }
421 
adjustThreshold(QueueElementIF fetched[], long procTime)422   public void adjustThreshold(QueueElementIF fetched[], long procTime) {
423     throw new IllegalArgumentException("Not supported");
424   }
425 
426 }
427