1 /* 2 * Copyright (c) 2001 by Matt Welsh and The Regents of the University of 3 * California. All rights reserved. 4 * 5 * Permission to use, copy, modify, and distribute this software and its 6 * documentation for any purpose, without fee, and without written agreement is 7 * hereby granted, provided that the above copyright notice and the following 8 * two paragraphs appear in all copies of this software. 9 * 10 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR 11 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT 12 * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF 13 * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 14 * 15 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 16 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY 17 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS 18 * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO 19 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 20 * 21 * Author: Matt Welsh <mdw@cs.berkeley.edu> 22 * 23 */ 24 25 package seda.sandStorm.internal; 26 27 import seda.sandStorm.api.*; 28 import seda.sandStorm.api.internal.*; 29 import seda.sandStorm.core.*; 30 import seda.sandStorm.main.*; 31 import java.util.*; 32 33 /** 34 * ThreadPool is a generic class which provides a thread pool. 35 * 36 * @author Matt Welsh 37 */ 38 39 public class ThreadPool implements ProfilableIF { 40 41 private static final boolean DEBUG = false; 42 43 private StageWrapperIF stage; 44 private ManagerIF mgr; 45 private String poolname; 46 private ThreadGroup pooltg; 47 private Runnable runnable; 48 private Vector threads, stoppedThreads; 49 50 int minThreads, maxThreads; 51 52 private int maxAggregation; 53 private int blockTime = 1000; 54 private int idleTimeThreshold; 55 private AggThrottle aggThrottle; 56 57 /** 58 * Create a thread pool for the given stage, manager and runnable, 59 * with the thread pool controller determining the number of threads 60 * used. 61 */ ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable)62 public ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable) { 63 this.stage = stage; 64 this.poolname = stage.getStage().getName(); 65 this.mgr = mgr; 66 this.runnable = runnable; 67 68 SandstormConfig config = mgr.getConfig(); 69 if (config.getBoolean("global.batchController.enable")) { 70 aggThrottle = new AggThrottle(stage, mgr); 71 } else { 72 this.maxAggregation = config.getInt("global.batchController.maxBatch"); 73 } 74 75 threads = new Vector(); 76 stoppedThreads = new Vector(); 77 78 // First look for stages.[stageName] options, then global options 79 String tag = "stages."+(stage.getStage().getName())+".threadPool."; 80 String globaltag = "global.threadPool."; 81 82 int initialSize = config.getInt(tag+"initialThreads"); 83 if (initialSize < 1) { 84 initialSize = config.getInt(globaltag+"initialThreads"); 85 if (initialSize < 1) initialSize = 1; 86 } 87 minThreads = config.getInt(tag+"minThreads"); 88 if (minThreads < 1) { 89 minThreads = config.getInt(globaltag+"minThreads"); 90 if (minThreads < 1) minThreads = 1; 91 } 92 maxThreads = config.getInt(tag+"maxThreads"); 93 if (maxThreads < 1) { 94 maxThreads = config.getInt(globaltag+"maxThreads"); 95 if (maxThreads < 1) maxThreads = -1; // Infinite 96 } 97 98 this.blockTime = config.getInt(tag+"blockTime", 99 config.getInt(globaltag+"blockTime", blockTime)); 100 this.idleTimeThreshold = config.getInt(tag+"sizeController.idleTimeThreshold", 101 config.getInt(globaltag+"sizeController.idleTimeThreshold", blockTime)); 102 103 System.err.println("TP <"+poolname+">: initial "+initialSize+", min "+minThreads+", max "+maxThreads+", blockTime "+blockTime+", idleTime "+idleTimeThreshold); 104 105 addThreads(initialSize, false); 106 mgr.getProfiler().add("ThreadPool <"+poolname+">", this); 107 pooltg = new ThreadGroup("TP <"+poolname+">"); 108 } 109 110 /** 111 * Create a thread pool with the given name, manager, runnable, 112 * and thread sizing parameters. 113 */ ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable, int initialThreads, int minThreads, int maxThreads, int blockTime, int idleTimeThreshold)114 public ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable, 115 int initialThreads, int minThreads, int maxThreads, int blockTime, int idleTimeThreshold) { 116 this.stage = stage; 117 this.poolname = stage.getStage().getName(); 118 this.mgr = mgr; 119 this.runnable = runnable; 120 121 SandstormConfig config = mgr.getConfig(); 122 if (config.getBoolean("global.batchController.enable")) { 123 aggThrottle = new AggThrottle(stage, mgr); 124 } else { 125 this.maxAggregation = config.getInt("global.batchController.maxBatch"); 126 } 127 128 threads = new Vector(); 129 stoppedThreads = new Vector(); 130 if (initialThreads < 1) initialThreads = 1; 131 this.minThreads = minThreads; 132 if (this.minThreads < 1) this.minThreads = 1; 133 this.maxThreads = maxThreads; 134 //if (this.maxThreads < 1) this.maxThreads = initialThreads; 135 this.blockTime = blockTime; 136 this.idleTimeThreshold = idleTimeThreshold; 137 138 addThreads(initialThreads, false); 139 mgr.getProfiler().add("ThreadPool <"+poolname+">", this); 140 pooltg = new ThreadGroup("TP <"+poolname+">"); 141 } 142 143 /** 144 * Create a thread pool with the given name, manager, runnable, 145 * and a fixed number of threads. 146 */ ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable, int numThreads)147 public ThreadPool(StageWrapperIF stage, ManagerIF mgr, Runnable runnable, 148 int numThreads) { 149 this.stage = stage; 150 this.poolname = stage.getStage().getName(); 151 this.mgr = mgr; 152 this.runnable = runnable; 153 154 SandstormConfig config = mgr.getConfig(); 155 if (config.getBoolean("global.batchController.enable")) { 156 aggThrottle = new AggThrottle(stage, mgr); 157 } else { 158 this.maxAggregation = config.getInt("global.batchController.maxBatch"); 159 } 160 161 threads = new Vector(); 162 stoppedThreads = new Vector(); 163 maxThreads = minThreads = numThreads; 164 addThreads(numThreads, false); 165 mgr.getProfiler().add("ThreadPool <"+poolname+">", this); 166 pooltg = new ThreadGroup("TP <"+poolname+">"); 167 } 168 169 /** 170 * Start the thread pool. 171 */ start()172 public void start() { 173 System.err.print("TP <"+poolname+">: Starting "+numThreads()+" threads"); 174 if (aggThrottle != null) { 175 System.err.println(", batchController enabled"); 176 } else { 177 System.err.println(", maxBatch="+maxAggregation); 178 } 179 for (int i = 0; i < threads.size(); i++) { 180 Thread t = (Thread)threads.elementAt(i); 181 t.start(); 182 } 183 } 184 185 /** 186 * Stop the thread pool. 187 */ stop()188 public void stop() { 189 pooltg.stop(); 190 } 191 192 /** 193 * Add threads to this pool. 194 */ addThreads(int num, boolean start)195 void addThreads(int num, boolean start) { 196 synchronized (this) { 197 int numToAdd; 198 if (maxThreads < 0) { 199 numToAdd = num; 200 } else { 201 int numTotal = Math.min(maxThreads, numThreads()+num); 202 numToAdd = numTotal - numThreads(); 203 } 204 if ((maxThreads < 0) || (numToAdd < maxThreads)) { 205 System.err.println("TP <"+poolname+">: Adding "+numToAdd+" threads to pool, size "+(numThreads()+numToAdd)); 206 } 207 for (int i = 0; i < numToAdd; i++) { 208 String name = "TP-"+numThreads()+" <"+poolname+">"; 209 Thread t = new Thread(pooltg, runnable, name); 210 threads.addElement(t); 211 mgr.getProfiler().getGraphProfiler().addThread(t, stage); 212 if (start) t.start(); 213 } 214 } 215 } 216 217 /** 218 * Remove threads from pool. 219 */ removeThreads(int num)220 void removeThreads(int num) { 221 System.err.print("TP <"+poolname+">: Removing "+num+" threads from pool, "); 222 synchronized (this) { 223 for (int i = 0; (i < num) && (numThreads() > minThreads); i++) { 224 Thread t = (Thread)threads.firstElement(); 225 stopThread(t); 226 } 227 } 228 System.err.println("size "+numThreads()); 229 } 230 231 /** 232 * Cause the given thread to stop execution. 233 */ stopThread(Thread t)234 void stopThread(Thread t) { 235 synchronized (this) { 236 threads.removeElement(t); 237 stoppedThreads.addElement(t); 238 } 239 System.err.println("TP <"+poolname+">: stopping thread, size "+numThreads()); 240 } 241 242 /** 243 * Return the number of threads in this pool. 244 */ numThreads()245 int numThreads() { 246 synchronized (this) { 247 return threads.size(); 248 } 249 } 250 251 /** 252 * Used by a thread to determine its queue block time. 253 */ getBlockTime()254 public long getBlockTime() { 255 return blockTime; 256 } 257 258 /** 259 * Used by a thread to request its aggregation target from the pool. 260 */ getAggregationTarget()261 public synchronized int getAggregationTarget() { 262 if (aggThrottle != null) { 263 return aggThrottle.getAggTarget(); 264 } else { 265 return maxAggregation; 266 } 267 } 268 269 /** 270 * Used by a thread to determine whether it should exit. 271 */ timeToStop(long idleTime)272 public boolean timeToStop(long idleTime) { 273 synchronized (this) { 274 if ((idleTime > idleTimeThreshold) && (numThreads() > minThreads)) { 275 stopThread(Thread.currentThread()); 276 } 277 if (stoppedThreads.contains(Thread.currentThread())) return true; 278 } 279 return false; 280 } 281 toString()282 public String toString() { 283 return "TP (size="+numThreads()+") for <"+poolname+">"; 284 } 285 getName()286 public String getName() { 287 return "ThreadPool <"+poolname+">"; 288 } 289 profileSize()290 public int profileSize() { 291 return numThreads(); 292 } 293 294 } 295 296