1 /*
2  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
3  * California. All rights reserved.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation for any purpose, without fee, and without written agreement is
7  * hereby granted, provided that the above copyright notice and the following
8  * two paragraphs appear in all copies of this software.
9  *
10  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
11  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
12  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
13  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14  *
15  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
16  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
17  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
18  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
19  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
20  *
21  * Author: Matt Welsh <mdw@cs.berkeley.edu>
22  *
23  */
24 
25 package seda.sandStorm.internal;
26 
27 import seda.sandStorm.api.*;
28 import seda.sandStorm.api.internal.*;
29 import seda.sandStorm.core.*;
30 import seda.sandStorm.main.*;
31 import seda.util.*;
32 import java.util.*;
33 
34 /**
35  * An implementation of ResponseTimeController that uses a PID control.
36  *
37  * @author   Matt Welsh
38  */
39 public class ResponseTimeControllerPID extends ResponseTimeController {
40 
41   private static final boolean DEBUG = true;
42 
43   private static final boolean ADJUST_THRESHOLD = false;
44   private static final boolean ADJUST_RATE = true;
45 
46   private static final boolean BE_CREATIVE = false;
47 
48   private static final int MEASUREMENT_SIZE = 100;
49   private static final long MEASUREMENT_TIME = 5000;
50   private static final double SMOOTH_CONST = 0.8;
51   private static final double PROP_GAIN = 1.0;
52   private static final double DERIV_GAIN = -0.5;
53   private static final double INTR_GAIN = (0.2 / MEASUREMENT_SIZE);
54   private static final int NINETIETH = (int)((double)MEASUREMENT_SIZE * 0.9);
55 
56   protected final static int INIT_THRESHOLD = 1;
57   protected final static int MIN_THRESHOLD = 1;
58   protected final static int MAX_THRESHOLD = 1024;
59 
60   private static final double INIT_RATE = 10.0;
61   private static final int INIT_DEPTH = 10;
62   private static final double MAX_RATE = 5000.0;
63   private static final double MIN_RATE = 0.05;
64 
65   private SinkProxy sinkProxy;
66   private long measurements[], sortedmeasurements[];
67   private double errors[], lasterr, lastinterr, totalinterr;
68   private int curThreshold, cur_measurement, cur_error;
69   private long numReceived;
70   private double curRate;
71   private double ninetiethRT, lambda;
72   private long adjtime;
73   private boolean enabled;
74 
ResponseTimeControllerPID(ManagerIF mgr, StageWrapperIF stage)75   public ResponseTimeControllerPID(ManagerIF mgr, StageWrapperIF stage) throws IllegalArgumentException {
76     super(mgr, stage);
77     this.adjtime = System.currentTimeMillis();
78     this.sinkProxy = (SinkProxy)stage.getStage().getSink();
79     this.measurements = new long[MEASUREMENT_SIZE];
80     this.sortedmeasurements = new long[MEASUREMENT_SIZE];
81     this.errors = new double[MEASUREMENT_SIZE];
82     this.cur_measurement = 0;
83     this.cur_error = 0;
84 
85     // Add profile
86     mgr.getProfiler().add("RTControllerPID 90th-percentile RT <"+stage.getStage().getName()+">",
87 	new ProfilableIF() {
88 	public int profileSize() {
89 	return (int)ninetiethRT;
90 	}
91 	});
92 
93     if (ADJUST_THRESHOLD) {
94 
95       mgr.getProfiler().add("RTControllerPID queue threshold <"+stage.getStage().getName()+">",
96   	  new ProfilableIF() {
97   	  public int profileSize() {
98   	  return curThreshold;
99   	  }
100   	  });
101 
102       this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), MAX_THRESHOLD);
103       ((QueueThresholdPredicate)pred).setThreshold(INIT_THRESHOLD);
104       this.curThreshold = ((QueueThresholdPredicate)pred).getThreshold();
105       stage.getStage().getSink().setEnqueuePredicate(this.pred);
106 
107       System.err.println("RTControllerPID <"+stage.getStage().getName()+">: ADJUST_THRESH enabled, MEASUREMENT_SIZE="+MEASUREMENT_SIZE+", SMOOTH_CONST="+SMOOTH_CONST+", PROP_GAIN="+PROP_GAIN+", DERIV_GAIN="+DERIV_GAIN+", INTR_GAIN="+INTR_GAIN);
108 
109     } else if (ADJUST_RATE) {
110 
111       this.pred = new RateLimitingPredicate(stage.getStage().getSink(), INIT_RATE, INIT_DEPTH);
112       this.curRate = ((RateLimitingPredicate)pred).getTargetRate();
113       stage.getStage().getSink().setEnqueuePredicate(pred);
114 
115       System.err.println("RTControllerPID <"+stage.getStage().getName()+">: ADJUST_RATE enabled, MEASUREMENT_SIZE="+MEASUREMENT_SIZE+", SMOOTH_CONST="+SMOOTH_CONST+", PROP_GAIN="+PROP_GAIN+", DERIV_GAIN="+DERIV_GAIN+", INTR_GAIN="+INTR_GAIN);
116     }
117 
118     this.enabled = true;
119   }
120 
enable()121   public synchronized void enable() {
122     if (enabled) return;
123 
124     System.err.println("RTControllerPID <"+stage.getStage().getName()+">: Enabling");
125     if (ADJUST_THRESHOLD) {
126       this.pred = new QueueThresholdPredicate(stage.getStage().getSink(), curThreshold);
127 
128     } else if (ADJUST_RATE) {
129       this.pred = new RateLimitingPredicate(stage.getStage().getSink(), curRate, INIT_DEPTH);
130     }
131 
132     stage.getStage().getSink().setEnqueuePredicate(this.pred);
133     enabled = true;
134   }
135 
disable()136   public synchronized void disable() {
137     if (!enabled) return;
138     System.err.println("RTControllerPID <"+stage.getStage().getName()+">: Disabling");
139     this.pred = null;
140     stage.getStage().getSink().setEnqueuePredicate(null);
141     enabled = false;
142   }
143 
adjustThreshold(QueueElementIF fetched[], long procTime)144   public synchronized void adjustThreshold(QueueElementIF fetched[], long procTime) {
145     long curtime = System.currentTimeMillis();
146     boolean adjust = false;
147 
148     for (int i = 0; i < fetched.length; i++) {
149       if (fetched[i] instanceof TimeStampedEvent) {
150 	TimeStampedEvent ev = (TimeStampedEvent)fetched[i];
151 	long time = ev.timestamp;
152 	if (time != 0) {
153 	  measurements[cur_measurement] = curtime - time;
154 	  cur_measurement++;
155 	  if (cur_measurement == MEASUREMENT_SIZE) {
156 	    cur_measurement = 0;
157 	    adjust = true;
158 	  }
159 	}
160       }
161     }
162 
163     int numsort = MEASUREMENT_SIZE;
164     long elapsed = curtime - adjtime;
165     if (elapsed >= MEASUREMENT_TIME) {
166       adjust = true;
167       numsort = cur_measurement;
168       cur_measurement = 0;
169     }
170 
171     if (!adjust) return;
172     System.arraycopy(measurements, 0, sortedmeasurements, 0, numsort);
173     Arrays.sort(sortedmeasurements, 0, numsort);
174     long cur = sortedmeasurements[ (int)(0.9 * (double)numsort) ];
175     ninetiethRT = (SMOOTH_CONST * (double)ninetiethRT*1.0) + ((1.0 - SMOOTH_CONST) * ((double)cur * 1.0));
176     adjtime = curtime;
177     stage.getStats().record90thRT(ninetiethRT);
178 
179     int numReceived = sinkProxy.enqueueSuccessCount;
180     sinkProxy.enqueueSuccessCount = 0;
181     double cur_lambda = (numReceived * 1.0) / (elapsed * 1.0e-3);
182     lambda = (SMOOTH_CONST * lambda) + ((1.0 - SMOOTH_CONST) * cur_lambda);
183 
184     // Apply PID control
185     double err = (targetRT - ninetiethRT) / targetRT;
186     double derr = (err - lasterr) / (double)(elapsed * 1.0e-3);
187     double interr = (((lasterr + err)/2) * (double)((elapsed) * 1.0e-3));
188     lasterr = err; adjtime = curtime;
189 
190     totalinterr -= errors[cur_error];
191     totalinterr += interr;
192     errors[cur_error] = interr;
193     cur_error++; if (cur_error == MEASUREMENT_SIZE) cur_error = 0;
194 
195 //    interr -= errors[cur_error];
196 //    errors[cur_error] = interr;
197 //    cur_error++; if (cur_error == MEASUREMENT_SIZE) cur_error = 0;
198 //    lasterr = err; lastinterr = interr; adjtime = curtime;
199 
200     double out;
201     if (BE_CREATIVE) {
202       out = (PROP_GAIN * err * err);
203       if (err < 0) { out *= -1; }
204     } else {
205       out = ((PROP_GAIN * err) + (DERIV_GAIN * derr) + (INTR_GAIN*totalinterr));
206     }
207 
208     if (DEBUG) System.err.println("RTControllerPID <"+stage.getStage().getName()+">: lambda "+MDWUtil.format(lambda)+" 90th "+MDWUtil.format(ninetiethRT)+" err "+MDWUtil.format(err)+" derr "+MDWUtil.format(derr)+" interr "+MDWUtil.format(totalinterr)+" out "+MDWUtil.format(out));
209 
210     if (!enabled) return;
211 
212     if (ADJUST_THRESHOLD) {
213       curThreshold += out;
214       //curThreshold = (int)((MIN_THRESHOLD) + ((MAX_THRESHOLD - MIN_THRESHOLD) * out));
215       if (curThreshold < MIN_THRESHOLD) curThreshold = MIN_THRESHOLD;
216       if (curThreshold > MAX_THRESHOLD) curThreshold = MAX_THRESHOLD;
217 
218       if (DEBUG) System.err.println("RTControllerPID <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" target "+targetRT+" threshold "+curThreshold);
219       ((QueueThresholdPredicate)pred).setThreshold(curThreshold);
220 
221     } else if (ADJUST_RATE) {
222 
223       if (BE_CREATIVE) {
224 	if (out < 0) {
225 	  //curRate /= (out * -1);
226 	  curRate /= 2;
227 	} else {
228 	  curRate += out;
229 	}
230       } else {
231 	curRate += out;
232       }
233 
234       curRate = Math.max(MIN_RATE, curRate);
235       curRate = Math.min(MAX_RATE, curRate);
236       ((RateLimitingPredicate)pred).setTargetRate(this.curRate);
237 
238       if (DEBUG) System.err.println("RTControllerPID <"+stage.getStage().getName()+">: ninetiethRT "+ninetiethRT+" target "+targetRT+" rate now "+curRate);
239 
240     }
241 
242   }
243 
244 }
245