1 /* 2 * Copyright (c) 2001 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.internal; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.api.internal.*; 29 import seda.sandStorm.core.*; 30 import seda.sandStorm.main.*; 31 import seda.util.*; 32 import java.util.*; 33 34 /** 35 * An implementation of ResponseTimeController that uses a PID control. 36 * 37 * @author Matt Welsh 38 */ 39 public class ResponseTimeControllerPID extends ResponseTimeController { 40 41 private static final boolean DEBUG = true; 42 43 private static final boolean ADJUST_THRESHOLD = false; 44 private static final boolean ADJUST_RATE = true; 45 46 private static final boolean BE_CREATIVE = false; 47 48 private static final int MEASUREMENT_SIZE = 100; 49 private static final long MEASUREMENT_TIME = 5000; 50 private static final double SMOOTH_CONST = 0.8; 51 private static final double PROP_GAIN = 1.0; 52 private static final double DERIV_GAIN = -0.5; 53 private static final double INTR_GAIN = (0.2 / MEASUREMENT_SIZE); 54 private static final int NINETIETH = (int)((double)MEASUREMENT_SIZE * 0.9); 55 56 protected final static int INIT_THRESHOLD = 1; 57 protected final static int MIN_THRESHOLD = 1; 58 protected final static int MAX_THRESHOLD = 1024; 59 60 private static final double INIT_RATE = 10.0; 61 private static final int INIT_DEPTH = 10; 62 private static final double MAX_RATE = 5000.0; 63 private static final double MIN_RATE = 0.05; 64 65 private SinkProxy sinkProxy; 66 private long measurements[], sortedmeasurements[]; 67 private double errors[], lasterr, lastinterr, totalinterr; 68 private int curThreshold, cur_measurement, cur_error; 69 private long numReceived; 70 private double curRate; 71 private double ninetiethRT, lambda; 72 private long adjtime; 73 private boolean enabled; 74 ResponseTimeControllerPID(ManagerIF mgr, StageWrapperIF stage)75 public ResponseTimeControllerPID(ManagerIF mgr, StageWrapperIF stage) throws IllegalArgumentException { 76 super(mgr, stage); 77 this.adjtime = System.currentTimeMillis(); 78 this.sinkProxy = (SinkProxy)stage.getStage().getSink(); 79 this.measurements = new long[MEASUREMENT_SIZE]; 80 this.sortedmeasurements = new long[MEASUREMENT_SIZE]; 81 this.errors = new double[MEASUREMENT_SIZE]; 82 this.cur_measurement = 0; 83 this.cur_error = 0; 84 85 // Add profile 86 mgr.getProfiler().add("RTControllerPID 90th-percentile RT <"+stage.getStage().getName()+">", 87 new ProfilableIF() { 88 public int profileSize() { 89 return (int)ninetiethRT; 90 } 91 }); 92 93 if (ADJUST_THRESHOLD) { 94 95 mgr.getProfiler().add("RTControllerPID queue threshold <"+stage.getStage().getName()+">", 96 new ProfilableIF() { 97 public int profileSize() { 98 return curThreshold; 99 } 100 }); 101 102 this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), MAX_THRESHOLD); 103 ((QueueThresholdPredicate)pred).setThreshold(INIT_THRESHOLD); 104 this.curThreshold = ((QueueThresholdPredicate)pred).getThreshold(); 105 stage.getStage().getSink().setEnqueuePredicate(this.pred); 106 107 System.err.println("RTControllerPID <"+stage.getStage().getName()+">: ADJUST_THRESH enabled, MEASUREMENT_SIZE="+MEASUREMENT_SIZE+", SMOOTH_CONST="+SMOOTH_CONST+", PROP_GAIN="+PROP_GAIN+", DERIV_GAIN="+DERIV_GAIN+", INTR_GAIN="+INTR_GAIN); 108 109 } else if (ADJUST_RATE) { 110 111 this.pred = new RateLimitingPredicate(stage.getStage().getSink(), INIT_RATE, INIT_DEPTH); 112 this.curRate = ((RateLimitingPredicate)pred).getTargetRate(); 113 stage.getStage().getSink().setEnqueuePredicate(pred); 114 115 System.err.println("RTControllerPID <"+stage.getStage().getName()+">: ADJUST_RATE enabled, MEASUREMENT_SIZE="+MEASUREMENT_SIZE+", SMOOTH_CONST="+SMOOTH_CONST+", PROP_GAIN="+PROP_GAIN+", DERIV_GAIN="+DERIV_GAIN+", INTR_GAIN="+INTR_GAIN); 116 } 117 118 this.enabled = true; 119 } 120 enable()121 public synchronized void enable() { 122 if (enabled) return; 123 124 System.err.println("RTControllerPID <"+stage.getStage().getName()+">: Enabling"); 125 if (ADJUST_THRESHOLD) { 126 this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), curThreshold); 127 128 } else if (ADJUST_RATE) { 129 this.pred = new RateLimitingPredicate(stage.getStage().getSink(), curRate, INIT_DEPTH); 130 } 131 132 stage.getStage().getSink().setEnqueuePredicate(this.pred); 133 enabled = true; 134 } 135 disable()136 public synchronized void disable() { 137 if (!enabled) return; 138 System.err.println("RTControllerPID <"+stage.getStage().getName()+">: Disabling"); 139 this.pred = null; 140 stage.getStage().getSink().setEnqueuePredicate(null); 141 enabled = false; 142 } 143 adjustThreshold(QueueElementIF fetched[], long procTime)144 public synchronized void adjustThreshold(QueueElementIF fetched[], long procTime) { 145 long curtime = System.currentTimeMillis(); 146 boolean adjust = false; 147 148 for (int i = 0; i < fetched.length; i++) { 149 if (fetched[i] instanceof TimeStampedEvent) { 150 TimeStampedEvent ev = (TimeStampedEvent)fetched[i]; 151 long time = ev.timestamp; 152 if (time != 0) { 153 measurements[cur_measurement] = curtime - time; 154 cur_measurement++; 155 if (cur_measurement == MEASUREMENT_SIZE) { 156 cur_measurement = 0; 157 adjust = true; 158 } 159 } 160 } 161 } 162 163 int numsort = MEASUREMENT_SIZE; 164 long elapsed = curtime - adjtime; 165 if (elapsed >= MEASUREMENT_TIME) { 166 adjust = true; 167 numsort = cur_measurement; 168 cur_measurement = 0; 169 } 170 171 if (!adjust) return; 172 System.arraycopy(measurements, 0, sortedmeasurements, 0, numsort); 173 Arrays.sort(sortedmeasurements, 0, numsort); 174 long cur = sortedmeasurements[ (int)(0.9 * (double)numsort) ]; 175 ninetiethRT = (SMOOTH_CONST * (double)ninetiethRT*1.0) + ((1.0 - SMOOTH_CONST) * ((double)cur * 1.0)); 176 adjtime = curtime; 177 stage.getStats().record90thRT(ninetiethRT); 178 179 int numReceived = sinkProxy.enqueueSuccessCount; 180 sinkProxy.enqueueSuccessCount = 0; 181 double cur_lambda = (numReceived * 1.0) / (elapsed * 1.0e-3); 182 lambda = (SMOOTH_CONST * lambda) + ((1.0 - SMOOTH_CONST) * cur_lambda); 183 184 // Apply PID control 185 double err = (targetRT - ninetiethRT) / targetRT; 186 double derr = (err - lasterr) / (double)(elapsed * 1.0e-3); 187 double interr = (((lasterr + err)/2) * (double)((elapsed) * 1.0e-3)); 188 lasterr = err; adjtime = curtime; 189 190 totalinterr -= errors[cur_error]; 191 totalinterr += interr; 192 errors[cur_error] = interr; 193 cur_error++; if (cur_error == MEASUREMENT_SIZE) cur_error = 0; 194 195 // interr -= errors[cur_error]; 196 // errors[cur_error] = interr; 197 // cur_error++; if (cur_error == MEASUREMENT_SIZE) cur_error = 0; 198 // lasterr = err; lastinterr = interr; adjtime = curtime; 199 200 double out; 201 if (BE_CREATIVE) { 202 out = (PROP_GAIN * err * err); 203 if (err < 0) { out *= -1; } 204 } else { 205 out = ((PROP_GAIN * err) + (DERIV_GAIN * derr) + (INTR_GAIN*totalinterr)); 206 } 207 208 if (DEBUG) System.err.println("RTControllerPID <"+stage.getStage().getName()+">: lambda "+MDWUtil.format(lambda)+" 90th "+MDWUtil.format(ninetiethRT)+" err "+MDWUtil.format(err)+" derr "+MDWUtil.format(derr)+" interr "+MDWUtil.format(totalinterr)+" out "+MDWUtil.format(out)); 209 210 if (!enabled) return; 211 212 if (ADJUST_THRESHOLD) { 213 curThreshold += out; 214 //curThreshold = (int)((MIN_THRESHOLD) + ((MAX_THRESHOLD - MIN_THRESHOLD) * out)); 215 if (curThreshold < MIN_THRESHOLD) curThreshold = MIN_THRESHOLD; 216 if (curThreshold > MAX_THRESHOLD) curThreshold = MAX_THRESHOLD; 217 218 if (DEBUG) System.err.println("RTControllerPID <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" target "+targetRT+" threshold "+curThreshold); 219 ((QueueThresholdPredicate)pred).setThreshold(curThreshold); 220 221 } else if (ADJUST_RATE) { 222 223 if (BE_CREATIVE) { 224 if (out < 0) { 225 //curRate /= (out * -1); 226 curRate /= 2; 227 } else { 228 curRate += out; 229 } 230 } else { 231 curRate += out; 232 } 233 234 curRate = Math.max(MIN_RATE, curRate); 235 curRate = Math.min(MAX_RATE, curRate); 236 ((RateLimitingPredicate)pred).setTargetRate(this.curRate); 237 238 if (DEBUG) System.err.println("RTControllerPID <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" target "+targetRT+" rate now "+curRate); 239 240 } 241 242 } 243 244 } 245