1 /*
2  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
3  * California. All rights reserved.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation for any purpose, without fee, and without written agreement is
7  * hereby granted, provided that the above copyright notice and the following
8  * two paragraphs appear in all copies of this software.
9  *
10  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
11  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
12  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
13  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14  *
15  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
16  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
17  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
18  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
19  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
20  *
21  * Author: Matt Welsh <mdw@cs.berkeley.edu>
22  *
23  */
24 
25 package seda.sandStorm.internal;
26 
27 import seda.sandStorm.api.*;
28 import seda.sandStorm.api.internal.*;
29 import seda.sandStorm.core.*;
30 import seda.sandStorm.main.*;
31 import java.util.*;
32 
33 /**
34  * An implementation of ResponseTimeController that uses a direct
35  * adjustment of queue thresholds based on the error in the 90th
36  * percentile response time.
37  *
38  * @author   Matt Welsh
39  */
40 public class ResponseTimeControllerDirect extends ResponseTimeController {
41 
42   private static final boolean DEBUG = true;
43 
44   private static final boolean ADJUST_THRESHOLD = false;
45   private static final boolean ADJUST_RATE = true;
46 
47   private static final int MEASUREMENT_SIZE = 100;
48   private static final long MEASUREMENT_TIME = 1000;
49   private static final double SMOOTH_CONST = 0.7;
50   private static final int NINETIETH = (int)((double)MEASUREMENT_SIZE * 0.9);
51 
52   private static final double LOW_WATER = 0.9;
53   private static final double HIGH_WATER = 1.2;
54   private static final double ADDITIVE_INCREASE = 0.5;
55   private static final double MULTIPLICATIVE_INCREASE = 1.1;
56   private static final double MULTIPLICATIVE_DECREASE = 2;
57 
58   protected final static int INIT_THRESHOLD = 1;
59   protected final static int MIN_THRESHOLD = 1;
60   protected final static int MAX_THRESHOLD = 1024;
61 
62   private static final double INIT_RATE = 10.0;
63   private static final int INIT_DEPTH = 10;
64   private static final double MAX_RATE = 5000.0;
65   private static final double MIN_RATE = 0.05;
66 
67   private long adjtime;
68   private long measurements[], sortedmeasurements[];
69   private int curThreshold, cur_measurement;
70   private double curRate;
71   private double ninetiethRT;
72   private boolean enabled;
73 
ResponseTimeControllerDirect(ManagerIF mgr, StageWrapperIF stage)74   public ResponseTimeControllerDirect(ManagerIF mgr, StageWrapperIF stage) throws IllegalArgumentException {
75     super(mgr, stage);
76 
77     this.measurements = new long[MEASUREMENT_SIZE];
78     this.sortedmeasurements = new long[MEASUREMENT_SIZE];
79     this.cur_measurement = 0;
80     this.adjtime = System.currentTimeMillis();
81 
82     // Add profile
83     mgr.getProfiler().add("RTController 90th-percentile RT <"+stage.getStage().getName()+">",
84 	new ProfilableIF() {
85 	public int profileSize() {
86 	return (int)ninetiethRT;
87 	}
88 	});
89 
90     if (ADJUST_THRESHOLD) {
91       mgr.getProfiler().add("RTController queueThreshold <"+stage.getStage().getName()+">",
92   	  new ProfilableIF() {
93   	  public int profileSize() {
94   	  return curThreshold;
95   	  }
96   	  });
97 
98       this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), MAX_THRESHOLD);
99       ((QueueThresholdPredicate)pred).setThreshold(INIT_THRESHOLD);
100       this.curThreshold = ((QueueThresholdPredicate)pred).getThreshold();
101       stage.getStage().getSink().setEnqueuePredicate(this.pred);
102 
103       System.err.println("RTControllerDirect <"+stage.getStage().getName()+">: ADJUST_THRESH enabled, target="+targetRT+", MEASUREMENT_SIZE="+MEASUREMENT_SIZE+", SMOOTH_CONST="+SMOOTH_CONST+", LOW_WATER="+LOW_WATER+", HIGH_WATER="+HIGH_WATER+", ADDITIVE_INCREASE="+ADDITIVE_INCREASE+", MULTIPLCATIVE_DECREASE="+MULTIPLICATIVE_DECREASE);
104 
105     } else if (ADJUST_RATE) {
106       mgr.getProfiler().add("RTController curRate <"+stage.getStage().getName()+">",
107   	  new ProfilableIF() {
108   	  public int profileSize() {
109   	  return (int)curRate;
110   	  }
111   	  });
112 
113       this.pred = new RateLimitingPredicate(stage.getStage().getSink(), INIT_RATE, INIT_DEPTH);
114       this.curRate = ((RateLimitingPredicate)pred).getTargetRate();
115       stage.getStage().getSink().setEnqueuePredicate(pred);
116 
117       System.err.println("RTControllerDirect <"+stage.getStage().getName()+">: ADJUST_RATE enabled, target="+targetRT+", MEASUREMENT_SIZE="+MEASUREMENT_SIZE+", SMOOTH_CONST="+SMOOTH_CONST+", LOW_WATER="+LOW_WATER+", HIGH_WATER="+HIGH_WATER+", ADDITIVE_INCREASE="+ADDITIVE_INCREASE+", MULTIPLCATIVE_DECREASE="+MULTIPLICATIVE_DECREASE);
118     }
119 
120     this.enabled = true;
121   }
122 
enable()123   public synchronized void enable() {
124     if (enabled) return;
125 
126     System.err.println("RTControllerDirect <"+stage.getStage().getName()+">: Enabling");
127     if (ADJUST_THRESHOLD) {
128       this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), curThreshold);
129     } else if (ADJUST_RATE) {
130       this.pred = new RateLimitingPredicate(stage.getStage().getSink(), curRate, INIT_DEPTH);
131     }
132 
133     stage.getStage().getSink().setEnqueuePredicate(pred);
134     enabled = true;
135   }
136 
disable()137   public synchronized void disable() {
138     if (!enabled) return;
139     System.err.println("RTControllerDirect <"+stage.getStage().getName()+">: Disabling");
140     this.pred = null;
141     stage.getStage().getSink().setEnqueuePredicate(null);
142     enabled = false;
143   }
144 
adjustThreshold(QueueElementIF fetched[], long procTime)145   public synchronized void adjustThreshold(QueueElementIF fetched[], long procTime) {
146     long curtime = System.currentTimeMillis();
147     boolean adjust = false;
148 
149     for (int i = 0; i < fetched.length; i++) {
150       if (fetched[i] instanceof TimeStampedEvent) {
151 	TimeStampedEvent ev = (TimeStampedEvent)fetched[i];
152 	long time = ev.timestamp;
153 	if (time != 0) {
154 	  measurements[cur_measurement] = curtime - time;
155 	  cur_measurement++;
156 	  if (cur_measurement == MEASUREMENT_SIZE) {
157 	    cur_measurement = 0;
158 	    adjust = true;
159 	  }
160 	}
161       }
162     }
163 
164     int numsort = MEASUREMENT_SIZE;
165     if ((curtime - adjtime) >= MEASUREMENT_TIME) {
166       adjust = true;
167       numsort = cur_measurement;
168       cur_measurement = 0;
169     }
170 
171     if (!adjust) return;
172     System.arraycopy(measurements, 0, sortedmeasurements, 0, numsort);
173     Arrays.sort(sortedmeasurements, 0, numsort);
174     long cur = sortedmeasurements[ (int)(0.9 * (double)numsort) ];
175     ninetiethRT = (SMOOTH_CONST * (double)ninetiethRT*1.0) + ((1.0 - SMOOTH_CONST) * ((double)cur * 1.0));
176     stage.getStats().record90thRT(ninetiethRT);
177 
178     adjtime = curtime;
179 
180     if (!enabled) return;
181 
182     if (ADJUST_THRESHOLD) {
183 
184       if (ninetiethRT < (LOW_WATER * targetRT)) {
185 	curThreshold += ADDITIVE_INCREASE;
186 	//curThreshold *= MULTIPLICATIVE_INCREASE;
187 	if (curThreshold > MAX_THRESHOLD) curThreshold = MAX_THRESHOLD;
188       } else if (ninetiethRT > (HIGH_WATER * targetRT)) {
189 	curThreshold /= MULTIPLICATIVE_DECREASE;
190 	if (curThreshold < MIN_THRESHOLD) curThreshold = MIN_THRESHOLD;
191       }
192       if (DEBUG) System.err.println("RTController <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" target "+targetRT+" threshold "+curThreshold);
193       ((QueueThresholdPredicate)pred).setThreshold(curThreshold);
194 
195     } else if (ADJUST_RATE) {
196 
197       if (ninetiethRT < (LOW_WATER * targetRT)) {
198 	curRate += ADDITIVE_INCREASE;
199 	//curRate *= MULTIPLICATIVE_INCREASE;
200 	if (curRate > MAX_RATE) curRate = MAX_RATE;
201       } else if (ninetiethRT > (HIGH_WATER * targetRT)) {
202 	curRate /= MULTIPLICATIVE_DECREASE;
203 	if (curRate < MIN_RATE) curRate = MIN_RATE;
204       }
205       if (DEBUG) System.err.println("RTController <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" target "+targetRT+" rate now "+curRate);
206       ((RateLimitingPredicate)pred).setTargetRate(this.curRate);
207 
208     }
209 
210 
211   }
212 
213 }
214