1 /* 2 * Copyright (c) 2001 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.internal; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.api.internal.*; 29 import seda.sandStorm.core.*; 30 import seda.sandStorm.main.*; 31 import java.util.*; 32 33 /** 34 * The ThreadPoolController is responsible for dynamically adusting the 35 * size of a given ThreadPool. 36 * 37 * @author Matt Welsh 38 */ 39 40 public class ThreadPoolController { 41 42 private static final boolean DEBUG = false; 43 44 // Multiple of standard controller delay 45 private static final int CONTROLLER_DELAY = 4; 46 47 // Multiple of standard controller delay 48 private static final int THROUGHPUT_MEASUREMENT_DELAY = 1; 49 50 // Multiple of standard controller delay 51 private static final int AUTO_MAX_DETECT_DELAY = 10; 52 53 // Size of random jump down in number of threads 54 private static final int AUTO_MAX_DETECT_RANDOM_JUMP = 4; 55 56 private static final double SMOOTH_CONST = 0.3; 57 58 private ManagerIF mgr; 59 private Vector tpvec; 60 61 private boolean autoMaxDetect; 62 private Thread controller; 63 private int controllerDelay, controllerThreshold; 64 ThreadPoolController(ManagerIF mgr)65 public ThreadPoolController(ManagerIF mgr) { 66 this.mgr = mgr; 67 tpvec = new Vector(); 68 69 SandstormConfig config = mgr.getConfig(); 70 this.controllerDelay = config.getInt("global.threadPool.sizeController.delay"); 71 this.controllerThreshold = config.getInt("global.threadPool.sizeController.threshold"); 72 this.autoMaxDetect = config.getBoolean("global.threadPool.sizeController.autoMaxDetect"); 73 74 start(); 75 } 76 ThreadPoolController(ManagerIF mgr, int delay, int threshold)77 public ThreadPoolController(ManagerIF mgr, int delay, int threshold) { 78 this.mgr = mgr; 79 tpvec = new Vector(); 80 this.controllerDelay = delay; 81 SandstormConfig config = mgr.getConfig(); 82 if (this.controllerDelay == -1) { 83 this.controllerDelay = config.getInt("global.threadPool.sizeController.delay"); 84 } 85 this.controllerThreshold = threshold; 86 if (this.controllerThreshold == -1) { 87 this.controllerThreshold = config.getInt("global.threadPool.sizeController.threshold"); 88 } 89 90 this.autoMaxDetect = config.getBoolean("global.threadPool.sizeController.autoMaxDetect"); 91 start(); 92 } 93 94 /** 95 * Register a thread pool with this controller, using the queue threshold 96 * specified by the system configuration. 97 */ register(StageWrapperIF stage, ThreadPool tp)98 public void register(StageWrapperIF stage, ThreadPool tp) { 99 SandstormConfig config = mgr.getConfig(); 100 int thresh = config.getInt("stages."+stage.getStage().getName()+".threadPool.sizeController.threshold", controllerThreshold); 101 tpvec.addElement(new tpcClient(stage, tp, null, thresh)); 102 } 103 104 /** 105 * Register a thread pool with this controller, using the queue threshold 106 * specified by the system configuration. 107 */ register(StageWrapperIF stage, ThreadPool tp, ProfilableIF metric)108 public void register(StageWrapperIF stage, ThreadPool tp, ProfilableIF metric) { 109 tpvec.addElement(new tpcClient(stage, tp, metric, controllerThreshold)); 110 } 111 start()112 private void start() { 113 System.err.println("ThreadPoolController: Started, delay "+controllerDelay+" ms, threshold "+controllerThreshold+", autoMaxDetect "+autoMaxDetect); 114 controller = new Thread(new controllerThread(), "TPC"); 115 controller.start(); 116 } 117 118 /** 119 * Internal class representing a single TPC-controlled thread pool. 120 */ 121 class tpcClient { 122 private StageWrapperIF stage; 123 private ThreadPool tp; 124 private int threshold; 125 private ProfilableIF metric; 126 127 int savedThreads, avgThreads; 128 long savedTotalEvents; 129 double savedThroughput, avgThroughput; 130 long last_time, reset_time; 131 tpcClient(final StageWrapperIF stage, ThreadPool tp, ProfilableIF metric, int threshold)132 tpcClient(final StageWrapperIF stage, ThreadPool tp, ProfilableIF metric, int threshold) { 133 this.stage = stage; 134 this.tp = tp; 135 this.threshold = threshold; 136 this.metric = metric; 137 if (this.metric == null) { 138 this.metric = new ProfilableIF() { 139 public int profileSize() { 140 return stage.getSource().size(); 141 } 142 }; 143 } 144 145 savedThreads = tp.numThreads(); 146 reset_time = last_time = System.currentTimeMillis(); 147 148 mgr.getProfiler().add("TPController savedThreads <"+stage.getStage().getName()+">", 149 new ProfilableIF() { 150 public int profileSize() { 151 return (int)savedThreads; 152 } 153 }); 154 155 mgr.getProfiler().add("TPController avgThreads <"+stage.getStage().getName()+">", 156 new ProfilableIF() { 157 public int profileSize() { 158 return (int)avgThreads; 159 } 160 }); 161 162 mgr.getProfiler().add("TPController savedThroughput <"+stage.getStage().getName()+">", 163 new ProfilableIF() { 164 public int profileSize() { 165 return (int)savedThroughput; 166 } 167 }); 168 169 mgr.getProfiler().add("TPController avgThroughput <"+stage.getStage().getName()+">", 170 new ProfilableIF() { 171 public int profileSize() { 172 return (int)avgThroughput; 173 } 174 }); 175 } 176 } 177 178 /** 179 * Internal class implementing the controller. 180 */ 181 class controllerThread implements Runnable { 182 183 int adjust_count = 0; 184 Random rand; 185 controllerThread()186 controllerThread() { 187 rand = new Random(); 188 } 189 run()190 public void run() { 191 if (DEBUG) System.err.println("TP size controller: starting"); 192 193 while (true) { 194 adjustThreadPools(); 195 try { 196 Thread.currentThread().sleep(controllerDelay); 197 } catch (InterruptedException ie) { 198 // Ignore 199 } 200 } 201 } 202 adjustThreadPools()203 private void adjustThreadPools() { 204 205 adjust_count++; 206 207 if ((adjust_count % CONTROLLER_DELAY) == 0) { 208 209 for (int i = 0; i < tpvec.size(); i++) { 210 tpcClient tpc = (tpcClient)tpvec.elementAt(i); 211 212 //if (DEBUG) System.err.println("TP controller: Inspecting "+tpc.tp); 213 214 int sz = tpc.metric.profileSize(); 215 //if (DEBUG) System.err.println("TP controller: "+tpc.tp+" has size "+sz+", threshold "+tpc.threshold); 216 boolean addThread = false; 217 if (sz >= tpc.threshold) addThread = true; 218 219 if (addThread) { 220 tpc.tp.addThreads(1, true); 221 } 222 } 223 } 224 225 if ((DEBUG || autoMaxDetect) && 226 (adjust_count % THROUGHPUT_MEASUREMENT_DELAY) == 0) { 227 228 long curTime = System.currentTimeMillis(); 229 230 for (int i = 0; i < tpvec.size(); i++) { 231 tpcClient tpc = (tpcClient)tpvec.elementAt(i); 232 233 StageWrapper sw; 234 try { 235 sw = (StageWrapper)tpc.stage; 236 } catch (ClassCastException se) { 237 // Skip this one 238 continue; 239 } 240 241 long events = sw.getStats().getTotalEvents(); 242 long curEvents = events - tpc.savedTotalEvents; 243 tpc.savedTotalEvents = events; 244 if (DEBUG) System.err.println("TP <"+tpc.stage.getStage().getName()+"> events "+events+" curEvents "+curEvents); 245 246 int curThreads = tpc.tp.numThreads(); 247 tpc.avgThreads = (int)((SMOOTH_CONST * curThreads) + ((1.0 - SMOOTH_CONST) * (double)(tpc.avgThreads * 1.0))); 248 249 //double throughput = (sw.getStats().getServiceRate() * curThreads); 250 double throughput = (curEvents * 1.0) / ((curTime - tpc.last_time) * 1.0e-3); 251 tpc.avgThroughput = (SMOOTH_CONST * throughput) + ((1.0 - SMOOTH_CONST) * (double)(tpc.avgThroughput * 1.0)); 252 if (DEBUG) System.err.println("TP <"+tpc.stage.getStage().getName()+"> throughput "+tpc.avgThroughput); 253 tpc.last_time = curTime; 254 } 255 } 256 257 if (autoMaxDetect && (adjust_count % AUTO_MAX_DETECT_DELAY) == 0) { 258 259 for (int i = 0; i < tpvec.size(); i++) { 260 tpcClient tpc = (tpcClient)tpvec.elementAt(i); 261 262 // Periodically override saved values 263 //long tr = curTime - tpc.reset_time; 264 //if (rand.nextDouble() < 1.0 - Math.exp(-1.0 * (tr / 1e5))) 265 // System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Resetting saved values"); 266 // tpc.reset_time = curTime; 267 // tpc.savedThreads = tpc.avgThreads; 268 // tpc.savedThroughput = tpc.avgThroughput; 269 270 // Make random jump down 271 // int nt = (int)(rand.nextDouble() * AUTO_MAX_DETECT_RANDOM_JUMP); 272 // tpc.tp.removeThreads(nt); 273 // 274 275 //continue; 276 277 if (tpc.avgThroughput >= (1.0 * tpc.savedThroughput)) { 278 // Accept new state 279 280 tpc.savedThreads = tpc.tp.numThreads(); 281 tpc.savedThroughput = tpc.avgThroughput; 282 if (DEBUG) System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Setting new state to threads="+tpc.savedThreads+" tp="+tpc.savedThroughput); 283 284 // else if (tpc.avgThroughput <= (1.2 * tpc.savedThroughput)) 285 // We are degrading: halve the number of threads 286 287 // int numThreads = tpc.tp.numThreads(); 288 // int newThreads = Math.max(1, numThreads / 2); 289 // System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Degrading (tp="+tpc.avgThroughput+") Reverting to threads="+tpc.savedThreads+"/"+newThreads+" stp="+tpc.savedThroughput); 290 // if (newThreads < numThreads) 291 // tpc.tp.removeThreads(numThreads - newThreads); 292 // tpc.savedThroughput = tpc.avgThroughput; 293 // tpc.savedThreads = newThreads; 294 295 } else if (tpc.avgThroughput <= (1.2 * tpc.savedThroughput)) { 296 // Otherwise reset to savedThreads (minus random jump down) 297 // as long as the number of threads is different 298 299 if (tpc.savedThreads != tpc.tp.numThreads()) { 300 int numThreads = tpc.tp.numThreads(); 301 int nt = (int)(rand.nextDouble() * AUTO_MAX_DETECT_RANDOM_JUMP); 302 int newThreads = Math.max(1, tpc.savedThreads - nt); 303 304 if (DEBUG || autoMaxDetect) System.err.println("TP controller <"+tpc.stage.getStage().getName()+"> Reverting to threads="+tpc.savedThreads+"/"+newThreads+" stp="+tpc.savedThroughput); 305 306 if (newThreads < numThreads) { 307 // Remove threads 308 tpc.tp.removeThreads(numThreads - newThreads); 309 } else if (newThreads > numThreads) { 310 // Add threads 311 tpc.tp.addThreads(newThreads - numThreads, true); 312 } 313 } 314 } 315 } 316 return; 317 } 318 319 } 320 } 321 } 322 323 324