1 /*
2  * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
3  * California. All rights reserved.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation for any purpose, without fee, and without written agreement is
7  * hereby granted, provided that the above copyright notice and the following
8  * two paragraphs appear in all copies of this software.
9  *
10  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
11  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
12  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
13  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
14  *
15  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
16  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
17  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
18  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
19  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
20  *
21  * Author: Matt Welsh <mdw@cs.berkeley.edu>
22  *
23  */
24 
25 package seda.sandStorm.internal;
26 
27 import seda.sandStorm.api.*;
28 import seda.sandStorm.api.internal.*;
29 import seda.sandStorm.core.*;
30 import seda.sandStorm.main.*;
31 import java.util.*;
32 
33 /**
34  * The ThreadPoolController is responsible for dynamically adusting the
35  * size of a given ThreadPool.
36  *
37  * @author   Matt Welsh
38  */
39 
40 public class ThreadPoolController {
41 
42   private static final boolean DEBUG = false;
43 
44   // Multiple of standard controller delay
45   private static final int CONTROLLER_DELAY = 4;
46 
47   // Multiple of standard controller delay
48   private static final int THROUGHPUT_MEASUREMENT_DELAY = 1;
49 
50   // Multiple of standard controller delay
51   private static final int AUTO_MAX_DETECT_DELAY = 10;
52 
53   // Size of random jump down in number of threads
54   private static final int AUTO_MAX_DETECT_RANDOM_JUMP = 4;
55 
56   private static final double SMOOTH_CONST = 0.3;
57 
58   private ManagerIF mgr;
59   private Vector tpvec;
60 
61   private boolean autoMaxDetect;
62   private Thread controller;
63   private int controllerDelay, controllerThreshold;
64 
ThreadPoolController(ManagerIF mgr)65   public ThreadPoolController(ManagerIF mgr) {
66     this.mgr = mgr;
67     tpvec = new Vector();
68 
69     SandstormConfig config = mgr.getConfig();
70     this.controllerDelay = config.getInt("global.threadPool.sizeController.delay");
71     this.controllerThreshold = config.getInt("global.threadPool.sizeController.threshold");
72     this.autoMaxDetect = config.getBoolean("global.threadPool.sizeController.autoMaxDetect");
73 
74     start();
75   }
76 
ThreadPoolController(ManagerIF mgr, int delay, int threshold)77   public ThreadPoolController(ManagerIF mgr, int delay, int threshold) {
78     this.mgr = mgr;
79     tpvec = new Vector();
80     this.controllerDelay = delay;
81     SandstormConfig config = mgr.getConfig();
82     if (this.controllerDelay == -1) {
83       this.controllerDelay = config.getInt("global.threadPool.sizeController.delay");
84     }
85     this.controllerThreshold = threshold;
86     if (this.controllerThreshold == -1) {
87       this.controllerThreshold = config.getInt("global.threadPool.sizeController.threshold");
88     }
89 
90     this.autoMaxDetect = config.getBoolean("global.threadPool.sizeController.autoMaxDetect");
91     start();
92   }
93 
94   /**
95    * Register a thread pool with this controller, using the queue threshold
96    * specified by the system configuration.
97    */
register(StageWrapperIF stage, ThreadPool tp)98   public void register(StageWrapperIF stage, ThreadPool tp) {
99     SandstormConfig config = mgr.getConfig();
100     int thresh = config.getInt("stages."+stage.getStage().getName()+".threadPool.sizeController.threshold", controllerThreshold);
101     tpvec.addElement(new tpcClient(stage, tp, null, thresh));
102   }
103 
104   /**
105    * Register a thread pool with this controller, using the queue threshold
106    * specified by the system configuration.
107    */
register(StageWrapperIF stage, ThreadPool tp, ProfilableIF metric)108   public void register(StageWrapperIF stage, ThreadPool tp, ProfilableIF metric) {
109     tpvec.addElement(new tpcClient(stage, tp, metric, controllerThreshold));
110   }
111 
start()112   private void start() {
113     System.err.println("ThreadPoolController: Started, delay "+controllerDelay+" ms, threshold "+controllerThreshold+", autoMaxDetect "+autoMaxDetect);
114     controller = new Thread(new controllerThread(), "TPC");
115     controller.start();
116   }
117 
118   /**
119    * Internal class representing a single TPC-controlled thread pool.
120    */
121   class tpcClient {
122     private StageWrapperIF stage;
123     private ThreadPool tp;
124     private int threshold;
125     private ProfilableIF metric;
126 
127     int savedThreads, avgThreads;
128     long savedTotalEvents;
129     double savedThroughput, avgThroughput;
130     long last_time, reset_time;
131 
tpcClient(final StageWrapperIF stage, ThreadPool tp, ProfilableIF metric, int threshold)132     tpcClient(final StageWrapperIF stage, ThreadPool tp, ProfilableIF metric, int threshold) {
133       this.stage = stage;
134       this.tp = tp;
135       this.threshold = threshold;
136       this.metric = metric;
137       if (this.metric == null) {
138 	this.metric = new ProfilableIF() {
139 	  public int profileSize() {
140 	    return stage.getSource().size();
141 	  }
142 	};
143       }
144 
145       savedThreads = tp.numThreads();
146       reset_time = last_time = System.currentTimeMillis();
147 
148       mgr.getProfiler().add("TPController savedThreads <"+stage.getStage().getName()+">",
149 	  new ProfilableIF() {
150 	  public int profileSize() {
151   	  return (int)savedThreads;
152 	  }
153 	  });
154 
155       mgr.getProfiler().add("TPController avgThreads <"+stage.getStage().getName()+">",
156 	  new ProfilableIF() {
157 	  public int profileSize() {
158   	  return (int)avgThreads;
159 	  }
160 	  });
161 
162       mgr.getProfiler().add("TPController savedThroughput <"+stage.getStage().getName()+">",
163 	  new ProfilableIF() {
164 	  public int profileSize() {
165   	  return (int)savedThroughput;
166 	  }
167 	  });
168 
169       mgr.getProfiler().add("TPController avgThroughput <"+stage.getStage().getName()+">",
170 	  new ProfilableIF() {
171 	  public int profileSize() {
172   	  return (int)avgThroughput;
173 	  }
174 	  });
175     }
176   }
177 
178   /**
179    * Internal class implementing the controller.
180    */
181   class controllerThread implements Runnable {
182 
183     int adjust_count = 0;
184     Random rand;
185 
controllerThread()186     controllerThread() {
187       rand = new Random();
188     }
189 
run()190     public void run() {
191       if (DEBUG) System.err.println("TP size controller: starting");
192 
193       while (true) {
194 	adjustThreadPools();
195 	try {
196 	  Thread.currentThread().sleep(controllerDelay);
197 	} catch (InterruptedException ie) {
198 	  // Ignore
199 	}
200       }
201     }
202 
adjustThreadPools()203     private void adjustThreadPools() {
204 
205       adjust_count++;
206 
207       if ((adjust_count % CONTROLLER_DELAY) == 0) {
208 
209 	for (int i = 0; i < tpvec.size(); i++) {
210 	  tpcClient tpc = (tpcClient)tpvec.elementAt(i);
211 
212 	  //if (DEBUG) System.err.println("TP controller: Inspecting "+tpc.tp);
213 
214 	  int sz = tpc.metric.profileSize();
215 	  //if (DEBUG) System.err.println("TP controller: "+tpc.tp+" has size "+sz+", threshold "+tpc.threshold);
216 	  boolean addThread = false;
217 	  if (sz >= tpc.threshold) addThread = true;
218 
219 	  if (addThread) {
220 	    tpc.tp.addThreads(1, true);
221 	  }
222 	}
223       }
224 
225       if ((DEBUG || autoMaxDetect) &&
226 	  (adjust_count % THROUGHPUT_MEASUREMENT_DELAY) == 0) {
227 
228 	long curTime = System.currentTimeMillis();
229 
230 	for (int i = 0; i < tpvec.size(); i++) {
231 	  tpcClient tpc = (tpcClient)tpvec.elementAt(i);
232 
233 	  StageWrapper sw;
234 	  try {
235 	    sw = (StageWrapper)tpc.stage;
236 	  } catch (ClassCastException se) {
237 	    // Skip this one
238 	    continue;
239 	  }
240 
241 	  long events = sw.getStats().getTotalEvents();
242 	  long curEvents = events - tpc.savedTotalEvents;
243 	  tpc.savedTotalEvents = events;
244 	  if (DEBUG) System.err.println("TP <"+tpc.stage.getStage().getName()+"> events "+events+" curEvents "+curEvents);
245 
246 	  int curThreads = tpc.tp.numThreads();
247 	  tpc.avgThreads = (int)((SMOOTH_CONST * curThreads) + ((1.0 - SMOOTH_CONST) * (double)(tpc.avgThreads * 1.0)));
248 
249 	  //double throughput = (sw.getStats().getServiceRate() * curThreads);
250 	  double throughput = (curEvents * 1.0) / ((curTime - tpc.last_time) * 1.0e-3);
251 	  tpc.avgThroughput = (SMOOTH_CONST * throughput) + ((1.0 - SMOOTH_CONST) * (double)(tpc.avgThroughput * 1.0));
252 	  if (DEBUG) System.err.println("TP <"+tpc.stage.getStage().getName()+"> throughput "+tpc.avgThroughput);
253 	  tpc.last_time = curTime;
254 	}
255       }
256 
257       if (autoMaxDetect && (adjust_count % AUTO_MAX_DETECT_DELAY) == 0) {
258 
259 	for (int i = 0; i < tpvec.size(); i++) {
260 	  tpcClient tpc = (tpcClient)tpvec.elementAt(i);
261 
262 	  // Periodically override saved values
263 	  //long tr = curTime - tpc.reset_time;
264 	  //if (rand.nextDouble() < 1.0 - Math.exp(-1.0 * (tr / 1e5)))
265 	  //  System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Resetting saved values");
266 	  //  tpc.reset_time = curTime;
267 	  //  tpc.savedThreads = tpc.avgThreads;
268 	  //  tpc.savedThroughput = tpc.avgThroughput;
269 
270 	  // Make random jump down
271 	  // int nt = (int)(rand.nextDouble() * AUTO_MAX_DETECT_RANDOM_JUMP);
272 	  // tpc.tp.removeThreads(nt);
273 	  //
274 
275 	  //continue;
276 
277 	  if (tpc.avgThroughput >= (1.0 * tpc.savedThroughput)) {
278 	    // Accept new state
279 
280 	    tpc.savedThreads = tpc.tp.numThreads();
281 	    tpc.savedThroughput = tpc.avgThroughput;
282 	    if (DEBUG) System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Setting new state to threads="+tpc.savedThreads+" tp="+tpc.savedThroughput);
283 
284 	    //	  else if (tpc.avgThroughput <= (1.2 * tpc.savedThroughput))
285 	    // We are degrading: halve the number of threads
286 
287 	    //	    int numThreads = tpc.tp.numThreads();
288 	    //	    int newThreads = Math.max(1, numThreads / 2);
289 	    //	    System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Degrading (tp="+tpc.avgThroughput+") Reverting to threads="+tpc.savedThreads+"/"+newThreads+" stp="+tpc.savedThroughput);
290 	    //     	    if (newThreads < numThreads)
291 	    //	      tpc.tp.removeThreads(numThreads - newThreads);
292 	    //	    tpc.savedThroughput = tpc.avgThroughput;
293 	    //	    tpc.savedThreads = newThreads;
294 
295 	  } else if (tpc.avgThroughput <= (1.2 * tpc.savedThroughput)) {
296 	    // Otherwise reset to savedThreads (minus random jump down)
297 	    // as long as the number of threads is different
298 
299 	    if (tpc.savedThreads != tpc.tp.numThreads()) {
300 	      int numThreads = tpc.tp.numThreads();
301 	      int nt = (int)(rand.nextDouble() * AUTO_MAX_DETECT_RANDOM_JUMP);
302 	      int newThreads = Math.max(1, tpc.savedThreads - nt);
303 
304 	      if (DEBUG || autoMaxDetect) System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Reverting to threads="+tpc.savedThreads+"/"+newThreads+" stp="+tpc.savedThroughput);
305 
306 	      if (newThreads < numThreads) {
307 		// Remove threads
308 		tpc.tp.removeThreads(numThreads - newThreads);
309 	      } else if (newThreads > numThreads) {
310 		// Add threads
311 		tpc.tp.addThreads(newThreads - numThreads, true);
312 	      }
313 	    }
314 	  }
315 	}
316 	return;
317       }
318 
319     }
320   }
321 }
322 
323 
324