1 /* 2 * Copyright (c) 2001 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.internal; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.api.internal.*; 29 import seda.sandStorm.core.*; 30 import seda.sandStorm.main.*; 31 import java.util.*; 32 33 /** 34 * An implementation of ResponseTimeController that uses a direct 35 * adjustment of queue thresholds based on the error in the 90th 36 * percentile response time. 37 * 38 * @author Matt Welsh 39 */ 40 public class ResponseTimeControllerDirect extends ResponseTimeController { 41 42 private static final boolean DEBUG = true; 43 44 private static final boolean ADJUST_THRESHOLD = false; 45 private static final boolean ADJUST_RATE = true; 46 47 private static final int MEASUREMENT_SIZE = 100; 48 private static final long MEASUREMENT_TIME = 1000; 49 private static final double SMOOTH_CONST = 0.7; 50 private static final int NINETIETH = (int)((double)MEASUREMENT_SIZE * 0.9); 51 52 private static final double LOW_WATER = 0.9; 53 private static final double HIGH_WATER = 1.2; 54 private static final double ADDITIVE_INCREASE = 0.5; 55 private static final double MULTIPLICATIVE_INCREASE = 1.1; 56 private static final double MULTIPLICATIVE_DECREASE = 2; 57 58 protected final static int INIT_THRESHOLD = 1; 59 protected final static int MIN_THRESHOLD = 1; 60 protected final static int MAX_THRESHOLD = 1024; 61 62 private static final double INIT_RATE = 10.0; 63 private static final int INIT_DEPTH = 10; 64 private static final double MAX_RATE = 5000.0; 65 private static final double MIN_RATE = 0.05; 66 67 private long adjtime; 68 private long measurements[], sortedmeasurements[]; 69 private int curThreshold, cur_measurement; 70 private double curRate; 71 private double ninetiethRT; 72 private boolean enabled; 73 ResponseTimeControllerDirect(ManagerIF mgr, StageWrapperIF stage)74 public ResponseTimeControllerDirect(ManagerIF mgr, StageWrapperIF stage) throws IllegalArgumentException { 75 super(mgr, stage); 76 77 this.measurements = new long[MEASUREMENT_SIZE]; 78 this.sortedmeasurements = new long[MEASUREMENT_SIZE]; 79 this.cur_measurement = 0; 80 this.adjtime = System.currentTimeMillis(); 81 82 // Add profile 83 mgr.getProfiler().add("RTController 90th-percentile RT <"+stage.getStage().getName()+">", 84 new ProfilableIF() { 85 public int profileSize() { 86 return (int)ninetiethRT; 87 } 88 }); 89 90 if (ADJUST_THRESHOLD) { 91 mgr.getProfiler().add("RTController queueThreshold <"+stage.getStage().getName()+">", 92 new ProfilableIF() { 93 public int profileSize() { 94 return curThreshold; 95 } 96 }); 97 98 this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), MAX_THRESHOLD); 99 ((QueueThresholdPredicate)pred).setThreshold(INIT_THRESHOLD); 100 this.curThreshold = ((QueueThresholdPredicate)pred).getThreshold(); 101 stage.getStage().getSink().setEnqueuePredicate(this.pred); 102 103 System.err.println("RTControllerDirect <"+stage.getStage().getName()+">: ADJUST_THRESH enabled, target="+targetRT+", MEASUREMENT_SIZE="+MEASUREMENT_SIZE+", SMOOTH_CONST="+SMOOTH_CONST+", LOW_WATER="+LOW_WATER+", HIGH_WATER="+HIGH_WATER+", ADDITIVE_INCREASE="+ADDITIVE_INCREASE+", MULTIPLCATIVE_DECREASE="+MULTIPLICATIVE_DECREASE); 104 105 } else if (ADJUST_RATE) { 106 mgr.getProfiler().add("RTController curRate <"+stage.getStage().getName()+">", 107 new ProfilableIF() { 108 public int profileSize() { 109 return (int)curRate; 110 } 111 }); 112 113 this.pred = new RateLimitingPredicate(stage.getStage().getSink(), INIT_RATE, INIT_DEPTH); 114 this.curRate = ((RateLimitingPredicate)pred).getTargetRate(); 115 stage.getStage().getSink().setEnqueuePredicate(pred); 116 117 System.err.println("RTControllerDirect <"+stage.getStage().getName()+">: ADJUST_RATE enabled, target="+targetRT+", MEASUREMENT_SIZE="+MEASUREMENT_SIZE+", SMOOTH_CONST="+SMOOTH_CONST+", LOW_WATER="+LOW_WATER+", HIGH_WATER="+HIGH_WATER+", ADDITIVE_INCREASE="+ADDITIVE_INCREASE+", MULTIPLCATIVE_DECREASE="+MULTIPLICATIVE_DECREASE); 118 } 119 120 this.enabled = true; 121 } 122 enable()123 public synchronized void enable() { 124 if (enabled) return; 125 126 System.err.println("RTControllerDirect <"+stage.getStage().getName()+">: Enabling"); 127 if (ADJUST_THRESHOLD) { 128 this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), curThreshold); 129 } else if (ADJUST_RATE) { 130 this.pred = new RateLimitingPredicate(stage.getStage().getSink(), curRate, INIT_DEPTH); 131 } 132 133 stage.getStage().getSink().setEnqueuePredicate(pred); 134 enabled = true; 135 } 136 disable()137 public synchronized void disable() { 138 if (!enabled) return; 139 System.err.println("RTControllerDirect <"+stage.getStage().getName()+">: Disabling"); 140 this.pred = null; 141 stage.getStage().getSink().setEnqueuePredicate(null); 142 enabled = false; 143 } 144 adjustThreshold(QueueElementIF fetched[], long procTime)145 public synchronized void adjustThreshold(QueueElementIF fetched[], long procTime) { 146 long curtime = System.currentTimeMillis(); 147 boolean adjust = false; 148 149 for (int i = 0; i < fetched.length; i++) { 150 if (fetched[i] instanceof TimeStampedEvent) { 151 TimeStampedEvent ev = (TimeStampedEvent)fetched[i]; 152 long time = ev.timestamp; 153 if (time != 0) { 154 measurements[cur_measurement] = curtime - time; 155 cur_measurement++; 156 if (cur_measurement == MEASUREMENT_SIZE) { 157 cur_measurement = 0; 158 adjust = true; 159 } 160 } 161 } 162 } 163 164 int numsort = MEASUREMENT_SIZE; 165 if ((curtime - adjtime) >= MEASUREMENT_TIME) { 166 adjust = true; 167 numsort = cur_measurement; 168 cur_measurement = 0; 169 } 170 171 if (!adjust) return; 172 System.arraycopy(measurements, 0, sortedmeasurements, 0, numsort); 173 Arrays.sort(sortedmeasurements, 0, numsort); 174 long cur = sortedmeasurements[ (int)(0.9 * (double)numsort) ]; 175 ninetiethRT = (SMOOTH_CONST * (double)ninetiethRT*1.0) + ((1.0 - SMOOTH_CONST) * ((double)cur * 1.0)); 176 stage.getStats().record90thRT(ninetiethRT); 177 178 adjtime = curtime; 179 180 if (!enabled) return; 181 182 if (ADJUST_THRESHOLD) { 183 184 if (ninetiethRT < (LOW_WATER * targetRT)) { 185 curThreshold += ADDITIVE_INCREASE; 186 //curThreshold *= MULTIPLICATIVE_INCREASE; 187 if (curThreshold > MAX_THRESHOLD) curThreshold = MAX_THRESHOLD; 188 } else if (ninetiethRT > (HIGH_WATER * targetRT)) { 189 curThreshold /= MULTIPLICATIVE_DECREASE; 190 if (curThreshold < MIN_THRESHOLD) curThreshold = MIN_THRESHOLD; 191 } 192 if (DEBUG) System.err.println("RTController <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" target "+targetRT+" threshold "+curThreshold); 193 ((QueueThresholdPredicate)pred).setThreshold(curThreshold); 194 195 } else if (ADJUST_RATE) { 196 197 if (ninetiethRT < (LOW_WATER * targetRT)) { 198 curRate += ADDITIVE_INCREASE; 199 //curRate *= MULTIPLICATIVE_INCREASE; 200 if (curRate > MAX_RATE) curRate = MAX_RATE; 201 } else if (ninetiethRT > (HIGH_WATER * targetRT)) { 202 curRate /= MULTIPLICATIVE_DECREASE; 203 if (curRate < MIN_RATE) curRate = MIN_RATE; 204 } 205 if (DEBUG) System.err.println("RTController <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" target "+targetRT+" rate now "+curRate); 206 ((RateLimitingPredicate)pred).setTargetRate(this.curRate); 207 208 } 209 210 211 } 212 213 } 214